2018-01-09 18:57:14 +00:00
/ *
*
* Copyright 2014 gRPC authors .
*
* Licensed under the Apache License , Version 2.0 ( the "License" ) ;
* you may not use this file except in compliance with the License .
* You may obtain a copy of the License at
*
* http : //www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing , software
* distributed under the License is distributed on an "AS IS" BASIS ,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND , either express or implied .
* See the License for the specific language governing permissions and
* limitations under the License .
*
* /
package transport
import (
2019-01-15 16:20:41 +00:00
"context"
"fmt"
2018-01-09 18:57:14 +00:00
"io"
"math"
"net"
2018-11-26 18:23:56 +00:00
"strconv"
2018-01-09 18:57:14 +00:00
"strings"
"sync"
"sync/atomic"
"time"
"golang.org/x/net/http2"
"golang.org/x/net/http2/hpack"
2018-07-18 14:47:22 +00:00
2018-01-09 18:57:14 +00:00
"google.golang.org/grpc/codes"
"google.golang.org/grpc/credentials"
2018-07-18 14:47:22 +00:00
"google.golang.org/grpc/internal/channelz"
2019-01-15 16:20:41 +00:00
"google.golang.org/grpc/internal/syscall"
2018-01-09 18:57:14 +00:00
"google.golang.org/grpc/keepalive"
"google.golang.org/grpc/metadata"
"google.golang.org/grpc/peer"
"google.golang.org/grpc/stats"
"google.golang.org/grpc/status"
)
// http2Client implements the ClientTransport interface with HTTP2.
type http2Client struct {
ctx context . Context
cancel context . CancelFunc
2018-07-18 14:47:22 +00:00
ctxDone <- chan struct { } // Cache the ctx.Done() chan.
2018-01-09 18:57:14 +00:00
userAgent string
md interface { }
conn net . Conn // underlying communication channel
2018-07-18 14:47:22 +00:00
loopy * loopyWriter
2018-01-09 18:57:14 +00:00
remoteAddr net . Addr
localAddr net . Addr
authInfo credentials . AuthInfo // auth info about the connection
2018-07-18 14:47:22 +00:00
readerDone chan struct { } // sync point to enable testing.
writerDone chan struct { } // sync point to enable testing.
2018-01-09 18:57:14 +00:00
// goAway is closed to notify the upper layer (i.e., addrConn.transportMonitor)
// that the server sent GoAway on this transport.
goAway chan struct { }
// awakenKeepalive is used to wake up keepalive when after it has gone dormant.
awakenKeepalive chan struct { }
framer * framer
// controlBuf delivers all the control related tasks (e.g., window
// updates, reset streams, and various settings) to the controller.
controlBuf * controlBuffer
2018-07-18 14:47:22 +00:00
fc * trInFlow
2018-01-09 18:57:14 +00:00
// The scheme used: https if TLS is on, http otherwise.
scheme string
isSecure bool
2018-11-26 18:23:56 +00:00
perRPCCreds [ ] credentials . PerRPCCredentials
2018-01-09 18:57:14 +00:00
// Boolean to keep track of reading activity on transport.
// 1 is true and 0 is false.
2018-07-18 14:47:22 +00:00
activity uint32 // Accessed atomically.
kp keepalive . ClientParameters
keepaliveEnabled bool
2018-01-09 18:57:14 +00:00
statsHandler stats . Handler
initialWindowSize int32
2018-11-26 18:23:56 +00:00
// configured by peer through SETTINGS_MAX_HEADER_LIST_SIZE
maxSendHeaderListSize * uint32
2018-07-18 14:47:22 +00:00
bdpEst * bdpEstimator
2019-04-03 07:57:13 +00:00
// onPrefaceReceipt is a callback that client transport calls upon
2018-01-09 18:57:14 +00:00
// receiving server preface to signal that a succefull HTTP2
// connection was established.
2019-04-03 07:57:13 +00:00
onPrefaceReceipt func ( )
2018-01-09 18:57:14 +00:00
2018-07-18 14:47:22 +00:00
maxConcurrentStreams uint32
streamQuota int64
streamsQuotaAvailable chan struct { }
waitingStreams uint32
nextID uint32
mu sync . Mutex // guard the following variables
state transportState
2018-01-09 18:57:14 +00:00
activeStreams map [ uint32 ] * Stream
// prevGoAway ID records the Last-Stream-ID in the previous GOAway frame.
prevGoAwayID uint32
// goAwayReason records the http2.ErrCode and debug data received with the
// GoAway frame.
goAwayReason GoAwayReason
2018-07-18 14:47:22 +00:00
// Fields below are for channelz metric collection.
channelzID int64 // channelz unique identification number
2018-11-26 18:23:56 +00:00
czData * channelzData
onGoAway func ( GoAwayReason )
onClose func ( )
2018-01-09 18:57:14 +00:00
}
func dial ( ctx context . Context , fn func ( context . Context , string ) ( net . Conn , error ) , addr string ) ( net . Conn , error ) {
if fn != nil {
return fn ( ctx , addr )
}
2019-01-15 16:20:41 +00:00
return ( & net . Dialer { } ) . DialContext ( ctx , "tcp" , addr )
2018-01-09 18:57:14 +00:00
}
func isTemporary ( err error ) bool {
switch err := err . ( type ) {
case interface {
Temporary ( ) bool
} :
return err . Temporary ( )
case interface {
Timeout ( ) bool
} :
// Timeouts may be resolved upon retry, and are thus treated as
// temporary.
return err . Timeout ( )
}
2018-07-18 14:47:22 +00:00
return true
2018-01-09 18:57:14 +00:00
}
// newHTTP2Client constructs a connected ClientTransport to addr based on HTTP2
// and starts to receive messages on it. Non-nil error returns if construction
// fails.
2019-04-03 07:57:13 +00:00
func newHTTP2Client ( connectCtx , ctx context . Context , addr TargetInfo , opts ConnectOptions , onPrefaceReceipt func ( ) , onGoAway func ( GoAwayReason ) , onClose func ( ) ) ( _ * http2Client , err error ) {
2018-01-09 18:57:14 +00:00
scheme := "http"
ctx , cancel := context . WithCancel ( ctx )
defer func ( ) {
if err != nil {
cancel ( )
}
} ( )
conn , err := dial ( connectCtx , opts . Dialer , addr . Addr )
if err != nil {
if opts . FailOnNonTempDialError {
return nil , connectionErrorf ( isTemporary ( err ) , err , "transport: error while dialing: %v" , err )
}
return nil , connectionErrorf ( true , err , "transport: Error while dialing %v" , err )
}
// Any further errors will close the underlying connection
defer func ( conn net . Conn ) {
if err != nil {
conn . Close ( )
}
} ( conn )
2019-01-15 16:20:41 +00:00
kp := opts . KeepaliveParams
// Validate keepalive parameters.
if kp . Time == 0 {
kp . Time = defaultClientKeepaliveTime
}
if kp . Timeout == 0 {
kp . Timeout = defaultClientKeepaliveTimeout
}
keepaliveEnabled := false
if kp . Time != infinity {
if err = syscall . SetTCPUserTimeout ( conn , kp . Timeout ) ; err != nil {
return nil , connectionErrorf ( false , err , "transport: failed to set TCP_USER_TIMEOUT: %v" , err )
}
keepaliveEnabled = true
}
2018-01-09 18:57:14 +00:00
var (
isSecure bool
authInfo credentials . AuthInfo
)
2018-11-26 18:23:56 +00:00
transportCreds := opts . TransportCredentials
perRPCCreds := opts . PerRPCCredentials
if b := opts . CredsBundle ; b != nil {
if t := b . TransportCredentials ( ) ; t != nil {
transportCreds = t
}
if t := b . PerRPCCredentials ( ) ; t != nil {
perRPCCreds = append ( perRPCCreds , t )
}
}
if transportCreds != nil {
2018-01-09 18:57:14 +00:00
scheme = "https"
2018-11-26 18:23:56 +00:00
conn , authInfo , err = transportCreds . ClientHandshake ( connectCtx , addr . Authority , conn )
2018-01-09 18:57:14 +00:00
if err != nil {
2018-07-18 14:47:22 +00:00
return nil , connectionErrorf ( isTemporary ( err ) , err , "transport: authentication handshake failed: %v" , err )
2018-01-09 18:57:14 +00:00
}
isSecure = true
}
dynamicWindow := true
icwz := int32 ( initialWindowSize )
if opts . InitialConnWindowSize >= defaultWindowSize {
icwz = opts . InitialConnWindowSize
dynamicWindow = false
}
2018-11-26 18:23:56 +00:00
writeBufSize := opts . WriteBufferSize
readBufSize := opts . ReadBufferSize
maxHeaderListSize := defaultClientMaxHeaderListSize
if opts . MaxHeaderListSize != nil {
maxHeaderListSize = * opts . MaxHeaderListSize
2018-01-09 18:57:14 +00:00
}
t := & http2Client {
2018-07-18 14:47:22 +00:00
ctx : ctx ,
ctxDone : ctx . Done ( ) , // Cache Done chan.
cancel : cancel ,
userAgent : opts . UserAgent ,
md : addr . Metadata ,
conn : conn ,
remoteAddr : conn . RemoteAddr ( ) ,
localAddr : conn . LocalAddr ( ) ,
authInfo : authInfo ,
readerDone : make ( chan struct { } ) ,
writerDone : make ( chan struct { } ) ,
goAway : make ( chan struct { } ) ,
awakenKeepalive : make ( chan struct { } , 1 ) ,
2018-11-26 18:23:56 +00:00
framer : newFramer ( conn , writeBufSize , readBufSize , maxHeaderListSize ) ,
2018-07-18 14:47:22 +00:00
fc : & trInFlow { limit : uint32 ( icwz ) } ,
scheme : scheme ,
activeStreams : make ( map [ uint32 ] * Stream ) ,
isSecure : isSecure ,
2018-11-26 18:23:56 +00:00
perRPCCreds : perRPCCreds ,
2018-07-18 14:47:22 +00:00
kp : kp ,
statsHandler : opts . StatsHandler ,
initialWindowSize : initialWindowSize ,
2019-04-03 07:57:13 +00:00
onPrefaceReceipt : onPrefaceReceipt ,
2018-07-18 14:47:22 +00:00
nextID : 1 ,
maxConcurrentStreams : defaultMaxStreamsClient ,
streamQuota : defaultMaxStreamsClient ,
streamsQuotaAvailable : make ( chan struct { } , 1 ) ,
2018-11-26 18:23:56 +00:00
czData : new ( channelzData ) ,
onGoAway : onGoAway ,
onClose : onClose ,
2019-01-15 16:20:41 +00:00
keepaliveEnabled : keepaliveEnabled ,
2018-07-18 14:47:22 +00:00
}
t . controlBuf = newControlBuffer ( t . ctxDone )
2018-01-09 18:57:14 +00:00
if opts . InitialWindowSize >= defaultWindowSize {
t . initialWindowSize = opts . InitialWindowSize
dynamicWindow = false
}
if dynamicWindow {
t . bdpEst = & bdpEstimator {
bdp : initialWindowSize ,
updateFlowControl : t . updateFlowControl ,
}
}
// Make sure awakenKeepalive can't be written upon.
// keepalive routine will make it writable, if need be.
t . awakenKeepalive <- struct { } { }
if t . statsHandler != nil {
t . ctx = t . statsHandler . TagConn ( t . ctx , & stats . ConnTagInfo {
RemoteAddr : t . remoteAddr ,
LocalAddr : t . localAddr ,
} )
connBegin := & stats . ConnBegin {
Client : true ,
}
t . statsHandler . HandleConn ( t . ctx , connBegin )
}
2018-07-18 14:47:22 +00:00
if channelz . IsOn ( ) {
2019-01-15 16:20:41 +00:00
t . channelzID = channelz . RegisterNormalSocket ( t , opts . ChannelzParentID , fmt . Sprintf ( "%s -> %s" , t . localAddr , t . remoteAddr ) )
2018-07-18 14:47:22 +00:00
}
2019-01-15 16:20:41 +00:00
if t . keepaliveEnabled {
2018-07-18 14:47:22 +00:00
go t . keepalive ( )
}
2018-01-09 18:57:14 +00:00
// Start the reader goroutine for incoming message. Each transport has
// a dedicated goroutine which reads HTTP2 frame from network. Then it
// dispatches the frame to the corresponding stream entity.
go t . reader ( )
2018-11-26 18:23:56 +00:00
2018-01-09 18:57:14 +00:00
// Send connection preface to server.
n , err := t . conn . Write ( clientPreface )
if err != nil {
t . Close ( )
return nil , connectionErrorf ( true , err , "transport: failed to write client preface: %v" , err )
}
if n != len ( clientPreface ) {
t . Close ( )
return nil , connectionErrorf ( true , err , "transport: preface mismatch, wrote %d bytes; want %d" , n , len ( clientPreface ) )
}
2018-11-26 18:23:56 +00:00
var ss [ ] http2 . Setting
2018-01-09 18:57:14 +00:00
if t . initialWindowSize != defaultWindowSize {
2018-11-26 18:23:56 +00:00
ss = append ( ss , http2 . Setting {
2018-01-09 18:57:14 +00:00
ID : http2 . SettingInitialWindowSize ,
Val : uint32 ( t . initialWindowSize ) ,
} )
}
2018-11-26 18:23:56 +00:00
if opts . MaxHeaderListSize != nil {
ss = append ( ss , http2 . Setting {
ID : http2 . SettingMaxHeaderListSize ,
Val : * opts . MaxHeaderListSize ,
} )
}
err = t . framer . fr . WriteSettings ( ss ... )
2018-01-09 18:57:14 +00:00
if err != nil {
t . Close ( )
return nil , connectionErrorf ( true , err , "transport: failed to write initial settings frame: %v" , err )
}
// Adjust the connection flow control window if needed.
if delta := uint32 ( icwz - defaultWindowSize ) ; delta > 0 {
if err := t . framer . fr . WriteWindowUpdate ( 0 , delta ) ; err != nil {
t . Close ( )
return nil , connectionErrorf ( true , err , "transport: failed to write window update: %v" , err )
}
}
2018-11-26 18:23:56 +00:00
2019-04-03 07:57:13 +00:00
if err := t . framer . writer . Flush ( ) ; err != nil {
return nil , err
}
2018-01-09 18:57:14 +00:00
go func ( ) {
2018-07-18 14:47:22 +00:00
t . loopy = newLoopyWriter ( clientSide , t . framer , t . controlBuf , t . bdpEst )
err := t . loopy . run ( )
if err != nil {
errorf ( "transport: loopyWriter.run returning. Err: %v" , err )
}
// If it's a connection error, let reader goroutine handle it
// since there might be data in the buffers.
if _ , ok := err . ( net . Error ) ; ! ok {
t . conn . Close ( )
}
close ( t . writerDone )
2018-01-09 18:57:14 +00:00
} ( )
return t , nil
}
func ( t * http2Client ) newStream ( ctx context . Context , callHdr * CallHdr ) * Stream {
// TODO(zhaoq): Handle uint32 overflow of Stream.id.
s := & Stream {
2018-03-06 22:33:18 +00:00
done : make ( chan struct { } ) ,
method : callHdr . Method ,
sendCompress : callHdr . SendCompress ,
buf : newRecvBuffer ( ) ,
headerChan : make ( chan struct { } ) ,
contentSubtype : callHdr . ContentSubtype ,
2018-01-09 18:57:14 +00:00
}
2018-07-18 14:47:22 +00:00
s . wq = newWriteQuota ( defaultWriteQuota , s . done )
2018-01-09 18:57:14 +00:00
s . requestRead = func ( n int ) {
t . adjustWindow ( s , uint32 ( n ) )
}
// The client side stream context should have exactly the same life cycle with the user provided context.
// That means, s.ctx should be read-only. And s.ctx is done iff ctx is done.
// So we use the original context here instead of creating a copy.
s . ctx = ctx
s . trReader = & transportReader {
reader : & recvBufferReader {
2018-07-18 14:47:22 +00:00
ctx : s . ctx ,
ctxDone : s . ctx . Done ( ) ,
recv : s . buf ,
2019-04-03 07:57:13 +00:00
closeStream : func ( err error ) {
t . CloseStream ( s , err )
} ,
2018-01-09 18:57:14 +00:00
} ,
windowHandler : func ( n int ) {
t . updateWindow ( s , uint32 ( n ) )
} ,
}
return s
}
2018-07-18 14:47:22 +00:00
func ( t * http2Client ) getPeer ( ) * peer . Peer {
2018-01-09 18:57:14 +00:00
pr := & peer . Peer {
Addr : t . remoteAddr ,
}
// Attach Auth info if there is any.
if t . authInfo != nil {
pr . AuthInfo = t . authInfo
}
2018-07-18 14:47:22 +00:00
return pr
}
2018-03-06 22:33:18 +00:00
2018-07-18 14:47:22 +00:00
func ( t * http2Client ) createHeaderFields ( ctx context . Context , callHdr * CallHdr ) ( [ ] hpack . HeaderField , error ) {
aud := t . createAudience ( callHdr )
authData , err := t . getTrAuthData ( ctx , aud )
if err != nil {
return nil , err
2018-01-09 18:57:14 +00:00
}
2018-07-18 14:47:22 +00:00
callAuthData , err := t . getCallAuthData ( ctx , aud , callHdr )
if err != nil {
2018-01-09 18:57:14 +00:00
return nil , err
}
// TODO(mmukhi): Benchmark if the performance gets better if count the metadata and other header fields
// first and create a slice of that exact size.
// Make the slice of certain predictable size to reduce allocations made by append.
hfLen := 7 // :method, :scheme, :path, :authority, content-type, user-agent, te
hfLen += len ( authData ) + len ( callAuthData )
headerFields := make ( [ ] hpack . HeaderField , 0 , hfLen )
headerFields = append ( headerFields , hpack . HeaderField { Name : ":method" , Value : "POST" } )
headerFields = append ( headerFields , hpack . HeaderField { Name : ":scheme" , Value : t . scheme } )
headerFields = append ( headerFields , hpack . HeaderField { Name : ":path" , Value : callHdr . Method } )
headerFields = append ( headerFields , hpack . HeaderField { Name : ":authority" , Value : callHdr . Host } )
2018-03-06 22:33:18 +00:00
headerFields = append ( headerFields , hpack . HeaderField { Name : "content-type" , Value : contentType ( callHdr . ContentSubtype ) } )
2018-01-09 18:57:14 +00:00
headerFields = append ( headerFields , hpack . HeaderField { Name : "user-agent" , Value : t . userAgent } )
headerFields = append ( headerFields , hpack . HeaderField { Name : "te" , Value : "trailers" } )
2018-11-26 18:23:56 +00:00
if callHdr . PreviousAttempts > 0 {
headerFields = append ( headerFields , hpack . HeaderField { Name : "grpc-previous-rpc-attempts" , Value : strconv . Itoa ( callHdr . PreviousAttempts ) } )
}
2018-01-09 18:57:14 +00:00
if callHdr . SendCompress != "" {
headerFields = append ( headerFields , hpack . HeaderField { Name : "grpc-encoding" , Value : callHdr . SendCompress } )
}
if dl , ok := ctx . Deadline ( ) ; ok {
// Send out timeout regardless its value. The server can detect timeout context by itself.
// TODO(mmukhi): Perhaps this field should be updated when actually writing out to the wire.
2019-04-03 07:57:13 +00:00
timeout := time . Until ( dl )
2018-01-09 18:57:14 +00:00
headerFields = append ( headerFields , hpack . HeaderField { Name : "grpc-timeout" , Value : encodeTimeout ( timeout ) } )
}
for k , v := range authData {
headerFields = append ( headerFields , hpack . HeaderField { Name : k , Value : encodeMetadataHeader ( k , v ) } )
}
for k , v := range callAuthData {
headerFields = append ( headerFields , hpack . HeaderField { Name : k , Value : encodeMetadataHeader ( k , v ) } )
}
if b := stats . OutgoingTags ( ctx ) ; b != nil {
headerFields = append ( headerFields , hpack . HeaderField { Name : "grpc-tags-bin" , Value : encodeBinHeader ( b ) } )
}
if b := stats . OutgoingTrace ( ctx ) ; b != nil {
headerFields = append ( headerFields , hpack . HeaderField { Name : "grpc-trace-bin" , Value : encodeBinHeader ( b ) } )
}
2018-03-06 22:33:18 +00:00
if md , added , ok := metadata . FromOutgoingContextRaw ( ctx ) ; ok {
var k string
for _ , vv := range added {
for i , v := range vv {
if i % 2 == 0 {
k = v
continue
}
// HTTP doesn't allow you to set pseudoheaders after non pseudoheaders were set.
if isReservedHeader ( k ) {
continue
}
headerFields = append ( headerFields , hpack . HeaderField { Name : strings . ToLower ( k ) , Value : encodeMetadataHeader ( k , v ) } )
}
}
2018-01-09 18:57:14 +00:00
for k , vv := range md {
// HTTP doesn't allow you to set pseudoheaders after non pseudoheaders were set.
if isReservedHeader ( k ) {
continue
}
for _ , v := range vv {
headerFields = append ( headerFields , hpack . HeaderField { Name : k , Value : encodeMetadataHeader ( k , v ) } )
}
}
}
if md , ok := t . md . ( * metadata . MD ) ; ok {
for k , vv := range * md {
if isReservedHeader ( k ) {
continue
}
for _ , v := range vv {
headerFields = append ( headerFields , hpack . HeaderField { Name : k , Value : encodeMetadataHeader ( k , v ) } )
}
}
}
2018-07-18 14:47:22 +00:00
return headerFields , nil
}
func ( t * http2Client ) createAudience ( callHdr * CallHdr ) string {
// Create an audience string only if needed.
2018-11-26 18:23:56 +00:00
if len ( t . perRPCCreds ) == 0 && callHdr . Creds == nil {
2018-07-18 14:47:22 +00:00
return ""
2018-01-09 18:57:14 +00:00
}
2018-07-18 14:47:22 +00:00
// Construct URI required to get auth request metadata.
// Omit port if it is the default one.
host := strings . TrimSuffix ( callHdr . Host , ":443" )
pos := strings . LastIndex ( callHdr . Method , "/" )
if pos == - 1 {
pos = len ( callHdr . Method )
}
return "https://" + host + callHdr . Method [ : pos ]
}
func ( t * http2Client ) getTrAuthData ( ctx context . Context , audience string ) ( map [ string ] string , error ) {
authData := map [ string ] string { }
2018-11-26 18:23:56 +00:00
for _ , c := range t . perRPCCreds {
2018-07-18 14:47:22 +00:00
data , err := c . GetRequestMetadata ( ctx , audience )
if err != nil {
if _ , ok := status . FromError ( err ) ; ok {
return nil , err
}
2018-11-26 18:23:56 +00:00
return nil , status . Errorf ( codes . Unauthenticated , "transport: %v" , err )
2018-07-18 14:47:22 +00:00
}
for k , v := range data {
// Capital header names are illegal in HTTP/2.
k = strings . ToLower ( k )
authData [ k ] = v
}
}
return authData , nil
}
func ( t * http2Client ) getCallAuthData ( ctx context . Context , audience string , callHdr * CallHdr ) ( map [ string ] string , error ) {
callAuthData := map [ string ] string { }
// Check if credentials.PerRPCCredentials were provided via call options.
// Note: if these credentials are provided both via dial options and call
// options, then both sets of credentials will be applied.
if callCreds := callHdr . Creds ; callCreds != nil {
if ! t . isSecure && callCreds . RequireTransportSecurity ( ) {
2018-11-26 18:23:56 +00:00
return nil , status . Error ( codes . Unauthenticated , "transport: cannot send secure credentials on an insecure connection" )
2018-07-18 14:47:22 +00:00
}
data , err := callCreds . GetRequestMetadata ( ctx , audience )
if err != nil {
2018-11-26 18:23:56 +00:00
return nil , status . Errorf ( codes . Internal , "transport: %v" , err )
2018-07-18 14:47:22 +00:00
}
for k , v := range data {
// Capital header names are illegal in HTTP/2
k = strings . ToLower ( k )
callAuthData [ k ] = v
}
}
return callAuthData , nil
}
// NewStream creates a stream and registers it into the transport as "active"
// streams.
func ( t * http2Client ) NewStream ( ctx context . Context , callHdr * CallHdr ) ( _ * Stream , err error ) {
ctx = peer . NewContext ( ctx , t . getPeer ( ) )
headerFields , err := t . createHeaderFields ( ctx , callHdr )
if err != nil {
return nil , err
2018-01-09 18:57:14 +00:00
}
s := t . newStream ( ctx , callHdr )
2018-07-18 14:47:22 +00:00
cleanup := func ( err error ) {
if s . swapState ( streamDone ) == streamDone {
// If it was already done, return.
return
}
// The stream was unprocessed by the server.
atomic . StoreUint32 ( & s . unprocessed , 1 )
s . write ( recvMsg { err : err } )
close ( s . done )
// If headerChan isn't closed, then close it.
2019-06-14 09:24:43 +00:00
if atomic . CompareAndSwapUint32 ( & s . headerChanClosed , 0 , 1 ) {
2018-07-18 14:47:22 +00:00
close ( s . headerChan )
2018-01-09 18:57:14 +00:00
}
2018-07-18 14:47:22 +00:00
2018-01-09 18:57:14 +00:00
}
2018-07-18 14:47:22 +00:00
hdr := & headerFrame {
2018-01-09 18:57:14 +00:00
hf : headerFields ,
endStream : false ,
2018-07-18 14:47:22 +00:00
initStream : func ( id uint32 ) ( bool , error ) {
t . mu . Lock ( )
if state := t . state ; state != reachable {
t . mu . Unlock ( )
// Do a quick cleanup.
err := error ( errStreamDrain )
if state == closing {
err = ErrConnClosing
}
cleanup ( err )
return false , err
}
t . activeStreams [ id ] = s
if channelz . IsOn ( ) {
2018-11-26 18:23:56 +00:00
atomic . AddInt64 ( & t . czData . streamsStarted , 1 )
atomic . StoreInt64 ( & t . czData . lastStreamCreatedTime , time . Now ( ) . UnixNano ( ) )
2018-07-18 14:47:22 +00:00
}
var sendPing bool
// If the number of active streams change from 0 to 1, then check if keepalive
// has gone dormant. If so, wake it up.
if len ( t . activeStreams ) == 1 && t . keepaliveEnabled {
select {
case t . awakenKeepalive <- struct { } { } :
sendPing = true
// Fill the awakenKeepalive channel again as this channel must be
// kept non-writable except at the point that the keepalive()
// goroutine is waiting either to be awaken or shutdown.
t . awakenKeepalive <- struct { } { }
default :
}
}
t . mu . Unlock ( )
return sendPing , nil
} ,
onOrphaned : cleanup ,
wq : s . wq ,
}
firstTry := true
var ch chan struct { }
checkForStreamQuota := func ( it interface { } ) bool {
if t . streamQuota <= 0 { // Can go negative if server decreases it.
if firstTry {
t . waitingStreams ++
}
ch = t . streamsQuotaAvailable
return false
}
if ! firstTry {
t . waitingStreams --
}
t . streamQuota --
h := it . ( * headerFrame )
h . streamID = t . nextID
t . nextID += 2
s . id = h . streamID
s . fc = & inFlow { limit : uint32 ( t . initialWindowSize ) }
if t . streamQuota > 0 && t . waitingStreams > 0 {
select {
case t . streamsQuotaAvailable <- struct { } { } :
default :
}
}
return true
}
2018-11-26 18:23:56 +00:00
var hdrListSizeErr error
checkForHeaderListSize := func ( it interface { } ) bool {
if t . maxSendHeaderListSize == nil {
return true
}
hdrFrame := it . ( * headerFrame )
var sz int64
for _ , f := range hdrFrame . hf {
if sz += int64 ( f . Size ( ) ) ; sz > int64 ( * t . maxSendHeaderListSize ) {
hdrListSizeErr = status . Errorf ( codes . Internal , "header list size to send violates the maximum size (%d bytes) set by server" , * t . maxSendHeaderListSize )
return false
}
}
return true
}
2018-07-18 14:47:22 +00:00
for {
2018-11-26 18:23:56 +00:00
success , err := t . controlBuf . executeAndPut ( func ( it interface { } ) bool {
if ! checkForStreamQuota ( it ) {
return false
}
if ! checkForHeaderListSize ( it ) {
return false
}
return true
} , hdr )
2018-07-18 14:47:22 +00:00
if err != nil {
return nil , err
}
if success {
break
}
2018-11-26 18:23:56 +00:00
if hdrListSizeErr != nil {
return nil , hdrListSizeErr
}
2018-07-18 14:47:22 +00:00
firstTry = false
select {
case <- ch :
case <- s . ctx . Done ( ) :
return nil , ContextErr ( s . ctx . Err ( ) )
case <- t . goAway :
return nil , errStreamDrain
case <- t . ctx . Done ( ) :
return nil , ErrConnClosing
}
}
2018-01-09 18:57:14 +00:00
if t . statsHandler != nil {
outHeader := & stats . OutHeader {
Client : true ,
FullMethod : callHdr . Method ,
RemoteAddr : t . remoteAddr ,
LocalAddr : t . localAddr ,
Compression : callHdr . SendCompress ,
}
t . statsHandler . HandleRPC ( s . ctx , outHeader )
}
return s , nil
}
// CloseStream clears the footprint of a stream when the stream is not needed any more.
// This must not be executed in reader's goroutine.
func ( t * http2Client ) CloseStream ( s * Stream , err error ) {
2018-07-18 14:47:22 +00:00
var (
rst bool
rstCode http2 . ErrCode
)
2018-01-09 18:57:14 +00:00
if err != nil {
2018-07-18 14:47:22 +00:00
rst = true
rstCode = http2 . ErrCodeCancel
2018-01-09 18:57:14 +00:00
}
2018-11-26 18:23:56 +00:00
t . closeStream ( s , err , rst , rstCode , status . Convert ( err ) , nil , false )
2018-07-18 14:47:22 +00:00
}
func ( t * http2Client ) closeStream ( s * Stream , err error , rst bool , rstCode http2 . ErrCode , st * status . Status , mdata map [ string ] [ ] string , eosReceived bool ) {
// Set stream status to done.
if s . swapState ( streamDone ) == streamDone {
2018-11-26 18:23:56 +00:00
// If it was already done, return. If multiple closeStream calls
// happen simultaneously, wait for the first to finish.
<- s . done
2018-01-09 18:57:14 +00:00
return
}
2018-07-18 14:47:22 +00:00
// status and trailers can be updated here without any synchronization because the stream goroutine will
// only read it after it sees an io.EOF error from read or write and we'll write those errors
// only after updating this.
s . status = st
if len ( mdata ) > 0 {
s . trailer = mdata
2018-01-09 18:57:14 +00:00
}
2018-07-18 14:47:22 +00:00
if err != nil {
// This will unblock reads eventually.
s . write ( recvMsg { err : err } )
}
// If headerChan isn't closed, then close it.
2019-06-14 09:24:43 +00:00
if atomic . CompareAndSwapUint32 ( & s . headerChanClosed , 0 , 1 ) {
2018-11-26 18:23:56 +00:00
s . noHeaders = true
2018-01-09 18:57:14 +00:00
close ( s . headerChan )
}
2018-07-18 14:47:22 +00:00
cleanup := & cleanupStream {
streamID : s . id ,
onWrite : func ( ) {
t . mu . Lock ( )
if t . activeStreams != nil {
delete ( t . activeStreams , s . id )
}
t . mu . Unlock ( )
if channelz . IsOn ( ) {
if eosReceived {
2018-11-26 18:23:56 +00:00
atomic . AddInt64 ( & t . czData . streamsSucceeded , 1 )
2018-07-18 14:47:22 +00:00
} else {
2018-11-26 18:23:56 +00:00
atomic . AddInt64 ( & t . czData . streamsFailed , 1 )
2018-07-18 14:47:22 +00:00
}
}
} ,
rst : rst ,
rstCode : rstCode ,
}
addBackStreamQuota := func ( interface { } ) bool {
t . streamQuota ++
if t . streamQuota > 0 && t . waitingStreams > 0 {
select {
case t . streamsQuotaAvailable <- struct { } { } :
default :
}
}
return true
2018-01-09 18:57:14 +00:00
}
2018-07-18 14:47:22 +00:00
t . controlBuf . executeAndPut ( addBackStreamQuota , cleanup )
2018-11-26 18:23:56 +00:00
// This will unblock write.
close ( s . done )
2018-01-09 18:57:14 +00:00
}
// Close kicks off the shutdown process of the transport. This should be called
// only once on a transport. Once it is called, the transport should not be
// accessed any more.
2018-11-26 18:23:56 +00:00
//
// This method blocks until the addrConn that initiated this transport is
// re-connected. This happens because t.onClose() begins reconnect logic at the
// addrConn level and blocks until the addrConn is successfully connected.
2018-01-09 18:57:14 +00:00
func ( t * http2Client ) Close ( ) error {
t . mu . Lock ( )
2018-07-18 14:47:22 +00:00
// Make sure we only Close once.
2018-01-09 18:57:14 +00:00
if t . state == closing {
t . mu . Unlock ( )
return nil
}
t . state = closing
streams := t . activeStreams
t . activeStreams = nil
t . mu . Unlock ( )
2018-07-18 14:47:22 +00:00
t . controlBuf . finish ( )
t . cancel ( )
err := t . conn . Close ( )
if channelz . IsOn ( ) {
channelz . RemoveEntry ( t . channelzID )
}
2018-01-09 18:57:14 +00:00
// Notify all active streams.
for _ , s := range streams {
2018-11-26 18:23:56 +00:00
t . closeStream ( s , ErrConnClosing , false , http2 . ErrCodeNo , status . New ( codes . Unavailable , ErrConnClosing . Desc ) , nil , false )
2018-01-09 18:57:14 +00:00
}
if t . statsHandler != nil {
connEnd := & stats . ConnEnd {
Client : true ,
}
t . statsHandler . HandleConn ( t . ctx , connEnd )
}
2019-04-03 07:57:13 +00:00
t . onClose ( )
2018-01-09 18:57:14 +00:00
return err
}
// GracefulClose sets the state to draining, which prevents new streams from
// being created and causes the transport to be closed when the last active
// stream is closed. If there are no active streams, the transport is closed
// immediately. This does nothing if the transport is already draining or
// closing.
2019-05-31 09:45:11 +00:00
func ( t * http2Client ) GracefulClose ( ) {
2018-01-09 18:57:14 +00:00
t . mu . Lock ( )
2018-07-18 14:47:22 +00:00
// Make sure we move to draining only from active.
if t . state == draining || t . state == closing {
2018-01-09 18:57:14 +00:00
t . mu . Unlock ( )
2019-05-31 09:45:11 +00:00
return
2018-01-09 18:57:14 +00:00
}
t . state = draining
active := len ( t . activeStreams )
t . mu . Unlock ( )
if active == 0 {
2019-05-31 09:45:11 +00:00
t . Close ( )
return
2018-01-09 18:57:14 +00:00
}
2018-07-18 14:47:22 +00:00
t . controlBuf . put ( & incomingGoAway { } )
2018-01-09 18:57:14 +00:00
}
// Write formats the data into HTTP2 data frame(s) and sends it out. The caller
// should proceed only if Write returns nil.
func ( t * http2Client ) Write ( s * Stream , hdr [ ] byte , data [ ] byte , opts * Options ) error {
2018-07-18 14:47:22 +00:00
if opts . Last {
// If it's the last message, update stream state.
if ! s . compareAndSwapState ( streamActive , streamWriteDone ) {
return errStreamDone
2018-01-09 18:57:14 +00:00
}
2018-07-18 14:47:22 +00:00
} else if s . getState ( ) != streamActive {
return errStreamDone
2018-01-09 18:57:14 +00:00
}
2018-07-18 14:47:22 +00:00
df := & dataFrame {
streamID : s . id ,
endStream : opts . Last ,
}
if hdr != nil || data != nil { // If it's not an empty data frame.
// Add some data to grpc message header so that we can equally
// distribute bytes across frames.
emptyLen := http2MaxFrameLen - len ( hdr )
if emptyLen > len ( data ) {
emptyLen = len ( data )
}
hdr = append ( hdr , data [ : emptyLen ] ... )
data = data [ emptyLen : ]
df . h , df . d = hdr , data
// TODO(mmukhi): The above logic in this if can be moved to loopyWriter's data handler.
if err := s . wq . get ( int32 ( len ( hdr ) + len ( data ) ) ) ; err != nil {
return err
}
2018-01-09 18:57:14 +00:00
}
2018-07-18 14:47:22 +00:00
return t . controlBuf . put ( df )
2018-01-09 18:57:14 +00:00
}
func ( t * http2Client ) getStream ( f http2 . Frame ) ( * Stream , bool ) {
t . mu . Lock ( )
defer t . mu . Unlock ( )
s , ok := t . activeStreams [ f . Header ( ) . StreamID ]
return s , ok
}
// adjustWindow sends out extra window update over the initial window size
// of stream if the application is requesting data larger in size than
// the window.
func ( t * http2Client ) adjustWindow ( s * Stream , n uint32 ) {
if w := s . fc . maybeAdjust ( n ) ; w > 0 {
2018-07-18 14:47:22 +00:00
t . controlBuf . put ( & outgoingWindowUpdate { streamID : s . id , increment : w } )
2018-01-09 18:57:14 +00:00
}
}
2018-07-18 14:47:22 +00:00
// updateWindow adjusts the inbound quota for the stream.
// Window updates will be sent out when the cumulative quota
// exceeds the corresponding threshold.
2018-01-09 18:57:14 +00:00
func ( t * http2Client ) updateWindow ( s * Stream , n uint32 ) {
if w := s . fc . onRead ( n ) ; w > 0 {
2018-07-18 14:47:22 +00:00
t . controlBuf . put ( & outgoingWindowUpdate { streamID : s . id , increment : w } )
2018-01-09 18:57:14 +00:00
}
}
// updateFlowControl updates the incoming flow control windows
// for the transport and the stream based on the current bdp
// estimation.
func ( t * http2Client ) updateFlowControl ( n uint32 ) {
t . mu . Lock ( )
for _ , s := range t . activeStreams {
s . fc . newLimit ( n )
}
t . mu . Unlock ( )
2018-07-18 14:47:22 +00:00
updateIWS := func ( interface { } ) bool {
t . initialWindowSize = int32 ( n )
return true
}
t . controlBuf . executeAndPut ( updateIWS , & outgoingWindowUpdate { streamID : 0 , increment : t . fc . newLimit ( n ) } )
t . controlBuf . put ( & outgoingSettings {
2018-01-09 18:57:14 +00:00
ss : [ ] http2 . Setting {
{
ID : http2 . SettingInitialWindowSize ,
2018-07-18 14:47:22 +00:00
Val : n ,
2018-01-09 18:57:14 +00:00
} ,
} ,
} )
}
func ( t * http2Client ) handleData ( f * http2 . DataFrame ) {
size := f . Header ( ) . Length
var sendBDPPing bool
if t . bdpEst != nil {
2018-07-18 14:47:22 +00:00
sendBDPPing = t . bdpEst . add ( size )
2018-01-09 18:57:14 +00:00
}
// Decouple connection's flow control from application's read.
// An update on connection's flow control should not depend on
// whether user application has read the data or not. Such a
// restriction is already imposed on the stream's flow control,
// and therefore the sender will be blocked anyways.
// Decoupling the connection flow control will prevent other
// active(fast) streams from starving in presence of slow or
// inactive streams.
//
2018-07-18 14:47:22 +00:00
if w := t . fc . onData ( size ) ; w > 0 {
t . controlBuf . put ( & outgoingWindowUpdate {
streamID : 0 ,
increment : w ,
} )
}
2018-01-09 18:57:14 +00:00
if sendBDPPing {
2018-07-18 14:47:22 +00:00
// Avoid excessive ping detection (e.g. in an L7 proxy)
// by sending a window update prior to the BDP ping.
if w := t . fc . reset ( ) ; w > 0 {
t . controlBuf . put ( & outgoingWindowUpdate {
streamID : 0 ,
increment : w ,
} )
2018-01-09 18:57:14 +00:00
}
2018-07-18 14:47:22 +00:00
2018-01-09 18:57:14 +00:00
t . controlBuf . put ( bdpPing )
}
// Select the right stream to dispatch.
s , ok := t . getStream ( f )
if ! ok {
return
}
if size > 0 {
2018-07-18 14:47:22 +00:00
if err := s . fc . onData ( size ) ; err != nil {
t . closeStream ( s , io . EOF , true , http2 . ErrCodeFlowControl , status . New ( codes . Internal , err . Error ( ) ) , nil , false )
2018-01-09 18:57:14 +00:00
return
}
if f . Header ( ) . Flags . Has ( http2 . FlagDataPadded ) {
2018-07-18 14:47:22 +00:00
if w := s . fc . onRead ( size - uint32 ( len ( f . Data ( ) ) ) ) ; w > 0 {
t . controlBuf . put ( & outgoingWindowUpdate { s . id , w } )
2018-01-09 18:57:14 +00:00
}
}
// TODO(bradfitz, zhaoq): A copy is required here because there is no
// guarantee f.Data() is consumed before the arrival of next frame.
// Can this copy be eliminated?
if len ( f . Data ( ) ) > 0 {
data := make ( [ ] byte , len ( f . Data ( ) ) )
copy ( data , f . Data ( ) )
s . write ( recvMsg { data : data } )
}
}
// The server has closed the stream without sending trailers. Record that
// the read direction is closed, and set the status appropriately.
if f . FrameHeader . Flags . Has ( http2 . FlagDataEndStream ) {
2018-07-18 14:47:22 +00:00
t . closeStream ( s , io . EOF , false , http2 . ErrCodeNo , status . New ( codes . Internal , "server closed the stream without sending trailers" ) , nil , true )
2018-01-09 18:57:14 +00:00
}
}
func ( t * http2Client ) handleRSTStream ( f * http2 . RSTStreamFrame ) {
s , ok := t . getStream ( f )
if ! ok {
return
}
2018-07-18 14:47:22 +00:00
if f . ErrCode == http2 . ErrCodeRefusedStream {
2018-01-09 18:57:14 +00:00
// The stream was unprocessed by the server.
2018-07-18 14:47:22 +00:00
atomic . StoreUint32 ( & s . unprocessed , 1 )
2018-01-09 18:57:14 +00:00
}
2018-07-18 14:47:22 +00:00
statusCode , ok := http2ErrConvTab [ f . ErrCode ]
2018-01-09 18:57:14 +00:00
if ! ok {
warningf ( "transport: http2Client.handleRSTStream found no mapped gRPC status for the received http2 error %v" , f . ErrCode )
statusCode = codes . Unknown
}
2018-11-26 18:23:56 +00:00
if statusCode == codes . Canceled {
// Our deadline was already exceeded, and that was likely the cause of
// this cancelation. Alter the status code accordingly.
if d , ok := s . ctx . Deadline ( ) ; ok && d . After ( time . Now ( ) ) {
statusCode = codes . DeadlineExceeded
}
}
2018-07-18 14:47:22 +00:00
t . closeStream ( s , io . EOF , false , http2 . ErrCodeNo , status . Newf ( statusCode , "stream terminated by RST_STREAM with error code: %v" , f . ErrCode ) , nil , false )
2018-01-09 18:57:14 +00:00
}
func ( t * http2Client ) handleSettings ( f * http2 . SettingsFrame , isFirst bool ) {
if f . IsAck ( ) {
return
}
2018-07-18 14:47:22 +00:00
var maxStreams * uint32
var ss [ ] http2 . Setting
2018-11-26 18:23:56 +00:00
var updateFuncs [ ] func ( )
2018-01-09 18:57:14 +00:00
f . ForeachSetting ( func ( s http2 . Setting ) error {
2018-11-26 18:23:56 +00:00
switch s . ID {
case http2 . SettingMaxConcurrentStreams :
2018-07-18 14:47:22 +00:00
maxStreams = new ( uint32 )
* maxStreams = s . Val
2018-11-26 18:23:56 +00:00
case http2 . SettingMaxHeaderListSize :
updateFuncs = append ( updateFuncs , func ( ) {
t . maxSendHeaderListSize = new ( uint32 )
* t . maxSendHeaderListSize = s . Val
} )
default :
ss = append ( ss , s )
2018-01-09 18:57:14 +00:00
}
return nil
} )
2018-07-18 14:47:22 +00:00
if isFirst && maxStreams == nil {
maxStreams = new ( uint32 )
* maxStreams = math . MaxUint32
2018-01-09 18:57:14 +00:00
}
2018-07-18 14:47:22 +00:00
sf := & incomingSettings {
ss : ss ,
}
2018-11-26 18:23:56 +00:00
if maxStreams != nil {
updateStreamQuota := func ( ) {
delta := int64 ( * maxStreams ) - int64 ( t . maxConcurrentStreams )
t . maxConcurrentStreams = * maxStreams
t . streamQuota += delta
if delta > 0 && t . waitingStreams > 0 {
close ( t . streamsQuotaAvailable ) // wake all of them up.
t . streamsQuotaAvailable = make ( chan struct { } , 1 )
}
}
updateFuncs = append ( updateFuncs , updateStreamQuota )
2018-07-18 14:47:22 +00:00
}
2018-11-26 18:23:56 +00:00
t . controlBuf . executeAndPut ( func ( interface { } ) bool {
for _ , f := range updateFuncs {
f ( )
2018-07-18 14:47:22 +00:00
}
return true
2018-11-26 18:23:56 +00:00
} , sf )
2018-01-09 18:57:14 +00:00
}
func ( t * http2Client ) handlePing ( f * http2 . PingFrame ) {
if f . IsAck ( ) {
// Maybe it's a BDP ping.
if t . bdpEst != nil {
t . bdpEst . calculate ( f . Data )
}
return
}
pingAck := & ping { ack : true }
copy ( pingAck . data [ : ] , f . Data [ : ] )
t . controlBuf . put ( pingAck )
}
func ( t * http2Client ) handleGoAway ( f * http2 . GoAwayFrame ) {
t . mu . Lock ( )
2018-07-18 14:47:22 +00:00
if t . state == closing {
2018-01-09 18:57:14 +00:00
t . mu . Unlock ( )
return
}
if f . ErrCode == http2 . ErrCodeEnhanceYourCalm {
infof ( "Client received GoAway with http2.ErrCodeEnhanceYourCalm." )
}
id := f . LastStreamID
if id > 0 && id % 2 != 1 {
t . mu . Unlock ( )
t . Close ( )
return
}
// A client can receive multiple GoAways from the server (see
// https://github.com/grpc/grpc-go/issues/1387). The idea is that the first
// GoAway will be sent with an ID of MaxInt32 and the second GoAway will be
// sent after an RTT delay with the ID of the last stream the server will
// process.
//
// Therefore, when we get the first GoAway we don't necessarily close any
// streams. While in case of second GoAway we close all streams created after
// the GoAwayId. This way streams that were in-flight while the GoAway from
// server was being sent don't get killed.
select {
case <- t . goAway : // t.goAway has been closed (i.e.,multiple GoAways).
// If there are multiple GoAways the first one should always have an ID greater than the following ones.
if id > t . prevGoAwayID {
t . mu . Unlock ( )
t . Close ( )
return
}
default :
t . setGoAwayReason ( f )
close ( t . goAway )
t . state = draining
2018-07-18 14:47:22 +00:00
t . controlBuf . put ( & incomingGoAway { } )
2018-11-26 18:23:56 +00:00
// This has to be a new goroutine because we're still using the current goroutine to read in the transport.
t . onGoAway ( t . goAwayReason )
2018-01-09 18:57:14 +00:00
}
// All streams with IDs greater than the GoAwayId
// and smaller than the previous GoAway ID should be killed.
upperLimit := t . prevGoAwayID
if upperLimit == 0 { // This is the first GoAway Frame.
upperLimit = math . MaxUint32 // Kill all streams after the GoAway ID.
}
for streamID , stream := range t . activeStreams {
if streamID > id && streamID <= upperLimit {
// The stream was unprocessed by the server.
2018-07-18 14:47:22 +00:00
atomic . StoreUint32 ( & stream . unprocessed , 1 )
t . closeStream ( stream , errStreamDrain , false , http2 . ErrCodeNo , statusGoAway , nil , false )
2018-01-09 18:57:14 +00:00
}
}
t . prevGoAwayID = id
active := len ( t . activeStreams )
t . mu . Unlock ( )
if active == 0 {
t . Close ( )
}
}
// setGoAwayReason sets the value of t.goAwayReason based
// on the GoAway frame received.
// It expects a lock on transport's mutext to be held by
// the caller.
func ( t * http2Client ) setGoAwayReason ( f * http2 . GoAwayFrame ) {
t . goAwayReason = GoAwayNoReason
switch f . ErrCode {
case http2 . ErrCodeEnhanceYourCalm :
if string ( f . DebugData ( ) ) == "too_many_pings" {
t . goAwayReason = GoAwayTooManyPings
}
}
}
func ( t * http2Client ) GetGoAwayReason ( ) GoAwayReason {
t . mu . Lock ( )
defer t . mu . Unlock ( )
return t . goAwayReason
}
func ( t * http2Client ) handleWindowUpdate ( f * http2 . WindowUpdateFrame ) {
2018-07-18 14:47:22 +00:00
t . controlBuf . put ( & incomingWindowUpdate {
streamID : f . Header ( ) . StreamID ,
increment : f . Increment ,
} )
2018-01-09 18:57:14 +00:00
}
// operateHeaders takes action on the decoded headers.
func ( t * http2Client ) operateHeaders ( frame * http2 . MetaHeadersFrame ) {
s , ok := t . getStream ( frame )
if ! ok {
return
}
2019-05-31 09:45:11 +00:00
endStream := frame . StreamEnded ( )
2018-07-18 14:47:22 +00:00
atomic . StoreUint32 ( & s . bytesReceived , 1 )
2019-06-14 09:24:43 +00:00
initialHeader := atomic . LoadUint32 ( & s . headerChanClosed ) == 0
2019-05-31 09:45:11 +00:00
if ! initialHeader && ! endStream {
2019-06-14 09:24:43 +00:00
// As specified by gRPC over HTTP2, a HEADERS frame (and associated CONTINUATION frames) can only appear at the start or end of a stream. Therefore, second HEADERS frame must have EOS bit set.
2019-05-31 09:45:11 +00:00
st := status . New ( codes . Internal , "a HEADERS frame cannot appear in the middle of a stream" )
t . closeStream ( s , st . Err ( ) , true , http2 . ErrCodeProtocol , st , nil , false )
return
}
state := & decodeState { }
2019-06-14 09:24:43 +00:00
// Initialize isGRPC value to be !initialHeader, since if a gRPC Response-Headers has already been received, then it means that the peer is speaking gRPC and we are in gRPC mode.
2019-05-31 09:45:11 +00:00
state . data . isGRPC = ! initialHeader
2018-11-26 18:23:56 +00:00
if err := state . decodeHeader ( frame ) ; err != nil {
2019-05-31 09:45:11 +00:00
t . closeStream ( s , err , true , http2 . ErrCodeProtocol , status . Convert ( err ) , nil , endStream )
2018-01-09 18:57:14 +00:00
return
}
2019-06-14 09:24:43 +00:00
isHeader := false
2018-01-09 18:57:14 +00:00
defer func ( ) {
if t . statsHandler != nil {
if isHeader {
inHeader := & stats . InHeader {
Client : true ,
WireLength : int ( frame . Header ( ) . Length ) ,
}
t . statsHandler . HandleRPC ( s . ctx , inHeader )
} else {
inTrailer := & stats . InTrailer {
Client : true ,
WireLength : int ( frame . Header ( ) . Length ) ,
}
t . statsHandler . HandleRPC ( s . ctx , inTrailer )
}
}
} ( )
2019-05-31 09:45:11 +00:00
2019-06-14 09:24:43 +00:00
// If headerChan hasn't been closed yet
if atomic . CompareAndSwapUint32 ( & s . headerChanClosed , 0 , 1 ) {
2018-03-06 22:33:18 +00:00
if ! endStream {
2019-06-14 09:24:43 +00:00
// HEADERS frame block carries a Response-Headers.
2018-03-06 22:33:18 +00:00
isHeader = true
2018-07-18 14:47:22 +00:00
// These values can be set without any synchronization because
// stream goroutine will read it only after seeing a closed
// headerChan which we'll close after setting this.
2019-05-31 09:45:11 +00:00
s . recvCompress = state . data . encoding
if len ( state . data . mdata ) > 0 {
s . header = state . data . mdata
2018-03-06 22:33:18 +00:00
}
2019-06-14 09:24:43 +00:00
} else {
// HEADERS frame block carries a Trailers-Only.
s . noHeaders = true
2018-01-09 18:57:14 +00:00
}
close ( s . headerChan )
}
2019-05-31 09:45:11 +00:00
2019-06-14 09:24:43 +00:00
if ! endStream {
return
}
2018-11-26 18:23:56 +00:00
// if client received END_STREAM from server while stream was still active, send RST_STREAM
rst := s . getState ( ) == streamActive
2019-05-31 09:45:11 +00:00
t . closeStream ( s , io . EOF , rst , http2 . ErrCodeNo , state . status ( ) , state . data . mdata , true )
2018-01-09 18:57:14 +00:00
}
// reader runs as a separate goroutine in charge of reading data from network
// connection.
//
// TODO(zhaoq): currently one reader per transport. Investigate whether this is
// optimal.
// TODO(zhaoq): Check the validity of the incoming frame sequence.
func ( t * http2Client ) reader ( ) {
2018-07-18 14:47:22 +00:00
defer close ( t . readerDone )
2018-01-09 18:57:14 +00:00
// Check the validity of server preface.
frame , err := t . framer . fr . ReadFrame ( )
if err != nil {
2018-11-26 18:23:56 +00:00
t . Close ( ) // this kicks off resetTransport, so must be last before return
2018-01-09 18:57:14 +00:00
return
}
2018-11-26 18:23:56 +00:00
t . conn . SetReadDeadline ( time . Time { } ) // reset deadline once we get the settings frame (we didn't time out, yay!)
2018-07-18 14:47:22 +00:00
if t . keepaliveEnabled {
atomic . CompareAndSwapUint32 ( & t . activity , 0 , 1 )
}
2018-01-09 18:57:14 +00:00
sf , ok := frame . ( * http2 . SettingsFrame )
if ! ok {
2018-11-26 18:23:56 +00:00
t . Close ( ) // this kicks off resetTransport, so must be last before return
2018-01-09 18:57:14 +00:00
return
}
2019-04-03 07:57:13 +00:00
t . onPrefaceReceipt ( )
2018-01-09 18:57:14 +00:00
t . handleSettings ( sf , true )
// loop to keep reading incoming messages on this transport.
for {
frame , err := t . framer . fr . ReadFrame ( )
2018-07-18 14:47:22 +00:00
if t . keepaliveEnabled {
atomic . CompareAndSwapUint32 ( & t . activity , 0 , 1 )
}
2018-01-09 18:57:14 +00:00
if err != nil {
// Abort an active stream if the http2.Framer returns a
// http2.StreamError. This can happen only if the server's response
// is malformed http2.
if se , ok := err . ( http2 . StreamError ) ; ok {
t . mu . Lock ( )
s := t . activeStreams [ se . StreamID ]
t . mu . Unlock ( )
if s != nil {
// use error detail to provide better err message
2018-11-26 18:23:56 +00:00
code := http2ErrConvTab [ se . Code ]
msg := t . framer . fr . ErrorDetail ( ) . Error ( )
t . closeStream ( s , status . Error ( code , msg ) , true , http2 . ErrCodeProtocol , status . New ( code , msg ) , nil , false )
2018-01-09 18:57:14 +00:00
}
continue
} else {
// Transport error.
t . Close ( )
return
}
}
switch frame := frame . ( type ) {
case * http2 . MetaHeadersFrame :
t . operateHeaders ( frame )
case * http2 . DataFrame :
t . handleData ( frame )
case * http2 . RSTStreamFrame :
t . handleRSTStream ( frame )
case * http2 . SettingsFrame :
t . handleSettings ( frame , false )
case * http2 . PingFrame :
t . handlePing ( frame )
case * http2 . GoAwayFrame :
t . handleGoAway ( frame )
case * http2 . WindowUpdateFrame :
t . handleWindowUpdate ( frame )
default :
errorf ( "transport: http2Client.reader got unhandled frame type %v." , frame )
}
}
}
// keepalive running in a separate goroutune makes sure the connection is alive by sending pings.
func ( t * http2Client ) keepalive ( ) {
p := & ping { data : [ 8 ] byte { } }
timer := time . NewTimer ( t . kp . Time )
for {
select {
case <- timer . C :
if atomic . CompareAndSwapUint32 ( & t . activity , 1 , 0 ) {
timer . Reset ( t . kp . Time )
continue
}
// Check if keepalive should go dormant.
t . mu . Lock ( )
if len ( t . activeStreams ) < 1 && ! t . kp . PermitWithoutStream {
// Make awakenKeepalive writable.
<- t . awakenKeepalive
t . mu . Unlock ( )
select {
case <- t . awakenKeepalive :
// If the control gets here a ping has been sent
// need to reset the timer with keepalive.Timeout.
case <- t . ctx . Done ( ) :
return
}
} else {
t . mu . Unlock ( )
2018-07-18 14:47:22 +00:00
if channelz . IsOn ( ) {
2018-11-26 18:23:56 +00:00
atomic . AddInt64 ( & t . czData . kpCount , 1 )
2018-07-18 14:47:22 +00:00
}
2018-01-09 18:57:14 +00:00
// Send ping.
t . controlBuf . put ( p )
}
// By the time control gets here a ping has been sent one way or the other.
timer . Reset ( t . kp . Timeout )
select {
case <- timer . C :
if atomic . CompareAndSwapUint32 ( & t . activity , 1 , 0 ) {
timer . Reset ( t . kp . Time )
continue
}
t . Close ( )
return
case <- t . ctx . Done ( ) :
if ! timer . Stop ( ) {
<- timer . C
}
return
}
case <- t . ctx . Done ( ) :
if ! timer . Stop ( ) {
<- timer . C
}
return
}
}
}
func ( t * http2Client ) Error ( ) <- chan struct { } {
return t . ctx . Done ( )
}
func ( t * http2Client ) GoAway ( ) <- chan struct { } {
return t . goAway
}
2018-07-18 14:47:22 +00:00
func ( t * http2Client ) ChannelzMetric ( ) * channelz . SocketInternalMetric {
s := channelz . SocketInternalMetric {
2018-11-26 18:23:56 +00:00
StreamsStarted : atomic . LoadInt64 ( & t . czData . streamsStarted ) ,
StreamsSucceeded : atomic . LoadInt64 ( & t . czData . streamsSucceeded ) ,
StreamsFailed : atomic . LoadInt64 ( & t . czData . streamsFailed ) ,
MessagesSent : atomic . LoadInt64 ( & t . czData . msgSent ) ,
MessagesReceived : atomic . LoadInt64 ( & t . czData . msgRecv ) ,
KeepAlivesSent : atomic . LoadInt64 ( & t . czData . kpCount ) ,
LastLocalStreamCreatedTimestamp : time . Unix ( 0 , atomic . LoadInt64 ( & t . czData . lastStreamCreatedTime ) ) ,
LastMessageSentTimestamp : time . Unix ( 0 , atomic . LoadInt64 ( & t . czData . lastMsgSentTime ) ) ,
LastMessageReceivedTimestamp : time . Unix ( 0 , atomic . LoadInt64 ( & t . czData . lastMsgRecvTime ) ) ,
2018-07-18 14:47:22 +00:00
LocalFlowControlWindow : int64 ( t . fc . getSize ( ) ) ,
2018-11-26 18:23:56 +00:00
SocketOptions : channelz . GetSocketOption ( t . conn ) ,
LocalAddr : t . localAddr ,
RemoteAddr : t . remoteAddr ,
2018-07-18 14:47:22 +00:00
// RemoteName :
}
2018-11-26 18:23:56 +00:00
if au , ok := t . authInfo . ( credentials . ChannelzSecurityInfo ) ; ok {
s . Security = au . GetSecurityValue ( )
}
2018-07-18 14:47:22 +00:00
s . RemoteFlowControlWindow = t . getOutFlowWindow ( )
return & s
}
2019-05-31 09:45:11 +00:00
func ( t * http2Client ) RemoteAddr ( ) net . Addr { return t . remoteAddr }
2018-07-18 14:47:22 +00:00
func ( t * http2Client ) IncrMsgSent ( ) {
2018-11-26 18:23:56 +00:00
atomic . AddInt64 ( & t . czData . msgSent , 1 )
atomic . StoreInt64 ( & t . czData . lastMsgSentTime , time . Now ( ) . UnixNano ( ) )
2018-07-18 14:47:22 +00:00
}
func ( t * http2Client ) IncrMsgRecv ( ) {
2018-11-26 18:23:56 +00:00
atomic . AddInt64 ( & t . czData . msgRecv , 1 )
atomic . StoreInt64 ( & t . czData . lastMsgRecvTime , time . Now ( ) . UnixNano ( ) )
2018-07-18 14:47:22 +00:00
}
func ( t * http2Client ) getOutFlowWindow ( ) int64 {
resp := make ( chan uint32 , 1 )
timer := time . NewTimer ( time . Second )
defer timer . Stop ( )
t . controlBuf . put ( & outFlowControlSizeRequest { resp } )
select {
case sz := <- resp :
return int64 ( sz )
case <- t . ctxDone :
return - 1
case <- timer . C :
return - 2
}
}