rebase: bump google.golang.org/grpc from 1.50.1 to 1.51.0

Bumps [google.golang.org/grpc](https://github.com/grpc/grpc-go) from 1.50.1 to 1.51.0.
- [Release notes](https://github.com/grpc/grpc-go/releases)
- [Commits](https://github.com/grpc/grpc-go/compare/v1.50.1...v1.51.0)

---
updated-dependencies:
- dependency-name: google.golang.org/grpc
  dependency-type: direct:production
  update-type: version-update:semver-minor
...

Signed-off-by: dependabot[bot] <support@github.com>
This commit is contained in:
dependabot[bot] 2022-12-07 15:45:49 +00:00 committed by mergify[bot]
parent 31f0ac6e2d
commit f003c37b21
42 changed files with 507 additions and 267 deletions

2
go.mod
View File

@ -30,7 +30,7 @@ require (
golang.org/x/crypto v0.3.0 golang.org/x/crypto v0.3.0
golang.org/x/net v0.2.0 golang.org/x/net v0.2.0
golang.org/x/sys v0.3.0 golang.org/x/sys v0.3.0
google.golang.org/grpc v1.50.1 google.golang.org/grpc v1.51.0
google.golang.org/protobuf v1.28.1 google.golang.org/protobuf v1.28.1
k8s.io/api v0.25.4 k8s.io/api v0.25.4
k8s.io/apimachinery v0.25.4 k8s.io/apimachinery v0.25.4

4
go.sum
View File

@ -1617,8 +1617,8 @@ google.golang.org/grpc v1.40.0/go.mod h1:ogyxbiOoUXAkP+4+xa6PZSE9DZgIHtSpzjDTB9K
google.golang.org/grpc v1.44.0/go.mod h1:k+4IHHFw41K8+bbowsex27ge2rCb65oeWqe4jJ590SU= google.golang.org/grpc v1.44.0/go.mod h1:k+4IHHFw41K8+bbowsex27ge2rCb65oeWqe4jJ590SU=
google.golang.org/grpc v1.46.0/go.mod h1:vN9eftEi1UMyUsIF80+uQXhHjbXYbm0uXoFCACuMGWk= google.golang.org/grpc v1.46.0/go.mod h1:vN9eftEi1UMyUsIF80+uQXhHjbXYbm0uXoFCACuMGWk=
google.golang.org/grpc v1.47.0/go.mod h1:vN9eftEi1UMyUsIF80+uQXhHjbXYbm0uXoFCACuMGWk= google.golang.org/grpc v1.47.0/go.mod h1:vN9eftEi1UMyUsIF80+uQXhHjbXYbm0uXoFCACuMGWk=
google.golang.org/grpc v1.50.1 h1:DS/BukOZWp8s6p4Dt/tOaJaTQyPyOoCcrjroHuCeLzY= google.golang.org/grpc v1.51.0 h1:E1eGv1FTqoLIdnBCZufiSHgKjlqG6fKFf6pPWtMTh8U=
google.golang.org/grpc v1.50.1/go.mod h1:ZgQEeidpAuNRZ8iRrlBKXZQP1ghovWIVhdJRyCDK+GI= google.golang.org/grpc v1.51.0/go.mod h1:wgNDFcnuBGmxLKI/qn4T+m5BtEBYXJPvibbUPsAIPww=
google.golang.org/grpc/cmd/protoc-gen-go-grpc v1.1.0/go.mod h1:6Kw0yEErY5E/yWrBtf03jp27GLLJujG4z/JK95pnjjw= google.golang.org/grpc/cmd/protoc-gen-go-grpc v1.1.0/go.mod h1:6Kw0yEErY5E/yWrBtf03jp27GLLJujG4z/JK95pnjjw=
google.golang.org/protobuf v0.0.0-20200109180630-ec00e32a8dfd/go.mod h1:DFci5gLYBciE7Vtevhsrf46CRTquxDuWsQurQQe4oz8= google.golang.org/protobuf v0.0.0-20200109180630-ec00e32a8dfd/go.mod h1:DFci5gLYBciE7Vtevhsrf46CRTquxDuWsQurQQe4oz8=
google.golang.org/protobuf v0.0.0-20200221191635-4d8936d0db64/go.mod h1:kwYJMbMJ01Woi6D6+Kah6886xMZcty6N08ah7+eCXa0= google.golang.org/protobuf v0.0.0-20200221191635-4d8936d0db64/go.mod h1:kwYJMbMJ01Woi6D6+Kah6886xMZcty6N08ah7+eCXa0=

View File

@ -19,7 +19,7 @@
// Package attributes defines a generic key/value store used in various gRPC // Package attributes defines a generic key/value store used in various gRPC
// components. // components.
// //
// Experimental // # Experimental
// //
// Notice: This package is EXPERIMENTAL and may be changed or removed in a // Notice: This package is EXPERIMENTAL and may be changed or removed in a
// later release. // later release.

View File

@ -48,7 +48,7 @@ type BackoffConfig struct {
// here for more details: // here for more details:
// https://github.com/grpc/grpc/blob/master/doc/connection-backoff.md. // https://github.com/grpc/grpc/blob/master/doc/connection-backoff.md.
// //
// Experimental // # Experimental
// //
// Notice: This type is EXPERIMENTAL and may be changed or removed in a // Notice: This type is EXPERIMENTAL and may be changed or removed in a
// later release. // later release.

View File

@ -110,6 +110,11 @@ type SubConn interface {
UpdateAddresses([]resolver.Address) UpdateAddresses([]resolver.Address)
// Connect starts the connecting for this SubConn. // Connect starts the connecting for this SubConn.
Connect() Connect()
// GetOrBuildProducer returns a reference to the existing Producer for this
// ProducerBuilder in this SubConn, or, if one does not currently exist,
// creates a new one and returns it. Returns a close function which must
// be called when the Producer is no longer needed.
GetOrBuildProducer(ProducerBuilder) (p Producer, close func())
} }
// NewSubConnOptions contains options to create new SubConn. // NewSubConnOptions contains options to create new SubConn.
@ -371,3 +376,21 @@ type ClientConnState struct {
// ErrBadResolverState may be returned by UpdateClientConnState to indicate a // ErrBadResolverState may be returned by UpdateClientConnState to indicate a
// problem with the provided name resolver data. // problem with the provided name resolver data.
var ErrBadResolverState = errors.New("bad resolver state") var ErrBadResolverState = errors.New("bad resolver state")
// A ProducerBuilder is a simple constructor for a Producer. It is used by the
// SubConn to create producers when needed.
type ProducerBuilder interface {
// Build creates a Producer. The first parameter is always a
// grpc.ClientConnInterface (a type to allow creating RPCs/streams on the
// associated SubConn), but is declared as interface{} to avoid a
// dependency cycle. Should also return a close function that will be
// called when all references to the Producer have been given up.
Build(grpcClientConnInterface interface{}) (p Producer, close func())
}
// A Producer is a type shared among potentially many consumers. It is
// associated with a SubConn, and an implementation will typically contain
// other methods to provide additional functionality, e.g. configuration or
// subscription registration.
type Producer interface {
}

View File

@ -55,7 +55,11 @@ func (cse *ConnectivityStateEvaluator) RecordTransition(oldState, newState conne
cse.numIdle += updateVal cse.numIdle += updateVal
} }
} }
return cse.CurrentState()
}
// CurrentState returns the current aggregate conn state by evaluating the counters
func (cse *ConnectivityStateEvaluator) CurrentState() connectivity.State {
// Evaluate. // Evaluate.
if cse.numReady > 0 { if cse.numReady > 0 {
return connectivity.Ready return connectivity.Ready

View File

@ -19,17 +19,20 @@
package grpc package grpc
import ( import (
"context"
"fmt" "fmt"
"strings" "strings"
"sync" "sync"
"google.golang.org/grpc/balancer" "google.golang.org/grpc/balancer"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/connectivity" "google.golang.org/grpc/connectivity"
"google.golang.org/grpc/internal/balancer/gracefulswitch" "google.golang.org/grpc/internal/balancer/gracefulswitch"
"google.golang.org/grpc/internal/buffer" "google.golang.org/grpc/internal/buffer"
"google.golang.org/grpc/internal/channelz" "google.golang.org/grpc/internal/channelz"
"google.golang.org/grpc/internal/grpcsync" "google.golang.org/grpc/internal/grpcsync"
"google.golang.org/grpc/resolver" "google.golang.org/grpc/resolver"
"google.golang.org/grpc/status"
) )
// ccBalancerWrapper sits between the ClientConn and the Balancer. // ccBalancerWrapper sits between the ClientConn and the Balancer.
@ -305,7 +308,7 @@ func (ccb *ccBalancerWrapper) NewSubConn(addrs []resolver.Address, opts balancer
channelz.Warningf(logger, ccb.cc.channelzID, "acBalancerWrapper: NewSubConn: failed to newAddrConn: %v", err) channelz.Warningf(logger, ccb.cc.channelzID, "acBalancerWrapper: NewSubConn: failed to newAddrConn: %v", err)
return nil, err return nil, err
} }
acbw := &acBalancerWrapper{ac: ac} acbw := &acBalancerWrapper{ac: ac, producers: make(map[balancer.ProducerBuilder]*refCountedProducer)}
acbw.ac.mu.Lock() acbw.ac.mu.Lock()
ac.acbw = acbw ac.acbw = acbw
acbw.ac.mu.Unlock() acbw.ac.mu.Unlock()
@ -361,6 +364,7 @@ func (ccb *ccBalancerWrapper) Target() string {
type acBalancerWrapper struct { type acBalancerWrapper struct {
mu sync.Mutex mu sync.Mutex
ac *addrConn ac *addrConn
producers map[balancer.ProducerBuilder]*refCountedProducer
} }
func (acbw *acBalancerWrapper) UpdateAddresses(addrs []resolver.Address) { func (acbw *acBalancerWrapper) UpdateAddresses(addrs []resolver.Address) {
@ -414,3 +418,64 @@ func (acbw *acBalancerWrapper) getAddrConn() *addrConn {
defer acbw.mu.Unlock() defer acbw.mu.Unlock()
return acbw.ac return acbw.ac
} }
var errSubConnNotReady = status.Error(codes.Unavailable, "SubConn not currently connected")
// NewStream begins a streaming RPC on the addrConn. If the addrConn is not
// ready, returns errSubConnNotReady.
func (acbw *acBalancerWrapper) NewStream(ctx context.Context, desc *StreamDesc, method string, opts ...CallOption) (ClientStream, error) {
transport := acbw.ac.getReadyTransport()
if transport == nil {
return nil, errSubConnNotReady
}
return newNonRetryClientStream(ctx, desc, method, transport, acbw.ac, opts...)
}
// Invoke performs a unary RPC. If the addrConn is not ready, returns
// errSubConnNotReady.
func (acbw *acBalancerWrapper) Invoke(ctx context.Context, method string, args interface{}, reply interface{}, opts ...CallOption) error {
cs, err := acbw.NewStream(ctx, unaryStreamDesc, method, opts...)
if err != nil {
return err
}
if err := cs.SendMsg(args); err != nil {
return err
}
return cs.RecvMsg(reply)
}
type refCountedProducer struct {
producer balancer.Producer
refs int // number of current refs to the producer
close func() // underlying producer's close function
}
func (acbw *acBalancerWrapper) GetOrBuildProducer(pb balancer.ProducerBuilder) (balancer.Producer, func()) {
acbw.mu.Lock()
defer acbw.mu.Unlock()
// Look up existing producer from this builder.
pData := acbw.producers[pb]
if pData == nil {
// Not found; create a new one and add it to the producers map.
p, close := pb.Build(acbw)
pData = &refCountedProducer{producer: p, close: close}
acbw.producers[pb] = pData
}
// Account for this new reference.
pData.refs++
// Return a cleanup function wrapped in a OnceFunc to remove this reference
// and delete the refCountedProducer from the map if the total reference
// count goes to zero.
unref := func() {
acbw.mu.Lock()
pData.refs--
if pData.refs == 0 {
defer pData.close() // Run outside the acbw mutex
delete(acbw.producers, pb)
}
acbw.mu.Unlock()
}
return pData.producer, grpcsync.OnceFunc(unref)
}

View File

@ -261,6 +261,7 @@ type GrpcLogEntry struct {
// according to the type of the log entry. // according to the type of the log entry.
// //
// Types that are assignable to Payload: // Types that are assignable to Payload:
//
// *GrpcLogEntry_ClientHeader // *GrpcLogEntry_ClientHeader
// *GrpcLogEntry_ServerHeader // *GrpcLogEntry_ServerHeader
// *GrpcLogEntry_Message // *GrpcLogEntry_Message

View File

@ -23,7 +23,7 @@
// https://github.com/grpc/proposal/blob/master/A14-channelz.md, is provided by // https://github.com/grpc/proposal/blob/master/A14-channelz.md, is provided by
// the `internal/channelz` package. // the `internal/channelz` package.
// //
// Experimental // # Experimental
// //
// Notice: All APIs in this package are experimental and may be removed in a // Notice: All APIs in this package are experimental and may be removed in a
// later release. // later release.

View File

@ -503,7 +503,7 @@ type ClientConn struct {
// WaitForStateChange waits until the connectivity.State of ClientConn changes from sourceState or // WaitForStateChange waits until the connectivity.State of ClientConn changes from sourceState or
// ctx expires. A true value is returned in former case and false in latter. // ctx expires. A true value is returned in former case and false in latter.
// //
// Experimental // # Experimental
// //
// Notice: This API is EXPERIMENTAL and may be changed or removed in a // Notice: This API is EXPERIMENTAL and may be changed or removed in a
// later release. // later release.
@ -522,7 +522,7 @@ func (cc *ClientConn) WaitForStateChange(ctx context.Context, sourceState connec
// GetState returns the connectivity.State of ClientConn. // GetState returns the connectivity.State of ClientConn.
// //
// Experimental // # Experimental
// //
// Notice: This API is EXPERIMENTAL and may be changed or removed in a later // Notice: This API is EXPERIMENTAL and may be changed or removed in a later
// release. // release.
@ -534,7 +534,7 @@ func (cc *ClientConn) GetState() connectivity.State {
// the channel is idle. Does not wait for the connection attempts to begin // the channel is idle. Does not wait for the connection attempts to begin
// before returning. // before returning.
// //
// Experimental // # Experimental
// //
// Notice: This API is EXPERIMENTAL and may be changed or removed in a later // Notice: This API is EXPERIMENTAL and may be changed or removed in a later
// release. // release.
@ -761,7 +761,7 @@ func (cc *ClientConn) channelzMetric() *channelz.ChannelInternalMetric {
// Target returns the target string of the ClientConn. // Target returns the target string of the ClientConn.
// //
// Experimental // # Experimental
// //
// Notice: This API is EXPERIMENTAL and may be changed or removed in a // Notice: This API is EXPERIMENTAL and may be changed or removed in a
// later release. // later release.
@ -998,7 +998,7 @@ func (cc *ClientConn) resolveNow(o resolver.ResolveNowOptions) {
// However, if a previously unavailable network becomes available, this may be // However, if a previously unavailable network becomes available, this may be
// used to trigger an immediate reconnect. // used to trigger an immediate reconnect.
// //
// Experimental // # Experimental
// //
// Notice: This API is EXPERIMENTAL and may be changed or removed in a // Notice: This API is EXPERIMENTAL and may be changed or removed in a
// later release. // later release.
@ -1228,38 +1228,33 @@ func (ac *addrConn) tryAllAddrs(addrs []resolver.Address, connectDeadline time.T
// address was not successfully connected, or updates ac appropriately with the // address was not successfully connected, or updates ac appropriately with the
// new transport. // new transport.
func (ac *addrConn) createTransport(addr resolver.Address, copts transport.ConnectOptions, connectDeadline time.Time) error { func (ac *addrConn) createTransport(addr resolver.Address, copts transport.ConnectOptions, connectDeadline time.Time) error {
// TODO: Delete prefaceReceived and move the logic to wait for it into the
// transport.
prefaceReceived := grpcsync.NewEvent()
connClosed := grpcsync.NewEvent()
addr.ServerName = ac.cc.getServerName(addr) addr.ServerName = ac.cc.getServerName(addr)
hctx, hcancel := context.WithCancel(ac.ctx) hctx, hcancel := context.WithCancel(ac.ctx)
hcStarted := false // protected by ac.mu
onClose := func() { onClose := grpcsync.OnceFunc(func() {
ac.mu.Lock() ac.mu.Lock()
defer ac.mu.Unlock() defer ac.mu.Unlock()
defer connClosed.Fire() if ac.state == connectivity.Shutdown {
defer hcancel() // Already shut down. tearDown() already cleared the transport and
if !hcStarted || hctx.Err() != nil { // canceled hctx via ac.ctx, and we expected this connection to be
// We didn't start the health check or set the state to READY, so // closed, so do nothing here.
// no need to do anything else here. return
// }
// OR, we have already cancelled the health check context, meaning hcancel()
// we have already called onClose once for this transport. In this if ac.transport == nil {
// case it would be dangerous to clear the transport and update the // We're still connecting to this address, which could error. Do
// state, since there may be a new transport in this addrConn. // not update the connectivity state or resolve; these will happen
// at the end of the tryAllAddrs connection loop in the event of an
// error.
return return
} }
ac.transport = nil ac.transport = nil
// Refresh the name resolver // Refresh the name resolver on any connection loss.
ac.cc.resolveNow(resolver.ResolveNowOptions{}) ac.cc.resolveNow(resolver.ResolveNowOptions{})
if ac.state != connectivity.Shutdown { // Always go idle and wait for the LB policy to initiate a new
// connection attempt.
ac.updateConnectivityState(connectivity.Idle, nil) ac.updateConnectivityState(connectivity.Idle, nil)
} })
}
onGoAway := func(r transport.GoAwayReason) { onGoAway := func(r transport.GoAwayReason) {
ac.mu.Lock() ac.mu.Lock()
ac.adjustParams(r) ac.adjustParams(r)
@ -1271,7 +1266,7 @@ func (ac *addrConn) createTransport(addr resolver.Address, copts transport.Conne
defer cancel() defer cancel()
copts.ChannelzParentID = ac.channelzID copts.ChannelzParentID = ac.channelzID
newTr, err := transport.NewClientTransport(connectCtx, ac.cc.ctx, addr, copts, func() { prefaceReceived.Fire() }, onGoAway, onClose) newTr, err := transport.NewClientTransport(connectCtx, ac.cc.ctx, addr, copts, onGoAway, onClose)
if err != nil { if err != nil {
// newTr is either nil, or closed. // newTr is either nil, or closed.
hcancel() hcancel()
@ -1279,35 +1274,13 @@ func (ac *addrConn) createTransport(addr resolver.Address, copts transport.Conne
return err return err
} }
select {
case <-connectCtx.Done():
// We didn't get the preface in time.
// The error we pass to Close() is immaterial since there are no open
// streams at this point, so no trailers with error details will be sent
// out. We just need to pass a non-nil error.
newTr.Close(transport.ErrConnClosing)
if connectCtx.Err() == context.DeadlineExceeded {
err := errors.New("failed to receive server preface within timeout")
channelz.Warningf(logger, ac.channelzID, "grpc: addrConn.createTransport failed to connect to %s: %v", addr, err)
return err
}
return nil
case <-prefaceReceived.Done():
// We got the preface - huzzah! things are good.
ac.mu.Lock() ac.mu.Lock()
defer ac.mu.Unlock() defer ac.mu.Unlock()
if connClosed.HasFired() {
// onClose called first; go idle but do nothing else.
if ac.state != connectivity.Shutdown {
ac.updateConnectivityState(connectivity.Idle, nil)
}
return nil
}
if ac.state == connectivity.Shutdown { if ac.state == connectivity.Shutdown {
// This can happen if the subConn was removed while in `Connecting` // This can happen if the subConn was removed while in `Connecting`
// state. tearDown() would have set the state to `Shutdown`, but // state. tearDown() would have set the state to `Shutdown`, but
// would not have closed the transport since ac.transport would not // would not have closed the transport since ac.transport would not
// been set at that point. // have been set at that point.
// //
// We run this in a goroutine because newTr.Close() calls onClose() // We run this in a goroutine because newTr.Close() calls onClose()
// inline, which requires locking ac.mu. // inline, which requires locking ac.mu.
@ -1318,21 +1291,17 @@ func (ac *addrConn) createTransport(addr resolver.Address, copts transport.Conne
go newTr.Close(transport.ErrConnClosing) go newTr.Close(transport.ErrConnClosing)
return nil return nil
} }
if hctx.Err() != nil {
// onClose was already called for this connection, but the connection
// was successfully established first. Consider it a success and set
// the new state to Idle.
ac.updateConnectivityState(connectivity.Idle, nil)
return nil
}
ac.curAddr = addr ac.curAddr = addr
ac.transport = newTr ac.transport = newTr
hcStarted = true
ac.startHealthCheck(hctx) // Will set state to READY if appropriate. ac.startHealthCheck(hctx) // Will set state to READY if appropriate.
return nil return nil
case <-connClosed.Done():
// The transport has already closed. If we received the preface, too,
// this is not an error.
select {
case <-prefaceReceived.Done():
return nil
default:
return errors.New("connection closed before server preface received")
}
}
} }
// startHealthCheck starts the health checking stream (RPC) to watch the health // startHealthCheck starts the health checking stream (RPC) to watch the health
@ -1583,7 +1552,7 @@ func (cc *ClientConn) parseTargetAndFindResolver() (resolver.Builder, error) {
channelz.Infof(logger, cc.channelzID, "dial target %q parse failed: %v", cc.target, err) channelz.Infof(logger, cc.channelzID, "dial target %q parse failed: %v", cc.target, err)
} else { } else {
channelz.Infof(logger, cc.channelzID, "parsed dial target is: %+v", parsedTarget) channelz.Infof(logger, cc.channelzID, "parsed dial target is: %+v", parsedTarget)
rb = cc.getResolver(parsedTarget.Scheme) rb = cc.getResolver(parsedTarget.URL.Scheme)
if rb != nil { if rb != nil {
cc.parsedTarget = parsedTarget cc.parsedTarget = parsedTarget
return rb, nil return rb, nil
@ -1604,9 +1573,9 @@ func (cc *ClientConn) parseTargetAndFindResolver() (resolver.Builder, error) {
return nil, err return nil, err
} }
channelz.Infof(logger, cc.channelzID, "parsed dial target is: %+v", parsedTarget) channelz.Infof(logger, cc.channelzID, "parsed dial target is: %+v", parsedTarget)
rb = cc.getResolver(parsedTarget.Scheme) rb = cc.getResolver(parsedTarget.URL.Scheme)
if rb == nil { if rb == nil {
return nil, fmt.Errorf("could not get resolver for default scheme: %q", parsedTarget.Scheme) return nil, fmt.Errorf("could not get resolver for default scheme: %q", parsedTarget.URL.Scheme)
} }
cc.parsedTarget = parsedTarget cc.parsedTarget = parsedTarget
return rb, nil return rb, nil

View File

@ -36,16 +36,16 @@ import (
// PerRPCCredentials defines the common interface for the credentials which need to // PerRPCCredentials defines the common interface for the credentials which need to
// attach security information to every RPC (e.g., oauth2). // attach security information to every RPC (e.g., oauth2).
type PerRPCCredentials interface { type PerRPCCredentials interface {
// GetRequestMetadata gets the current request metadata, refreshing // GetRequestMetadata gets the current request metadata, refreshing tokens
// tokens if required. This should be called by the transport layer on // if required. This should be called by the transport layer on each
// each request, and the data should be populated in headers or other // request, and the data should be populated in headers or other
// context. If a status code is returned, it will be used as the status // context. If a status code is returned, it will be used as the status for
// for the RPC. uri is the URI of the entry point for the request. // the RPC (restricted to an allowable set of codes as defined by gRFC
// When supported by the underlying implementation, ctx can be used for // A54). uri is the URI of the entry point for the request. When supported
// timeout and cancellation. Additionally, RequestInfo data will be // by the underlying implementation, ctx can be used for timeout and
// available via ctx to this call. // cancellation. Additionally, RequestInfo data will be available via ctx
// TODO(zhaoq): Define the set of the qualified keys instead of leaving // to this call. TODO(zhaoq): Define the set of the qualified keys instead
// it as an arbitrary string. // of leaving it as an arbitrary string.
GetRequestMetadata(ctx context.Context, uri ...string) (map[string]string, error) GetRequestMetadata(ctx context.Context, uri ...string) (map[string]string, error)
// RequireTransportSecurity indicates whether the credentials requires // RequireTransportSecurity indicates whether the credentials requires
// transport security. // transport security.

View File

@ -195,7 +195,7 @@ func NewServerTLSFromFile(certFile, keyFile string) (TransportCredentials, error
// TLSChannelzSecurityValue defines the struct that TLS protocol should return // TLSChannelzSecurityValue defines the struct that TLS protocol should return
// from GetSecurityValue(), containing security info like cipher and certificate used. // from GetSecurityValue(), containing security info like cipher and certificate used.
// //
// Experimental // # Experimental
// //
// Notice: This type is EXPERIMENTAL and may be changed or removed in a // Notice: This type is EXPERIMENTAL and may be changed or removed in a
// later release. // later release.

View File

@ -19,7 +19,7 @@
// Package encoding defines the interface for the compressor and codec, and // Package encoding defines the interface for the compressor and codec, and
// functions to register and retrieve compressors and codecs. // functions to register and retrieve compressors and codecs.
// //
// Experimental // # Experimental
// //
// Notice: This package is EXPERIMENTAL and may be changed or removed in a // Notice: This package is EXPERIMENTAL and may be changed or removed in a
// later release. // later release.
@ -28,6 +28,8 @@ package encoding
import ( import (
"io" "io"
"strings" "strings"
"google.golang.org/grpc/internal/grpcutil"
) )
// Identity specifies the optional encoding for uncompressed streams. // Identity specifies the optional encoding for uncompressed streams.
@ -73,6 +75,7 @@ var registeredCompressor = make(map[string]Compressor)
// registered with the same name, the one registered last will take effect. // registered with the same name, the one registered last will take effect.
func RegisterCompressor(c Compressor) { func RegisterCompressor(c Compressor) {
registeredCompressor[c.Name()] = c registeredCompressor[c.Name()] = c
grpcutil.RegisteredCompressorNames = append(grpcutil.RegisteredCompressorNames, c.Name())
} }
// GetCompressor returns Compressor for the given compressor name. // GetCompressor returns Compressor for the given compressor name.

View File

@ -19,7 +19,7 @@
// Package gzip implements and registers the gzip compressor // Package gzip implements and registers the gzip compressor
// during the initialization. // during the initialization.
// //
// Experimental // # Experimental
// //
// Notice: This package is EXPERIMENTAL and may be changed or removed in a // Notice: This package is EXPERIMENTAL and may be changed or removed in a
// later release. // later release.

View File

@ -242,7 +242,7 @@ func (g *loggerT) V(l int) bool {
// DepthLoggerV2, the below functions will be called with the appropriate stack // DepthLoggerV2, the below functions will be called with the appropriate stack
// depth set for trivial functions the logger may ignore. // depth set for trivial functions the logger may ignore.
// //
// Experimental // # Experimental
// //
// Notice: This type is EXPERIMENTAL and may be changed or removed in a // Notice: This type is EXPERIMENTAL and may be changed or removed in a
// later release. // later release.

View File

@ -27,9 +27,13 @@ import (
const ( const (
prefix = "GRPC_GO_" prefix = "GRPC_GO_"
txtErrIgnoreStr = prefix + "IGNORE_TXT_ERRORS" txtErrIgnoreStr = prefix + "IGNORE_TXT_ERRORS"
advertiseCompressorsStr = prefix + "ADVERTISE_COMPRESSORS"
) )
var ( var (
// TXTErrIgnore is set if TXT errors should be ignored ("GRPC_GO_IGNORE_TXT_ERRORS" is not "false"). // TXTErrIgnore is set if TXT errors should be ignored ("GRPC_GO_IGNORE_TXT_ERRORS" is not "false").
TXTErrIgnore = !strings.EqualFold(os.Getenv(txtErrIgnoreStr), "false") TXTErrIgnore = !strings.EqualFold(os.Getenv(txtErrIgnoreStr), "false")
// AdvertiseCompressors is set if registered compressor should be advertised
// ("GRPC_GO_ADVERTISE_COMPRESSORS" is not "false").
AdvertiseCompressors = !strings.EqualFold(os.Getenv(advertiseCompressorsStr), "false")
) )

View File

@ -110,7 +110,7 @@ type LoggerV2 interface {
// This is a copy of the DepthLoggerV2 defined in the external grpclog package. // This is a copy of the DepthLoggerV2 defined in the external grpclog package.
// It is defined here to avoid a circular dependency. // It is defined here to avoid a circular dependency.
// //
// Experimental // # Experimental
// //
// Notice: This type is EXPERIMENTAL and may be changed or removed in a // Notice: This type is EXPERIMENTAL and may be changed or removed in a
// later release. // later release.

View File

@ -0,0 +1,32 @@
/*
*
* Copyright 2022 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package grpcsync
import (
"sync"
)
// OnceFunc returns a function wrapping f which ensures f is only executed
// once even if the returned function is executed multiple times.
func OnceFunc(f func()) func() {
var once sync.Once
return func() {
once.Do(f)
}
}

View File

@ -0,0 +1,47 @@
/*
*
* Copyright 2022 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package grpcutil
import (
"strings"
"google.golang.org/grpc/internal/envconfig"
)
// RegisteredCompressorNames holds names of the registered compressors.
var RegisteredCompressorNames []string
// IsCompressorNameRegistered returns true when name is available in registry.
func IsCompressorNameRegistered(name string) bool {
for _, compressor := range RegisteredCompressorNames {
if compressor == name {
return true
}
}
return false
}
// RegisteredCompressors returns a string of registered compressor names
// separated by comma.
func RegisteredCompressors() string {
if !envconfig.AdvertiseCompressors {
return ""
}
return strings.Join(RegisteredCompressorNames, ",")
}

View File

@ -25,7 +25,6 @@ import (
// ParseMethod splits service and method from the input. It expects format // ParseMethod splits service and method from the input. It expects format
// "/service/method". // "/service/method".
//
func ParseMethod(methodName string) (service, method string, _ error) { func ParseMethod(methodName string) (service, method string, _ error) {
if !strings.HasPrefix(methodName, "/") { if !strings.HasPrefix(methodName, "/") {
return "", "", errors.New("invalid method name: should start with /") return "", "", errors.New("invalid method name: should start with /")

View File

@ -164,3 +164,13 @@ func (e *Error) Is(target error) bool {
} }
return proto.Equal(e.s.s, tse.s.s) return proto.Equal(e.s.s, tse.s.s)
} }
// IsRestrictedControlPlaneCode returns whether the status includes a code
// restricted for control plane usage as defined by gRFC A54.
func IsRestrictedControlPlaneCode(s *Status) bool {
switch s.Code() {
case codes.InvalidArgument, codes.NotFound, codes.AlreadyExists, codes.FailedPrecondition, codes.Aborted, codes.OutOfRange, codes.DataLoss:
return true
}
return false
}

View File

@ -442,10 +442,10 @@ func (ht *serverHandlerTransport) Drain() {
// mapRecvMsgError returns the non-nil err into the appropriate // mapRecvMsgError returns the non-nil err into the appropriate
// error value as expected by callers of *grpc.parser.recvMsg. // error value as expected by callers of *grpc.parser.recvMsg.
// In particular, in can only be: // In particular, in can only be:
// * io.EOF // - io.EOF
// * io.ErrUnexpectedEOF // - io.ErrUnexpectedEOF
// * of type transport.ConnectionError // - of type transport.ConnectionError
// * an error from the status package // - an error from the status package
func mapRecvMsgError(err error) error { func mapRecvMsgError(err error) error {
if err == io.EOF || err == io.ErrUnexpectedEOF { if err == io.EOF || err == io.ErrUnexpectedEOF {
return err return err

View File

@ -38,8 +38,10 @@ import (
"google.golang.org/grpc/credentials" "google.golang.org/grpc/credentials"
"google.golang.org/grpc/internal/channelz" "google.golang.org/grpc/internal/channelz"
icredentials "google.golang.org/grpc/internal/credentials" icredentials "google.golang.org/grpc/internal/credentials"
"google.golang.org/grpc/internal/grpcsync"
"google.golang.org/grpc/internal/grpcutil" "google.golang.org/grpc/internal/grpcutil"
imetadata "google.golang.org/grpc/internal/metadata" imetadata "google.golang.org/grpc/internal/metadata"
istatus "google.golang.org/grpc/internal/status"
"google.golang.org/grpc/internal/syscall" "google.golang.org/grpc/internal/syscall"
"google.golang.org/grpc/internal/transport/networktype" "google.golang.org/grpc/internal/transport/networktype"
"google.golang.org/grpc/keepalive" "google.golang.org/grpc/keepalive"
@ -99,16 +101,13 @@ type http2Client struct {
maxSendHeaderListSize *uint32 maxSendHeaderListSize *uint32
bdpEst *bdpEstimator bdpEst *bdpEstimator
// onPrefaceReceipt is a callback that client transport calls upon
// receiving server preface to signal that a succefull HTTP2
// connection was established.
onPrefaceReceipt func()
maxConcurrentStreams uint32 maxConcurrentStreams uint32
streamQuota int64 streamQuota int64
streamsQuotaAvailable chan struct{} streamsQuotaAvailable chan struct{}
waitingStreams uint32 waitingStreams uint32
nextID uint32 nextID uint32
registeredCompressors string
// Do not access controlBuf with mu held. // Do not access controlBuf with mu held.
mu sync.Mutex // guard the following variables mu sync.Mutex // guard the following variables
@ -194,7 +193,7 @@ func isTemporary(err error) bool {
// newHTTP2Client constructs a connected ClientTransport to addr based on HTTP2 // newHTTP2Client constructs a connected ClientTransport to addr based on HTTP2
// and starts to receive messages on it. Non-nil error returns if construction // and starts to receive messages on it. Non-nil error returns if construction
// fails. // fails.
func newHTTP2Client(connectCtx, ctx context.Context, addr resolver.Address, opts ConnectOptions, onPrefaceReceipt func(), onGoAway func(GoAwayReason), onClose func()) (_ *http2Client, err error) { func newHTTP2Client(connectCtx, ctx context.Context, addr resolver.Address, opts ConnectOptions, onGoAway func(GoAwayReason), onClose func()) (_ *http2Client, err error) {
scheme := "http" scheme := "http"
ctx, cancel := context.WithCancel(ctx) ctx, cancel := context.WithCancel(ctx)
defer func() { defer func() {
@ -216,12 +215,35 @@ func newHTTP2Client(connectCtx, ctx context.Context, addr resolver.Address, opts
} }
return nil, connectionErrorf(true, err, "transport: Error while dialing %v", err) return nil, connectionErrorf(true, err, "transport: Error while dialing %v", err)
} }
// Any further errors will close the underlying connection // Any further errors will close the underlying connection
defer func(conn net.Conn) { defer func(conn net.Conn) {
if err != nil { if err != nil {
conn.Close() conn.Close()
} }
}(conn) }(conn)
// The following defer and goroutine monitor the connectCtx for cancelation
// and deadline. On context expiration, the connection is hard closed and
// this function will naturally fail as a result. Otherwise, the defer
// waits for the goroutine to exit to prevent the context from being
// monitored (and to prevent the connection from ever being closed) after
// returning from this function.
ctxMonitorDone := grpcsync.NewEvent()
newClientCtx, newClientDone := context.WithCancel(connectCtx)
defer func() {
newClientDone() // Awaken the goroutine below if connectCtx hasn't expired.
<-ctxMonitorDone.Done() // Wait for the goroutine below to exit.
}()
go func(conn net.Conn) {
defer ctxMonitorDone.Fire() // Signal this goroutine has exited.
<-newClientCtx.Done() // Block until connectCtx expires or the defer above executes.
if connectCtx.Err() != nil {
// connectCtx expired before exiting the function. Hard close the connection.
conn.Close()
}
}(conn)
kp := opts.KeepaliveParams kp := opts.KeepaliveParams
// Validate keepalive parameters. // Validate keepalive parameters.
if kp.Time == 0 { if kp.Time == 0 {
@ -253,15 +275,7 @@ func newHTTP2Client(connectCtx, ctx context.Context, addr resolver.Address, opts
} }
} }
if transportCreds != nil { if transportCreds != nil {
rawConn := conn conn, authInfo, err = transportCreds.ClientHandshake(connectCtx, addr.ServerName, conn)
// Pull the deadline from the connectCtx, which will be used for
// timeouts in the authentication protocol handshake. Can ignore the
// boolean as the deadline will return the zero value, which will make
// the conn not timeout on I/O operations.
deadline, _ := connectCtx.Deadline()
rawConn.SetDeadline(deadline)
conn, authInfo, err = transportCreds.ClientHandshake(connectCtx, addr.ServerName, rawConn)
rawConn.SetDeadline(time.Time{})
if err != nil { if err != nil {
return nil, connectionErrorf(isTemporary(err), err, "transport: authentication handshake failed: %v", err) return nil, connectionErrorf(isTemporary(err), err, "transport: authentication handshake failed: %v", err)
} }
@ -299,6 +313,7 @@ func newHTTP2Client(connectCtx, ctx context.Context, addr resolver.Address, opts
ctxDone: ctx.Done(), // Cache Done chan. ctxDone: ctx.Done(), // Cache Done chan.
cancel: cancel, cancel: cancel,
userAgent: opts.UserAgent, userAgent: opts.UserAgent,
registeredCompressors: grpcutil.RegisteredCompressors(),
conn: conn, conn: conn,
remoteAddr: conn.RemoteAddr(), remoteAddr: conn.RemoteAddr(),
localAddr: conn.LocalAddr(), localAddr: conn.LocalAddr(),
@ -315,16 +330,15 @@ func newHTTP2Client(connectCtx, ctx context.Context, addr resolver.Address, opts
kp: kp, kp: kp,
statsHandlers: opts.StatsHandlers, statsHandlers: opts.StatsHandlers,
initialWindowSize: initialWindowSize, initialWindowSize: initialWindowSize,
onPrefaceReceipt: onPrefaceReceipt,
nextID: 1, nextID: 1,
maxConcurrentStreams: defaultMaxStreamsClient, maxConcurrentStreams: defaultMaxStreamsClient,
streamQuota: defaultMaxStreamsClient, streamQuota: defaultMaxStreamsClient,
streamsQuotaAvailable: make(chan struct{}, 1), streamsQuotaAvailable: make(chan struct{}, 1),
czData: new(channelzData), czData: new(channelzData),
onGoAway: onGoAway, onGoAway: onGoAway,
onClose: onClose,
keepaliveEnabled: keepaliveEnabled, keepaliveEnabled: keepaliveEnabled,
bufferPool: newBufferPool(), bufferPool: newBufferPool(),
onClose: onClose,
} }
// Add peer information to the http2client context. // Add peer information to the http2client context.
t.ctx = peer.NewContext(t.ctx, t.getPeer()) t.ctx = peer.NewContext(t.ctx, t.getPeer())
@ -363,21 +377,32 @@ func newHTTP2Client(connectCtx, ctx context.Context, addr resolver.Address, opts
t.kpDormancyCond = sync.NewCond(&t.mu) t.kpDormancyCond = sync.NewCond(&t.mu)
go t.keepalive() go t.keepalive()
} }
// Start the reader goroutine for incoming message. Each transport has
// a dedicated goroutine which reads HTTP2 frame from network. Then it // Start the reader goroutine for incoming messages. Each transport has a
// dispatches the frame to the corresponding stream entity. // dedicated goroutine which reads HTTP2 frames from the network. Then it
go t.reader() // dispatches the frame to the corresponding stream entity. When the
// server preface is received, readerErrCh is closed. If an error occurs
// first, an error is pushed to the channel. This must be checked before
// returning from this function.
readerErrCh := make(chan error, 1)
go t.reader(readerErrCh)
defer func() {
if err == nil {
err = <-readerErrCh
}
if err != nil {
t.Close(err)
}
}()
// Send connection preface to server. // Send connection preface to server.
n, err := t.conn.Write(clientPreface) n, err := t.conn.Write(clientPreface)
if err != nil { if err != nil {
err = connectionErrorf(true, err, "transport: failed to write client preface: %v", err) err = connectionErrorf(true, err, "transport: failed to write client preface: %v", err)
t.Close(err)
return nil, err return nil, err
} }
if n != len(clientPreface) { if n != len(clientPreface) {
err = connectionErrorf(true, nil, "transport: preface mismatch, wrote %d bytes; want %d", n, len(clientPreface)) err = connectionErrorf(true, nil, "transport: preface mismatch, wrote %d bytes; want %d", n, len(clientPreface))
t.Close(err)
return nil, err return nil, err
} }
var ss []http2.Setting var ss []http2.Setting
@ -397,14 +422,12 @@ func newHTTP2Client(connectCtx, ctx context.Context, addr resolver.Address, opts
err = t.framer.fr.WriteSettings(ss...) err = t.framer.fr.WriteSettings(ss...)
if err != nil { if err != nil {
err = connectionErrorf(true, err, "transport: failed to write initial settings frame: %v", err) err = connectionErrorf(true, err, "transport: failed to write initial settings frame: %v", err)
t.Close(err)
return nil, err return nil, err
} }
// Adjust the connection flow control window if needed. // Adjust the connection flow control window if needed.
if delta := uint32(icwz - defaultWindowSize); delta > 0 { if delta := uint32(icwz - defaultWindowSize); delta > 0 {
if err := t.framer.fr.WriteWindowUpdate(0, delta); err != nil { if err := t.framer.fr.WriteWindowUpdate(0, delta); err != nil {
err = connectionErrorf(true, err, "transport: failed to write window update: %v", err) err = connectionErrorf(true, err, "transport: failed to write window update: %v", err)
t.Close(err)
return nil, err return nil, err
} }
} }
@ -507,9 +530,22 @@ func (t *http2Client) createHeaderFields(ctx context.Context, callHdr *CallHdr)
headerFields = append(headerFields, hpack.HeaderField{Name: "grpc-previous-rpc-attempts", Value: strconv.Itoa(callHdr.PreviousAttempts)}) headerFields = append(headerFields, hpack.HeaderField{Name: "grpc-previous-rpc-attempts", Value: strconv.Itoa(callHdr.PreviousAttempts)})
} }
registeredCompressors := t.registeredCompressors
if callHdr.SendCompress != "" { if callHdr.SendCompress != "" {
headerFields = append(headerFields, hpack.HeaderField{Name: "grpc-encoding", Value: callHdr.SendCompress}) headerFields = append(headerFields, hpack.HeaderField{Name: "grpc-encoding", Value: callHdr.SendCompress})
headerFields = append(headerFields, hpack.HeaderField{Name: "grpc-accept-encoding", Value: callHdr.SendCompress}) // Include the outgoing compressor name when compressor is not registered
// via encoding.RegisterCompressor. This is possible when client uses
// WithCompressor dial option.
if !grpcutil.IsCompressorNameRegistered(callHdr.SendCompress) {
if registeredCompressors != "" {
registeredCompressors += ","
}
registeredCompressors += callHdr.SendCompress
}
}
if registeredCompressors != "" {
headerFields = append(headerFields, hpack.HeaderField{Name: "grpc-accept-encoding", Value: registeredCompressors})
} }
if dl, ok := ctx.Deadline(); ok { if dl, ok := ctx.Deadline(); ok {
// Send out timeout regardless its value. The server can detect timeout context by itself. // Send out timeout regardless its value. The server can detect timeout context by itself.
@ -589,7 +625,11 @@ func (t *http2Client) getTrAuthData(ctx context.Context, audience string) (map[s
for _, c := range t.perRPCCreds { for _, c := range t.perRPCCreds {
data, err := c.GetRequestMetadata(ctx, audience) data, err := c.GetRequestMetadata(ctx, audience)
if err != nil { if err != nil {
if _, ok := status.FromError(err); ok { if st, ok := status.FromError(err); ok {
// Restrict the code to the list allowed by gRFC A54.
if istatus.IsRestrictedControlPlaneCode(st) {
err = status.Errorf(codes.Internal, "transport: received per-RPC creds error with illegal status: %v", err)
}
return nil, err return nil, err
} }
@ -618,7 +658,14 @@ func (t *http2Client) getCallAuthData(ctx context.Context, audience string, call
} }
data, err := callCreds.GetRequestMetadata(ctx, audience) data, err := callCreds.GetRequestMetadata(ctx, audience)
if err != nil { if err != nil {
return nil, status.Errorf(codes.Internal, "transport: %v", err) if st, ok := status.FromError(err); ok {
// Restrict the code to the list allowed by gRFC A54.
if istatus.IsRestrictedControlPlaneCode(st) {
err = status.Errorf(codes.Internal, "transport: received per-RPC creds error with illegal status: %v", err)
}
return nil, err
}
return nil, status.Errorf(codes.Internal, "transport: per-RPC creds failed due to error: %v", err)
} }
callAuthData = make(map[string]string, len(data)) callAuthData = make(map[string]string, len(data))
for k, v := range data { for k, v := range data {
@ -880,19 +927,15 @@ func (t *http2Client) closeStream(s *Stream, err error, rst bool, rstCode http2.
// Close kicks off the shutdown process of the transport. This should be called // Close kicks off the shutdown process of the transport. This should be called
// only once on a transport. Once it is called, the transport should not be // only once on a transport. Once it is called, the transport should not be
// accessed any more. // accessed any more.
//
// This method blocks until the addrConn that initiated this transport is
// re-connected. This happens because t.onClose() begins reconnect logic at the
// addrConn level and blocks until the addrConn is successfully connected.
func (t *http2Client) Close(err error) { func (t *http2Client) Close(err error) {
t.mu.Lock() t.mu.Lock()
// Make sure we only Close once. // Make sure we only close once.
if t.state == closing { if t.state == closing {
t.mu.Unlock() t.mu.Unlock()
return return
} }
// Call t.onClose before setting the state to closing to prevent the client // Call t.onClose ASAP to prevent the client from attempting to create new
// from attempting to create new streams ASAP. // streams.
t.onClose() t.onClose()
t.state = closing t.state = closing
streams := t.activeStreams streams := t.activeStreams
@ -1482,33 +1525,35 @@ func (t *http2Client) operateHeaders(frame *http2.MetaHeadersFrame) {
t.closeStream(s, io.EOF, rst, http2.ErrCodeNo, statusGen, mdata, true) t.closeStream(s, io.EOF, rst, http2.ErrCodeNo, statusGen, mdata, true)
} }
// reader runs as a separate goroutine in charge of reading data from network // readServerPreface reads and handles the initial settings frame from the
// connection. // server.
// func (t *http2Client) readServerPreface() error {
// TODO(zhaoq): currently one reader per transport. Investigate whether this is
// optimal.
// TODO(zhaoq): Check the validity of the incoming frame sequence.
func (t *http2Client) reader() {
defer close(t.readerDone)
// Check the validity of server preface.
frame, err := t.framer.fr.ReadFrame() frame, err := t.framer.fr.ReadFrame()
if err != nil { if err != nil {
err = connectionErrorf(true, err, "error reading server preface: %v", err) return connectionErrorf(true, err, "error reading server preface: %v", err)
t.Close(err) // this kicks off resetTransport, so must be last before return
return
}
t.conn.SetReadDeadline(time.Time{}) // reset deadline once we get the settings frame (we didn't time out, yay!)
if t.keepaliveEnabled {
atomic.StoreInt64(&t.lastRead, time.Now().UnixNano())
} }
sf, ok := frame.(*http2.SettingsFrame) sf, ok := frame.(*http2.SettingsFrame)
if !ok { if !ok {
// this kicks off resetTransport, so must be last before return return connectionErrorf(true, nil, "initial http2 frame from server is not a settings frame: %T", frame)
t.Close(connectionErrorf(true, nil, "initial http2 frame from server is not a settings frame: %T", frame)) }
t.handleSettings(sf, true)
return nil
}
// reader verifies the server preface and reads all subsequent data from
// network connection. If the server preface is not read successfully, an
// error is pushed to errCh; otherwise errCh is closed with no error.
func (t *http2Client) reader(errCh chan<- error) {
defer close(t.readerDone)
if err := t.readServerPreface(); err != nil {
errCh <- err
return return
} }
t.onPrefaceReceipt() close(errCh)
t.handleSettings(sf, true) if t.keepaliveEnabled {
atomic.StoreInt64(&t.lastRead, time.Now().UnixNano())
}
// loop to keep reading incoming messages on this transport. // loop to keep reading incoming messages on this transport.
for { for {

View File

@ -43,6 +43,10 @@ import (
"google.golang.org/grpc/tap" "google.golang.org/grpc/tap"
) )
// ErrNoHeaders is used as a signal that a trailers only response was received,
// and is not a real error.
var ErrNoHeaders = errors.New("stream has no headers")
const logLevel = 2 const logLevel = 2
type bufferPool struct { type bufferPool struct {
@ -366,9 +370,15 @@ func (s *Stream) Header() (metadata.MD, error) {
return s.header.Copy(), nil return s.header.Copy(), nil
} }
s.waitOnHeader() s.waitOnHeader()
if !s.headerValid { if !s.headerValid {
return nil, s.status.Err() return nil, s.status.Err()
} }
if s.noHeaders {
return nil, ErrNoHeaders
}
return s.header.Copy(), nil return s.header.Copy(), nil
} }
@ -573,8 +583,8 @@ type ConnectOptions struct {
// NewClientTransport establishes the transport with the required ConnectOptions // NewClientTransport establishes the transport with the required ConnectOptions
// and returns it to the caller. // and returns it to the caller.
func NewClientTransport(connectCtx, ctx context.Context, addr resolver.Address, opts ConnectOptions, onPrefaceReceipt func(), onGoAway func(GoAwayReason), onClose func()) (ClientTransport, error) { func NewClientTransport(connectCtx, ctx context.Context, addr resolver.Address, opts ConnectOptions, onGoAway func(GoAwayReason), onClose func()) (ClientTransport, error) {
return newHTTP2Client(connectCtx, ctx, addr, opts, onPrefaceReceipt, onGoAway, onClose) return newHTTP2Client(connectCtx, ctx, addr, opts, onGoAway, onClose)
} }
// Options provides additional hints and information for message // Options provides additional hints and information for message

View File

@ -45,6 +45,7 @@ type MD map[string][]string
// - uppercase letters: A-Z (normalized to lower) // - uppercase letters: A-Z (normalized to lower)
// - lowercase letters: a-z // - lowercase letters: a-z
// - special characters: -_. // - special characters: -_.
//
// Uppercase letters are automatically converted to lowercase. // Uppercase letters are automatically converted to lowercase.
// //
// Keys beginning with "grpc-" are reserved for grpc-internal use only and may // Keys beginning with "grpc-" are reserved for grpc-internal use only and may
@ -66,6 +67,7 @@ func New(m map[string]string) MD {
// - uppercase letters: A-Z (normalized to lower) // - uppercase letters: A-Z (normalized to lower)
// - lowercase letters: a-z // - lowercase letters: a-z
// - special characters: -_. // - special characters: -_.
//
// Uppercase letters are automatically converted to lowercase. // Uppercase letters are automatically converted to lowercase.
// //
// Keys beginning with "grpc-" are reserved for grpc-internal use only and may // Keys beginning with "grpc-" are reserved for grpc-internal use only and may
@ -196,7 +198,7 @@ func FromIncomingContext(ctx context.Context) (MD, bool) {
// ValueFromIncomingContext returns the metadata value corresponding to the metadata // ValueFromIncomingContext returns the metadata value corresponding to the metadata
// key from the incoming metadata if it exists. Key must be lower-case. // key from the incoming metadata if it exists. Key must be lower-case.
// //
// Experimental // # Experimental
// //
// Notice: This API is EXPERIMENTAL and may be changed or removed in a // Notice: This API is EXPERIMENTAL and may be changed or removed in a
// later release. // later release.

View File

@ -26,6 +26,7 @@ import (
"google.golang.org/grpc/balancer" "google.golang.org/grpc/balancer"
"google.golang.org/grpc/codes" "google.golang.org/grpc/codes"
"google.golang.org/grpc/internal/channelz" "google.golang.org/grpc/internal/channelz"
istatus "google.golang.org/grpc/internal/status"
"google.golang.org/grpc/internal/transport" "google.golang.org/grpc/internal/transport"
"google.golang.org/grpc/status" "google.golang.org/grpc/status"
) )
@ -129,8 +130,12 @@ func (pw *pickerWrapper) pick(ctx context.Context, failfast bool, info balancer.
if err == balancer.ErrNoSubConnAvailable { if err == balancer.ErrNoSubConnAvailable {
continue continue
} }
if _, ok := status.FromError(err); ok { if st, ok := status.FromError(err); ok {
// Status error: end the RPC unconditionally with this status. // Status error: end the RPC unconditionally with this status.
// First restrict the code to the list allowed by gRFC A54.
if istatus.IsRestrictedControlPlaneCode(st) {
err = status.Errorf(codes.Internal, "received picker error with illegal status: %v", err)
}
return nil, nil, dropError{error: err} return nil, nil, dropError{error: err}
} }
// For all other errors, wait for ready RPCs should block and other // For all other errors, wait for ready RPCs should block and other

View File

@ -25,7 +25,7 @@ import (
// PreparedMsg is responsible for creating a Marshalled and Compressed object. // PreparedMsg is responsible for creating a Marshalled and Compressed object.
// //
// Experimental // # Experimental
// //
// Notice: This type is EXPERIMENTAL and may be changed or removed in a // Notice: This type is EXPERIMENTAL and may be changed or removed in a
// later release. // later release.

View File

@ -53,6 +53,7 @@ type ServerReflectionRequest struct {
// defined field and then handles them using corresponding methods. // defined field and then handles them using corresponding methods.
// //
// Types that are assignable to MessageRequest: // Types that are assignable to MessageRequest:
//
// *ServerReflectionRequest_FileByFilename // *ServerReflectionRequest_FileByFilename
// *ServerReflectionRequest_FileContainingSymbol // *ServerReflectionRequest_FileContainingSymbol
// *ServerReflectionRequest_FileContainingExtension // *ServerReflectionRequest_FileContainingExtension
@ -263,6 +264,7 @@ type ServerReflectionResponse struct {
// message_request in the request. // message_request in the request.
// //
// Types that are assignable to MessageResponse: // Types that are assignable to MessageResponse:
//
// *ServerReflectionResponse_FileDescriptorResponse // *ServerReflectionResponse_FileDescriptorResponse
// *ServerReflectionResponse_AllExtensionNumbersResponse // *ServerReflectionResponse_AllExtensionNumbersResponse
// *ServerReflectionResponse_ListServicesResponse // *ServerReflectionResponse_ListServicesResponse

View File

@ -23,6 +23,7 @@ The service implemented is defined in:
https://github.com/grpc/grpc/blob/master/src/proto/grpc/reflection/v1alpha/reflection.proto. https://github.com/grpc/grpc/blob/master/src/proto/grpc/reflection/v1alpha/reflection.proto.
To register server reflection on a gRPC server: To register server reflection on a gRPC server:
import "google.golang.org/grpc/reflection" import "google.golang.org/grpc/reflection"
s := grpc.NewServer() s := grpc.NewServer()
@ -32,7 +33,6 @@ To register server reflection on a gRPC server:
reflection.Register(s) reflection.Register(s)
s.Serve(lis) s.Serve(lis)
*/ */
package reflection // import "google.golang.org/grpc/reflection" package reflection // import "google.golang.org/grpc/reflection"
@ -74,7 +74,7 @@ func Register(s GRPCServer) {
// for a custom implementation to return zero values for the // for a custom implementation to return zero values for the
// grpc.ServiceInfo values in the map. // grpc.ServiceInfo values in the map.
// //
// Experimental // # Experimental
// //
// Notice: This type is EXPERIMENTAL and may be changed or removed in a // Notice: This type is EXPERIMENTAL and may be changed or removed in a
// later release. // later release.
@ -85,7 +85,7 @@ type ServiceInfoProvider interface {
// ExtensionResolver is the interface used to query details about extensions. // ExtensionResolver is the interface used to query details about extensions.
// This interface is satisfied by protoregistry.GlobalTypes. // This interface is satisfied by protoregistry.GlobalTypes.
// //
// Experimental // # Experimental
// //
// Notice: This type is EXPERIMENTAL and may be changed or removed in a // Notice: This type is EXPERIMENTAL and may be changed or removed in a
// later release. // later release.
@ -96,7 +96,7 @@ type ExtensionResolver interface {
// ServerOptions represents the options used to construct a reflection server. // ServerOptions represents the options used to construct a reflection server.
// //
// Experimental // # Experimental
// //
// Notice: This type is EXPERIMENTAL and may be changed or removed in a // Notice: This type is EXPERIMENTAL and may be changed or removed in a
// later release. // later release.
@ -120,7 +120,7 @@ type ServerOptions struct {
// This can be used to customize behavior of the reflection service. Most usages // This can be used to customize behavior of the reflection service. Most usages
// should prefer to use Register instead. // should prefer to use Register instead.
// //
// Experimental // # Experimental
// //
// Notice: This function is EXPERIMENTAL and may be changed or removed in a // Notice: This function is EXPERIMENTAL and may be changed or removed in a
// later release. // later release.

View File

@ -96,7 +96,7 @@ const (
// Address represents a server the client connects to. // Address represents a server the client connects to.
// //
// Experimental // # Experimental
// //
// Notice: This type is EXPERIMENTAL and may be changed or removed in a // Notice: This type is EXPERIMENTAL and may be changed or removed in a
// later release. // later release.

View File

@ -198,7 +198,7 @@ func Header(md *metadata.MD) CallOption {
// HeaderCallOption is a CallOption for collecting response header metadata. // HeaderCallOption is a CallOption for collecting response header metadata.
// The metadata field will be populated *after* the RPC completes. // The metadata field will be populated *after* the RPC completes.
// //
// Experimental // # Experimental
// //
// Notice: This type is EXPERIMENTAL and may be changed or removed in a // Notice: This type is EXPERIMENTAL and may be changed or removed in a
// later release. // later release.
@ -220,7 +220,7 @@ func Trailer(md *metadata.MD) CallOption {
// TrailerCallOption is a CallOption for collecting response trailer metadata. // TrailerCallOption is a CallOption for collecting response trailer metadata.
// The metadata field will be populated *after* the RPC completes. // The metadata field will be populated *after* the RPC completes.
// //
// Experimental // # Experimental
// //
// Notice: This type is EXPERIMENTAL and may be changed or removed in a // Notice: This type is EXPERIMENTAL and may be changed or removed in a
// later release. // later release.
@ -242,7 +242,7 @@ func Peer(p *peer.Peer) CallOption {
// PeerCallOption is a CallOption for collecting the identity of the remote // PeerCallOption is a CallOption for collecting the identity of the remote
// peer. The peer field will be populated *after* the RPC completes. // peer. The peer field will be populated *after* the RPC completes.
// //
// Experimental // # Experimental
// //
// Notice: This type is EXPERIMENTAL and may be changed or removed in a // Notice: This type is EXPERIMENTAL and may be changed or removed in a
// later release. // later release.
@ -282,7 +282,7 @@ func FailFast(failFast bool) CallOption {
// FailFastCallOption is a CallOption for indicating whether an RPC should fail // FailFastCallOption is a CallOption for indicating whether an RPC should fail
// fast or not. // fast or not.
// //
// Experimental // # Experimental
// //
// Notice: This type is EXPERIMENTAL and may be changed or removed in a // Notice: This type is EXPERIMENTAL and may be changed or removed in a
// later release. // later release.
@ -305,7 +305,7 @@ func MaxCallRecvMsgSize(bytes int) CallOption {
// MaxRecvMsgSizeCallOption is a CallOption that indicates the maximum message // MaxRecvMsgSizeCallOption is a CallOption that indicates the maximum message
// size in bytes the client can receive. // size in bytes the client can receive.
// //
// Experimental // # Experimental
// //
// Notice: This type is EXPERIMENTAL and may be changed or removed in a // Notice: This type is EXPERIMENTAL and may be changed or removed in a
// later release. // later release.
@ -328,7 +328,7 @@ func MaxCallSendMsgSize(bytes int) CallOption {
// MaxSendMsgSizeCallOption is a CallOption that indicates the maximum message // MaxSendMsgSizeCallOption is a CallOption that indicates the maximum message
// size in bytes the client can send. // size in bytes the client can send.
// //
// Experimental // # Experimental
// //
// Notice: This type is EXPERIMENTAL and may be changed or removed in a // Notice: This type is EXPERIMENTAL and may be changed or removed in a
// later release. // later release.
@ -351,7 +351,7 @@ func PerRPCCredentials(creds credentials.PerRPCCredentials) CallOption {
// PerRPCCredsCallOption is a CallOption that indicates the per-RPC // PerRPCCredsCallOption is a CallOption that indicates the per-RPC
// credentials to use for the call. // credentials to use for the call.
// //
// Experimental // # Experimental
// //
// Notice: This type is EXPERIMENTAL and may be changed or removed in a // Notice: This type is EXPERIMENTAL and may be changed or removed in a
// later release. // later release.
@ -369,7 +369,7 @@ func (o PerRPCCredsCallOption) after(c *callInfo, attempt *csAttempt) {}
// sending the request. If WithCompressor is also set, UseCompressor has // sending the request. If WithCompressor is also set, UseCompressor has
// higher priority. // higher priority.
// //
// Experimental // # Experimental
// //
// Notice: This API is EXPERIMENTAL and may be changed or removed in a // Notice: This API is EXPERIMENTAL and may be changed or removed in a
// later release. // later release.
@ -379,7 +379,7 @@ func UseCompressor(name string) CallOption {
// CompressorCallOption is a CallOption that indicates the compressor to use. // CompressorCallOption is a CallOption that indicates the compressor to use.
// //
// Experimental // # Experimental
// //
// Notice: This type is EXPERIMENTAL and may be changed or removed in a // Notice: This type is EXPERIMENTAL and may be changed or removed in a
// later release. // later release.
@ -416,7 +416,7 @@ func CallContentSubtype(contentSubtype string) CallOption {
// ContentSubtypeCallOption is a CallOption that indicates the content-subtype // ContentSubtypeCallOption is a CallOption that indicates the content-subtype
// used for marshaling messages. // used for marshaling messages.
// //
// Experimental // # Experimental
// //
// Notice: This type is EXPERIMENTAL and may be changed or removed in a // Notice: This type is EXPERIMENTAL and may be changed or removed in a
// later release. // later release.
@ -444,7 +444,7 @@ func (o ContentSubtypeCallOption) after(c *callInfo, attempt *csAttempt) {}
// This function is provided for advanced users; prefer to use only // This function is provided for advanced users; prefer to use only
// CallContentSubtype to select a registered codec instead. // CallContentSubtype to select a registered codec instead.
// //
// Experimental // # Experimental
// //
// Notice: This API is EXPERIMENTAL and may be changed or removed in a // Notice: This API is EXPERIMENTAL and may be changed or removed in a
// later release. // later release.
@ -455,7 +455,7 @@ func ForceCodec(codec encoding.Codec) CallOption {
// ForceCodecCallOption is a CallOption that indicates the codec used for // ForceCodecCallOption is a CallOption that indicates the codec used for
// marshaling messages. // marshaling messages.
// //
// Experimental // # Experimental
// //
// Notice: This type is EXPERIMENTAL and may be changed or removed in a // Notice: This type is EXPERIMENTAL and may be changed or removed in a
// later release. // later release.
@ -480,7 +480,7 @@ func CallCustomCodec(codec Codec) CallOption {
// CustomCodecCallOption is a CallOption that indicates the codec used for // CustomCodecCallOption is a CallOption that indicates the codec used for
// marshaling messages. // marshaling messages.
// //
// Experimental // # Experimental
// //
// Notice: This type is EXPERIMENTAL and may be changed or removed in a // Notice: This type is EXPERIMENTAL and may be changed or removed in a
// later release. // later release.
@ -497,7 +497,7 @@ func (o CustomCodecCallOption) after(c *callInfo, attempt *csAttempt) {}
// MaxRetryRPCBufferSize returns a CallOption that limits the amount of memory // MaxRetryRPCBufferSize returns a CallOption that limits the amount of memory
// used for buffering this RPC's requests for retry purposes. // used for buffering this RPC's requests for retry purposes.
// //
// Experimental // # Experimental
// //
// Notice: This API is EXPERIMENTAL and may be changed or removed in a // Notice: This API is EXPERIMENTAL and may be changed or removed in a
// later release. // later release.
@ -508,7 +508,7 @@ func MaxRetryRPCBufferSize(bytes int) CallOption {
// MaxRetryRPCBufferSizeCallOption is a CallOption indicating the amount of // MaxRetryRPCBufferSizeCallOption is a CallOption indicating the amount of
// memory to be used for caching this RPC for retry purposes. // memory to be used for caching this RPC for retry purposes.
// //
// Experimental // # Experimental
// //
// Notice: This type is EXPERIMENTAL and may be changed or removed in a // Notice: This type is EXPERIMENTAL and may be changed or removed in a
// later release. // later release.
@ -548,10 +548,11 @@ type parser struct {
// format. The caller owns the returned msg memory. // format. The caller owns the returned msg memory.
// //
// If there is an error, possible values are: // If there is an error, possible values are:
// * io.EOF, when no messages remain // - io.EOF, when no messages remain
// * io.ErrUnexpectedEOF // - io.ErrUnexpectedEOF
// * of type transport.ConnectionError // - of type transport.ConnectionError
// * an error from the status package // - an error from the status package
//
// No other error values or types must be returned, which also means // No other error values or types must be returned, which also means
// that the underlying io.Reader must not return an incompatible // that the underlying io.Reader must not return an incompatible
// error. // error.

View File

@ -19,7 +19,7 @@
// Package serviceconfig defines types and methods for operating on gRPC // Package serviceconfig defines types and methods for operating on gRPC
// service configs. // service configs.
// //
// Experimental // # Experimental
// //
// Notice: This package is EXPERIMENTAL and may be changed or removed in a // Notice: This package is EXPERIMENTAL and may be changed or removed in a
// later release. // later release.

View File

@ -39,6 +39,7 @@ import (
imetadata "google.golang.org/grpc/internal/metadata" imetadata "google.golang.org/grpc/internal/metadata"
iresolver "google.golang.org/grpc/internal/resolver" iresolver "google.golang.org/grpc/internal/resolver"
"google.golang.org/grpc/internal/serviceconfig" "google.golang.org/grpc/internal/serviceconfig"
istatus "google.golang.org/grpc/internal/status"
"google.golang.org/grpc/internal/transport" "google.golang.org/grpc/internal/transport"
"google.golang.org/grpc/metadata" "google.golang.org/grpc/metadata"
"google.golang.org/grpc/peer" "google.golang.org/grpc/peer"
@ -195,6 +196,13 @@ func newClientStream(ctx context.Context, desc *StreamDesc, cc *ClientConn, meth
rpcInfo := iresolver.RPCInfo{Context: ctx, Method: method} rpcInfo := iresolver.RPCInfo{Context: ctx, Method: method}
rpcConfig, err := cc.safeConfigSelector.SelectConfig(rpcInfo) rpcConfig, err := cc.safeConfigSelector.SelectConfig(rpcInfo)
if err != nil { if err != nil {
if st, ok := status.FromError(err); ok {
// Restrict the code to the list allowed by gRFC A54.
if istatus.IsRestrictedControlPlaneCode(st) {
err = status.Errorf(codes.Internal, "config selector returned illegal status: %v", err)
}
return nil, err
}
return nil, toRPCErr(err) return nil, toRPCErr(err)
} }
@ -744,17 +752,25 @@ func (cs *clientStream) withRetry(op func(a *csAttempt) error, onSuccess func())
func (cs *clientStream) Header() (metadata.MD, error) { func (cs *clientStream) Header() (metadata.MD, error) {
var m metadata.MD var m metadata.MD
noHeader := false
err := cs.withRetry(func(a *csAttempt) error { err := cs.withRetry(func(a *csAttempt) error {
var err error var err error
m, err = a.s.Header() m, err = a.s.Header()
if err == transport.ErrNoHeaders {
noHeader = true
return nil
}
return toRPCErr(err) return toRPCErr(err)
}, cs.commitAttemptLocked) }, cs.commitAttemptLocked)
if err != nil { if err != nil {
cs.finish(err) cs.finish(err)
return nil, err return nil, err
} }
if len(cs.binlogs) != 0 && !cs.serverHeaderBinlogged {
// Only log if binary log is on and header has not been logged. if len(cs.binlogs) != 0 && !cs.serverHeaderBinlogged && !noHeader {
// Only log if binary log is on and header has not been logged, and
// there is actually headers to log.
logEntry := &binarylog.ServerHeader{ logEntry := &binarylog.ServerHeader{
OnClientSide: true, OnClientSide: true,
Header: m, Header: m,

View File

@ -19,7 +19,7 @@
// Package tap defines the function handles which are executed on the transport // Package tap defines the function handles which are executed on the transport
// layer of gRPC-Go and related information. // layer of gRPC-Go and related information.
// //
// Experimental // # Experimental
// //
// Notice: This API is EXPERIMENTAL and may be changed or removed in a // Notice: This API is EXPERIMENTAL and may be changed or removed in a
// later release. // later release.

View File

@ -19,4 +19,4 @@
package grpc package grpc
// Version is the current grpc version. // Version is the current grpc version.
const Version = "1.50.1" const Version = "1.51.0"

View File

@ -67,7 +67,9 @@ elif [[ "$#" -ne 0 ]]; then
fi fi
# - Ensure all source files contain a copyright message. # - Ensure all source files contain a copyright message.
not git grep -L "\(Copyright [0-9]\{4,\} gRPC authors\)\|DO NOT EDIT" -- '*.go' # (Done in two parts because Darwin "git grep" has broken support for compound
# exclusion matches.)
(grep -L "DO NOT EDIT" $(git grep -L "\(Copyright [0-9]\{4,\} gRPC authors\)" -- '*.go') || true) | fail_on_output
# - Make sure all tests in grpc and grpc/test use leakcheck via Teardown. # - Make sure all tests in grpc and grpc/test use leakcheck via Teardown.
not grep 'func Test[^(]' *_test.go not grep 'func Test[^(]' *_test.go
@ -81,7 +83,7 @@ not git grep -l 'x/net/context' -- "*.go"
git grep -l '"math/rand"' -- "*.go" 2>&1 | not grep -v '^examples\|^stress\|grpcrand\|^benchmark\|wrr_test' git grep -l '"math/rand"' -- "*.go" 2>&1 | not grep -v '^examples\|^stress\|grpcrand\|^benchmark\|wrr_test'
# - Do not call grpclog directly. Use grpclog.Component instead. # - Do not call grpclog directly. Use grpclog.Component instead.
git grep -l 'grpclog.I\|grpclog.W\|grpclog.E\|grpclog.F\|grpclog.V' -- "*.go" | not grep -v '^grpclog/component.go\|^internal/grpctest/tlogger_test.go' git grep -l -e 'grpclog.I' --or -e 'grpclog.W' --or -e 'grpclog.E' --or -e 'grpclog.F' --or -e 'grpclog.V' -- "*.go" | not grep -v '^grpclog/component.go\|^internal/grpctest/tlogger_test.go'
# - Ensure all ptypes proto packages are renamed when importing. # - Ensure all ptypes proto packages are renamed when importing.
not git grep "\(import \|^\s*\)\"github.com/golang/protobuf/ptypes/" -- "*.go" not git grep "\(import \|^\s*\)\"github.com/golang/protobuf/ptypes/" -- "*.go"

2
vendor/modules.txt vendored
View File

@ -693,7 +693,7 @@ google.golang.org/appengine/urlfetch
google.golang.org/genproto/googleapis/api/httpbody google.golang.org/genproto/googleapis/api/httpbody
google.golang.org/genproto/googleapis/rpc/status google.golang.org/genproto/googleapis/rpc/status
google.golang.org/genproto/protobuf/field_mask google.golang.org/genproto/protobuf/field_mask
# google.golang.org/grpc v1.50.1 # google.golang.org/grpc v1.51.0
## explicit; go 1.17 ## explicit; go 1.17
google.golang.org/grpc google.golang.org/grpc
google.golang.org/grpc/attributes google.golang.org/grpc/attributes