rebase: bump google.golang.org/grpc from 1.42.0 to 1.43.0

Bumps [google.golang.org/grpc](https://github.com/grpc/grpc-go) from 1.42.0 to 1.43.0.
- [Release notes](https://github.com/grpc/grpc-go/releases)
- [Commits](https://github.com/grpc/grpc-go/compare/v1.42.0...v1.43.0)

---
updated-dependencies:
- dependency-name: google.golang.org/grpc
  dependency-type: direct:production
  update-type: version-update:semver-minor
...

Signed-off-by: dependabot[bot] <support@github.com>
This commit is contained in:
dependabot[bot] 2022-01-10 20:13:10 +00:00 committed by mergify[bot]
parent 097e339d69
commit bbce265ead
20 changed files with 239 additions and 143 deletions

2
go.mod
View File

@ -24,7 +24,7 @@ require (
github.com/stretchr/testify v1.7.0 github.com/stretchr/testify v1.7.0
golang.org/x/crypto v0.0.0-20210817164053-32db794688a5 golang.org/x/crypto v0.0.0-20210817164053-32db794688a5
golang.org/x/sys v0.0.0-20211029165221-6e7872819dc8 golang.org/x/sys v0.0.0-20211029165221-6e7872819dc8
google.golang.org/grpc v1.42.0 google.golang.org/grpc v1.43.0
google.golang.org/protobuf v1.27.1 google.golang.org/protobuf v1.27.1
k8s.io/api v0.23.0 k8s.io/api v0.23.0
k8s.io/apimachinery v0.23.0 k8s.io/apimachinery v0.23.0

4
go.sum
View File

@ -1607,8 +1607,8 @@ google.golang.org/grpc v1.37.0/go.mod h1:NREThFqKR1f3iQ6oBuvc5LadQuXVGo9rkm5ZGrQ
google.golang.org/grpc v1.38.0/go.mod h1:NREThFqKR1f3iQ6oBuvc5LadQuXVGo9rkm5ZGrQdJfM= google.golang.org/grpc v1.38.0/go.mod h1:NREThFqKR1f3iQ6oBuvc5LadQuXVGo9rkm5ZGrQdJfM=
google.golang.org/grpc v1.40.0/go.mod h1:ogyxbiOoUXAkP+4+xa6PZSE9DZgIHtSpzjDTB9KAK34= google.golang.org/grpc v1.40.0/go.mod h1:ogyxbiOoUXAkP+4+xa6PZSE9DZgIHtSpzjDTB9KAK34=
google.golang.org/grpc v1.41.0/go.mod h1:U3l9uK9J0sini8mHphKoXyaqDA/8VyGnDee1zzIUK6k= google.golang.org/grpc v1.41.0/go.mod h1:U3l9uK9J0sini8mHphKoXyaqDA/8VyGnDee1zzIUK6k=
google.golang.org/grpc v1.42.0 h1:XT2/MFpuPFsEX2fWh3YQtHkZ+WYZFQRfaUgLZYj/p6A= google.golang.org/grpc v1.43.0 h1:Eeu7bZtDZ2DpRCsLhUlcrLnvYaMK1Gz86a+hMVvELmM=
google.golang.org/grpc v1.42.0/go.mod h1:k+4IHHFw41K8+bbowsex27ge2rCb65oeWqe4jJ590SU= google.golang.org/grpc v1.43.0/go.mod h1:k+4IHHFw41K8+bbowsex27ge2rCb65oeWqe4jJ590SU=
google.golang.org/protobuf v1.21.0/go.mod h1:47Nbq4nVaFHyn7ilMalzfO3qCViNmqZ2kzikPIcrTAo= google.golang.org/protobuf v1.21.0/go.mod h1:47Nbq4nVaFHyn7ilMalzfO3qCViNmqZ2kzikPIcrTAo=
google.golang.org/protobuf v1.22.0/go.mod h1:EGpADcykh3NcUnDUJcl1+ZksZNG86OlYog2l/sGQquU= google.golang.org/protobuf v1.22.0/go.mod h1:EGpADcykh3NcUnDUJcl1+ZksZNG86OlYog2l/sGQquU=
google.golang.org/protobuf v1.23.0/go.mod h1:EGpADcykh3NcUnDUJcl1+ZksZNG86OlYog2l/sGQquU= google.golang.org/protobuf v1.23.0/go.mod h1:EGpADcykh3NcUnDUJcl1+ZksZNG86OlYog2l/sGQquU=

View File

@ -174,25 +174,32 @@ type ClientConn interface {
// BuildOptions contains additional information for Build. // BuildOptions contains additional information for Build.
type BuildOptions struct { type BuildOptions struct {
// DialCreds is the transport credential the Balancer implementation can // DialCreds is the transport credentials to use when communicating with a
// use to dial to a remote load balancer server. The Balancer implementations // remote load balancer server. Balancer implementations which do not
// can ignore this if it does not need to talk to another party securely. // communicate with a remote load balancer server can ignore this field.
DialCreds credentials.TransportCredentials DialCreds credentials.TransportCredentials
// CredsBundle is the credentials bundle that the Balancer can use. // CredsBundle is the credentials bundle to use when communicating with a
// remote load balancer server. Balancer implementations which do not
// communicate with a remote load balancer server can ignore this field.
CredsBundle credentials.Bundle CredsBundle credentials.Bundle
// Dialer is the custom dialer the Balancer implementation can use to dial // Dialer is the custom dialer to use when communicating with a remote load
// to a remote load balancer server. The Balancer implementations // balancer server. Balancer implementations which do not communicate with a
// can ignore this if it doesn't need to talk to remote balancer. // remote load balancer server can ignore this field.
Dialer func(context.Context, string) (net.Conn, error) Dialer func(context.Context, string) (net.Conn, error)
// ChannelzParentID is the entity parent's channelz unique identification number. // Authority is the server name to use as part of the authentication
// handshake when communicating with a remote load balancer server. Balancer
// implementations which do not communicate with a remote load balancer
// server can ignore this field.
Authority string
// ChannelzParentID is the parent ClientConn's channelz ID.
ChannelzParentID int64 ChannelzParentID int64
// CustomUserAgent is the custom user agent set on the parent ClientConn. // CustomUserAgent is the custom user agent set on the parent ClientConn.
// The balancer should set the same custom user agent if it creates a // The balancer should set the same custom user agent if it creates a
// ClientConn. // ClientConn.
CustomUserAgent string CustomUserAgent string
// Target contains the parsed address info of the dial target. It is the same resolver.Target as // Target contains the parsed address info of the dial target. It is the
// passed to the resolver. // same resolver.Target as passed to the resolver. See the documentation for
// See the documentation for the resolver.Target type for details about what it contains. // the resolver.Target type for details about what it contains.
Target resolver.Target Target resolver.Target
} }

View File

@ -83,13 +83,13 @@ var (
// errTransportCredsAndBundle indicates that creds bundle is used together // errTransportCredsAndBundle indicates that creds bundle is used together
// with other individual Transport Credentials. // with other individual Transport Credentials.
errTransportCredsAndBundle = errors.New("grpc: credentials.Bundle may not be used with individual TransportCredentials") errTransportCredsAndBundle = errors.New("grpc: credentials.Bundle may not be used with individual TransportCredentials")
// errTransportCredentialsMissing indicates that users want to transmit security // errNoTransportCredsInBundle indicated that the configured creds bundle
// information (e.g., OAuth2 token) which requires secure connection on an insecure // returned a transport credentials which was nil.
// connection. errNoTransportCredsInBundle = errors.New("grpc: credentials.Bundle must return non-nil transport credentials")
// errTransportCredentialsMissing indicates that users want to transmit
// security information (e.g., OAuth2 token) which requires secure
// connection on an insecure connection.
errTransportCredentialsMissing = errors.New("grpc: the credentials require transport level security (use grpc.WithTransportCredentials() to set)") errTransportCredentialsMissing = errors.New("grpc: the credentials require transport level security (use grpc.WithTransportCredentials() to set)")
// errCredentialsConflict indicates that grpc.WithTransportCredentials()
// and grpc.WithInsecure() are both called for a connection.
errCredentialsConflict = errors.New("grpc: transport credentials are set for an insecure connection (grpc.WithTransportCredentials() and grpc.WithInsecure() are both called)")
) )
const ( const (
@ -177,17 +177,20 @@ func DialContext(ctx context.Context, target string, opts ...DialOption) (conn *
cc.csMgr.channelzID = cc.channelzID cc.csMgr.channelzID = cc.channelzID
} }
if !cc.dopts.insecure {
if cc.dopts.copts.TransportCredentials == nil && cc.dopts.copts.CredsBundle == nil { if cc.dopts.copts.TransportCredentials == nil && cc.dopts.copts.CredsBundle == nil {
return nil, errNoTransportSecurity return nil, errNoTransportSecurity
} }
if cc.dopts.copts.TransportCredentials != nil && cc.dopts.copts.CredsBundle != nil { if cc.dopts.copts.TransportCredentials != nil && cc.dopts.copts.CredsBundle != nil {
return nil, errTransportCredsAndBundle return nil, errTransportCredsAndBundle
} }
} else { if cc.dopts.copts.CredsBundle != nil && cc.dopts.copts.CredsBundle.TransportCredentials() == nil {
if cc.dopts.copts.TransportCredentials != nil || cc.dopts.copts.CredsBundle != nil { return nil, errNoTransportCredsInBundle
return nil, errCredentialsConflict
} }
transportCreds := cc.dopts.copts.TransportCredentials
if transportCreds == nil {
transportCreds = cc.dopts.copts.CredsBundle.TransportCredentials()
}
if transportCreds.Info().SecurityProtocol == "insecure" {
for _, cd := range cc.dopts.copts.PerRPCCredentials { for _, cd := range cc.dopts.copts.PerRPCCredentials {
if cd.RequireTransportSecurity() { if cd.RequireTransportSecurity() {
return nil, errTransportCredentialsMissing return nil, errTransportCredentialsMissing
@ -282,6 +285,7 @@ func DialContext(ctx context.Context, target string, opts ...DialOption) (conn *
DialCreds: credsClone, DialCreds: credsClone,
CredsBundle: cc.dopts.copts.CredsBundle, CredsBundle: cc.dopts.copts.CredsBundle,
Dialer: cc.dopts.copts.Dialer, Dialer: cc.dopts.copts.Dialer,
Authority: cc.authority,
CustomUserAgent: cc.dopts.copts.UserAgent, CustomUserAgent: cc.dopts.copts.UserAgent,
ChannelzParentID: cc.channelzID, ChannelzParentID: cc.channelzID,
Target: cc.parsedTarget, Target: cc.parsedTarget,
@ -629,7 +633,10 @@ func (cc *ClientConn) updateResolverState(s resolver.State, err error) error {
} }
var ret error var ret error
if cc.dopts.disableServiceConfig || s.ServiceConfig == nil { if cc.dopts.disableServiceConfig {
channelz.Infof(logger, cc.channelzID, "ignoring service config from resolver (%v) and applying the default because service config is disabled", s.ServiceConfig)
cc.maybeApplyDefaultServiceConfig(s.Addresses)
} else if s.ServiceConfig == nil {
cc.maybeApplyDefaultServiceConfig(s.Addresses) cc.maybeApplyDefaultServiceConfig(s.Addresses)
// TODO: do we need to apply a failing LB policy if there is no // TODO: do we need to apply a failing LB policy if there is no
// default, per the error handling design? // default, per the error handling design?

View File

@ -178,8 +178,18 @@ type TransportCredentials interface {
// //
// This API is experimental. // This API is experimental.
type Bundle interface { type Bundle interface {
// TransportCredentials returns the transport credentials from the Bundle.
//
// Implementations must return non-nil transport credentials. If transport
// security is not needed by the Bundle, implementations may choose to
// return insecure.NewCredentials().
TransportCredentials() TransportCredentials TransportCredentials() TransportCredentials
// PerRPCCredentials returns the per-RPC credentials from the Bundle.
//
// May be nil if per-RPC credentials are not needed.
PerRPCCredentials() PerRPCCredentials PerRPCCredentials() PerRPCCredentials
// NewWithMode should make a copy of Bundle, and switch mode. Modifying the // NewWithMode should make a copy of Bundle, and switch mode. Modifying the
// existing Bundle may cause races. // existing Bundle may cause races.
// //

View File

@ -0,0 +1,77 @@
/*
*
* Copyright 2020 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
// Package insecure provides an implementation of the
// credentials.TransportCredentials interface which disables transport security.
//
// Experimental
//
// Notice: This package is EXPERIMENTAL and may be changed or removed in a
// later release.
package insecure
import (
"context"
"net"
"google.golang.org/grpc/credentials"
)
// NewCredentials returns a credentials which disables transport security.
//
// Note that using this credentials with per-RPC credentials which require
// transport security is incompatible and will cause grpc.Dial() to fail.
func NewCredentials() credentials.TransportCredentials {
return insecureTC{}
}
// insecureTC implements the insecure transport credentials. The handshake
// methods simply return the passed in net.Conn and set the security level to
// NoSecurity.
type insecureTC struct{}
func (insecureTC) ClientHandshake(ctx context.Context, _ string, conn net.Conn) (net.Conn, credentials.AuthInfo, error) {
return conn, info{credentials.CommonAuthInfo{SecurityLevel: credentials.NoSecurity}}, nil
}
func (insecureTC) ServerHandshake(conn net.Conn) (net.Conn, credentials.AuthInfo, error) {
return conn, info{credentials.CommonAuthInfo{SecurityLevel: credentials.NoSecurity}}, nil
}
func (insecureTC) Info() credentials.ProtocolInfo {
return credentials.ProtocolInfo{SecurityProtocol: "insecure"}
}
func (insecureTC) Clone() credentials.TransportCredentials {
return insecureTC{}
}
func (insecureTC) OverrideServerName(string) error {
return nil
}
// info contains the auth information for an insecure connection.
// It implements the AuthInfo interface.
type info struct {
credentials.CommonAuthInfo
}
// AuthType returns the type of info as a string.
func (info) AuthType() string {
return "insecure"
}

View File

@ -27,9 +27,9 @@ import (
"google.golang.org/grpc/backoff" "google.golang.org/grpc/backoff"
"google.golang.org/grpc/balancer" "google.golang.org/grpc/balancer"
"google.golang.org/grpc/credentials" "google.golang.org/grpc/credentials"
"google.golang.org/grpc/credentials/insecure"
"google.golang.org/grpc/internal" "google.golang.org/grpc/internal"
internalbackoff "google.golang.org/grpc/internal/backoff" internalbackoff "google.golang.org/grpc/internal/backoff"
"google.golang.org/grpc/internal/envconfig"
"google.golang.org/grpc/internal/transport" "google.golang.org/grpc/internal/transport"
"google.golang.org/grpc/keepalive" "google.golang.org/grpc/keepalive"
"google.golang.org/grpc/resolver" "google.golang.org/grpc/resolver"
@ -50,7 +50,6 @@ type dialOptions struct {
bs internalbackoff.Strategy bs internalbackoff.Strategy
block bool block bool
returnLastError bool returnLastError bool
insecure bool
timeout time.Duration timeout time.Duration
scChan <-chan ServiceConfig scChan <-chan ServiceConfig
authority string authority string
@ -228,18 +227,14 @@ func WithServiceConfig(c <-chan ServiceConfig) DialOption {
}) })
} }
// WithConnectParams configures the dialer to use the provided ConnectParams. // WithConnectParams configures the ClientConn to use the provided ConnectParams
// for creating and maintaining connections to servers.
// //
// The backoff configuration specified as part of the ConnectParams overrides // The backoff configuration specified as part of the ConnectParams overrides
// all defaults specified in // all defaults specified in
// https://github.com/grpc/grpc/blob/master/doc/connection-backoff.md. Consider // https://github.com/grpc/grpc/blob/master/doc/connection-backoff.md. Consider
// using the backoff.DefaultConfig as a base, in cases where you want to // using the backoff.DefaultConfig as a base, in cases where you want to
// override only a subset of the backoff configuration. // override only a subset of the backoff configuration.
//
// Experimental
//
// Notice: This API is EXPERIMENTAL and may be changed or removed in a
// later release.
func WithConnectParams(p ConnectParams) DialOption { func WithConnectParams(p ConnectParams) DialOption {
return newFuncDialOption(func(o *dialOptions) { return newFuncDialOption(func(o *dialOptions) {
o.bs = internalbackoff.Exponential{Config: p.Backoff} o.bs = internalbackoff.Exponential{Config: p.Backoff}
@ -303,11 +298,17 @@ func WithReturnConnectionError() DialOption {
} }
// WithInsecure returns a DialOption which disables transport security for this // WithInsecure returns a DialOption which disables transport security for this
// ClientConn. Note that transport security is required unless WithInsecure is // ClientConn. Under the hood, it uses insecure.NewCredentials().
// set. //
// Note that using this DialOption with per-RPC credentials (through
// WithCredentialsBundle or WithPerRPCCredentials) which require transport
// security is incompatible and will cause grpc.Dial() to fail.
//
// Deprecated: use insecure.NewCredentials() instead.
// Will be supported throughout 1.x.
func WithInsecure() DialOption { func WithInsecure() DialOption {
return newFuncDialOption(func(o *dialOptions) { return newFuncDialOption(func(o *dialOptions) {
o.insecure = true o.copts.TransportCredentials = insecure.NewCredentials()
}) })
} }
@ -580,7 +581,6 @@ func withHealthCheckFunc(f internal.HealthChecker) DialOption {
func defaultDialOptions() dialOptions { func defaultDialOptions() dialOptions {
return dialOptions{ return dialOptions{
disableRetry: !envconfig.Retry,
healthCheckFunc: internal.HealthCheckFunc, healthCheckFunc: internal.HealthCheckFunc,
copts: transport.ConnectOptions{ copts: transport.ConnectOptions{
WriteBufferSize: defaultWriteBufSize, WriteBufferSize: defaultWriteBufSize,

View File

@ -204,9 +204,9 @@ func RegisterChannel(c Channel, pid int64, ref string) int64 {
trace: &channelTrace{createdTime: time.Now(), events: make([]*TraceEvent, 0, getMaxTraceEntry())}, trace: &channelTrace{createdTime: time.Now(), events: make([]*TraceEvent, 0, getMaxTraceEntry())},
} }
if pid == 0 { if pid == 0 {
db.get().addChannel(id, cn, true, pid, ref) db.get().addChannel(id, cn, true, pid)
} else { } else {
db.get().addChannel(id, cn, false, pid, ref) db.get().addChannel(id, cn, false, pid)
} }
return id return id
} }
@ -228,7 +228,7 @@ func RegisterSubChannel(c Channel, pid int64, ref string) int64 {
pid: pid, pid: pid,
trace: &channelTrace{createdTime: time.Now(), events: make([]*TraceEvent, 0, getMaxTraceEntry())}, trace: &channelTrace{createdTime: time.Now(), events: make([]*TraceEvent, 0, getMaxTraceEntry())},
} }
db.get().addSubChannel(id, sc, pid, ref) db.get().addSubChannel(id, sc, pid)
return id return id
} }
@ -258,7 +258,7 @@ func RegisterListenSocket(s Socket, pid int64, ref string) int64 {
} }
id := idGen.genID() id := idGen.genID()
ls := &listenSocket{refName: ref, s: s, id: id, pid: pid} ls := &listenSocket{refName: ref, s: s, id: id, pid: pid}
db.get().addListenSocket(id, ls, pid, ref) db.get().addListenSocket(id, ls, pid)
return id return id
} }
@ -273,11 +273,11 @@ func RegisterNormalSocket(s Socket, pid int64, ref string) int64 {
} }
id := idGen.genID() id := idGen.genID()
ns := &normalSocket{refName: ref, s: s, id: id, pid: pid} ns := &normalSocket{refName: ref, s: s, id: id, pid: pid}
db.get().addNormalSocket(id, ns, pid, ref) db.get().addNormalSocket(id, ns, pid)
return id return id
} }
// RemoveEntry removes an entry with unique channelz trakcing id to be id from // RemoveEntry removes an entry with unique channelz tracking id to be id from
// channelz database. // channelz database.
func RemoveEntry(id int64) { func RemoveEntry(id int64) {
db.get().removeEntry(id) db.get().removeEntry(id)
@ -333,7 +333,7 @@ func (c *channelMap) addServer(id int64, s *server) {
c.mu.Unlock() c.mu.Unlock()
} }
func (c *channelMap) addChannel(id int64, cn *channel, isTopChannel bool, pid int64, ref string) { func (c *channelMap) addChannel(id int64, cn *channel, isTopChannel bool, pid int64) {
c.mu.Lock() c.mu.Lock()
cn.cm = c cn.cm = c
cn.trace.cm = c cn.trace.cm = c
@ -346,7 +346,7 @@ func (c *channelMap) addChannel(id int64, cn *channel, isTopChannel bool, pid in
c.mu.Unlock() c.mu.Unlock()
} }
func (c *channelMap) addSubChannel(id int64, sc *subChannel, pid int64, ref string) { func (c *channelMap) addSubChannel(id int64, sc *subChannel, pid int64) {
c.mu.Lock() c.mu.Lock()
sc.cm = c sc.cm = c
sc.trace.cm = c sc.trace.cm = c
@ -355,7 +355,7 @@ func (c *channelMap) addSubChannel(id int64, sc *subChannel, pid int64, ref stri
c.mu.Unlock() c.mu.Unlock()
} }
func (c *channelMap) addListenSocket(id int64, ls *listenSocket, pid int64, ref string) { func (c *channelMap) addListenSocket(id int64, ls *listenSocket, pid int64) {
c.mu.Lock() c.mu.Lock()
ls.cm = c ls.cm = c
c.listenSockets[id] = ls c.listenSockets[id] = ls
@ -363,7 +363,7 @@ func (c *channelMap) addListenSocket(id int64, ls *listenSocket, pid int64, ref
c.mu.Unlock() c.mu.Unlock()
} }
func (c *channelMap) addNormalSocket(id int64, ns *normalSocket, pid int64, ref string) { func (c *channelMap) addNormalSocket(id int64, ns *normalSocket, pid int64) {
c.mu.Lock() c.mu.Lock()
ns.cm = c ns.cm = c
c.normalSockets[id] = ns c.normalSockets[id] = ns

View File

@ -22,20 +22,14 @@ package envconfig
import ( import (
"os" "os"
"strings" "strings"
xdsenv "google.golang.org/grpc/internal/xds/env"
) )
const ( const (
prefix = "GRPC_GO_" prefix = "GRPC_GO_"
retryStr = prefix + "RETRY"
txtErrIgnoreStr = prefix + "IGNORE_TXT_ERRORS" txtErrIgnoreStr = prefix + "IGNORE_TXT_ERRORS"
) )
var ( var (
// Retry is enabled unless explicitly disabled via "GRPC_GO_RETRY=off" or
// if XDS retry support is explicitly disabled.
Retry = !strings.EqualFold(os.Getenv(retryStr), "off") && xdsenv.RetrySupport
// TXTErrIgnore is set if TXT errors should be ignored ("GRPC_GO_IGNORE_TXT_ERRORS" is not "false"). // TXTErrIgnore is set if TXT errors should be ignored ("GRPC_GO_IGNORE_TXT_ERRORS" is not "false").
TXTErrIgnore = !strings.EqualFold(os.Getenv(txtErrIgnoreStr), "false") TXTErrIgnore = !strings.EqualFold(os.Getenv(txtErrIgnoreStr), "false")
) )

View File

@ -16,9 +16,7 @@
* *
*/ */
// Package env acts a single source of definition for all environment variables package envconfig
// related to the xDS implementation in gRPC.
package env
import ( import (
"os" "os"
@ -26,72 +24,67 @@ import (
) )
const ( const (
// BootstrapFileNameEnv is the env variable to set bootstrap file name. // XDSBootstrapFileNameEnv is the env variable to set bootstrap file name.
// Do not use this and read from env directly. Its value is read and kept in // Do not use this and read from env directly. Its value is read and kept in
// variable BootstrapFileName. // variable BootstrapFileName.
// //
// When both bootstrap FileName and FileContent are set, FileName is used. // When both bootstrap FileName and FileContent are set, FileName is used.
BootstrapFileNameEnv = "GRPC_XDS_BOOTSTRAP" XDSBootstrapFileNameEnv = "GRPC_XDS_BOOTSTRAP"
// BootstrapFileContentEnv is the env variable to set bootstrapp file // XDSBootstrapFileContentEnv is the env variable to set bootstrapp file
// content. Do not use this and read from env directly. Its value is read // content. Do not use this and read from env directly. Its value is read
// and kept in variable BootstrapFileName. // and kept in variable BootstrapFileName.
// //
// When both bootstrap FileName and FileContent are set, FileName is used. // When both bootstrap FileName and FileContent are set, FileName is used.
BootstrapFileContentEnv = "GRPC_XDS_BOOTSTRAP_CONFIG" XDSBootstrapFileContentEnv = "GRPC_XDS_BOOTSTRAP_CONFIG"
ringHashSupportEnv = "GRPC_XDS_EXPERIMENTAL_ENABLE_RING_HASH" ringHashSupportEnv = "GRPC_XDS_EXPERIMENTAL_ENABLE_RING_HASH"
clientSideSecuritySupportEnv = "GRPC_XDS_EXPERIMENTAL_SECURITY_SUPPORT" clientSideSecuritySupportEnv = "GRPC_XDS_EXPERIMENTAL_SECURITY_SUPPORT"
aggregateAndDNSSupportEnv = "GRPC_XDS_EXPERIMENTAL_ENABLE_AGGREGATE_AND_LOGICAL_DNS_CLUSTER" aggregateAndDNSSupportEnv = "GRPC_XDS_EXPERIMENTAL_ENABLE_AGGREGATE_AND_LOGICAL_DNS_CLUSTER"
retrySupportEnv = "GRPC_XDS_EXPERIMENTAL_ENABLE_RETRY"
rbacSupportEnv = "GRPC_XDS_EXPERIMENTAL_RBAC" rbacSupportEnv = "GRPC_XDS_EXPERIMENTAL_RBAC"
federationEnv = "GRPC_EXPERIMENTAL_XDS_FEDERATION"
c2pResolverSupportEnv = "GRPC_EXPERIMENTAL_GOOGLE_C2P_RESOLVER"
c2pResolverTestOnlyTrafficDirectorURIEnv = "GRPC_TEST_ONLY_GOOGLE_C2P_RESOLVER_TRAFFIC_DIRECTOR_URI" c2pResolverTestOnlyTrafficDirectorURIEnv = "GRPC_TEST_ONLY_GOOGLE_C2P_RESOLVER_TRAFFIC_DIRECTOR_URI"
) )
var ( var (
// BootstrapFileName holds the name of the file which contains xDS bootstrap // XDSBootstrapFileName holds the name of the file which contains xDS
// configuration. Users can specify the location of the bootstrap file by // bootstrap configuration. Users can specify the location of the bootstrap
// setting the environment variable "GRPC_XDS_BOOTSTRAP". // file by setting the environment variable "GRPC_XDS_BOOTSTRAP".
// //
// When both bootstrap FileName and FileContent are set, FileName is used. // When both bootstrap FileName and FileContent are set, FileName is used.
BootstrapFileName = os.Getenv(BootstrapFileNameEnv) XDSBootstrapFileName = os.Getenv(XDSBootstrapFileNameEnv)
// BootstrapFileContent holds the content of the xDS bootstrap // XDSBootstrapFileContent holds the content of the xDS bootstrap
// configuration. Users can specify the bootstrap config by // configuration. Users can specify the bootstrap config by setting the
// setting the environment variable "GRPC_XDS_BOOTSTRAP_CONFIG". // environment variable "GRPC_XDS_BOOTSTRAP_CONFIG".
// //
// When both bootstrap FileName and FileContent are set, FileName is used. // When both bootstrap FileName and FileContent are set, FileName is used.
BootstrapFileContent = os.Getenv(BootstrapFileContentEnv) XDSBootstrapFileContent = os.Getenv(XDSBootstrapFileContentEnv)
// RingHashSupport indicates whether ring hash support is enabled, which can // XDSRingHash indicates whether ring hash support is enabled, which can be
// be disabled by setting the environment variable // disabled by setting the environment variable
// "GRPC_XDS_EXPERIMENTAL_ENABLE_RING_HASH" to "false". // "GRPC_XDS_EXPERIMENTAL_ENABLE_RING_HASH" to "false".
RingHashSupport = !strings.EqualFold(os.Getenv(ringHashSupportEnv), "false") XDSRingHash = !strings.EqualFold(os.Getenv(ringHashSupportEnv), "false")
// ClientSideSecuritySupport is used to control processing of security // XDSClientSideSecurity is used to control processing of security
// configuration on the client-side. // configuration on the client-side.
// //
// Note that there is no env var protection for the server-side because we // Note that there is no env var protection for the server-side because we
// have a brand new API on the server-side and users explicitly need to use // have a brand new API on the server-side and users explicitly need to use
// the new API to get security integration on the server. // the new API to get security integration on the server.
ClientSideSecuritySupport = !strings.EqualFold(os.Getenv(clientSideSecuritySupportEnv), "false") XDSClientSideSecurity = !strings.EqualFold(os.Getenv(clientSideSecuritySupportEnv), "false")
// AggregateAndDNSSupportEnv indicates whether processing of aggregated // XDSAggregateAndDNS indicates whether processing of aggregated cluster
// cluster and DNS cluster is enabled, which can be enabled by setting the // and DNS cluster is enabled, which can be enabled by setting the
// environment variable // environment variable
// "GRPC_XDS_EXPERIMENTAL_ENABLE_AGGREGATE_AND_LOGICAL_DNS_CLUSTER" to // "GRPC_XDS_EXPERIMENTAL_ENABLE_AGGREGATE_AND_LOGICAL_DNS_CLUSTER" to
// "true". // "true".
AggregateAndDNSSupportEnv = strings.EqualFold(os.Getenv(aggregateAndDNSSupportEnv), "true") XDSAggregateAndDNS = strings.EqualFold(os.Getenv(aggregateAndDNSSupportEnv), "true")
// RetrySupport indicates whether xDS retry is enabled. // XDSRBAC indicates whether xDS configured RBAC HTTP Filter is enabled,
RetrySupport = !strings.EqualFold(os.Getenv(retrySupportEnv), "false")
// RBACSupport indicates whether xDS configured RBAC HTTP Filter is enabled,
// which can be disabled by setting the environment variable // which can be disabled by setting the environment variable
// "GRPC_XDS_EXPERIMENTAL_RBAC" to "false". // "GRPC_XDS_EXPERIMENTAL_RBAC" to "false".
RBACSupport = !strings.EqualFold(os.Getenv(rbacSupportEnv), "false") XDSRBAC = !strings.EqualFold(os.Getenv(rbacSupportEnv), "false")
// XDSFederation indicates whether federation support is enabled.
XDSFederation = strings.EqualFold(os.Getenv(federationEnv), "true")
// C2PResolverSupport indicates whether support for C2P resolver is enabled.
// This can be enabled by setting the environment variable
// "GRPC_EXPERIMENTAL_GOOGLE_C2P_RESOLVER" to "true".
C2PResolverSupport = strings.EqualFold(os.Getenv(c2pResolverSupportEnv), "true")
// C2PResolverTestOnlyTrafficDirectorURI is the TD URI for testing. // C2PResolverTestOnlyTrafficDirectorURI is the TD URI for testing.
C2PResolverTestOnlyTrafficDirectorURI = os.Getenv(c2pResolverTestOnlyTrafficDirectorURIEnv) C2PResolverTestOnlyTrafficDirectorURI = os.Getenv(c2pResolverTestOnlyTrafficDirectorURIEnv)
) )

View File

@ -136,12 +136,10 @@ type inFlow struct {
// newLimit updates the inflow window to a new value n. // newLimit updates the inflow window to a new value n.
// It assumes that n is always greater than the old limit. // It assumes that n is always greater than the old limit.
func (f *inFlow) newLimit(n uint32) uint32 { func (f *inFlow) newLimit(n uint32) {
f.mu.Lock() f.mu.Lock()
d := n - f.limit
f.limit = n f.limit = n
f.mu.Unlock() f.mu.Unlock()
return d
} }
func (f *inFlow) maybeAdjust(n uint32) uint32 { func (f *inFlow) maybeAdjust(n uint32) uint32 {

View File

@ -201,6 +201,12 @@ func newHTTP2Client(connectCtx, ctx context.Context, addr resolver.Address, opts
} }
}() }()
// gRPC, resolver, balancer etc. can specify arbitrary data in the
// Attributes field of resolver.Address, which is shoved into connectCtx
// and passed to the dialer and credential handshaker. This makes it possible for
// address specific arbitrary data to reach custom dialers and credential handshakers.
connectCtx = icredentials.NewClientHandshakeInfoContext(connectCtx, credentials.ClientHandshakeInfo{Attributes: addr.Attributes})
conn, err := dial(connectCtx, opts.Dialer, addr, opts.UseProxy, opts.UserAgent) conn, err := dial(connectCtx, opts.Dialer, addr, opts.UseProxy, opts.UserAgent)
if err != nil { if err != nil {
if opts.FailOnNonTempDialError { if opts.FailOnNonTempDialError {
@ -245,11 +251,6 @@ func newHTTP2Client(connectCtx, ctx context.Context, addr resolver.Address, opts
} }
} }
if transportCreds != nil { if transportCreds != nil {
// gRPC, resolver, balancer etc. can specify arbitrary data in the
// Attributes field of resolver.Address, which is shoved into connectCtx
// and passed to the credential handshaker. This makes it possible for
// address specific arbitrary data to reach the credential handshaker.
connectCtx = icredentials.NewClientHandshakeInfoContext(connectCtx, credentials.ClientHandshakeInfo{Attributes: addr.Attributes})
rawConn := conn rawConn := conn
// Pull the deadline from the connectCtx, which will be used for // Pull the deadline from the connectCtx, which will be used for
// timeouts in the authentication protocol handshake. Can ignore the // timeouts in the authentication protocol handshake. Can ignore the
@ -587,7 +588,7 @@ func (t *http2Client) getTrAuthData(ctx context.Context, audience string) (map[s
return nil, err return nil, err
} }
return nil, status.Errorf(codes.Unauthenticated, "transport: %v", err) return nil, status.Errorf(codes.Unauthenticated, "transport: per-RPC creds failed due to error: %v", err)
} }
for k, v := range data { for k, v := range data {
// Capital header names are illegal in HTTP/2. // Capital header names are illegal in HTTP/2.
@ -1556,7 +1557,7 @@ func minTime(a, b time.Duration) time.Duration {
return b return b
} }
// keepalive running in a separate goroutune makes sure the connection is alive by sending pings. // keepalive running in a separate goroutine makes sure the connection is alive by sending pings.
func (t *http2Client) keepalive() { func (t *http2Client) keepalive() {
p := &ping{data: [8]byte{}} p := &ping{data: [8]byte{}}
// True iff a ping has been sent, and no data has been received since then. // True iff a ping has been sent, and no data has been received since then.

View File

@ -73,7 +73,6 @@ type http2Server struct {
writerDone chan struct{} // sync point to enable testing. writerDone chan struct{} // sync point to enable testing.
remoteAddr net.Addr remoteAddr net.Addr
localAddr net.Addr localAddr net.Addr
maxStreamID uint32 // max stream ID ever seen
authInfo credentials.AuthInfo // auth info about the connection authInfo credentials.AuthInfo // auth info about the connection
inTapHandle tap.ServerInHandle inTapHandle tap.ServerInHandle
framer *framer framer *framer
@ -123,6 +122,11 @@ type http2Server struct {
bufferPool *bufferPool bufferPool *bufferPool
connectionID uint64 connectionID uint64
// maxStreamMu guards the maximum stream ID
// This lock may not be taken if mu is already held.
maxStreamMu sync.Mutex
maxStreamID uint32 // max stream ID ever seen
} }
// NewServerTransport creates a http2 transport with conn and configuration // NewServerTransport creates a http2 transport with conn and configuration
@ -334,6 +338,10 @@ func NewServerTransport(conn net.Conn, config *ServerConfig) (_ ServerTransport,
// operateHeader takes action on the decoded headers. // operateHeader takes action on the decoded headers.
func (t *http2Server) operateHeaders(frame *http2.MetaHeadersFrame, handle func(*Stream), traceCtx func(context.Context, string) context.Context) (fatal bool) { func (t *http2Server) operateHeaders(frame *http2.MetaHeadersFrame, handle func(*Stream), traceCtx func(context.Context, string) context.Context) (fatal bool) {
// Acquire max stream ID lock for entire duration
t.maxStreamMu.Lock()
defer t.maxStreamMu.Unlock()
streamID := frame.Header().StreamID streamID := frame.Header().StreamID
// frame.Truncated is set to true when framer detects that the current header // frame.Truncated is set to true when framer detects that the current header
@ -348,6 +356,15 @@ func (t *http2Server) operateHeaders(frame *http2.MetaHeadersFrame, handle func(
return false return false
} }
if streamID%2 != 1 || streamID <= t.maxStreamID {
// illegal gRPC stream id.
if logger.V(logLevel) {
logger.Errorf("transport: http2Server.HandleStreams received an illegal stream id: %v", streamID)
}
return true
}
t.maxStreamID = streamID
buf := newRecvBuffer() buf := newRecvBuffer()
s := &Stream{ s := &Stream{
id: streamID, id: streamID,
@ -355,7 +372,6 @@ func (t *http2Server) operateHeaders(frame *http2.MetaHeadersFrame, handle func(
buf: buf, buf: buf,
fc: &inFlow{limit: uint32(t.initialWindowSize)}, fc: &inFlow{limit: uint32(t.initialWindowSize)},
} }
var ( var (
// If a gRPC Response-Headers has already been received, then it means // If a gRPC Response-Headers has already been received, then it means
// that the peer is speaking gRPC and we are in gRPC mode. // that the peer is speaking gRPC and we are in gRPC mode.
@ -498,16 +514,6 @@ func (t *http2Server) operateHeaders(frame *http2.MetaHeadersFrame, handle func(
s.cancel() s.cancel()
return false return false
} }
if streamID%2 != 1 || streamID <= t.maxStreamID {
t.mu.Unlock()
// illegal gRPC stream id.
if logger.V(logLevel) {
logger.Errorf("transport: http2Server.HandleStreams received an illegal stream id: %v", streamID)
}
s.cancel()
return true
}
t.maxStreamID = streamID
if httpMethod != http.MethodPost { if httpMethod != http.MethodPost {
t.mu.Unlock() t.mu.Unlock()
if logger.V(logLevel) { if logger.V(logLevel) {
@ -1293,20 +1299,23 @@ var goAwayPing = &ping{data: [8]byte{1, 6, 1, 8, 0, 3, 3, 9}}
// Handles outgoing GoAway and returns true if loopy needs to put itself // Handles outgoing GoAway and returns true if loopy needs to put itself
// in draining mode. // in draining mode.
func (t *http2Server) outgoingGoAwayHandler(g *goAway) (bool, error) { func (t *http2Server) outgoingGoAwayHandler(g *goAway) (bool, error) {
t.maxStreamMu.Lock()
t.mu.Lock() t.mu.Lock()
if t.state == closing { // TODO(mmukhi): This seems unnecessary. if t.state == closing { // TODO(mmukhi): This seems unnecessary.
t.mu.Unlock() t.mu.Unlock()
t.maxStreamMu.Unlock()
// The transport is closing. // The transport is closing.
return false, ErrConnClosing return false, ErrConnClosing
} }
sid := t.maxStreamID
if !g.headsUp { if !g.headsUp {
// Stop accepting more streams now. // Stop accepting more streams now.
t.state = draining t.state = draining
sid := t.maxStreamID
if len(t.activeStreams) == 0 { if len(t.activeStreams) == 0 {
g.closeConn = true g.closeConn = true
} }
t.mu.Unlock() t.mu.Unlock()
t.maxStreamMu.Unlock()
if err := t.framer.fr.WriteGoAway(sid, g.code, g.debugData); err != nil { if err := t.framer.fr.WriteGoAway(sid, g.code, g.debugData); err != nil {
return false, err return false, err
} }
@ -1319,6 +1328,7 @@ func (t *http2Server) outgoingGoAwayHandler(g *goAway) (bool, error) {
return true, nil return true, nil
} }
t.mu.Unlock() t.mu.Unlock()
t.maxStreamMu.Unlock()
// For a graceful close, send out a GoAway with stream ID of MaxUInt32, // For a graceful close, send out a GoAway with stream ID of MaxUInt32,
// Follow that with a ping and wait for the ack to come back or a timer // Follow that with a ping and wait for the ack to come back or a timer
// to expire. During this time accept new streams since they might have // to expire. During this time accept new streams since they might have

View File

@ -37,7 +37,7 @@ var (
httpProxyFromEnvironment = http.ProxyFromEnvironment httpProxyFromEnvironment = http.ProxyFromEnvironment
) )
func mapAddress(ctx context.Context, address string) (*url.URL, error) { func mapAddress(address string) (*url.URL, error) {
req := &http.Request{ req := &http.Request{
URL: &url.URL{ URL: &url.URL{
Scheme: "https", Scheme: "https",
@ -114,7 +114,7 @@ func doHTTPConnectHandshake(ctx context.Context, conn net.Conn, backendAddr stri
// connection. // connection.
func proxyDial(ctx context.Context, addr string, grpcUA string) (conn net.Conn, err error) { func proxyDial(ctx context.Context, addr string, grpcUA string) (conn net.Conn, err error) {
newAddr := addr newAddr := addr
proxyURL, err := mapAddress(ctx, addr) proxyURL, err := mapAddress(addr)
if err != nil { if err != nil {
return nil, err return nil, err
} }

View File

@ -125,7 +125,7 @@ func (b *pickfirstBalancer) Close() {
} }
func (b *pickfirstBalancer) ExitIdle() { func (b *pickfirstBalancer) ExitIdle() {
if b.state == connectivity.Idle { if b.sc != nil && b.state == connectivity.Idle {
b.sc.Connect() b.sc.Connect()
} }
} }

View File

@ -102,8 +102,8 @@ done
# The go_package option in grpc/lookup/v1/rls.proto doesn't match the # The go_package option in grpc/lookup/v1/rls.proto doesn't match the
# current location. Move it into the right place. # current location. Move it into the right place.
mkdir -p ${WORKDIR}/out/google.golang.org/grpc/balancer/rls/internal/proto/grpc_lookup_v1 mkdir -p ${WORKDIR}/out/google.golang.org/grpc/internal/proto/grpc_lookup_v1
mv ${WORKDIR}/out/google.golang.org/grpc/lookup/grpc_lookup_v1/* ${WORKDIR}/out/google.golang.org/grpc/balancer/rls/internal/proto/grpc_lookup_v1 mv ${WORKDIR}/out/google.golang.org/grpc/lookup/grpc_lookup_v1/* ${WORKDIR}/out/google.golang.org/grpc/internal/proto/grpc_lookup_v1
# grpc_testingv3/testv3.pb.go is not re-generated because it was # grpc_testingv3/testv3.pb.go is not re-generated because it was
# intentionally generated by an older version of protoc-gen-go. # intentionally generated by an older version of protoc-gen-go.

View File

@ -712,13 +712,11 @@ func recvAndDecompress(p *parser, s *transport.Stream, dc Decompressor, maxRecei
if err != nil { if err != nil {
return nil, status.Errorf(codes.Internal, "grpc: failed to decompress the received message %v", err) return nil, status.Errorf(codes.Internal, "grpc: failed to decompress the received message %v", err)
} }
} else {
size = len(d)
}
if size > maxReceiveMessageSize { if size > maxReceiveMessageSize {
// TODO: Revisit the error code. Currently keep it consistent with java // TODO: Revisit the error code. Currently keep it consistent with java
// implementation. // implementation.
return nil, status.Errorf(codes.ResourceExhausted, "grpc: received message larger than max (%d vs. %d)", size, maxReceiveMessageSize) return nil, status.Errorf(codes.ResourceExhausted, "grpc: received message after decompression larger than max (%d vs. %d)", size, maxReceiveMessageSize)
}
} }
return d, nil return d, nil
} }

View File

@ -29,6 +29,7 @@ package status
import ( import (
"context" "context"
"errors"
"fmt" "fmt"
spb "google.golang.org/genproto/googleapis/rpc/status" spb "google.golang.org/genproto/googleapis/rpc/status"
@ -117,18 +118,18 @@ func Code(err error) codes.Code {
return codes.Unknown return codes.Unknown
} }
// FromContextError converts a context error into a Status. It returns a // FromContextError converts a context error or wrapped context error into a
// Status with codes.OK if err is nil, or a Status with codes.Unknown if err is // Status. It returns a Status with codes.OK if err is nil, or a Status with
// non-nil and not a context error. // codes.Unknown if err is non-nil and not a context error.
func FromContextError(err error) *Status { func FromContextError(err error) *Status {
switch err { if err == nil {
case nil:
return nil return nil
case context.DeadlineExceeded: }
if errors.Is(err, context.DeadlineExceeded) {
return New(codes.DeadlineExceeded, err.Error()) return New(codes.DeadlineExceeded, err.Error())
case context.Canceled: }
if errors.Is(err, context.Canceled) {
return New(codes.Canceled, err.Error()) return New(codes.Canceled, err.Error())
default: }
return New(codes.Unknown, err.Error()) return New(codes.Unknown, err.Error())
} }
}

View File

@ -19,4 +19,4 @@
package grpc package grpc
// Version is the current grpc version. // Version is the current grpc version.
const Version = "1.42.0" const Version = "1.43.0"

4
vendor/modules.txt vendored
View File

@ -588,7 +588,7 @@ google.golang.org/appengine/urlfetch
google.golang.org/genproto/googleapis/api/httpbody google.golang.org/genproto/googleapis/api/httpbody
google.golang.org/genproto/googleapis/rpc/status google.golang.org/genproto/googleapis/rpc/status
google.golang.org/genproto/protobuf/field_mask google.golang.org/genproto/protobuf/field_mask
# google.golang.org/grpc v1.42.0 # google.golang.org/grpc v1.43.0
## explicit; go 1.14 ## explicit; go 1.14
google.golang.org/grpc google.golang.org/grpc
google.golang.org/grpc/attributes google.golang.org/grpc/attributes
@ -601,6 +601,7 @@ google.golang.org/grpc/binarylog/grpc_binarylog_v1
google.golang.org/grpc/codes google.golang.org/grpc/codes
google.golang.org/grpc/connectivity google.golang.org/grpc/connectivity
google.golang.org/grpc/credentials google.golang.org/grpc/credentials
google.golang.org/grpc/credentials/insecure
google.golang.org/grpc/encoding google.golang.org/grpc/encoding
google.golang.org/grpc/encoding/gzip google.golang.org/grpc/encoding/gzip
google.golang.org/grpc/encoding/proto google.golang.org/grpc/encoding/proto
@ -629,7 +630,6 @@ google.golang.org/grpc/internal/status
google.golang.org/grpc/internal/syscall google.golang.org/grpc/internal/syscall
google.golang.org/grpc/internal/transport google.golang.org/grpc/internal/transport
google.golang.org/grpc/internal/transport/networktype google.golang.org/grpc/internal/transport/networktype
google.golang.org/grpc/internal/xds/env
google.golang.org/grpc/keepalive google.golang.org/grpc/keepalive
google.golang.org/grpc/metadata google.golang.org/grpc/metadata
google.golang.org/grpc/peer google.golang.org/grpc/peer