2018-01-09 18:57:14 +00:00
/ *
*
* Copyright 2014 gRPC authors .
*
* Licensed under the Apache License , Version 2.0 ( the "License" ) ;
* you may not use this file except in compliance with the License .
* You may obtain a copy of the License at
*
* http : //www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing , software
* distributed under the License is distributed on an "AS IS" BASIS ,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND , either express or implied .
* See the License for the specific language governing permissions and
* limitations under the License .
*
* /
package transport
import (
"bytes"
2019-01-15 16:20:41 +00:00
"context"
2018-01-09 18:57:14 +00:00
"errors"
"fmt"
"io"
"math"
"net"
"strconv"
"sync"
"sync/atomic"
"time"
"github.com/golang/protobuf/proto"
"golang.org/x/net/http2"
"golang.org/x/net/http2/hpack"
2018-07-18 14:47:22 +00:00
2019-07-18 12:13:24 +00:00
spb "google.golang.org/genproto/googleapis/rpc/status"
2018-01-09 18:57:14 +00:00
"google.golang.org/grpc/codes"
"google.golang.org/grpc/credentials"
2018-07-18 14:47:22 +00:00
"google.golang.org/grpc/grpclog"
2019-07-18 12:13:24 +00:00
"google.golang.org/grpc/internal"
2018-07-18 14:47:22 +00:00
"google.golang.org/grpc/internal/channelz"
"google.golang.org/grpc/internal/grpcrand"
2018-01-09 18:57:14 +00:00
"google.golang.org/grpc/keepalive"
"google.golang.org/grpc/metadata"
"google.golang.org/grpc/peer"
"google.golang.org/grpc/stats"
"google.golang.org/grpc/status"
"google.golang.org/grpc/tap"
)
2018-11-26 18:23:56 +00:00
var (
// ErrIllegalHeaderWrite indicates that setting header is illegal because of
// the stream's state.
ErrIllegalHeaderWrite = errors . New ( "transport: the stream is done or WriteHeader was already called" )
// ErrHeaderListSizeLimitViolation indicates that the header list size is larger
// than the limit set by peer.
ErrHeaderListSizeLimitViolation = errors . New ( "transport: trying to send header list size larger than the limit set by peer" )
2019-07-18 12:13:24 +00:00
// statusRawProto is a function to get to the raw status proto wrapped in a
// status.Status without a proto.Clone().
statusRawProto = internal . StatusRawProto . ( func ( * status . Status ) * spb . Status )
2018-11-26 18:23:56 +00:00
)
2018-01-09 18:57:14 +00:00
// http2Server implements the ServerTransport interface with HTTP2.
type http2Server struct {
ctx context . Context
2018-07-18 14:47:22 +00:00
ctxDone <- chan struct { } // Cache the context.Done() chan
2018-01-09 18:57:14 +00:00
cancel context . CancelFunc
conn net . Conn
2018-07-18 14:47:22 +00:00
loopy * loopyWriter
readerDone chan struct { } // sync point to enable testing.
writerDone chan struct { } // sync point to enable testing.
2018-01-09 18:57:14 +00:00
remoteAddr net . Addr
localAddr net . Addr
maxStreamID uint32 // max stream ID ever seen
authInfo credentials . AuthInfo // auth info about the connection
inTapHandle tap . ServerInHandle
framer * framer
// The max number of concurrent streams.
maxStreams uint32
// controlBuf delivers all the control related tasks (e.g., window
// updates, reset streams, and various settings) to the controller.
controlBuf * controlBuffer
2018-07-18 14:47:22 +00:00
fc * trInFlow
stats stats . Handler
2018-01-09 18:57:14 +00:00
// Flag to keep track of reading activity on transport.
// 1 is true and 0 is false.
activity uint32 // Accessed atomically.
// Keepalive and max-age parameters for the server.
kp keepalive . ServerParameters
// Keepalive enforcement policy.
kep keepalive . EnforcementPolicy
// The time instance last ping was received.
lastPingAt time . Time
// Number of times the client has violated keepalive ping policy so far.
pingStrikes uint8
// Flag to signify that number of ping strikes should be reset to 0.
// This is set whenever data or header frames are sent.
// 1 means yes.
2018-11-26 18:23:56 +00:00
resetPingStrikes uint32 // Accessed atomically.
initialWindowSize int32
bdpEst * bdpEstimator
maxSendHeaderListSize * uint32
2018-01-09 18:57:14 +00:00
mu sync . Mutex // guard the following
// drainChan is initialized when drain(...) is called the first time.
// After which the server writes out the first GoAway(with ID 2^31-1) frame.
// Then an independent goroutine will be launched to later send the second GoAway.
// During this time we don't want to write another first GoAway(with ID 2^31 -1) frame.
// Thus call to drain(...) will be a no-op if drainChan is already initialized since draining is
// already underway.
drainChan chan struct { }
state transportState
activeStreams map [ uint32 ] * Stream
// idle is the time instant when the connection went idle.
// This is either the beginning of the connection or when the number of
// RPCs go down to 0.
// When the connection is busy, this value is set to 0.
idle time . Time
2018-07-18 14:47:22 +00:00
// Fields below are for channelz metric collection.
channelzID int64 // channelz unique identification number
2018-11-26 18:23:56 +00:00
czData * channelzData
2019-07-18 12:13:24 +00:00
bufferPool * bufferPool
2018-01-09 18:57:14 +00:00
}
// newHTTP2Server constructs a ServerTransport based on HTTP2. ConnectionError is
// returned if something goes wrong.
func newHTTP2Server ( conn net . Conn , config * ServerConfig ) ( _ ServerTransport , err error ) {
2018-11-26 18:23:56 +00:00
writeBufSize := config . WriteBufferSize
readBufSize := config . ReadBufferSize
maxHeaderListSize := defaultServerMaxHeaderListSize
if config . MaxHeaderListSize != nil {
maxHeaderListSize = * config . MaxHeaderListSize
2018-01-09 18:57:14 +00:00
}
2018-11-26 18:23:56 +00:00
framer := newFramer ( conn , writeBufSize , readBufSize , maxHeaderListSize )
2018-01-09 18:57:14 +00:00
// Send initial settings as connection preface to client.
2019-09-20 10:45:13 +00:00
isettings := [ ] http2 . Setting { {
ID : http2 . SettingMaxFrameSize ,
Val : http2MaxFrameLen ,
} }
2018-01-09 18:57:14 +00:00
// TODO(zhaoq): Have a better way to signal "no limit" because 0 is
// permitted in the HTTP2 spec.
maxStreams := config . MaxStreams
if maxStreams == 0 {
maxStreams = math . MaxUint32
} else {
isettings = append ( isettings , http2 . Setting {
ID : http2 . SettingMaxConcurrentStreams ,
Val : maxStreams ,
} )
}
dynamicWindow := true
iwz := int32 ( initialWindowSize )
if config . InitialWindowSize >= defaultWindowSize {
iwz = config . InitialWindowSize
dynamicWindow = false
}
icwz := int32 ( initialWindowSize )
if config . InitialConnWindowSize >= defaultWindowSize {
icwz = config . InitialConnWindowSize
dynamicWindow = false
}
if iwz != defaultWindowSize {
isettings = append ( isettings , http2 . Setting {
ID : http2 . SettingInitialWindowSize ,
Val : uint32 ( iwz ) } )
}
2018-11-26 18:23:56 +00:00
if config . MaxHeaderListSize != nil {
isettings = append ( isettings , http2 . Setting {
ID : http2 . SettingMaxHeaderListSize ,
Val : * config . MaxHeaderListSize ,
} )
}
2018-01-09 18:57:14 +00:00
if err := framer . fr . WriteSettings ( isettings ... ) ; err != nil {
return nil , connectionErrorf ( false , err , "transport: %v" , err )
}
// Adjust the connection flow control window if needed.
if delta := uint32 ( icwz - defaultWindowSize ) ; delta > 0 {
if err := framer . fr . WriteWindowUpdate ( 0 , delta ) ; err != nil {
return nil , connectionErrorf ( false , err , "transport: %v" , err )
}
}
kp := config . KeepaliveParams
if kp . MaxConnectionIdle == 0 {
kp . MaxConnectionIdle = defaultMaxConnectionIdle
}
if kp . MaxConnectionAge == 0 {
kp . MaxConnectionAge = defaultMaxConnectionAge
}
// Add a jitter to MaxConnectionAge.
kp . MaxConnectionAge += getJitter ( kp . MaxConnectionAge )
if kp . MaxConnectionAgeGrace == 0 {
kp . MaxConnectionAgeGrace = defaultMaxConnectionAgeGrace
}
if kp . Time == 0 {
kp . Time = defaultServerKeepaliveTime
}
if kp . Timeout == 0 {
kp . Timeout = defaultServerKeepaliveTimeout
}
kep := config . KeepalivePolicy
if kep . MinTime == 0 {
kep . MinTime = defaultKeepalivePolicyMinTime
}
ctx , cancel := context . WithCancel ( context . Background ( ) )
t := & http2Server {
ctx : ctx ,
cancel : cancel ,
2018-07-18 14:47:22 +00:00
ctxDone : ctx . Done ( ) ,
2018-01-09 18:57:14 +00:00
conn : conn ,
remoteAddr : conn . RemoteAddr ( ) ,
localAddr : conn . LocalAddr ( ) ,
authInfo : config . AuthInfo ,
framer : framer ,
2018-07-18 14:47:22 +00:00
readerDone : make ( chan struct { } ) ,
writerDone : make ( chan struct { } ) ,
2018-01-09 18:57:14 +00:00
maxStreams : maxStreams ,
inTapHandle : config . InTapHandle ,
2018-07-18 14:47:22 +00:00
fc : & trInFlow { limit : uint32 ( icwz ) } ,
2018-01-09 18:57:14 +00:00
state : reachable ,
activeStreams : make ( map [ uint32 ] * Stream ) ,
stats : config . StatsHandler ,
kp : kp ,
idle : time . Now ( ) ,
kep : kep ,
initialWindowSize : iwz ,
2018-11-26 18:23:56 +00:00
czData : new ( channelzData ) ,
2019-07-18 12:13:24 +00:00
bufferPool : newBufferPool ( ) ,
2018-01-09 18:57:14 +00:00
}
2018-07-18 14:47:22 +00:00
t . controlBuf = newControlBuffer ( t . ctxDone )
2018-01-09 18:57:14 +00:00
if dynamicWindow {
t . bdpEst = & bdpEstimator {
bdp : initialWindowSize ,
updateFlowControl : t . updateFlowControl ,
}
}
if t . stats != nil {
t . ctx = t . stats . TagConn ( t . ctx , & stats . ConnTagInfo {
RemoteAddr : t . remoteAddr ,
LocalAddr : t . localAddr ,
} )
connBegin := & stats . ConnBegin { }
t . stats . HandleConn ( t . ctx , connBegin )
}
2018-07-18 14:47:22 +00:00
if channelz . IsOn ( ) {
2019-01-15 16:20:41 +00:00
t . channelzID = channelz . RegisterNormalSocket ( t , config . ChannelzParentID , fmt . Sprintf ( "%s -> %s" , t . remoteAddr , t . localAddr ) )
2018-07-18 14:47:22 +00:00
}
2018-01-09 18:57:14 +00:00
t . framer . writer . Flush ( )
defer func ( ) {
if err != nil {
t . Close ( )
}
} ( )
// Check the validity of client preface.
preface := make ( [ ] byte , len ( clientPreface ) )
if _ , err := io . ReadFull ( t . conn , preface ) ; err != nil {
return nil , connectionErrorf ( false , err , "transport: http2Server.HandleStreams failed to receive the preface from client: %v" , err )
}
if ! bytes . Equal ( preface , clientPreface ) {
return nil , connectionErrorf ( false , nil , "transport: http2Server.HandleStreams received bogus greeting from client: %q" , preface )
}
frame , err := t . framer . fr . ReadFrame ( )
if err == io . EOF || err == io . ErrUnexpectedEOF {
return nil , err
}
if err != nil {
return nil , connectionErrorf ( false , err , "transport: http2Server.HandleStreams failed to read initial settings frame: %v" , err )
}
atomic . StoreUint32 ( & t . activity , 1 )
sf , ok := frame . ( * http2 . SettingsFrame )
if ! ok {
return nil , connectionErrorf ( false , nil , "transport: http2Server.HandleStreams saw invalid preface type %T from client" , frame )
}
t . handleSettings ( sf )
go func ( ) {
2018-07-18 14:47:22 +00:00
t . loopy = newLoopyWriter ( serverSide , t . framer , t . controlBuf , t . bdpEst )
t . loopy . ssGoAwayHandler = t . outgoingGoAwayHandler
if err := t . loopy . run ( ) ; err != nil {
errorf ( "transport: loopyWriter.run returning. Err: %v" , err )
}
2018-01-09 18:57:14 +00:00
t . conn . Close ( )
2018-07-18 14:47:22 +00:00
close ( t . writerDone )
2018-01-09 18:57:14 +00:00
} ( )
go t . keepalive ( )
return t , nil
}
// operateHeader takes action on the decoded headers.
2018-11-26 18:23:56 +00:00
func ( t * http2Server ) operateHeaders ( frame * http2 . MetaHeadersFrame , handle func ( * Stream ) , traceCtx func ( context . Context , string ) context . Context ) ( fatal bool ) {
2018-01-09 18:57:14 +00:00
streamID := frame . Header ( ) . StreamID
2019-05-31 09:45:11 +00:00
state := & decodeState {
serverSide : true ,
}
2018-11-26 18:23:56 +00:00
if err := state . decodeHeader ( frame ) ; err != nil {
if se , ok := status . FromError ( err ) ; ok {
t . controlBuf . put ( & cleanupStream {
streamID : streamID ,
rst : true ,
rstCode : statusCodeConvTab [ se . Code ( ) ] ,
onWrite : func ( ) { } ,
} )
2018-01-09 18:57:14 +00:00
}
2018-11-26 18:23:56 +00:00
return false
2018-01-09 18:57:14 +00:00
}
buf := newRecvBuffer ( )
s := & Stream {
2018-03-06 22:33:18 +00:00
id : streamID ,
st : t ,
buf : buf ,
fc : & inFlow { limit : uint32 ( t . initialWindowSize ) } ,
2019-05-31 09:45:11 +00:00
recvCompress : state . data . encoding ,
method : state . data . method ,
contentSubtype : state . data . contentSubtype ,
2018-01-09 18:57:14 +00:00
}
if frame . StreamEnded ( ) {
// s is just created by the caller. No lock needed.
s . state = streamReadDone
}
2019-05-31 09:45:11 +00:00
if state . data . timeoutSet {
s . ctx , s . cancel = context . WithTimeout ( t . ctx , state . data . timeout )
2018-01-09 18:57:14 +00:00
} else {
s . ctx , s . cancel = context . WithCancel ( t . ctx )
}
pr := & peer . Peer {
Addr : t . remoteAddr ,
}
// Attach Auth info if there is any.
if t . authInfo != nil {
pr . AuthInfo = t . authInfo
}
s . ctx = peer . NewContext ( s . ctx , pr )
// Attach the received metadata to the context.
2019-05-31 09:45:11 +00:00
if len ( state . data . mdata ) > 0 {
s . ctx = metadata . NewIncomingContext ( s . ctx , state . data . mdata )
2018-01-09 18:57:14 +00:00
}
2019-05-31 09:45:11 +00:00
if state . data . statsTags != nil {
s . ctx = stats . SetIncomingTags ( s . ctx , state . data . statsTags )
2018-01-09 18:57:14 +00:00
}
2019-05-31 09:45:11 +00:00
if state . data . statsTrace != nil {
s . ctx = stats . SetIncomingTrace ( s . ctx , state . data . statsTrace )
2018-01-09 18:57:14 +00:00
}
if t . inTapHandle != nil {
var err error
info := & tap . Info {
2019-05-31 09:45:11 +00:00
FullMethodName : state . data . method ,
2018-01-09 18:57:14 +00:00
}
s . ctx , err = t . inTapHandle ( s . ctx , info )
if err != nil {
warningf ( "transport: http2Server.operateHeaders got an error from InTapHandle: %v" , err )
2018-07-18 14:47:22 +00:00
t . controlBuf . put ( & cleanupStream {
streamID : s . id ,
rst : true ,
rstCode : http2 . ErrCodeRefusedStream ,
onWrite : func ( ) { } ,
} )
2018-11-26 18:23:56 +00:00
return false
2018-01-09 18:57:14 +00:00
}
}
t . mu . Lock ( )
if t . state != reachable {
t . mu . Unlock ( )
2018-11-26 18:23:56 +00:00
return false
2018-01-09 18:57:14 +00:00
}
if uint32 ( len ( t . activeStreams ) ) >= t . maxStreams {
t . mu . Unlock ( )
2018-07-18 14:47:22 +00:00
t . controlBuf . put ( & cleanupStream {
streamID : streamID ,
rst : true ,
rstCode : http2 . ErrCodeRefusedStream ,
onWrite : func ( ) { } ,
} )
2018-11-26 18:23:56 +00:00
return false
2018-01-09 18:57:14 +00:00
}
if streamID % 2 != 1 || streamID <= t . maxStreamID {
t . mu . Unlock ( )
// illegal gRPC stream id.
errorf ( "transport: http2Server.HandleStreams received an illegal stream id: %v" , streamID )
return true
}
t . maxStreamID = streamID
t . activeStreams [ streamID ] = s
if len ( t . activeStreams ) == 1 {
t . idle = time . Time { }
}
t . mu . Unlock ( )
2018-07-18 14:47:22 +00:00
if channelz . IsOn ( ) {
2018-11-26 18:23:56 +00:00
atomic . AddInt64 ( & t . czData . streamsStarted , 1 )
atomic . StoreInt64 ( & t . czData . lastStreamCreatedTime , time . Now ( ) . UnixNano ( ) )
2018-07-18 14:47:22 +00:00
}
2018-01-09 18:57:14 +00:00
s . requestRead = func ( n int ) {
t . adjustWindow ( s , uint32 ( n ) )
}
s . ctx = traceCtx ( s . ctx , s . method )
if t . stats != nil {
s . ctx = t . stats . TagRPC ( s . ctx , & stats . RPCTagInfo { FullMethodName : s . method } )
inHeader := & stats . InHeader {
FullMethod : s . method ,
RemoteAddr : t . remoteAddr ,
LocalAddr : t . localAddr ,
Compression : s . recvCompress ,
WireLength : int ( frame . Header ( ) . Length ) ,
}
t . stats . HandleRPC ( s . ctx , inHeader )
}
2018-07-18 14:47:22 +00:00
s . ctxDone = s . ctx . Done ( )
s . wq = newWriteQuota ( defaultWriteQuota , s . ctxDone )
2018-01-09 18:57:14 +00:00
s . trReader = & transportReader {
reader : & recvBufferReader {
2019-07-18 12:13:24 +00:00
ctx : s . ctx ,
ctxDone : s . ctxDone ,
recv : s . buf ,
freeBuffer : t . bufferPool . put ,
2018-01-09 18:57:14 +00:00
} ,
windowHandler : func ( n int ) {
t . updateWindow ( s , uint32 ( n ) )
} ,
}
2018-07-18 14:47:22 +00:00
// Register the stream with loopy.
t . controlBuf . put ( & registerStream {
streamID : s . id ,
wq : s . wq ,
} )
2018-01-09 18:57:14 +00:00
handle ( s )
2018-11-26 18:23:56 +00:00
return false
2018-01-09 18:57:14 +00:00
}
// HandleStreams receives incoming streams using the given handler. This is
// typically run in a separate goroutine.
// traceCtx attaches trace to ctx and returns the new context.
func ( t * http2Server ) HandleStreams ( handle func ( * Stream ) , traceCtx func ( context . Context , string ) context . Context ) {
2018-07-18 14:47:22 +00:00
defer close ( t . readerDone )
2018-01-09 18:57:14 +00:00
for {
2019-09-20 10:45:13 +00:00
t . controlBuf . throttle ( )
2018-01-09 18:57:14 +00:00
frame , err := t . framer . fr . ReadFrame ( )
atomic . StoreUint32 ( & t . activity , 1 )
if err != nil {
if se , ok := err . ( http2 . StreamError ) ; ok {
2018-07-18 14:47:22 +00:00
warningf ( "transport: http2Server.HandleStreams encountered http2.StreamError: %v" , se )
2018-01-09 18:57:14 +00:00
t . mu . Lock ( )
s := t . activeStreams [ se . StreamID ]
t . mu . Unlock ( )
if s != nil {
2019-05-31 09:45:11 +00:00
t . closeStream ( s , true , se . Code , false )
2018-07-18 14:47:22 +00:00
} else {
t . controlBuf . put ( & cleanupStream {
streamID : se . StreamID ,
rst : true ,
rstCode : se . Code ,
onWrite : func ( ) { } ,
} )
2018-01-09 18:57:14 +00:00
}
continue
}
if err == io . EOF || err == io . ErrUnexpectedEOF {
t . Close ( )
return
}
warningf ( "transport: http2Server.HandleStreams failed to read frame: %v" , err )
t . Close ( )
return
}
switch frame := frame . ( type ) {
case * http2 . MetaHeadersFrame :
if t . operateHeaders ( frame , handle , traceCtx ) {
t . Close ( )
break
}
case * http2 . DataFrame :
t . handleData ( frame )
case * http2 . RSTStreamFrame :
t . handleRSTStream ( frame )
case * http2 . SettingsFrame :
t . handleSettings ( frame )
case * http2 . PingFrame :
t . handlePing ( frame )
case * http2 . WindowUpdateFrame :
t . handleWindowUpdate ( frame )
case * http2 . GoAwayFrame :
// TODO: Handle GoAway from the client appropriately.
default :
errorf ( "transport: http2Server.HandleStreams found unhandled frame type %v." , frame )
}
}
}
func ( t * http2Server ) getStream ( f http2 . Frame ) ( * Stream , bool ) {
t . mu . Lock ( )
defer t . mu . Unlock ( )
if t . activeStreams == nil {
// The transport is closing.
return nil , false
}
s , ok := t . activeStreams [ f . Header ( ) . StreamID ]
if ! ok {
// The stream is already done.
return nil , false
}
return s , true
}
// adjustWindow sends out extra window update over the initial window size
// of stream if the application is requesting data larger in size than
// the window.
func ( t * http2Server ) adjustWindow ( s * Stream , n uint32 ) {
if w := s . fc . maybeAdjust ( n ) ; w > 0 {
2018-07-18 14:47:22 +00:00
t . controlBuf . put ( & outgoingWindowUpdate { streamID : s . id , increment : w } )
2018-01-09 18:57:14 +00:00
}
2018-07-18 14:47:22 +00:00
2018-01-09 18:57:14 +00:00
}
// updateWindow adjusts the inbound quota for the stream and the transport.
// Window updates will deliver to the controller for sending when
// the cumulative quota exceeds the corresponding threshold.
func ( t * http2Server ) updateWindow ( s * Stream , n uint32 ) {
if w := s . fc . onRead ( n ) ; w > 0 {
2018-07-18 14:47:22 +00:00
t . controlBuf . put ( & outgoingWindowUpdate { streamID : s . id ,
increment : w ,
} )
2018-01-09 18:57:14 +00:00
}
}
// updateFlowControl updates the incoming flow control windows
// for the transport and the stream based on the current bdp
// estimation.
func ( t * http2Server ) updateFlowControl ( n uint32 ) {
t . mu . Lock ( )
for _ , s := range t . activeStreams {
s . fc . newLimit ( n )
}
t . initialWindowSize = int32 ( n )
t . mu . Unlock ( )
2018-07-18 14:47:22 +00:00
t . controlBuf . put ( & outgoingWindowUpdate {
streamID : 0 ,
increment : t . fc . newLimit ( n ) ,
} )
t . controlBuf . put ( & outgoingSettings {
2018-01-09 18:57:14 +00:00
ss : [ ] http2 . Setting {
{
ID : http2 . SettingInitialWindowSize ,
2018-07-18 14:47:22 +00:00
Val : n ,
2018-01-09 18:57:14 +00:00
} ,
} ,
} )
}
func ( t * http2Server ) handleData ( f * http2 . DataFrame ) {
size := f . Header ( ) . Length
var sendBDPPing bool
if t . bdpEst != nil {
2018-07-18 14:47:22 +00:00
sendBDPPing = t . bdpEst . add ( size )
2018-01-09 18:57:14 +00:00
}
// Decouple connection's flow control from application's read.
// An update on connection's flow control should not depend on
// whether user application has read the data or not. Such a
// restriction is already imposed on the stream's flow control,
// and therefore the sender will be blocked anyways.
// Decoupling the connection flow control will prevent other
// active(fast) streams from starving in presence of slow or
// inactive streams.
2018-07-18 14:47:22 +00:00
if w := t . fc . onData ( size ) ; w > 0 {
t . controlBuf . put ( & outgoingWindowUpdate {
streamID : 0 ,
increment : w ,
} )
}
2018-01-09 18:57:14 +00:00
if sendBDPPing {
2018-07-18 14:47:22 +00:00
// Avoid excessive ping detection (e.g. in an L7 proxy)
// by sending a window update prior to the BDP ping.
if w := t . fc . reset ( ) ; w > 0 {
t . controlBuf . put ( & outgoingWindowUpdate {
streamID : 0 ,
increment : w ,
} )
2018-01-09 18:57:14 +00:00
}
t . controlBuf . put ( bdpPing )
}
// Select the right stream to dispatch.
s , ok := t . getStream ( f )
if ! ok {
return
}
if size > 0 {
2018-07-18 14:47:22 +00:00
if err := s . fc . onData ( size ) ; err != nil {
2019-05-31 09:45:11 +00:00
t . closeStream ( s , true , http2 . ErrCodeFlowControl , false )
2018-01-09 18:57:14 +00:00
return
}
if f . Header ( ) . Flags . Has ( http2 . FlagDataPadded ) {
2018-07-18 14:47:22 +00:00
if w := s . fc . onRead ( size - uint32 ( len ( f . Data ( ) ) ) ) ; w > 0 {
t . controlBuf . put ( & outgoingWindowUpdate { s . id , w } )
2018-01-09 18:57:14 +00:00
}
}
// TODO(bradfitz, zhaoq): A copy is required here because there is no
// guarantee f.Data() is consumed before the arrival of next frame.
// Can this copy be eliminated?
if len ( f . Data ( ) ) > 0 {
2019-07-18 12:13:24 +00:00
buffer := t . bufferPool . get ( )
buffer . Reset ( )
buffer . Write ( f . Data ( ) )
s . write ( recvMsg { buffer : buffer } )
2018-01-09 18:57:14 +00:00
}
}
if f . Header ( ) . Flags . Has ( http2 . FlagDataEndStream ) {
// Received the end of stream from the client.
2018-07-18 14:47:22 +00:00
s . compareAndSwapState ( streamActive , streamReadDone )
2018-01-09 18:57:14 +00:00
s . write ( recvMsg { err : io . EOF } )
}
}
func ( t * http2Server ) handleRSTStream ( f * http2 . RSTStreamFrame ) {
2019-05-31 09:45:11 +00:00
// If the stream is not deleted from the transport's active streams map, then do a regular close stream.
if s , ok := t . getStream ( f ) ; ok {
t . closeStream ( s , false , 0 , false )
2018-01-09 18:57:14 +00:00
return
}
2019-05-31 09:45:11 +00:00
// If the stream is already deleted from the active streams map, then put a cleanupStream item into controlbuf to delete the stream from loopy writer's established streams map.
t . controlBuf . put ( & cleanupStream {
streamID : f . Header ( ) . StreamID ,
rst : false ,
rstCode : 0 ,
onWrite : func ( ) { } ,
} )
2018-01-09 18:57:14 +00:00
}
func ( t * http2Server ) handleSettings ( f * http2 . SettingsFrame ) {
if f . IsAck ( ) {
return
}
2018-07-18 14:47:22 +00:00
var ss [ ] http2 . Setting
2018-11-26 18:23:56 +00:00
var updateFuncs [ ] func ( )
2018-01-09 18:57:14 +00:00
f . ForeachSetting ( func ( s http2 . Setting ) error {
2018-11-26 18:23:56 +00:00
switch s . ID {
case http2 . SettingMaxHeaderListSize :
updateFuncs = append ( updateFuncs , func ( ) {
t . maxSendHeaderListSize = new ( uint32 )
* t . maxSendHeaderListSize = s . Val
} )
default :
ss = append ( ss , s )
}
2018-01-09 18:57:14 +00:00
return nil
} )
2018-11-26 18:23:56 +00:00
t . controlBuf . executeAndPut ( func ( interface { } ) bool {
for _ , f := range updateFuncs {
f ( )
}
return true
} , & incomingSettings {
2018-07-18 14:47:22 +00:00
ss : ss ,
} )
2018-01-09 18:57:14 +00:00
}
const (
maxPingStrikes = 2
defaultPingTimeout = 2 * time . Hour
)
func ( t * http2Server ) handlePing ( f * http2 . PingFrame ) {
if f . IsAck ( ) {
if f . Data == goAwayPing . data && t . drainChan != nil {
close ( t . drainChan )
return
}
// Maybe it's a BDP ping.
if t . bdpEst != nil {
t . bdpEst . calculate ( f . Data )
}
return
}
pingAck := & ping { ack : true }
copy ( pingAck . data [ : ] , f . Data [ : ] )
t . controlBuf . put ( pingAck )
now := time . Now ( )
defer func ( ) {
t . lastPingAt = now
} ( )
// A reset ping strikes means that we don't need to check for policy
// violation for this ping and the pingStrikes counter should be set
// to 0.
if atomic . CompareAndSwapUint32 ( & t . resetPingStrikes , 1 , 0 ) {
t . pingStrikes = 0
return
}
t . mu . Lock ( )
ns := len ( t . activeStreams )
t . mu . Unlock ( )
if ns < 1 && ! t . kep . PermitWithoutStream {
// Keepalive shouldn't be active thus, this new ping should
// have come after at least defaultPingTimeout.
if t . lastPingAt . Add ( defaultPingTimeout ) . After ( now ) {
t . pingStrikes ++
}
} else {
// Check if keepalive policy is respected.
if t . lastPingAt . Add ( t . kep . MinTime ) . After ( now ) {
t . pingStrikes ++
}
}
if t . pingStrikes > maxPingStrikes {
// Send goaway and close the connection.
errorf ( "transport: Got too many pings from the client, closing the connection." )
t . controlBuf . put ( & goAway { code : http2 . ErrCodeEnhanceYourCalm , debugData : [ ] byte ( "too_many_pings" ) , closeConn : true } )
}
}
func ( t * http2Server ) handleWindowUpdate ( f * http2 . WindowUpdateFrame ) {
2018-07-18 14:47:22 +00:00
t . controlBuf . put ( & incomingWindowUpdate {
streamID : f . Header ( ) . StreamID ,
increment : f . Increment ,
} )
}
func appendHeaderFieldsFromMD ( headerFields [ ] hpack . HeaderField , md metadata . MD ) [ ] hpack . HeaderField {
for k , vv := range md {
if isReservedHeader ( k ) {
// Clients don't tolerate reading restricted headers after some non restricted ones were sent.
continue
}
for _ , v := range vv {
headerFields = append ( headerFields , hpack . HeaderField { Name : k , Value : encodeMetadataHeader ( k , v ) } )
}
2018-01-09 18:57:14 +00:00
}
2018-07-18 14:47:22 +00:00
return headerFields
2018-01-09 18:57:14 +00:00
}
2018-11-26 18:23:56 +00:00
func ( t * http2Server ) checkForHeaderListSize ( it interface { } ) bool {
if t . maxSendHeaderListSize == nil {
return true
}
hdrFrame := it . ( * headerFrame )
var sz int64
for _ , f := range hdrFrame . hf {
if sz += int64 ( f . Size ( ) ) ; sz > int64 ( * t . maxSendHeaderListSize ) {
errorf ( "header list size to send violates the maximum size (%d bytes) set by client" , * t . maxSendHeaderListSize )
return false
}
}
return true
}
2018-01-09 18:57:14 +00:00
// WriteHeader sends the header metedata md back to the client.
func ( t * http2Server ) WriteHeader ( s * Stream , md metadata . MD ) error {
2018-07-18 14:47:22 +00:00
if s . updateHeaderSent ( ) || s . getState ( ) == streamDone {
2018-01-09 18:57:14 +00:00
return ErrIllegalHeaderWrite
}
2018-07-18 14:47:22 +00:00
s . hdrMu . Lock ( )
2018-01-09 18:57:14 +00:00
if md . Len ( ) > 0 {
if s . header . Len ( ) > 0 {
s . header = metadata . Join ( s . header , md )
} else {
s . header = md
}
}
2018-11-26 18:23:56 +00:00
if err := t . writeHeaderLocked ( s ) ; err != nil {
s . hdrMu . Unlock ( )
return err
}
2018-07-18 14:47:22 +00:00
s . hdrMu . Unlock ( )
return nil
}
2019-09-20 10:45:13 +00:00
func ( t * http2Server ) setResetPingStrikes ( ) {
atomic . StoreUint32 ( & t . resetPingStrikes , 1 )
}
2018-11-26 18:23:56 +00:00
func ( t * http2Server ) writeHeaderLocked ( s * Stream ) error {
2018-01-09 18:57:14 +00:00
// TODO(mmukhi): Benchmark if the performance gets better if count the metadata and other header fields
// first and create a slice of that exact size.
headerFields := make ( [ ] hpack . HeaderField , 0 , 2 ) // at least :status, content-type will be there if none else.
headerFields = append ( headerFields , hpack . HeaderField { Name : ":status" , Value : "200" } )
2018-03-06 22:33:18 +00:00
headerFields = append ( headerFields , hpack . HeaderField { Name : "content-type" , Value : contentType ( s . contentSubtype ) } )
2018-01-09 18:57:14 +00:00
if s . sendCompress != "" {
headerFields = append ( headerFields , hpack . HeaderField { Name : "grpc-encoding" , Value : s . sendCompress } )
}
2018-07-18 14:47:22 +00:00
headerFields = appendHeaderFieldsFromMD ( headerFields , s . header )
2018-11-26 18:23:56 +00:00
success , err := t . controlBuf . executeAndPut ( t . checkForHeaderListSize , & headerFrame {
2018-01-09 18:57:14 +00:00
streamID : s . id ,
hf : headerFields ,
endStream : false ,
2019-09-20 10:45:13 +00:00
onWrite : t . setResetPingStrikes ,
2018-01-09 18:57:14 +00:00
} )
2018-11-26 18:23:56 +00:00
if ! success {
if err != nil {
return err
}
2019-05-31 09:45:11 +00:00
t . closeStream ( s , true , http2 . ErrCodeInternal , false )
2018-11-26 18:23:56 +00:00
return ErrHeaderListSizeLimitViolation
}
2018-01-09 18:57:14 +00:00
if t . stats != nil {
2018-03-06 22:33:18 +00:00
// Note: WireLength is not set in outHeader.
// TODO(mmukhi): Revisit this later, if needed.
outHeader := & stats . OutHeader { }
2018-01-09 18:57:14 +00:00
t . stats . HandleRPC ( s . Context ( ) , outHeader )
}
2018-11-26 18:23:56 +00:00
return nil
2018-01-09 18:57:14 +00:00
}
// WriteStatus sends stream status to the client and terminates the stream.
// There is no further I/O operations being able to perform on this stream.
// TODO(zhaoq): Now it indicates the end of entire stream. Revisit if early
// OK is adopted.
func ( t * http2Server ) WriteStatus ( s * Stream , st * status . Status ) error {
2018-07-18 14:47:22 +00:00
if s . getState ( ) == streamDone {
2018-01-09 18:57:14 +00:00
return nil
}
2018-07-18 14:47:22 +00:00
s . hdrMu . Lock ( )
2018-01-09 18:57:14 +00:00
// TODO(mmukhi): Benchmark if the performance gets better if count the metadata and other header fields
// first and create a slice of that exact size.
headerFields := make ( [ ] hpack . HeaderField , 0 , 2 ) // grpc-status and grpc-message will be there if none else.
2018-07-18 14:47:22 +00:00
if ! s . updateHeaderSent ( ) { // No headers have been sent.
if len ( s . header ) > 0 { // Send a separate header frame.
2018-11-26 18:23:56 +00:00
if err := t . writeHeaderLocked ( s ) ; err != nil {
s . hdrMu . Unlock ( )
return err
}
2018-07-18 14:47:22 +00:00
} else { // Send a trailer only response.
headerFields = append ( headerFields , hpack . HeaderField { Name : ":status" , Value : "200" } )
headerFields = append ( headerFields , hpack . HeaderField { Name : "content-type" , Value : contentType ( s . contentSubtype ) } )
}
2018-01-09 18:57:14 +00:00
}
headerFields = append ( headerFields , hpack . HeaderField { Name : "grpc-status" , Value : strconv . Itoa ( int ( st . Code ( ) ) ) } )
headerFields = append ( headerFields , hpack . HeaderField { Name : "grpc-message" , Value : encodeGrpcMessage ( st . Message ( ) ) } )
2019-07-18 12:13:24 +00:00
if p := statusRawProto ( st ) ; p != nil && len ( p . Details ) > 0 {
2018-01-09 18:57:14 +00:00
stBytes , err := proto . Marshal ( p )
if err != nil {
// TODO: return error instead, when callers are able to handle it.
2018-07-18 14:47:22 +00:00
grpclog . Errorf ( "transport: failed to marshal rpc status: %v, error: %v" , p , err )
} else {
headerFields = append ( headerFields , hpack . HeaderField { Name : "grpc-status-details-bin" , Value : encodeBinHeader ( stBytes ) } )
2018-01-09 18:57:14 +00:00
}
}
// Attach the trailer metadata.
2018-07-18 14:47:22 +00:00
headerFields = appendHeaderFieldsFromMD ( headerFields , s . trailer )
trailingHeader := & headerFrame {
2018-01-09 18:57:14 +00:00
streamID : s . id ,
hf : headerFields ,
endStream : true ,
2019-09-20 10:45:13 +00:00
onWrite : t . setResetPingStrikes ,
2018-07-18 14:47:22 +00:00
}
s . hdrMu . Unlock ( )
2018-11-26 18:23:56 +00:00
success , err := t . controlBuf . execute ( t . checkForHeaderListSize , trailingHeader )
if ! success {
if err != nil {
return err
}
2019-05-31 09:45:11 +00:00
t . closeStream ( s , true , http2 . ErrCodeInternal , false )
2018-11-26 18:23:56 +00:00
return ErrHeaderListSizeLimitViolation
}
2019-05-31 09:45:11 +00:00
// Send a RST_STREAM after the trailers if the client has not already half-closed.
rst := s . getState ( ) == streamActive
t . finishStream ( s , rst , http2 . ErrCodeNo , trailingHeader , true )
2018-01-09 18:57:14 +00:00
if t . stats != nil {
t . stats . HandleRPC ( s . Context ( ) , & stats . OutTrailer { } )
}
return nil
}
// Write converts the data into HTTP2 data frame and sends it out. Non-nil error
// is returns if it fails (e.g., framing error, transport error).
func ( t * http2Server ) Write ( s * Stream , hdr [ ] byte , data [ ] byte , opts * Options ) error {
2018-07-18 14:47:22 +00:00
if ! s . isHeaderSent ( ) { // Headers haven't been written yet.
if err := t . WriteHeader ( s , nil ) ; err != nil {
2019-05-31 09:45:11 +00:00
if _ , ok := err . ( ConnectionError ) ; ok {
return err
}
2018-07-18 14:47:22 +00:00
// TODO(mmukhi, dfawley): Make sure this is the right code to return.
2018-11-26 18:23:56 +00:00
return status . Errorf ( codes . Internal , "transport: %v" , err )
2018-07-18 14:47:22 +00:00
}
} else {
// Writing headers checks for this condition.
if s . getState ( ) == streamDone {
// TODO(mmukhi, dfawley): Should the server write also return io.EOF?
s . cancel ( )
select {
case <- t . ctx . Done ( ) :
return ErrConnClosing
default :
}
return ContextErr ( s . ctx . Err ( ) )
}
2018-01-09 18:57:14 +00:00
}
2018-07-18 14:47:22 +00:00
// Add some data to header frame so that we can equally distribute bytes across frames.
2018-01-09 18:57:14 +00:00
emptyLen := http2MaxFrameLen - len ( hdr )
if emptyLen > len ( data ) {
emptyLen = len ( data )
}
hdr = append ( hdr , data [ : emptyLen ] ... )
data = data [ emptyLen : ]
2018-07-18 14:47:22 +00:00
df := & dataFrame {
2019-09-20 10:45:13 +00:00
streamID : s . id ,
h : hdr ,
d : data ,
onEachWrite : t . setResetPingStrikes ,
2018-01-09 18:57:14 +00:00
}
2018-07-18 14:47:22 +00:00
if err := s . wq . get ( int32 ( len ( hdr ) + len ( data ) ) ) ; err != nil {
select {
case <- t . ctx . Done ( ) :
return ErrConnClosing
default :
}
return ContextErr ( s . ctx . Err ( ) )
2018-01-09 18:57:14 +00:00
}
2018-07-18 14:47:22 +00:00
return t . controlBuf . put ( df )
2018-01-09 18:57:14 +00:00
}
// keepalive running in a separate goroutine does the following:
// 1. Gracefully closes an idle connection after a duration of keepalive.MaxConnectionIdle.
// 2. Gracefully closes any connection after a duration of keepalive.MaxConnectionAge.
// 3. Forcibly closes a connection after an additive period of keepalive.MaxConnectionAgeGrace over keepalive.MaxConnectionAge.
// 4. Makes sure a connection is alive by sending pings with a frequency of keepalive.Time and closes a non-responsive connection
// after an additional duration of keepalive.Timeout.
func ( t * http2Server ) keepalive ( ) {
p := & ping { }
var pingSent bool
maxIdle := time . NewTimer ( t . kp . MaxConnectionIdle )
maxAge := time . NewTimer ( t . kp . MaxConnectionAge )
keepalive := time . NewTimer ( t . kp . Time )
// NOTE: All exit paths of this function should reset their
// respective timers. A failure to do so will cause the
// following clean-up to deadlock and eventually leak.
defer func ( ) {
if ! maxIdle . Stop ( ) {
<- maxIdle . C
}
if ! maxAge . Stop ( ) {
<- maxAge . C
}
if ! keepalive . Stop ( ) {
<- keepalive . C
}
} ( )
for {
select {
case <- maxIdle . C :
t . mu . Lock ( )
idle := t . idle
if idle . IsZero ( ) { // The connection is non-idle.
t . mu . Unlock ( )
maxIdle . Reset ( t . kp . MaxConnectionIdle )
continue
}
val := t . kp . MaxConnectionIdle - time . Since ( idle )
t . mu . Unlock ( )
if val <= 0 {
// The connection has been idle for a duration of keepalive.MaxConnectionIdle or more.
// Gracefully close the connection.
t . drain ( http2 . ErrCodeNo , [ ] byte { } )
2018-07-18 14:47:22 +00:00
// Resetting the timer so that the clean-up doesn't deadlock.
2018-01-09 18:57:14 +00:00
maxIdle . Reset ( infinity )
return
}
maxIdle . Reset ( val )
case <- maxAge . C :
t . drain ( http2 . ErrCodeNo , [ ] byte { } )
maxAge . Reset ( t . kp . MaxConnectionAgeGrace )
select {
case <- maxAge . C :
// Close the connection after grace period.
2019-09-20 10:45:13 +00:00
infof ( "transport: closing server transport due to maximum connection age." )
2018-01-09 18:57:14 +00:00
t . Close ( )
2018-07-18 14:47:22 +00:00
// Resetting the timer so that the clean-up doesn't deadlock.
2018-01-09 18:57:14 +00:00
maxAge . Reset ( infinity )
case <- t . ctx . Done ( ) :
}
return
case <- keepalive . C :
if atomic . CompareAndSwapUint32 ( & t . activity , 1 , 0 ) {
pingSent = false
keepalive . Reset ( t . kp . Time )
continue
}
if pingSent {
2019-09-20 10:45:13 +00:00
infof ( "transport: closing server transport due to idleness." )
2018-01-09 18:57:14 +00:00
t . Close ( )
2018-07-18 14:47:22 +00:00
// Resetting the timer so that the clean-up doesn't deadlock.
2018-01-09 18:57:14 +00:00
keepalive . Reset ( infinity )
return
}
pingSent = true
2018-07-18 14:47:22 +00:00
if channelz . IsOn ( ) {
2018-11-26 18:23:56 +00:00
atomic . AddInt64 ( & t . czData . kpCount , 1 )
2018-07-18 14:47:22 +00:00
}
2018-01-09 18:57:14 +00:00
t . controlBuf . put ( p )
keepalive . Reset ( t . kp . Timeout )
case <- t . ctx . Done ( ) :
return
}
}
}
// Close starts shutting down the http2Server transport.
// TODO(zhaoq): Now the destruction is not blocked on any pending streams. This
// could cause some resource issue. Revisit this later.
func ( t * http2Server ) Close ( ) error {
t . mu . Lock ( )
if t . state == closing {
t . mu . Unlock ( )
return errors . New ( "transport: Close() was already called" )
}
t . state = closing
streams := t . activeStreams
t . activeStreams = nil
t . mu . Unlock ( )
2018-07-18 14:47:22 +00:00
t . controlBuf . finish ( )
2018-01-09 18:57:14 +00:00
t . cancel ( )
err := t . conn . Close ( )
2018-07-18 14:47:22 +00:00
if channelz . IsOn ( ) {
channelz . RemoveEntry ( t . channelzID )
}
2018-01-09 18:57:14 +00:00
// Cancel all active streams.
for _ , s := range streams {
s . cancel ( )
}
if t . stats != nil {
connEnd := & stats . ConnEnd { }
t . stats . HandleConn ( t . ctx , connEnd )
}
return err
}
2019-04-03 07:57:13 +00:00
// deleteStream deletes the stream s from transport's active streams.
2019-07-18 12:13:24 +00:00
func ( t * http2Server ) deleteStream ( s * Stream , eosReceived bool ) {
2019-05-31 09:45:11 +00:00
// In case stream sending and receiving are invoked in separate
// goroutines (e.g., bi-directional streaming), cancel needs to be
// called to interrupt the potential blocking on other goroutines.
s . cancel ( )
t . mu . Lock ( )
if _ , ok := t . activeStreams [ s . id ] ; ok {
delete ( t . activeStreams , s . id )
if len ( t . activeStreams ) == 0 {
t . idle = time . Now ( )
}
2019-04-03 07:57:13 +00:00
}
t . mu . Unlock ( )
if channelz . IsOn ( ) {
if eosReceived {
atomic . AddInt64 ( & t . czData . streamsSucceeded , 1 )
} else {
atomic . AddInt64 ( & t . czData . streamsFailed , 1 )
}
}
2019-05-31 09:45:11 +00:00
}
2019-04-03 07:57:13 +00:00
2019-05-31 09:45:11 +00:00
// finishStream closes the stream and puts the trailing headerFrame into controlbuf.
func ( t * http2Server ) finishStream ( s * Stream , rst bool , rstCode http2 . ErrCode , hdr * headerFrame , eosReceived bool ) {
2019-07-18 12:13:24 +00:00
oldState := s . swapState ( streamDone )
2019-05-31 09:45:11 +00:00
if oldState == streamDone {
2019-07-18 12:13:24 +00:00
// If the stream was already done, return.
2019-05-31 09:45:11 +00:00
return
}
2019-04-03 07:57:13 +00:00
2019-05-31 09:45:11 +00:00
hdr . cleanup = & cleanupStream {
2018-07-18 14:47:22 +00:00
streamID : s . id ,
rst : rst ,
rstCode : rstCode ,
2019-07-18 12:13:24 +00:00
onWrite : func ( ) {
t . deleteStream ( s , eosReceived )
} ,
2018-07-18 14:47:22 +00:00
}
2019-04-03 07:57:13 +00:00
t . controlBuf . put ( hdr )
2018-01-09 18:57:14 +00:00
}
2019-05-31 09:45:11 +00:00
// closeStream clears the footprint of a stream when the stream is not needed any more.
func ( t * http2Server ) closeStream ( s * Stream , rst bool , rstCode http2 . ErrCode , eosReceived bool ) {
2019-07-18 12:13:24 +00:00
s . swapState ( streamDone )
2019-05-31 09:45:11 +00:00
t . deleteStream ( s , eosReceived )
2019-07-18 12:13:24 +00:00
2019-05-31 09:45:11 +00:00
t . controlBuf . put ( & cleanupStream {
streamID : s . id ,
rst : rst ,
rstCode : rstCode ,
onWrite : func ( ) { } ,
} )
}
2018-01-09 18:57:14 +00:00
func ( t * http2Server ) RemoteAddr ( ) net . Addr {
return t . remoteAddr
}
func ( t * http2Server ) Drain ( ) {
t . drain ( http2 . ErrCodeNo , [ ] byte { } )
}
func ( t * http2Server ) drain ( code http2 . ErrCode , debugData [ ] byte ) {
t . mu . Lock ( )
defer t . mu . Unlock ( )
if t . drainChan != nil {
return
}
t . drainChan = make ( chan struct { } )
t . controlBuf . put ( & goAway { code : code , debugData : debugData , headsUp : true } )
}
2018-07-18 14:47:22 +00:00
var goAwayPing = & ping { data : [ 8 ] byte { 1 , 6 , 1 , 8 , 0 , 3 , 3 , 9 } }
// Handles outgoing GoAway and returns true if loopy needs to put itself
// in draining mode.
func ( t * http2Server ) outgoingGoAwayHandler ( g * goAway ) ( bool , error ) {
t . mu . Lock ( )
if t . state == closing { // TODO(mmukhi): This seems unnecessary.
t . mu . Unlock ( )
// The transport is closing.
return false , ErrConnClosing
}
sid := t . maxStreamID
if ! g . headsUp {
// Stop accepting more streams now.
t . state = draining
if len ( t . activeStreams ) == 0 {
g . closeConn = true
}
t . mu . Unlock ( )
if err := t . framer . fr . WriteGoAway ( sid , g . code , g . debugData ) ; err != nil {
return false , err
}
if g . closeConn {
// Abruptly close the connection following the GoAway (via
// loopywriter). But flush out what's inside the buffer first.
t . framer . writer . Flush ( )
return false , fmt . Errorf ( "transport: Connection closing" )
}
return true , nil
}
t . mu . Unlock ( )
// For a graceful close, send out a GoAway with stream ID of MaxUInt32,
// Follow that with a ping and wait for the ack to come back or a timer
// to expire. During this time accept new streams since they might have
// originated before the GoAway reaches the client.
// After getting the ack or timer expiration send out another GoAway this
// time with an ID of the max stream server intends to process.
if err := t . framer . fr . WriteGoAway ( math . MaxUint32 , http2 . ErrCodeNo , [ ] byte { } ) ; err != nil {
return false , err
}
if err := t . framer . fr . WritePing ( false , goAwayPing . data ) ; err != nil {
return false , err
}
go func ( ) {
timer := time . NewTimer ( time . Minute )
defer timer . Stop ( )
select {
case <- t . drainChan :
case <- timer . C :
case <- t . ctx . Done ( ) :
return
}
t . controlBuf . put ( & goAway { code : g . code , debugData : g . debugData } )
} ( )
return false , nil
}
func ( t * http2Server ) ChannelzMetric ( ) * channelz . SocketInternalMetric {
s := channelz . SocketInternalMetric {
2018-11-26 18:23:56 +00:00
StreamsStarted : atomic . LoadInt64 ( & t . czData . streamsStarted ) ,
StreamsSucceeded : atomic . LoadInt64 ( & t . czData . streamsSucceeded ) ,
StreamsFailed : atomic . LoadInt64 ( & t . czData . streamsFailed ) ,
MessagesSent : atomic . LoadInt64 ( & t . czData . msgSent ) ,
MessagesReceived : atomic . LoadInt64 ( & t . czData . msgRecv ) ,
KeepAlivesSent : atomic . LoadInt64 ( & t . czData . kpCount ) ,
LastRemoteStreamCreatedTimestamp : time . Unix ( 0 , atomic . LoadInt64 ( & t . czData . lastStreamCreatedTime ) ) ,
LastMessageSentTimestamp : time . Unix ( 0 , atomic . LoadInt64 ( & t . czData . lastMsgSentTime ) ) ,
LastMessageReceivedTimestamp : time . Unix ( 0 , atomic . LoadInt64 ( & t . czData . lastMsgRecvTime ) ) ,
2018-07-18 14:47:22 +00:00
LocalFlowControlWindow : int64 ( t . fc . getSize ( ) ) ,
2018-11-26 18:23:56 +00:00
SocketOptions : channelz . GetSocketOption ( t . conn ) ,
LocalAddr : t . localAddr ,
RemoteAddr : t . remoteAddr ,
2018-07-18 14:47:22 +00:00
// RemoteName :
}
2018-11-26 18:23:56 +00:00
if au , ok := t . authInfo . ( credentials . ChannelzSecurityInfo ) ; ok {
s . Security = au . GetSecurityValue ( )
}
2018-07-18 14:47:22 +00:00
s . RemoteFlowControlWindow = t . getOutFlowWindow ( )
return & s
}
func ( t * http2Server ) IncrMsgSent ( ) {
2018-11-26 18:23:56 +00:00
atomic . AddInt64 ( & t . czData . msgSent , 1 )
atomic . StoreInt64 ( & t . czData . lastMsgSentTime , time . Now ( ) . UnixNano ( ) )
2018-07-18 14:47:22 +00:00
}
func ( t * http2Server ) IncrMsgRecv ( ) {
2018-11-26 18:23:56 +00:00
atomic . AddInt64 ( & t . czData . msgRecv , 1 )
atomic . StoreInt64 ( & t . czData . lastMsgRecvTime , time . Now ( ) . UnixNano ( ) )
2018-07-18 14:47:22 +00:00
}
func ( t * http2Server ) getOutFlowWindow ( ) int64 {
2019-04-03 07:57:13 +00:00
resp := make ( chan uint32 , 1 )
2018-07-18 14:47:22 +00:00
timer := time . NewTimer ( time . Second )
defer timer . Stop ( )
t . controlBuf . put ( & outFlowControlSizeRequest { resp } )
select {
case sz := <- resp :
return int64 ( sz )
case <- t . ctxDone :
return - 1
case <- timer . C :
return - 2
}
}
2018-01-09 18:57:14 +00:00
func getJitter ( v time . Duration ) time . Duration {
if v == infinity {
return 0
}
// Generate a jitter between +/- 10% of the value.
r := int64 ( v / 10 )
2018-07-18 14:47:22 +00:00
j := grpcrand . Int63n ( 2 * r ) - r
2018-01-09 18:57:14 +00:00
return time . Duration ( j )
}