rebase: bump google.golang.org/grpc from 1.52.3 to 1.53.0

Bumps [google.golang.org/grpc](https://github.com/grpc/grpc-go) from 1.52.3 to 1.53.0.
- [Release notes](https://github.com/grpc/grpc-go/releases)
- [Commits](https://github.com/grpc/grpc-go/compare/v1.52.3...v1.53.0)

---
updated-dependencies:
- dependency-name: google.golang.org/grpc
  dependency-type: direct:production
  update-type: version-update:semver-minor
...

Signed-off-by: dependabot[bot] <support@github.com>
This commit is contained in:
dependabot[bot]
2023-02-13 21:00:11 +00:00
committed by mergify[bot]
parent db8320ce51
commit d09eae2efa
43 changed files with 971 additions and 623 deletions

View File

@ -26,7 +26,7 @@ import (
"github.com/golang/protobuf/proto"
"github.com/golang/protobuf/ptypes"
pb "google.golang.org/grpc/binarylog/grpc_binarylog_v1"
binlogpb "google.golang.org/grpc/binarylog/grpc_binarylog_v1"
"google.golang.org/grpc/metadata"
"google.golang.org/grpc/status"
)
@ -79,7 +79,7 @@ func NewTruncatingMethodLogger(h, m uint64) *TruncatingMethodLogger {
// Build is an internal only method for building the proto message out of the
// input event. It's made public to enable other library to reuse as much logic
// in TruncatingMethodLogger as possible.
func (ml *TruncatingMethodLogger) Build(c LogEntryConfig) *pb.GrpcLogEntry {
func (ml *TruncatingMethodLogger) Build(c LogEntryConfig) *binlogpb.GrpcLogEntry {
m := c.toProto()
timestamp, _ := ptypes.TimestampProto(time.Now())
m.Timestamp = timestamp
@ -87,11 +87,11 @@ func (ml *TruncatingMethodLogger) Build(c LogEntryConfig) *pb.GrpcLogEntry {
m.SequenceIdWithinCall = ml.idWithinCallGen.next()
switch pay := m.Payload.(type) {
case *pb.GrpcLogEntry_ClientHeader:
case *binlogpb.GrpcLogEntry_ClientHeader:
m.PayloadTruncated = ml.truncateMetadata(pay.ClientHeader.GetMetadata())
case *pb.GrpcLogEntry_ServerHeader:
case *binlogpb.GrpcLogEntry_ServerHeader:
m.PayloadTruncated = ml.truncateMetadata(pay.ServerHeader.GetMetadata())
case *pb.GrpcLogEntry_Message:
case *binlogpb.GrpcLogEntry_Message:
m.PayloadTruncated = ml.truncateMessage(pay.Message)
}
return m
@ -102,7 +102,7 @@ func (ml *TruncatingMethodLogger) Log(c LogEntryConfig) {
ml.sink.Write(ml.Build(c))
}
func (ml *TruncatingMethodLogger) truncateMetadata(mdPb *pb.Metadata) (truncated bool) {
func (ml *TruncatingMethodLogger) truncateMetadata(mdPb *binlogpb.Metadata) (truncated bool) {
if ml.headerMaxLen == maxUInt {
return false
}
@ -132,7 +132,7 @@ func (ml *TruncatingMethodLogger) truncateMetadata(mdPb *pb.Metadata) (truncated
return truncated
}
func (ml *TruncatingMethodLogger) truncateMessage(msgPb *pb.Message) (truncated bool) {
func (ml *TruncatingMethodLogger) truncateMessage(msgPb *binlogpb.Message) (truncated bool) {
if ml.messageMaxLen == maxUInt {
return false
}
@ -145,7 +145,7 @@ func (ml *TruncatingMethodLogger) truncateMessage(msgPb *pb.Message) (truncated
// LogEntryConfig represents the configuration for binary log entry.
type LogEntryConfig interface {
toProto() *pb.GrpcLogEntry
toProto() *binlogpb.GrpcLogEntry
}
// ClientHeader configs the binary log entry to be a ClientHeader entry.
@ -159,10 +159,10 @@ type ClientHeader struct {
PeerAddr net.Addr
}
func (c *ClientHeader) toProto() *pb.GrpcLogEntry {
func (c *ClientHeader) toProto() *binlogpb.GrpcLogEntry {
// This function doesn't need to set all the fields (e.g. seq ID). The Log
// function will set the fields when necessary.
clientHeader := &pb.ClientHeader{
clientHeader := &binlogpb.ClientHeader{
Metadata: mdToMetadataProto(c.Header),
MethodName: c.MethodName,
Authority: c.Authority,
@ -170,16 +170,16 @@ func (c *ClientHeader) toProto() *pb.GrpcLogEntry {
if c.Timeout > 0 {
clientHeader.Timeout = ptypes.DurationProto(c.Timeout)
}
ret := &pb.GrpcLogEntry{
Type: pb.GrpcLogEntry_EVENT_TYPE_CLIENT_HEADER,
Payload: &pb.GrpcLogEntry_ClientHeader{
ret := &binlogpb.GrpcLogEntry{
Type: binlogpb.GrpcLogEntry_EVENT_TYPE_CLIENT_HEADER,
Payload: &binlogpb.GrpcLogEntry_ClientHeader{
ClientHeader: clientHeader,
},
}
if c.OnClientSide {
ret.Logger = pb.GrpcLogEntry_LOGGER_CLIENT
ret.Logger = binlogpb.GrpcLogEntry_LOGGER_CLIENT
} else {
ret.Logger = pb.GrpcLogEntry_LOGGER_SERVER
ret.Logger = binlogpb.GrpcLogEntry_LOGGER_SERVER
}
if c.PeerAddr != nil {
ret.Peer = addrToProto(c.PeerAddr)
@ -195,19 +195,19 @@ type ServerHeader struct {
PeerAddr net.Addr
}
func (c *ServerHeader) toProto() *pb.GrpcLogEntry {
ret := &pb.GrpcLogEntry{
Type: pb.GrpcLogEntry_EVENT_TYPE_SERVER_HEADER,
Payload: &pb.GrpcLogEntry_ServerHeader{
ServerHeader: &pb.ServerHeader{
func (c *ServerHeader) toProto() *binlogpb.GrpcLogEntry {
ret := &binlogpb.GrpcLogEntry{
Type: binlogpb.GrpcLogEntry_EVENT_TYPE_SERVER_HEADER,
Payload: &binlogpb.GrpcLogEntry_ServerHeader{
ServerHeader: &binlogpb.ServerHeader{
Metadata: mdToMetadataProto(c.Header),
},
},
}
if c.OnClientSide {
ret.Logger = pb.GrpcLogEntry_LOGGER_CLIENT
ret.Logger = binlogpb.GrpcLogEntry_LOGGER_CLIENT
} else {
ret.Logger = pb.GrpcLogEntry_LOGGER_SERVER
ret.Logger = binlogpb.GrpcLogEntry_LOGGER_SERVER
}
if c.PeerAddr != nil {
ret.Peer = addrToProto(c.PeerAddr)
@ -223,7 +223,7 @@ type ClientMessage struct {
Message interface{}
}
func (c *ClientMessage) toProto() *pb.GrpcLogEntry {
func (c *ClientMessage) toProto() *binlogpb.GrpcLogEntry {
var (
data []byte
err error
@ -238,19 +238,19 @@ func (c *ClientMessage) toProto() *pb.GrpcLogEntry {
} else {
grpclogLogger.Infof("binarylogging: message to log is neither proto.message nor []byte")
}
ret := &pb.GrpcLogEntry{
Type: pb.GrpcLogEntry_EVENT_TYPE_CLIENT_MESSAGE,
Payload: &pb.GrpcLogEntry_Message{
Message: &pb.Message{
ret := &binlogpb.GrpcLogEntry{
Type: binlogpb.GrpcLogEntry_EVENT_TYPE_CLIENT_MESSAGE,
Payload: &binlogpb.GrpcLogEntry_Message{
Message: &binlogpb.Message{
Length: uint32(len(data)),
Data: data,
},
},
}
if c.OnClientSide {
ret.Logger = pb.GrpcLogEntry_LOGGER_CLIENT
ret.Logger = binlogpb.GrpcLogEntry_LOGGER_CLIENT
} else {
ret.Logger = pb.GrpcLogEntry_LOGGER_SERVER
ret.Logger = binlogpb.GrpcLogEntry_LOGGER_SERVER
}
return ret
}
@ -263,7 +263,7 @@ type ServerMessage struct {
Message interface{}
}
func (c *ServerMessage) toProto() *pb.GrpcLogEntry {
func (c *ServerMessage) toProto() *binlogpb.GrpcLogEntry {
var (
data []byte
err error
@ -278,19 +278,19 @@ func (c *ServerMessage) toProto() *pb.GrpcLogEntry {
} else {
grpclogLogger.Infof("binarylogging: message to log is neither proto.message nor []byte")
}
ret := &pb.GrpcLogEntry{
Type: pb.GrpcLogEntry_EVENT_TYPE_SERVER_MESSAGE,
Payload: &pb.GrpcLogEntry_Message{
Message: &pb.Message{
ret := &binlogpb.GrpcLogEntry{
Type: binlogpb.GrpcLogEntry_EVENT_TYPE_SERVER_MESSAGE,
Payload: &binlogpb.GrpcLogEntry_Message{
Message: &binlogpb.Message{
Length: uint32(len(data)),
Data: data,
},
},
}
if c.OnClientSide {
ret.Logger = pb.GrpcLogEntry_LOGGER_CLIENT
ret.Logger = binlogpb.GrpcLogEntry_LOGGER_CLIENT
} else {
ret.Logger = pb.GrpcLogEntry_LOGGER_SERVER
ret.Logger = binlogpb.GrpcLogEntry_LOGGER_SERVER
}
return ret
}
@ -300,15 +300,15 @@ type ClientHalfClose struct {
OnClientSide bool
}
func (c *ClientHalfClose) toProto() *pb.GrpcLogEntry {
ret := &pb.GrpcLogEntry{
Type: pb.GrpcLogEntry_EVENT_TYPE_CLIENT_HALF_CLOSE,
func (c *ClientHalfClose) toProto() *binlogpb.GrpcLogEntry {
ret := &binlogpb.GrpcLogEntry{
Type: binlogpb.GrpcLogEntry_EVENT_TYPE_CLIENT_HALF_CLOSE,
Payload: nil, // No payload here.
}
if c.OnClientSide {
ret.Logger = pb.GrpcLogEntry_LOGGER_CLIENT
ret.Logger = binlogpb.GrpcLogEntry_LOGGER_CLIENT
} else {
ret.Logger = pb.GrpcLogEntry_LOGGER_SERVER
ret.Logger = binlogpb.GrpcLogEntry_LOGGER_SERVER
}
return ret
}
@ -324,7 +324,7 @@ type ServerTrailer struct {
PeerAddr net.Addr
}
func (c *ServerTrailer) toProto() *pb.GrpcLogEntry {
func (c *ServerTrailer) toProto() *binlogpb.GrpcLogEntry {
st, ok := status.FromError(c.Err)
if !ok {
grpclogLogger.Info("binarylogging: error in trailer is not a status error")
@ -340,10 +340,10 @@ func (c *ServerTrailer) toProto() *pb.GrpcLogEntry {
grpclogLogger.Infof("binarylogging: failed to marshal status proto: %v", err)
}
}
ret := &pb.GrpcLogEntry{
Type: pb.GrpcLogEntry_EVENT_TYPE_SERVER_TRAILER,
Payload: &pb.GrpcLogEntry_Trailer{
Trailer: &pb.Trailer{
ret := &binlogpb.GrpcLogEntry{
Type: binlogpb.GrpcLogEntry_EVENT_TYPE_SERVER_TRAILER,
Payload: &binlogpb.GrpcLogEntry_Trailer{
Trailer: &binlogpb.Trailer{
Metadata: mdToMetadataProto(c.Trailer),
StatusCode: uint32(st.Code()),
StatusMessage: st.Message(),
@ -352,9 +352,9 @@ func (c *ServerTrailer) toProto() *pb.GrpcLogEntry {
},
}
if c.OnClientSide {
ret.Logger = pb.GrpcLogEntry_LOGGER_CLIENT
ret.Logger = binlogpb.GrpcLogEntry_LOGGER_CLIENT
} else {
ret.Logger = pb.GrpcLogEntry_LOGGER_SERVER
ret.Logger = binlogpb.GrpcLogEntry_LOGGER_SERVER
}
if c.PeerAddr != nil {
ret.Peer = addrToProto(c.PeerAddr)
@ -367,15 +367,15 @@ type Cancel struct {
OnClientSide bool
}
func (c *Cancel) toProto() *pb.GrpcLogEntry {
ret := &pb.GrpcLogEntry{
Type: pb.GrpcLogEntry_EVENT_TYPE_CANCEL,
func (c *Cancel) toProto() *binlogpb.GrpcLogEntry {
ret := &binlogpb.GrpcLogEntry{
Type: binlogpb.GrpcLogEntry_EVENT_TYPE_CANCEL,
Payload: nil,
}
if c.OnClientSide {
ret.Logger = pb.GrpcLogEntry_LOGGER_CLIENT
ret.Logger = binlogpb.GrpcLogEntry_LOGGER_CLIENT
} else {
ret.Logger = pb.GrpcLogEntry_LOGGER_SERVER
ret.Logger = binlogpb.GrpcLogEntry_LOGGER_SERVER
}
return ret
}
@ -392,15 +392,15 @@ func metadataKeyOmit(key string) bool {
return strings.HasPrefix(key, "grpc-")
}
func mdToMetadataProto(md metadata.MD) *pb.Metadata {
ret := &pb.Metadata{}
func mdToMetadataProto(md metadata.MD) *binlogpb.Metadata {
ret := &binlogpb.Metadata{}
for k, vv := range md {
if metadataKeyOmit(k) {
continue
}
for _, v := range vv {
ret.Entry = append(ret.Entry,
&pb.MetadataEntry{
&binlogpb.MetadataEntry{
Key: k,
Value: []byte(v),
},
@ -410,26 +410,26 @@ func mdToMetadataProto(md metadata.MD) *pb.Metadata {
return ret
}
func addrToProto(addr net.Addr) *pb.Address {
ret := &pb.Address{}
func addrToProto(addr net.Addr) *binlogpb.Address {
ret := &binlogpb.Address{}
switch a := addr.(type) {
case *net.TCPAddr:
if a.IP.To4() != nil {
ret.Type = pb.Address_TYPE_IPV4
ret.Type = binlogpb.Address_TYPE_IPV4
} else if a.IP.To16() != nil {
ret.Type = pb.Address_TYPE_IPV6
ret.Type = binlogpb.Address_TYPE_IPV6
} else {
ret.Type = pb.Address_TYPE_UNKNOWN
ret.Type = binlogpb.Address_TYPE_UNKNOWN
// Do not set address and port fields.
break
}
ret.Address = a.IP.String()
ret.IpPort = uint32(a.Port)
case *net.UnixAddr:
ret.Type = pb.Address_TYPE_UNIX
ret.Type = binlogpb.Address_TYPE_UNIX
ret.Address = a.String()
default:
ret.Type = pb.Address_TYPE_UNKNOWN
ret.Type = binlogpb.Address_TYPE_UNKNOWN
}
return ret
}

View File

@ -26,7 +26,7 @@ import (
"time"
"github.com/golang/protobuf/proto"
pb "google.golang.org/grpc/binarylog/grpc_binarylog_v1"
binlogpb "google.golang.org/grpc/binarylog/grpc_binarylog_v1"
)
var (
@ -42,15 +42,15 @@ type Sink interface {
// Write will be called to write the log entry into the sink.
//
// It should be thread-safe so it can be called in parallel.
Write(*pb.GrpcLogEntry) error
Write(*binlogpb.GrpcLogEntry) error
// Close will be called when the Sink is replaced by a new Sink.
Close() error
}
type noopSink struct{}
func (ns *noopSink) Write(*pb.GrpcLogEntry) error { return nil }
func (ns *noopSink) Close() error { return nil }
func (ns *noopSink) Write(*binlogpb.GrpcLogEntry) error { return nil }
func (ns *noopSink) Close() error { return nil }
// newWriterSink creates a binary log sink with the given writer.
//
@ -66,7 +66,7 @@ type writerSink struct {
out io.Writer
}
func (ws *writerSink) Write(e *pb.GrpcLogEntry) error {
func (ws *writerSink) Write(e *binlogpb.GrpcLogEntry) error {
b, err := proto.Marshal(e)
if err != nil {
grpclogLogger.Errorf("binary logging: failed to marshal proto message: %v", err)
@ -96,7 +96,7 @@ type bufferedSink struct {
done chan struct{}
}
func (fs *bufferedSink) Write(e *pb.GrpcLogEntry) error {
func (fs *bufferedSink) Write(e *binlogpb.GrpcLogEntry) error {
fs.mu.Lock()
defer fs.mu.Unlock()
if !fs.flusherStarted {

View File

@ -21,19 +21,42 @@ package envconfig
import (
"os"
"strconv"
"strings"
)
const (
prefix = "GRPC_GO_"
txtErrIgnoreStr = prefix + "IGNORE_TXT_ERRORS"
advertiseCompressorsStr = prefix + "ADVERTISE_COMPRESSORS"
)
var (
// TXTErrIgnore is set if TXT errors should be ignored ("GRPC_GO_IGNORE_TXT_ERRORS" is not "false").
TXTErrIgnore = !strings.EqualFold(os.Getenv(txtErrIgnoreStr), "false")
TXTErrIgnore = boolFromEnv("GRPC_GO_IGNORE_TXT_ERRORS", true)
// AdvertiseCompressors is set if registered compressor should be advertised
// ("GRPC_GO_ADVERTISE_COMPRESSORS" is not "false").
AdvertiseCompressors = !strings.EqualFold(os.Getenv(advertiseCompressorsStr), "false")
AdvertiseCompressors = boolFromEnv("GRPC_GO_ADVERTISE_COMPRESSORS", true)
// RingHashCap indicates the maximum ring size which defaults to 4096
// entries but may be overridden by setting the environment variable
// "GRPC_RING_HASH_CAP". This does not override the default bounds
// checking which NACKs configs specifying ring sizes > 8*1024*1024 (~8M).
RingHashCap = uint64FromEnv("GRPC_RING_HASH_CAP", 4096, 1, 8*1024*1024)
)
func boolFromEnv(envVar string, def bool) bool {
if def {
// The default is true; return true unless the variable is "false".
return !strings.EqualFold(os.Getenv(envVar), "false")
}
// The default is false; return false unless the variable is "true".
return strings.EqualFold(os.Getenv(envVar), "true")
}
func uint64FromEnv(envVar string, def, min, max uint64) uint64 {
v, err := strconv.ParseUint(os.Getenv(envVar), 10, 64)
if err != nil {
return def
}
if v < min {
return min
}
if v > max {
return max
}
return v
}

View File

@ -20,7 +20,6 @@ package envconfig
import (
"os"
"strings"
)
const (
@ -36,16 +35,6 @@ const (
//
// When both bootstrap FileName and FileContent are set, FileName is used.
XDSBootstrapFileContentEnv = "GRPC_XDS_BOOTSTRAP_CONFIG"
ringHashSupportEnv = "GRPC_XDS_EXPERIMENTAL_ENABLE_RING_HASH"
clientSideSecuritySupportEnv = "GRPC_XDS_EXPERIMENTAL_SECURITY_SUPPORT"
aggregateAndDNSSupportEnv = "GRPC_XDS_EXPERIMENTAL_ENABLE_AGGREGATE_AND_LOGICAL_DNS_CLUSTER"
rbacSupportEnv = "GRPC_XDS_EXPERIMENTAL_RBAC"
outlierDetectionSupportEnv = "GRPC_EXPERIMENTAL_ENABLE_OUTLIER_DETECTION"
federationEnv = "GRPC_EXPERIMENTAL_XDS_FEDERATION"
rlsInXDSEnv = "GRPC_EXPERIMENTAL_XDS_RLS_LB"
c2pResolverTestOnlyTrafficDirectorURIEnv = "GRPC_TEST_ONLY_GOOGLE_C2P_RESOLVER_TRAFFIC_DIRECTOR_URI"
)
var (
@ -64,38 +53,40 @@ var (
// XDSRingHash indicates whether ring hash support is enabled, which can be
// disabled by setting the environment variable
// "GRPC_XDS_EXPERIMENTAL_ENABLE_RING_HASH" to "false".
XDSRingHash = !strings.EqualFold(os.Getenv(ringHashSupportEnv), "false")
XDSRingHash = boolFromEnv("GRPC_XDS_EXPERIMENTAL_ENABLE_RING_HASH", true)
// XDSClientSideSecurity is used to control processing of security
// configuration on the client-side.
//
// Note that there is no env var protection for the server-side because we
// have a brand new API on the server-side and users explicitly need to use
// the new API to get security integration on the server.
XDSClientSideSecurity = !strings.EqualFold(os.Getenv(clientSideSecuritySupportEnv), "false")
XDSClientSideSecurity = boolFromEnv("GRPC_XDS_EXPERIMENTAL_SECURITY_SUPPORT", true)
// XDSAggregateAndDNS indicates whether processing of aggregated cluster
// and DNS cluster is enabled, which can be enabled by setting the
// environment variable
// "GRPC_XDS_EXPERIMENTAL_ENABLE_AGGREGATE_AND_LOGICAL_DNS_CLUSTER" to
// "true".
XDSAggregateAndDNS = !strings.EqualFold(os.Getenv(aggregateAndDNSSupportEnv), "false")
XDSAggregateAndDNS = boolFromEnv("GRPC_XDS_EXPERIMENTAL_ENABLE_AGGREGATE_AND_LOGICAL_DNS_CLUSTER", true)
// XDSRBAC indicates whether xDS configured RBAC HTTP Filter is enabled,
// which can be disabled by setting the environment variable
// "GRPC_XDS_EXPERIMENTAL_RBAC" to "false".
XDSRBAC = !strings.EqualFold(os.Getenv(rbacSupportEnv), "false")
XDSRBAC = boolFromEnv("GRPC_XDS_EXPERIMENTAL_RBAC", true)
// XDSOutlierDetection indicates whether outlier detection support is
// enabled, which can be disabled by setting the environment variable
// "GRPC_EXPERIMENTAL_ENABLE_OUTLIER_DETECTION" to "false".
XDSOutlierDetection = !strings.EqualFold(os.Getenv(outlierDetectionSupportEnv), "false")
// XDSFederation indicates whether federation support is enabled.
XDSFederation = strings.EqualFold(os.Getenv(federationEnv), "true")
XDSOutlierDetection = boolFromEnv("GRPC_EXPERIMENTAL_ENABLE_OUTLIER_DETECTION", true)
// XDSFederation indicates whether federation support is enabled, which can
// be enabled by setting the environment variable
// "GRPC_EXPERIMENTAL_XDS_FEDERATION" to "true".
XDSFederation = boolFromEnv("GRPC_EXPERIMENTAL_XDS_FEDERATION", false)
// XDSRLS indicates whether processing of Cluster Specifier plugins and
// support for the RLS CLuster Specifier is enabled, which can be enabled by
// setting the environment variable "GRPC_EXPERIMENTAL_XDS_RLS_LB" to
// "true".
XDSRLS = strings.EqualFold(os.Getenv(rlsInXDSEnv), "true")
XDSRLS = boolFromEnv("GRPC_EXPERIMENTAL_XDS_RLS_LB", false)
// C2PResolverTestOnlyTrafficDirectorURI is the TD URI for testing.
C2PResolverTestOnlyTrafficDirectorURI = os.Getenv(c2pResolverTestOnlyTrafficDirectorURIEnv)
C2PResolverTestOnlyTrafficDirectorURI = os.Getenv("GRPC_TEST_ONLY_GOOGLE_C2P_RESOLVER_TRAFFIC_DIRECTOR_URI")
)

View File

@ -77,6 +77,9 @@ var (
// ClearGlobalDialOptions clears the array of extra DialOption. This
// method is useful in testing and benchmarking.
ClearGlobalDialOptions func()
// JoinDialOptions combines the dial options passed as arguments into a
// single dial option.
JoinDialOptions interface{} // func(...grpc.DialOption) grpc.DialOption
// JoinServerOptions combines the server options passed as arguments into a
// single server option.
JoinServerOptions interface{} // func(...grpc.ServerOption) grpc.ServerOption

View File

@ -116,7 +116,7 @@ type dnsBuilder struct{}
// Build creates and starts a DNS resolver that watches the name resolution of the target.
func (b *dnsBuilder) Build(target resolver.Target, cc resolver.ClientConn, opts resolver.BuildOptions) (resolver.Resolver, error) {
host, port, err := parseTarget(target.Endpoint, defaultPort)
host, port, err := parseTarget(target.Endpoint(), defaultPort)
if err != nil {
return nil, err
}

View File

@ -31,7 +31,7 @@ const scheme = "passthrough"
type passthroughBuilder struct{}
func (*passthroughBuilder) Build(target resolver.Target, cc resolver.ClientConn, opts resolver.BuildOptions) (resolver.Resolver, error) {
if target.Endpoint == "" && opts.Dialer == nil {
if target.Endpoint() == "" && opts.Dialer == nil {
return nil, errors.New("passthrough: received empty target in Build()")
}
r := &passthroughResolver{
@ -52,7 +52,7 @@ type passthroughResolver struct {
}
func (r *passthroughResolver) start() {
r.cc.UpdateState(resolver.State{Addresses: []resolver.Address{{Addr: r.target.Endpoint}}})
r.cc.UpdateState(resolver.State{Addresses: []resolver.Address{{Addr: r.target.Endpoint()}}})
}
func (*passthroughResolver) ResolveNow(o resolver.ResolveNowOptions) {}

View File

@ -527,6 +527,9 @@ const minBatchSize = 1000
// As an optimization, to increase the batch size for each flush, loopy yields the processor, once
// if the batch size is too low to give stream goroutines a chance to fill it up.
func (l *loopyWriter) run() (err error) {
// Always flush the writer before exiting in case there are pending frames
// to be sent.
defer l.framer.writer.Flush()
for {
it, err := l.cbuf.get(true)
if err != nil {
@ -650,16 +653,18 @@ func (l *loopyWriter) headerHandler(h *headerFrame) error {
itl: &itemList{},
wq: h.wq,
}
str.itl.enqueue(h)
return l.originateStream(str)
return l.originateStream(str, h)
}
func (l *loopyWriter) originateStream(str *outStream) error {
hdr := str.itl.dequeue().(*headerFrame)
func (l *loopyWriter) originateStream(str *outStream, hdr *headerFrame) error {
// l.draining is set when handling GoAway. In which case, we want to avoid
// creating new streams.
if l.draining {
// TODO: provide a better error with the reason we are in draining.
hdr.onOrphaned(errStreamDrain)
return nil
}
if err := hdr.initStream(str.id); err != nil {
if err == errStreamDrain { // errStreamDrain need not close transport
return nil
}
return err
}
if err := l.writeHeader(str.id, hdr.endStream, hdr.hf, hdr.onWrite); err != nil {
@ -757,7 +762,7 @@ func (l *loopyWriter) cleanupStreamHandler(c *cleanupStream) error {
return err
}
}
if l.side == clientSide && l.draining && len(l.estdStreams) == 0 {
if l.draining && len(l.estdStreams) == 0 {
return errors.New("finished processing active streams while in draining mode")
}
return nil
@ -812,7 +817,6 @@ func (l *loopyWriter) goAwayHandler(g *goAway) error {
}
func (l *loopyWriter) closeConnectionHandler() error {
l.framer.writer.Flush()
// Exit loopyWriter entirely by returning an error here. This will lead to
// the transport closing the connection, and, ultimately, transport
// closure.

View File

@ -47,3 +47,9 @@ const (
defaultClientMaxHeaderListSize = uint32(16 << 20)
defaultServerMaxHeaderListSize = uint32(16 << 20)
)
// MaxStreamID is the upper bound for the stream ID before the current
// transport gracefully closes and new transport is created for subsequent RPCs.
// This is set to 75% of 2^31-1. Streams are identified with an unsigned 31-bit
// integer. It's exported so that tests can override it.
var MaxStreamID = uint32(math.MaxInt32 * 3 / 4)

View File

@ -65,7 +65,7 @@ func NewServerHandlerTransport(w http.ResponseWriter, r *http.Request, stats []s
contentSubtype, validContentType := grpcutil.ContentSubtype(contentType)
if !validContentType {
msg := fmt.Sprintf("invalid gRPC request content-type %q", contentType)
http.Error(w, msg, http.StatusBadRequest)
http.Error(w, msg, http.StatusUnsupportedMediaType)
return nil, errors.New(msg)
}
if _, ok := w.(http.Flusher); !ok {
@ -87,7 +87,7 @@ func NewServerHandlerTransport(w http.ResponseWriter, r *http.Request, stats []s
if v := r.Header.Get("grpc-timeout"); v != "" {
to, err := decodeTimeout(v)
if err != nil {
msg := fmt.Sprintf("malformed time-out: %v", err)
msg := fmt.Sprintf("malformed grpc-timeout: %v", err)
http.Error(w, msg, http.StatusBadRequest)
return nil, status.Error(codes.Internal, msg)
}

View File

@ -140,8 +140,7 @@ type http2Client struct {
channelzID *channelz.Identifier
czData *channelzData
onGoAway func(GoAwayReason)
onClose func()
onClose func(GoAwayReason)
bufferPool *bufferPool
@ -197,7 +196,7 @@ func isTemporary(err error) bool {
// newHTTP2Client constructs a connected ClientTransport to addr based on HTTP2
// and starts to receive messages on it. Non-nil error returns if construction
// fails.
func newHTTP2Client(connectCtx, ctx context.Context, addr resolver.Address, opts ConnectOptions, onGoAway func(GoAwayReason), onClose func()) (_ *http2Client, err error) {
func newHTTP2Client(connectCtx, ctx context.Context, addr resolver.Address, opts ConnectOptions, onClose func(GoAwayReason)) (_ *http2Client, err error) {
scheme := "http"
ctx, cancel := context.WithCancel(ctx)
defer func() {
@ -217,7 +216,7 @@ func newHTTP2Client(connectCtx, ctx context.Context, addr resolver.Address, opts
if opts.FailOnNonTempDialError {
return nil, connectionErrorf(isTemporary(err), err, "transport: error while dialing: %v", err)
}
return nil, connectionErrorf(true, err, "transport: Error while dialing %v", err)
return nil, connectionErrorf(true, err, "transport: Error while dialing: %v", err)
}
// Any further errors will close the underlying connection
@ -343,7 +342,6 @@ func newHTTP2Client(connectCtx, ctx context.Context, addr resolver.Address, opts
streamQuota: defaultMaxStreamsClient,
streamsQuotaAvailable: make(chan struct{}, 1),
czData: new(channelzData),
onGoAway: onGoAway,
keepaliveEnabled: keepaliveEnabled,
bufferPool: newBufferPool(),
onClose: onClose,
@ -744,15 +742,12 @@ func (t *http2Client) NewStream(ctx context.Context, callHdr *CallHdr) (*Stream,
endStream: false,
initStream: func(id uint32) error {
t.mu.Lock()
if state := t.state; state != reachable {
// TODO: handle transport closure in loopy instead and remove this
// initStream is never called when transport is draining.
if t.state == closing {
t.mu.Unlock()
// Do a quick cleanup.
err := error(errStreamDrain)
if state == closing {
err = ErrConnClosing
}
cleanup(err)
return err
cleanup(ErrConnClosing)
return ErrConnClosing
}
if channelz.IsOn() {
atomic.AddInt64(&t.czData.streamsStarted, 1)
@ -770,6 +765,7 @@ func (t *http2Client) NewStream(ctx context.Context, callHdr *CallHdr) (*Stream,
}
firstTry := true
var ch chan struct{}
transportDrainRequired := false
checkForStreamQuota := func(it interface{}) bool {
if t.streamQuota <= 0 { // Can go negative if server decreases it.
if firstTry {
@ -785,6 +781,11 @@ func (t *http2Client) NewStream(ctx context.Context, callHdr *CallHdr) (*Stream,
h := it.(*headerFrame)
h.streamID = t.nextID
t.nextID += 2
// Drain client transport if nextID > MaxStreamID which signals gRPC that
// the connection is closed and a new one must be created for subsequent RPCs.
transportDrainRequired = t.nextID > MaxStreamID
s.id = h.streamID
s.fc = &inFlow{limit: uint32(t.initialWindowSize)}
t.mu.Lock()
@ -864,6 +865,12 @@ func (t *http2Client) NewStream(ctx context.Context, callHdr *CallHdr) (*Stream,
sh.HandleRPC(s.ctx, outHeader)
}
}
if transportDrainRequired {
if logger.V(logLevel) {
logger.Infof("transport: t.nextID > MaxStreamID. Draining")
}
t.GracefulClose()
}
return s, nil
}
@ -957,7 +964,9 @@ func (t *http2Client) Close(err error) {
}
// Call t.onClose ASAP to prevent the client from attempting to create new
// streams.
t.onClose()
if t.state != draining {
t.onClose(GoAwayInvalid)
}
t.state = closing
streams := t.activeStreams
t.activeStreams = nil
@ -1010,6 +1019,7 @@ func (t *http2Client) GracefulClose() {
if logger.V(logLevel) {
logger.Infof("transport: GracefulClose called")
}
t.onClose(GoAwayInvalid)
t.state = draining
active := len(t.activeStreams)
t.mu.Unlock()
@ -1172,7 +1182,7 @@ func (t *http2Client) handleRSTStream(f *http2.RSTStreamFrame) {
statusCode, ok := http2ErrConvTab[f.ErrCode]
if !ok {
if logger.V(logLevel) {
logger.Warningf("transport: http2Client.handleRSTStream found no mapped gRPC status for the received http2 error %v", f.ErrCode)
logger.Warningf("transport: http2Client.handleRSTStream found no mapped gRPC status for the received http2 error: %v", f.ErrCode)
}
statusCode = codes.Unknown
}
@ -1290,8 +1300,10 @@ func (t *http2Client) handleGoAway(f *http2.GoAwayFrame) {
// Notify the clientconn about the GOAWAY before we set the state to
// draining, to allow the client to stop attempting to create streams
// before disallowing new streams on this connection.
t.onGoAway(t.goAwayReason)
t.state = draining
if t.state != draining {
t.onClose(t.goAwayReason)
t.state = draining
}
}
// All streams with IDs greater than the GoAwayId
// and smaller than the previous GoAway ID should be killed.
@ -1780,3 +1792,9 @@ func (t *http2Client) getOutFlowWindow() int64 {
return -2
}
}
func (t *http2Client) stateForTesting() transportState {
t.mu.Lock()
defer t.mu.Unlock()
return t.state
}

View File

@ -380,13 +380,14 @@ func (t *http2Server) operateHeaders(frame *http2.MetaHeadersFrame, handle func(
fc: &inFlow{limit: uint32(t.initialWindowSize)},
}
var (
// If a gRPC Response-Headers has already been received, then it means
// that the peer is speaking gRPC and we are in gRPC mode.
isGRPC = false
mdata = make(map[string][]string)
httpMethod string
// headerError is set if an error is encountered while parsing the headers
headerError bool
// if false, content-type was missing or invalid
isGRPC = false
contentType = ""
mdata = make(map[string][]string)
httpMethod string
// these are set if an error is encountered while parsing the headers
protocolError bool
headerError *status.Status
timeoutSet bool
timeout time.Duration
@ -397,6 +398,7 @@ func (t *http2Server) operateHeaders(frame *http2.MetaHeadersFrame, handle func(
case "content-type":
contentSubtype, validContentType := grpcutil.ContentSubtype(hf.Value)
if !validContentType {
contentType = hf.Value
break
}
mdata[hf.Name] = append(mdata[hf.Name], hf.Value)
@ -412,7 +414,7 @@ func (t *http2Server) operateHeaders(frame *http2.MetaHeadersFrame, handle func(
timeoutSet = true
var err error
if timeout, err = decodeTimeout(hf.Value); err != nil {
headerError = true
headerError = status.Newf(codes.Internal, "malformed grpc-timeout: %v", err)
}
// "Transports must consider requests containing the Connection header
// as malformed." - A41
@ -420,14 +422,14 @@ func (t *http2Server) operateHeaders(frame *http2.MetaHeadersFrame, handle func(
if logger.V(logLevel) {
logger.Errorf("transport: http2Server.operateHeaders parsed a :connection header which makes a request malformed as per the HTTP/2 spec")
}
headerError = true
protocolError = true
default:
if isReservedHeader(hf.Name) && !isWhitelistedHeader(hf.Name) {
break
}
v, err := decodeMetadataHeader(hf.Name, hf.Value)
if err != nil {
headerError = true
headerError = status.Newf(codes.Internal, "malformed binary metadata %q in header %q: %v", hf.Value, hf.Name, err)
logger.Warningf("Failed to decode metadata header (%q, %q): %v", hf.Name, hf.Value, err)
break
}
@ -446,7 +448,7 @@ func (t *http2Server) operateHeaders(frame *http2.MetaHeadersFrame, handle func(
logger.Errorf("transport: %v", errMsg)
}
t.controlBuf.put(&earlyAbortStream{
httpStatus: 400,
httpStatus: http.StatusBadRequest,
streamID: streamID,
contentSubtype: s.contentSubtype,
status: status.New(codes.Internal, errMsg),
@ -455,7 +457,7 @@ func (t *http2Server) operateHeaders(frame *http2.MetaHeadersFrame, handle func(
return nil
}
if !isGRPC || headerError {
if protocolError {
t.controlBuf.put(&cleanupStream{
streamID: streamID,
rst: true,
@ -464,6 +466,26 @@ func (t *http2Server) operateHeaders(frame *http2.MetaHeadersFrame, handle func(
})
return nil
}
if !isGRPC {
t.controlBuf.put(&earlyAbortStream{
httpStatus: http.StatusUnsupportedMediaType,
streamID: streamID,
contentSubtype: s.contentSubtype,
status: status.Newf(codes.InvalidArgument, "invalid gRPC request content-type %q", contentType),
rst: !frame.StreamEnded(),
})
return nil
}
if headerError != nil {
t.controlBuf.put(&earlyAbortStream{
httpStatus: http.StatusBadRequest,
streamID: streamID,
contentSubtype: s.contentSubtype,
status: headerError,
rst: !frame.StreamEnded(),
})
return nil
}
// "If :authority is missing, Host must be renamed to :authority." - A41
if len(mdata[":authority"]) == 0 {

View File

@ -583,8 +583,8 @@ type ConnectOptions struct {
// NewClientTransport establishes the transport with the required ConnectOptions
// and returns it to the caller.
func NewClientTransport(connectCtx, ctx context.Context, addr resolver.Address, opts ConnectOptions, onGoAway func(GoAwayReason), onClose func()) (ClientTransport, error) {
return newHTTP2Client(connectCtx, ctx, addr, opts, onGoAway, onClose)
func NewClientTransport(connectCtx, ctx context.Context, addr resolver.Address, opts ConnectOptions, onClose func(GoAwayReason)) (ClientTransport, error) {
return newHTTP2Client(connectCtx, ctx, addr, opts, onClose)
}
// Options provides additional hints and information for message