mirror of
https://github.com/ceph/ceph-csi.git
synced 2025-06-13 10:33:35 +00:00
rebase: bump google.golang.org/grpc from 1.51.0 to 1.52.0
Bumps [google.golang.org/grpc](https://github.com/grpc/grpc-go) from 1.51.0 to 1.52.0. - [Release notes](https://github.com/grpc/grpc-go/releases) - [Commits](https://github.com/grpc/grpc-go/compare/v1.51.0...v1.52.0) --- updated-dependencies: - dependency-name: google.golang.org/grpc dependency-type: direct:production update-type: version-update:semver-minor ... Signed-off-by: dependabot[bot] <support@github.com>
This commit is contained in:
committed by
mergify[bot]
parent
c69fd4ab63
commit
36fe1450f1
2
vendor/google.golang.org/grpc/internal/binarylog/method_logger.go
generated
vendored
2
vendor/google.golang.org/grpc/internal/binarylog/method_logger.go
generated
vendored
@ -121,7 +121,7 @@ func (ml *TruncatingMethodLogger) truncateMetadata(mdPb *pb.Metadata) (truncated
|
||||
// but not counted towards the size limit.
|
||||
continue
|
||||
}
|
||||
currentEntryLen := uint64(len(entry.Value))
|
||||
currentEntryLen := uint64(len(entry.GetKey())) + uint64(len(entry.GetValue()))
|
||||
if currentEntryLen > bytesLimit {
|
||||
break
|
||||
}
|
||||
|
6
vendor/google.golang.org/grpc/internal/resolver/dns/dns_resolver.go
generated
vendored
6
vendor/google.golang.org/grpc/internal/resolver/dns/dns_resolver.go
generated
vendored
@ -32,7 +32,7 @@ import (
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
grpclbstate "google.golang.org/grpc/balancer/grpclb/state"
|
||||
grpclbstate "google.golang.org/grpc/balancer/grpclb/grpclbstate"
|
||||
"google.golang.org/grpc/grpclog"
|
||||
"google.golang.org/grpc/internal/backoff"
|
||||
"google.golang.org/grpc/internal/envconfig"
|
||||
@ -140,10 +140,10 @@ func (b *dnsBuilder) Build(target resolver.Target, cc resolver.ClientConn, opts
|
||||
disableServiceConfig: opts.DisableServiceConfig,
|
||||
}
|
||||
|
||||
if target.Authority == "" {
|
||||
if target.URL.Host == "" {
|
||||
d.resolver = defaultResolver
|
||||
} else {
|
||||
d.resolver, err = customAuthorityResolver(target.Authority)
|
||||
d.resolver, err = customAuthorityResolver(target.URL.Host)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
9
vendor/google.golang.org/grpc/internal/resolver/passthrough/passthrough.go
generated
vendored
9
vendor/google.golang.org/grpc/internal/resolver/passthrough/passthrough.go
generated
vendored
@ -20,13 +20,20 @@
|
||||
// name without scheme back to gRPC as resolved address.
|
||||
package passthrough
|
||||
|
||||
import "google.golang.org/grpc/resolver"
|
||||
import (
|
||||
"errors"
|
||||
|
||||
"google.golang.org/grpc/resolver"
|
||||
)
|
||||
|
||||
const scheme = "passthrough"
|
||||
|
||||
type passthroughBuilder struct{}
|
||||
|
||||
func (*passthroughBuilder) Build(target resolver.Target, cc resolver.ClientConn, opts resolver.BuildOptions) (resolver.Resolver, error) {
|
||||
if target.Endpoint == "" && opts.Dialer == nil {
|
||||
return nil, errors.New("passthrough: received empty target in Build()")
|
||||
}
|
||||
r := &passthroughResolver{
|
||||
target: target,
|
||||
cc: cc,
|
||||
|
4
vendor/google.golang.org/grpc/internal/resolver/unix/unix.go
generated
vendored
4
vendor/google.golang.org/grpc/internal/resolver/unix/unix.go
generated
vendored
@ -34,8 +34,8 @@ type builder struct {
|
||||
}
|
||||
|
||||
func (b *builder) Build(target resolver.Target, cc resolver.ClientConn, _ resolver.BuildOptions) (resolver.Resolver, error) {
|
||||
if target.Authority != "" {
|
||||
return nil, fmt.Errorf("invalid (non-empty) authority: %v", target.Authority)
|
||||
if target.URL.Host != "" {
|
||||
return nil, fmt.Errorf("invalid (non-empty) authority: %v", target.URL.Host)
|
||||
}
|
||||
|
||||
// gRPC was parsing the dial target manually before PR #4817, and we
|
||||
|
46
vendor/google.golang.org/grpc/internal/transport/controlbuf.go
generated
vendored
46
vendor/google.golang.org/grpc/internal/transport/controlbuf.go
generated
vendored
@ -191,7 +191,7 @@ type goAway struct {
|
||||
code http2.ErrCode
|
||||
debugData []byte
|
||||
headsUp bool
|
||||
closeConn bool
|
||||
closeConn error // if set, loopyWriter will exit, resulting in conn closure
|
||||
}
|
||||
|
||||
func (*goAway) isTransportResponseFrame() bool { return false }
|
||||
@ -209,6 +209,14 @@ type outFlowControlSizeRequest struct {
|
||||
|
||||
func (*outFlowControlSizeRequest) isTransportResponseFrame() bool { return false }
|
||||
|
||||
// closeConnection is an instruction to tell the loopy writer to flush the
|
||||
// framer and exit, which will cause the transport's connection to be closed
|
||||
// (by the client or server). The transport itself will close after the reader
|
||||
// encounters the EOF caused by the connection closure.
|
||||
type closeConnection struct{}
|
||||
|
||||
func (closeConnection) isTransportResponseFrame() bool { return false }
|
||||
|
||||
type outStreamState int
|
||||
|
||||
const (
|
||||
@ -408,7 +416,7 @@ func (c *controlBuffer) get(block bool) (interface{}, error) {
|
||||
select {
|
||||
case <-c.ch:
|
||||
case <-c.done:
|
||||
return nil, ErrConnClosing
|
||||
return nil, errors.New("transport closed by client")
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -519,18 +527,6 @@ const minBatchSize = 1000
|
||||
// As an optimization, to increase the batch size for each flush, loopy yields the processor, once
|
||||
// if the batch size is too low to give stream goroutines a chance to fill it up.
|
||||
func (l *loopyWriter) run() (err error) {
|
||||
defer func() {
|
||||
if err == ErrConnClosing {
|
||||
// Don't log ErrConnClosing as error since it happens
|
||||
// 1. When the connection is closed by some other known issue.
|
||||
// 2. User closed the connection.
|
||||
// 3. A graceful close of connection.
|
||||
if logger.V(logLevel) {
|
||||
logger.Infof("transport: loopyWriter.run returning. %v", err)
|
||||
}
|
||||
err = nil
|
||||
}
|
||||
}()
|
||||
for {
|
||||
it, err := l.cbuf.get(true)
|
||||
if err != nil {
|
||||
@ -574,7 +570,6 @@ func (l *loopyWriter) run() (err error) {
|
||||
}
|
||||
l.framer.writer.Flush()
|
||||
break hasdata
|
||||
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -662,11 +657,10 @@ func (l *loopyWriter) headerHandler(h *headerFrame) error {
|
||||
func (l *loopyWriter) originateStream(str *outStream) error {
|
||||
hdr := str.itl.dequeue().(*headerFrame)
|
||||
if err := hdr.initStream(str.id); err != nil {
|
||||
if err == ErrConnClosing {
|
||||
return err
|
||||
if err == errStreamDrain { // errStreamDrain need not close transport
|
||||
return nil
|
||||
}
|
||||
// Other errors(errStreamDrain) need not close transport.
|
||||
return nil
|
||||
return err
|
||||
}
|
||||
if err := l.writeHeader(str.id, hdr.endStream, hdr.hf, hdr.onWrite); err != nil {
|
||||
return err
|
||||
@ -764,7 +758,7 @@ func (l *loopyWriter) cleanupStreamHandler(c *cleanupStream) error {
|
||||
}
|
||||
}
|
||||
if l.side == clientSide && l.draining && len(l.estdStreams) == 0 {
|
||||
return ErrConnClosing
|
||||
return errors.New("finished processing active streams while in draining mode")
|
||||
}
|
||||
return nil
|
||||
}
|
||||
@ -799,7 +793,7 @@ func (l *loopyWriter) incomingGoAwayHandler(*incomingGoAway) error {
|
||||
if l.side == clientSide {
|
||||
l.draining = true
|
||||
if len(l.estdStreams) == 0 {
|
||||
return ErrConnClosing
|
||||
return errors.New("received GOAWAY with no active streams")
|
||||
}
|
||||
}
|
||||
return nil
|
||||
@ -817,6 +811,14 @@ func (l *loopyWriter) goAwayHandler(g *goAway) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (l *loopyWriter) closeConnectionHandler() error {
|
||||
l.framer.writer.Flush()
|
||||
// Exit loopyWriter entirely by returning an error here. This will lead to
|
||||
// the transport closing the connection, and, ultimately, transport
|
||||
// closure.
|
||||
return ErrConnClosing
|
||||
}
|
||||
|
||||
func (l *loopyWriter) handle(i interface{}) error {
|
||||
switch i := i.(type) {
|
||||
case *incomingWindowUpdate:
|
||||
@ -845,6 +847,8 @@ func (l *loopyWriter) handle(i interface{}) error {
|
||||
return l.goAwayHandler(i)
|
||||
case *outFlowControlSizeRequest:
|
||||
return l.outFlowControlSizeRequestHandler(i)
|
||||
case closeConnection:
|
||||
return l.closeConnectionHandler()
|
||||
default:
|
||||
return fmt.Errorf("transport: unknown control message type %T", i)
|
||||
}
|
||||
|
45
vendor/google.golang.org/grpc/internal/transport/handler_server.go
generated
vendored
45
vendor/google.golang.org/grpc/internal/transport/handler_server.go
generated
vendored
@ -46,24 +46,32 @@ import (
|
||||
"google.golang.org/grpc/status"
|
||||
)
|
||||
|
||||
// NewServerHandlerTransport returns a ServerTransport handling gRPC
|
||||
// from inside an http.Handler. It requires that the http Server
|
||||
// supports HTTP/2.
|
||||
// NewServerHandlerTransport returns a ServerTransport handling gRPC from
|
||||
// inside an http.Handler, or writes an HTTP error to w and returns an error.
|
||||
// It requires that the http Server supports HTTP/2.
|
||||
func NewServerHandlerTransport(w http.ResponseWriter, r *http.Request, stats []stats.Handler) (ServerTransport, error) {
|
||||
if r.ProtoMajor != 2 {
|
||||
return nil, errors.New("gRPC requires HTTP/2")
|
||||
msg := "gRPC requires HTTP/2"
|
||||
http.Error(w, msg, http.StatusBadRequest)
|
||||
return nil, errors.New(msg)
|
||||
}
|
||||
if r.Method != "POST" {
|
||||
return nil, errors.New("invalid gRPC request method")
|
||||
msg := fmt.Sprintf("invalid gRPC request method %q", r.Method)
|
||||
http.Error(w, msg, http.StatusBadRequest)
|
||||
return nil, errors.New(msg)
|
||||
}
|
||||
contentType := r.Header.Get("Content-Type")
|
||||
// TODO: do we assume contentType is lowercase? we did before
|
||||
contentSubtype, validContentType := grpcutil.ContentSubtype(contentType)
|
||||
if !validContentType {
|
||||
return nil, errors.New("invalid gRPC request content-type")
|
||||
msg := fmt.Sprintf("invalid gRPC request content-type %q", contentType)
|
||||
http.Error(w, msg, http.StatusBadRequest)
|
||||
return nil, errors.New(msg)
|
||||
}
|
||||
if _, ok := w.(http.Flusher); !ok {
|
||||
return nil, errors.New("gRPC requires a ResponseWriter supporting http.Flusher")
|
||||
msg := "gRPC requires a ResponseWriter supporting http.Flusher"
|
||||
http.Error(w, msg, http.StatusInternalServerError)
|
||||
return nil, errors.New(msg)
|
||||
}
|
||||
|
||||
st := &serverHandlerTransport{
|
||||
@ -79,7 +87,9 @@ func NewServerHandlerTransport(w http.ResponseWriter, r *http.Request, stats []s
|
||||
if v := r.Header.Get("grpc-timeout"); v != "" {
|
||||
to, err := decodeTimeout(v)
|
||||
if err != nil {
|
||||
return nil, status.Errorf(codes.Internal, "malformed time-out: %v", err)
|
||||
msg := fmt.Sprintf("malformed time-out: %v", err)
|
||||
http.Error(w, msg, http.StatusBadRequest)
|
||||
return nil, status.Error(codes.Internal, msg)
|
||||
}
|
||||
st.timeoutSet = true
|
||||
st.timeout = to
|
||||
@ -97,7 +107,9 @@ func NewServerHandlerTransport(w http.ResponseWriter, r *http.Request, stats []s
|
||||
for _, v := range vv {
|
||||
v, err := decodeMetadataHeader(k, v)
|
||||
if err != nil {
|
||||
return nil, status.Errorf(codes.Internal, "malformed binary metadata: %v", err)
|
||||
msg := fmt.Sprintf("malformed binary metadata %q in header %q: %v", v, k, err)
|
||||
http.Error(w, msg, http.StatusBadRequest)
|
||||
return nil, status.Error(codes.Internal, msg)
|
||||
}
|
||||
metakv = append(metakv, k, v)
|
||||
}
|
||||
@ -141,12 +153,15 @@ type serverHandlerTransport struct {
|
||||
stats []stats.Handler
|
||||
}
|
||||
|
||||
func (ht *serverHandlerTransport) Close() {
|
||||
ht.closeOnce.Do(ht.closeCloseChanOnce)
|
||||
func (ht *serverHandlerTransport) Close(err error) {
|
||||
ht.closeOnce.Do(func() {
|
||||
if logger.V(logLevel) {
|
||||
logger.Infof("Closing serverHandlerTransport: %v", err)
|
||||
}
|
||||
close(ht.closedCh)
|
||||
})
|
||||
}
|
||||
|
||||
func (ht *serverHandlerTransport) closeCloseChanOnce() { close(ht.closedCh) }
|
||||
|
||||
func (ht *serverHandlerTransport) RemoteAddr() net.Addr { return strAddr(ht.req.RemoteAddr) }
|
||||
|
||||
// strAddr is a net.Addr backed by either a TCP "ip:port" string, or
|
||||
@ -236,7 +251,7 @@ func (ht *serverHandlerTransport) WriteStatus(s *Stream, st *status.Status) erro
|
||||
})
|
||||
}
|
||||
}
|
||||
ht.Close()
|
||||
ht.Close(errors.New("finished writing status"))
|
||||
return err
|
||||
}
|
||||
|
||||
@ -346,7 +361,7 @@ func (ht *serverHandlerTransport) HandleStreams(startStream func(*Stream), trace
|
||||
case <-ht.req.Context().Done():
|
||||
}
|
||||
cancel()
|
||||
ht.Close()
|
||||
ht.Close(errors.New("request is done processing"))
|
||||
}()
|
||||
|
||||
req := ht.req
|
||||
|
46
vendor/google.golang.org/grpc/internal/transport/http2_client.go
generated
vendored
46
vendor/google.golang.org/grpc/internal/transport/http2_client.go
generated
vendored
@ -59,11 +59,15 @@ var clientConnectionCounter uint64
|
||||
|
||||
// http2Client implements the ClientTransport interface with HTTP2.
|
||||
type http2Client struct {
|
||||
lastRead int64 // Keep this field 64-bit aligned. Accessed atomically.
|
||||
ctx context.Context
|
||||
cancel context.CancelFunc
|
||||
ctxDone <-chan struct{} // Cache the ctx.Done() chan.
|
||||
userAgent string
|
||||
lastRead int64 // Keep this field 64-bit aligned. Accessed atomically.
|
||||
ctx context.Context
|
||||
cancel context.CancelFunc
|
||||
ctxDone <-chan struct{} // Cache the ctx.Done() chan.
|
||||
userAgent string
|
||||
// address contains the resolver returned address for this transport.
|
||||
// If the `ServerName` field is set, it takes precedence over `CallHdr.Host`
|
||||
// passed to `NewStream`, when determining the :authority header.
|
||||
address resolver.Address
|
||||
md metadata.MD
|
||||
conn net.Conn // underlying communication channel
|
||||
loopy *loopyWriter
|
||||
@ -238,8 +242,11 @@ func newHTTP2Client(connectCtx, ctx context.Context, addr resolver.Address, opts
|
||||
go func(conn net.Conn) {
|
||||
defer ctxMonitorDone.Fire() // Signal this goroutine has exited.
|
||||
<-newClientCtx.Done() // Block until connectCtx expires or the defer above executes.
|
||||
if connectCtx.Err() != nil {
|
||||
if err := connectCtx.Err(); err != nil {
|
||||
// connectCtx expired before exiting the function. Hard close the connection.
|
||||
if logger.V(logLevel) {
|
||||
logger.Infof("newClientTransport: aborting due to connectCtx: %v", err)
|
||||
}
|
||||
conn.Close()
|
||||
}
|
||||
}(conn)
|
||||
@ -314,6 +321,7 @@ func newHTTP2Client(connectCtx, ctx context.Context, addr resolver.Address, opts
|
||||
cancel: cancel,
|
||||
userAgent: opts.UserAgent,
|
||||
registeredCompressors: grpcutil.RegisteredCompressors(),
|
||||
address: addr,
|
||||
conn: conn,
|
||||
remoteAddr: conn.RemoteAddr(),
|
||||
localAddr: conn.LocalAddr(),
|
||||
@ -440,10 +448,8 @@ func newHTTP2Client(connectCtx, ctx context.Context, addr resolver.Address, opts
|
||||
go func() {
|
||||
t.loopy = newLoopyWriter(clientSide, t.framer, t.controlBuf, t.bdpEst)
|
||||
err := t.loopy.run()
|
||||
if err != nil {
|
||||
if logger.V(logLevel) {
|
||||
logger.Errorf("transport: loopyWriter.run returning. Err: %v", err)
|
||||
}
|
||||
if logger.V(logLevel) {
|
||||
logger.Infof("transport: loopyWriter exited. Closing connection. Err: %v", err)
|
||||
}
|
||||
// Do not close the transport. Let reader goroutine handle it since
|
||||
// there might be data in the buffers.
|
||||
@ -702,6 +708,18 @@ func (e NewStreamError) Error() string {
|
||||
// streams. All non-nil errors returned will be *NewStreamError.
|
||||
func (t *http2Client) NewStream(ctx context.Context, callHdr *CallHdr) (*Stream, error) {
|
||||
ctx = peer.NewContext(ctx, t.getPeer())
|
||||
|
||||
// ServerName field of the resolver returned address takes precedence over
|
||||
// Host field of CallHdr to determine the :authority header. This is because,
|
||||
// the ServerName field takes precedence for server authentication during
|
||||
// TLS handshake, and the :authority header should match the value used
|
||||
// for server authentication.
|
||||
if t.address.ServerName != "" {
|
||||
newCallHdr := *callHdr
|
||||
newCallHdr.Host = t.address.ServerName
|
||||
callHdr = &newCallHdr
|
||||
}
|
||||
|
||||
headerFields, err := t.createHeaderFields(ctx, callHdr)
|
||||
if err != nil {
|
||||
return nil, &NewStreamError{Err: err, AllowTransparentRetry: false}
|
||||
@ -934,6 +952,9 @@ func (t *http2Client) Close(err error) {
|
||||
t.mu.Unlock()
|
||||
return
|
||||
}
|
||||
if logger.V(logLevel) {
|
||||
logger.Infof("transport: closing: %v", err)
|
||||
}
|
||||
// Call t.onClose ASAP to prevent the client from attempting to create new
|
||||
// streams.
|
||||
t.onClose()
|
||||
@ -986,11 +1007,14 @@ func (t *http2Client) GracefulClose() {
|
||||
t.mu.Unlock()
|
||||
return
|
||||
}
|
||||
if logger.V(logLevel) {
|
||||
logger.Infof("transport: GracefulClose called")
|
||||
}
|
||||
t.state = draining
|
||||
active := len(t.activeStreams)
|
||||
t.mu.Unlock()
|
||||
if active == 0 {
|
||||
t.Close(ErrConnClosing)
|
||||
t.Close(connectionErrorf(true, nil, "no active streams left to process while draining"))
|
||||
return
|
||||
}
|
||||
t.controlBuf.put(&incomingGoAway{})
|
||||
|
96
vendor/google.golang.org/grpc/internal/transport/http2_server.go
generated
vendored
96
vendor/google.golang.org/grpc/internal/transport/http2_server.go
generated
vendored
@ -21,6 +21,7 @@ package transport
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"io"
|
||||
"math"
|
||||
@ -41,6 +42,7 @@ import (
|
||||
"google.golang.org/grpc/credentials"
|
||||
"google.golang.org/grpc/internal/channelz"
|
||||
"google.golang.org/grpc/internal/grpcrand"
|
||||
"google.golang.org/grpc/internal/grpcsync"
|
||||
"google.golang.org/grpc/keepalive"
|
||||
"google.golang.org/grpc/metadata"
|
||||
"google.golang.org/grpc/peer"
|
||||
@ -101,13 +103,13 @@ type http2Server struct {
|
||||
|
||||
mu sync.Mutex // guard the following
|
||||
|
||||
// drainChan is initialized when Drain() is called the first time.
|
||||
// After which the server writes out the first GoAway(with ID 2^31-1) frame.
|
||||
// Then an independent goroutine will be launched to later send the second GoAway.
|
||||
// During this time we don't want to write another first GoAway(with ID 2^31 -1) frame.
|
||||
// Thus call to Drain() will be a no-op if drainChan is already initialized since draining is
|
||||
// already underway.
|
||||
drainChan chan struct{}
|
||||
// drainEvent is initialized when Drain() is called the first time. After
|
||||
// which the server writes out the first GoAway(with ID 2^31-1) frame. Then
|
||||
// an independent goroutine will be launched to later send the second
|
||||
// GoAway. During this time we don't want to write another first GoAway(with
|
||||
// ID 2^31 -1) frame. Thus call to Drain() will be a no-op if drainEvent is
|
||||
// already initialized since draining is already underway.
|
||||
drainEvent *grpcsync.Event
|
||||
state transportState
|
||||
activeStreams map[uint32]*Stream
|
||||
// idle is the time instant when the connection went idle.
|
||||
@ -293,7 +295,7 @@ func NewServerTransport(conn net.Conn, config *ServerConfig) (_ ServerTransport,
|
||||
|
||||
defer func() {
|
||||
if err != nil {
|
||||
t.Close()
|
||||
t.Close(err)
|
||||
}
|
||||
}()
|
||||
|
||||
@ -331,10 +333,9 @@ func NewServerTransport(conn net.Conn, config *ServerConfig) (_ ServerTransport,
|
||||
go func() {
|
||||
t.loopy = newLoopyWriter(serverSide, t.framer, t.controlBuf, t.bdpEst)
|
||||
t.loopy.ssGoAwayHandler = t.outgoingGoAwayHandler
|
||||
if err := t.loopy.run(); err != nil {
|
||||
if logger.V(logLevel) {
|
||||
logger.Errorf("transport: loopyWriter.run returning. Err: %v", err)
|
||||
}
|
||||
err := t.loopy.run()
|
||||
if logger.V(logLevel) {
|
||||
logger.Infof("transport: loopyWriter exited. Closing connection. Err: %v", err)
|
||||
}
|
||||
t.conn.Close()
|
||||
t.controlBuf.finish()
|
||||
@ -344,8 +345,9 @@ func NewServerTransport(conn net.Conn, config *ServerConfig) (_ ServerTransport,
|
||||
return t, nil
|
||||
}
|
||||
|
||||
// operateHeader takes action on the decoded headers.
|
||||
func (t *http2Server) operateHeaders(frame *http2.MetaHeadersFrame, handle func(*Stream), traceCtx func(context.Context, string) context.Context) (fatal bool) {
|
||||
// operateHeaders takes action on the decoded headers. Returns an error if fatal
|
||||
// error encountered and transport needs to close, otherwise returns nil.
|
||||
func (t *http2Server) operateHeaders(frame *http2.MetaHeadersFrame, handle func(*Stream), traceCtx func(context.Context, string) context.Context) error {
|
||||
// Acquire max stream ID lock for entire duration
|
||||
t.maxStreamMu.Lock()
|
||||
defer t.maxStreamMu.Unlock()
|
||||
@ -361,15 +363,12 @@ func (t *http2Server) operateHeaders(frame *http2.MetaHeadersFrame, handle func(
|
||||
rstCode: http2.ErrCodeFrameSize,
|
||||
onWrite: func() {},
|
||||
})
|
||||
return false
|
||||
return nil
|
||||
}
|
||||
|
||||
if streamID%2 != 1 || streamID <= t.maxStreamID {
|
||||
// illegal gRPC stream id.
|
||||
if logger.V(logLevel) {
|
||||
logger.Errorf("transport: http2Server.HandleStreams received an illegal stream id: %v", streamID)
|
||||
}
|
||||
return true
|
||||
return fmt.Errorf("received an illegal stream id: %v. headers frame: %+v", streamID, frame)
|
||||
}
|
||||
t.maxStreamID = streamID
|
||||
|
||||
@ -453,7 +452,7 @@ func (t *http2Server) operateHeaders(frame *http2.MetaHeadersFrame, handle func(
|
||||
status: status.New(codes.Internal, errMsg),
|
||||
rst: !frame.StreamEnded(),
|
||||
})
|
||||
return false
|
||||
return nil
|
||||
}
|
||||
|
||||
if !isGRPC || headerError {
|
||||
@ -463,7 +462,7 @@ func (t *http2Server) operateHeaders(frame *http2.MetaHeadersFrame, handle func(
|
||||
rstCode: http2.ErrCodeProtocol,
|
||||
onWrite: func() {},
|
||||
})
|
||||
return false
|
||||
return nil
|
||||
}
|
||||
|
||||
// "If :authority is missing, Host must be renamed to :authority." - A41
|
||||
@ -503,7 +502,7 @@ func (t *http2Server) operateHeaders(frame *http2.MetaHeadersFrame, handle func(
|
||||
if t.state != reachable {
|
||||
t.mu.Unlock()
|
||||
s.cancel()
|
||||
return false
|
||||
return nil
|
||||
}
|
||||
if uint32(len(t.activeStreams)) >= t.maxStreams {
|
||||
t.mu.Unlock()
|
||||
@ -514,7 +513,7 @@ func (t *http2Server) operateHeaders(frame *http2.MetaHeadersFrame, handle func(
|
||||
onWrite: func() {},
|
||||
})
|
||||
s.cancel()
|
||||
return false
|
||||
return nil
|
||||
}
|
||||
if httpMethod != http.MethodPost {
|
||||
t.mu.Unlock()
|
||||
@ -530,7 +529,7 @@ func (t *http2Server) operateHeaders(frame *http2.MetaHeadersFrame, handle func(
|
||||
rst: !frame.StreamEnded(),
|
||||
})
|
||||
s.cancel()
|
||||
return false
|
||||
return nil
|
||||
}
|
||||
if t.inTapHandle != nil {
|
||||
var err error
|
||||
@ -550,7 +549,7 @@ func (t *http2Server) operateHeaders(frame *http2.MetaHeadersFrame, handle func(
|
||||
status: stat,
|
||||
rst: !frame.StreamEnded(),
|
||||
})
|
||||
return false
|
||||
return nil
|
||||
}
|
||||
}
|
||||
t.activeStreams[streamID] = s
|
||||
@ -597,7 +596,7 @@ func (t *http2Server) operateHeaders(frame *http2.MetaHeadersFrame, handle func(
|
||||
wq: s.wq,
|
||||
})
|
||||
handle(s)
|
||||
return false
|
||||
return nil
|
||||
}
|
||||
|
||||
// HandleStreams receives incoming streams using the given handler. This is
|
||||
@ -630,19 +629,16 @@ func (t *http2Server) HandleStreams(handle func(*Stream), traceCtx func(context.
|
||||
continue
|
||||
}
|
||||
if err == io.EOF || err == io.ErrUnexpectedEOF {
|
||||
t.Close()
|
||||
t.Close(err)
|
||||
return
|
||||
}
|
||||
if logger.V(logLevel) {
|
||||
logger.Warningf("transport: http2Server.HandleStreams failed to read frame: %v", err)
|
||||
}
|
||||
t.Close()
|
||||
t.Close(err)
|
||||
return
|
||||
}
|
||||
switch frame := frame.(type) {
|
||||
case *http2.MetaHeadersFrame:
|
||||
if t.operateHeaders(frame, handle, traceCtx) {
|
||||
t.Close()
|
||||
if err := t.operateHeaders(frame, handle, traceCtx); err != nil {
|
||||
t.Close(err)
|
||||
break
|
||||
}
|
||||
case *http2.DataFrame:
|
||||
@ -843,8 +839,8 @@ const (
|
||||
|
||||
func (t *http2Server) handlePing(f *http2.PingFrame) {
|
||||
if f.IsAck() {
|
||||
if f.Data == goAwayPing.data && t.drainChan != nil {
|
||||
close(t.drainChan)
|
||||
if f.Data == goAwayPing.data && t.drainEvent != nil {
|
||||
t.drainEvent.Fire()
|
||||
return
|
||||
}
|
||||
// Maybe it's a BDP ping.
|
||||
@ -886,10 +882,7 @@ func (t *http2Server) handlePing(f *http2.PingFrame) {
|
||||
|
||||
if t.pingStrikes > maxPingStrikes {
|
||||
// Send goaway and close the connection.
|
||||
if logger.V(logLevel) {
|
||||
logger.Errorf("transport: Got too many pings from the client, closing the connection.")
|
||||
}
|
||||
t.controlBuf.put(&goAway{code: http2.ErrCodeEnhanceYourCalm, debugData: []byte("too_many_pings"), closeConn: true})
|
||||
t.controlBuf.put(&goAway{code: http2.ErrCodeEnhanceYourCalm, debugData: []byte("too_many_pings"), closeConn: errors.New("got too many pings from the client")})
|
||||
}
|
||||
}
|
||||
|
||||
@ -1153,7 +1146,7 @@ func (t *http2Server) keepalive() {
|
||||
if logger.V(logLevel) {
|
||||
logger.Infof("transport: closing server transport due to maximum connection age.")
|
||||
}
|
||||
t.Close()
|
||||
t.controlBuf.put(closeConnection{})
|
||||
case <-t.done:
|
||||
}
|
||||
return
|
||||
@ -1169,10 +1162,7 @@ func (t *http2Server) keepalive() {
|
||||
continue
|
||||
}
|
||||
if outstandingPing && kpTimeoutLeft <= 0 {
|
||||
if logger.V(logLevel) {
|
||||
logger.Infof("transport: closing server transport due to idleness.")
|
||||
}
|
||||
t.Close()
|
||||
t.Close(fmt.Errorf("keepalive ping not acked within timeout %s", t.kp.Time))
|
||||
return
|
||||
}
|
||||
if !outstandingPing {
|
||||
@ -1199,12 +1189,15 @@ func (t *http2Server) keepalive() {
|
||||
// Close starts shutting down the http2Server transport.
|
||||
// TODO(zhaoq): Now the destruction is not blocked on any pending streams. This
|
||||
// could cause some resource issue. Revisit this later.
|
||||
func (t *http2Server) Close() {
|
||||
func (t *http2Server) Close(err error) {
|
||||
t.mu.Lock()
|
||||
if t.state == closing {
|
||||
t.mu.Unlock()
|
||||
return
|
||||
}
|
||||
if logger.V(logLevel) {
|
||||
logger.Infof("transport: closing: %v", err)
|
||||
}
|
||||
t.state = closing
|
||||
streams := t.activeStreams
|
||||
t.activeStreams = nil
|
||||
@ -1295,10 +1288,10 @@ func (t *http2Server) RemoteAddr() net.Addr {
|
||||
func (t *http2Server) Drain() {
|
||||
t.mu.Lock()
|
||||
defer t.mu.Unlock()
|
||||
if t.drainChan != nil {
|
||||
if t.drainEvent != nil {
|
||||
return
|
||||
}
|
||||
t.drainChan = make(chan struct{})
|
||||
t.drainEvent = grpcsync.NewEvent()
|
||||
t.controlBuf.put(&goAway{code: http2.ErrCodeNo, debugData: []byte{}, headsUp: true})
|
||||
}
|
||||
|
||||
@ -1319,19 +1312,20 @@ func (t *http2Server) outgoingGoAwayHandler(g *goAway) (bool, error) {
|
||||
// Stop accepting more streams now.
|
||||
t.state = draining
|
||||
sid := t.maxStreamID
|
||||
retErr := g.closeConn
|
||||
if len(t.activeStreams) == 0 {
|
||||
g.closeConn = true
|
||||
retErr = errors.New("second GOAWAY written and no active streams left to process")
|
||||
}
|
||||
t.mu.Unlock()
|
||||
t.maxStreamMu.Unlock()
|
||||
if err := t.framer.fr.WriteGoAway(sid, g.code, g.debugData); err != nil {
|
||||
return false, err
|
||||
}
|
||||
if g.closeConn {
|
||||
if retErr != nil {
|
||||
// Abruptly close the connection following the GoAway (via
|
||||
// loopywriter). But flush out what's inside the buffer first.
|
||||
t.framer.writer.Flush()
|
||||
return false, fmt.Errorf("transport: Connection closing")
|
||||
return false, retErr
|
||||
}
|
||||
return true, nil
|
||||
}
|
||||
@ -1353,7 +1347,7 @@ func (t *http2Server) outgoingGoAwayHandler(g *goAway) (bool, error) {
|
||||
timer := time.NewTimer(time.Minute)
|
||||
defer timer.Stop()
|
||||
select {
|
||||
case <-t.drainChan:
|
||||
case <-t.drainEvent.Done():
|
||||
case <-timer.C:
|
||||
case <-t.done:
|
||||
return
|
||||
|
2
vendor/google.golang.org/grpc/internal/transport/transport.go
generated
vendored
2
vendor/google.golang.org/grpc/internal/transport/transport.go
generated
vendored
@ -701,7 +701,7 @@ type ServerTransport interface {
|
||||
// Close tears down the transport. Once it is called, the transport
|
||||
// should not be accessed any more. All the pending streams and their
|
||||
// handlers will be terminated asynchronously.
|
||||
Close()
|
||||
Close(err error)
|
||||
|
||||
// RemoteAddr returns the remote network address.
|
||||
RemoteAddr() net.Addr
|
||||
|
Reference in New Issue
Block a user