mirror of
https://github.com/ceph/ceph-csi.git
synced 2025-06-13 18:43:34 +00:00
rebase: bump google.golang.org/grpc from 1.59.0 to 1.60.1
Bumps [google.golang.org/grpc](https://github.com/grpc/grpc-go) from 1.59.0 to 1.60.1. - [Release notes](https://github.com/grpc/grpc-go/releases) - [Commits](https://github.com/grpc/grpc-go/compare/v1.59.0...v1.60.1) --- updated-dependencies: - dependency-name: google.golang.org/grpc dependency-type: direct:production update-type: version-update:semver-minor ... Signed-off-by: dependabot[bot] <support@github.com>
This commit is contained in:
committed by
mergify[bot]
parent
c807059618
commit
0ec64b7552
500
vendor/google.golang.org/grpc/clientconn.go
generated
vendored
500
vendor/google.golang.org/grpc/clientconn.go
generated
vendored
@ -33,9 +33,7 @@ import (
|
||||
"google.golang.org/grpc/balancer/base"
|
||||
"google.golang.org/grpc/codes"
|
||||
"google.golang.org/grpc/connectivity"
|
||||
"google.golang.org/grpc/credentials"
|
||||
"google.golang.org/grpc/internal"
|
||||
"google.golang.org/grpc/internal/backoff"
|
||||
"google.golang.org/grpc/internal/channelz"
|
||||
"google.golang.org/grpc/internal/grpcsync"
|
||||
"google.golang.org/grpc/internal/idle"
|
||||
@ -48,9 +46,9 @@ import (
|
||||
"google.golang.org/grpc/status"
|
||||
|
||||
_ "google.golang.org/grpc/balancer/roundrobin" // To register roundrobin.
|
||||
_ "google.golang.org/grpc/internal/resolver/dns" // To register dns resolver.
|
||||
_ "google.golang.org/grpc/internal/resolver/passthrough" // To register passthrough resolver.
|
||||
_ "google.golang.org/grpc/internal/resolver/unix" // To register unix resolver.
|
||||
_ "google.golang.org/grpc/resolver/dns" // To register dns resolver.
|
||||
)
|
||||
|
||||
const (
|
||||
@ -119,23 +117,8 @@ func (dcs *defaultConfigSelector) SelectConfig(rpcInfo iresolver.RPCInfo) (*ires
|
||||
}, nil
|
||||
}
|
||||
|
||||
// DialContext creates a client connection to the given target. By default, it's
|
||||
// a non-blocking dial (the function won't wait for connections to be
|
||||
// established, and connecting happens in the background). To make it a blocking
|
||||
// dial, use WithBlock() dial option.
|
||||
//
|
||||
// In the non-blocking case, the ctx does not act against the connection. It
|
||||
// only controls the setup steps.
|
||||
//
|
||||
// In the blocking case, ctx can be used to cancel or expire the pending
|
||||
// connection. Once this function returns, the cancellation and expiration of
|
||||
// ctx will be noop. Users should call ClientConn.Close to terminate all the
|
||||
// pending operations after this function returns.
|
||||
//
|
||||
// The target name syntax is defined in
|
||||
// https://github.com/grpc/grpc/blob/master/doc/naming.md.
|
||||
// e.g. to use dns resolver, a "dns:///" prefix should be applied to the target.
|
||||
func DialContext(ctx context.Context, target string, opts ...DialOption) (conn *ClientConn, err error) {
|
||||
// newClient returns a new client in idle mode.
|
||||
func newClient(target string, opts ...DialOption) (conn *ClientConn, err error) {
|
||||
cc := &ClientConn{
|
||||
target: target,
|
||||
conns: make(map[*addrConn]struct{}),
|
||||
@ -143,23 +126,11 @@ func DialContext(ctx context.Context, target string, opts ...DialOption) (conn *
|
||||
czData: new(channelzData),
|
||||
}
|
||||
|
||||
// We start the channel off in idle mode, but kick it out of idle at the end
|
||||
// of this method, instead of waiting for the first RPC. Other gRPC
|
||||
// implementations do wait for the first RPC to kick the channel out of
|
||||
// idle. But doing so would be a major behavior change for our users who are
|
||||
// used to seeing the channel active after Dial.
|
||||
//
|
||||
// Taking this approach of kicking it out of idle at the end of this method
|
||||
// allows us to share the code between channel creation and exiting idle
|
||||
// mode. This will also make it easy for us to switch to starting the
|
||||
// channel off in idle, if at all we ever get to do that.
|
||||
cc.idlenessState = ccIdlenessStateIdle
|
||||
|
||||
cc.retryThrottler.Store((*retryThrottler)(nil))
|
||||
cc.safeConfigSelector.UpdateConfigSelector(&defaultConfigSelector{nil})
|
||||
cc.ctx, cc.cancel = context.WithCancel(context.Background())
|
||||
cc.exitIdleCond = sync.NewCond(&cc.mu)
|
||||
|
||||
// Apply dial options.
|
||||
disableGlobalOpts := false
|
||||
for _, opt := range opts {
|
||||
if _, ok := opt.(*disableGlobalDialOptions); ok {
|
||||
@ -177,21 +148,9 @@ func DialContext(ctx context.Context, target string, opts ...DialOption) (conn *
|
||||
for _, opt := range opts {
|
||||
opt.apply(&cc.dopts)
|
||||
}
|
||||
|
||||
chainUnaryClientInterceptors(cc)
|
||||
chainStreamClientInterceptors(cc)
|
||||
|
||||
defer func() {
|
||||
if err != nil {
|
||||
cc.Close()
|
||||
}
|
||||
}()
|
||||
|
||||
// Register ClientConn with channelz.
|
||||
cc.channelzRegistration(target)
|
||||
|
||||
cc.csMgr = newConnectivityStateManager(cc.ctx, cc.channelzID)
|
||||
|
||||
if err := cc.validateTransportCredentials(); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
@ -205,10 +164,80 @@ func DialContext(ctx context.Context, target string, opts ...DialOption) (conn *
|
||||
}
|
||||
cc.mkp = cc.dopts.copts.KeepaliveParams
|
||||
|
||||
if cc.dopts.copts.UserAgent != "" {
|
||||
cc.dopts.copts.UserAgent += " " + grpcUA
|
||||
} else {
|
||||
cc.dopts.copts.UserAgent = grpcUA
|
||||
// Register ClientConn with channelz.
|
||||
cc.channelzRegistration(target)
|
||||
|
||||
// TODO: Ideally it should be impossible to error from this function after
|
||||
// channelz registration. This will require removing some channelz logs
|
||||
// from the following functions that can error. Errors can be returned to
|
||||
// the user, and successful logs can be emitted here, after the checks have
|
||||
// passed and channelz is subsequently registered.
|
||||
|
||||
// Determine the resolver to use.
|
||||
if err := cc.parseTargetAndFindResolver(); err != nil {
|
||||
channelz.RemoveEntry(cc.channelzID)
|
||||
return nil, err
|
||||
}
|
||||
if err = cc.determineAuthority(); err != nil {
|
||||
channelz.RemoveEntry(cc.channelzID)
|
||||
return nil, err
|
||||
}
|
||||
|
||||
cc.csMgr = newConnectivityStateManager(cc.ctx, cc.channelzID)
|
||||
cc.pickerWrapper = newPickerWrapper(cc.dopts.copts.StatsHandlers)
|
||||
|
||||
cc.initIdleStateLocked() // Safe to call without the lock, since nothing else has a reference to cc.
|
||||
cc.idlenessMgr = idle.NewManager((*idler)(cc), cc.dopts.idleTimeout)
|
||||
return cc, nil
|
||||
}
|
||||
|
||||
// DialContext creates a client connection to the given target. By default, it's
|
||||
// a non-blocking dial (the function won't wait for connections to be
|
||||
// established, and connecting happens in the background). To make it a blocking
|
||||
// dial, use WithBlock() dial option.
|
||||
//
|
||||
// In the non-blocking case, the ctx does not act against the connection. It
|
||||
// only controls the setup steps.
|
||||
//
|
||||
// In the blocking case, ctx can be used to cancel or expire the pending
|
||||
// connection. Once this function returns, the cancellation and expiration of
|
||||
// ctx will be noop. Users should call ClientConn.Close to terminate all the
|
||||
// pending operations after this function returns.
|
||||
//
|
||||
// The target name syntax is defined in
|
||||
// https://github.com/grpc/grpc/blob/master/doc/naming.md.
|
||||
// e.g. to use dns resolver, a "dns:///" prefix should be applied to the target.
|
||||
func DialContext(ctx context.Context, target string, opts ...DialOption) (conn *ClientConn, err error) {
|
||||
cc, err := newClient(target, opts...)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// We start the channel off in idle mode, but kick it out of idle now,
|
||||
// instead of waiting for the first RPC. Other gRPC implementations do wait
|
||||
// for the first RPC to kick the channel out of idle. But doing so would be
|
||||
// a major behavior change for our users who are used to seeing the channel
|
||||
// active after Dial.
|
||||
//
|
||||
// Taking this approach of kicking it out of idle at the end of this method
|
||||
// allows us to share the code between channel creation and exiting idle
|
||||
// mode. This will also make it easy for us to switch to starting the
|
||||
// channel off in idle, i.e. by making newClient exported.
|
||||
|
||||
defer func() {
|
||||
if err != nil {
|
||||
cc.Close()
|
||||
}
|
||||
}()
|
||||
|
||||
// This creates the name resolver, load balancer, etc.
|
||||
if err := cc.idlenessMgr.ExitIdleMode(); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// Return now for non-blocking dials.
|
||||
if !cc.dopts.block {
|
||||
return cc, nil
|
||||
}
|
||||
|
||||
if cc.dopts.timeout > 0 {
|
||||
@ -231,49 +260,6 @@ func DialContext(ctx context.Context, target string, opts ...DialOption) (conn *
|
||||
}
|
||||
}()
|
||||
|
||||
if cc.dopts.bs == nil {
|
||||
cc.dopts.bs = backoff.DefaultExponential
|
||||
}
|
||||
|
||||
// Determine the resolver to use.
|
||||
if err := cc.parseTargetAndFindResolver(); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if err = cc.determineAuthority(); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if cc.dopts.scChan != nil {
|
||||
// Blocking wait for the initial service config.
|
||||
select {
|
||||
case sc, ok := <-cc.dopts.scChan:
|
||||
if ok {
|
||||
cc.sc = &sc
|
||||
cc.safeConfigSelector.UpdateConfigSelector(&defaultConfigSelector{&sc})
|
||||
}
|
||||
case <-ctx.Done():
|
||||
return nil, ctx.Err()
|
||||
}
|
||||
}
|
||||
if cc.dopts.scChan != nil {
|
||||
go cc.scWatcher()
|
||||
}
|
||||
|
||||
// This creates the name resolver, load balancer, blocking picker etc.
|
||||
if err := cc.exitIdleMode(); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// Configure idleness support with configured idle timeout or default idle
|
||||
// timeout duration. Idleness can be explicitly disabled by the user, by
|
||||
// setting the dial option to 0.
|
||||
cc.idlenessMgr = idle.NewManager(idle.ManagerOptions{Enforcer: (*idler)(cc), Timeout: cc.dopts.idleTimeout, Logger: logger})
|
||||
|
||||
// Return early for non-blocking dials.
|
||||
if !cc.dopts.block {
|
||||
return cc, nil
|
||||
}
|
||||
|
||||
// A blocking dial blocks until the clientConn is ready.
|
||||
for {
|
||||
s := cc.GetState()
|
||||
@ -320,8 +306,8 @@ func (cc *ClientConn) addTraceEvent(msg string) {
|
||||
|
||||
type idler ClientConn
|
||||
|
||||
func (i *idler) EnterIdleMode() error {
|
||||
return (*ClientConn)(i).enterIdleMode()
|
||||
func (i *idler) EnterIdleMode() {
|
||||
(*ClientConn)(i).enterIdleMode()
|
||||
}
|
||||
|
||||
func (i *idler) ExitIdleMode() error {
|
||||
@ -329,117 +315,71 @@ func (i *idler) ExitIdleMode() error {
|
||||
}
|
||||
|
||||
// exitIdleMode moves the channel out of idle mode by recreating the name
|
||||
// resolver and load balancer.
|
||||
func (cc *ClientConn) exitIdleMode() error {
|
||||
// resolver and load balancer. This should never be called directly; use
|
||||
// cc.idlenessMgr.ExitIdleMode instead.
|
||||
func (cc *ClientConn) exitIdleMode() (err error) {
|
||||
cc.mu.Lock()
|
||||
if cc.conns == nil {
|
||||
cc.mu.Unlock()
|
||||
return errConnClosing
|
||||
}
|
||||
if cc.idlenessState != ccIdlenessStateIdle {
|
||||
channelz.Infof(logger, cc.channelzID, "ClientConn asked to exit idle mode, current mode is %v", cc.idlenessState)
|
||||
cc.mu.Unlock()
|
||||
return nil
|
||||
}
|
||||
|
||||
defer func() {
|
||||
// When Close() and exitIdleMode() race against each other, one of the
|
||||
// following two can happen:
|
||||
// - Close() wins the race and runs first. exitIdleMode() runs after, and
|
||||
// sees that the ClientConn is already closed and hence returns early.
|
||||
// - exitIdleMode() wins the race and runs first and recreates the balancer
|
||||
// and releases the lock before recreating the resolver. If Close() runs
|
||||
// in this window, it will wait for exitIdleMode to complete.
|
||||
//
|
||||
// We achieve this synchronization using the below condition variable.
|
||||
cc.mu.Lock()
|
||||
cc.idlenessState = ccIdlenessStateActive
|
||||
cc.exitIdleCond.Signal()
|
||||
cc.mu.Unlock()
|
||||
}()
|
||||
|
||||
cc.idlenessState = ccIdlenessStateExitingIdle
|
||||
exitedIdle := false
|
||||
if cc.blockingpicker == nil {
|
||||
cc.blockingpicker = newPickerWrapper(cc.dopts.copts.StatsHandlers)
|
||||
} else {
|
||||
cc.blockingpicker.exitIdleMode()
|
||||
exitedIdle = true
|
||||
}
|
||||
|
||||
var credsClone credentials.TransportCredentials
|
||||
if creds := cc.dopts.copts.TransportCredentials; creds != nil {
|
||||
credsClone = creds.Clone()
|
||||
}
|
||||
if cc.balancerWrapper == nil {
|
||||
cc.balancerWrapper = newCCBalancerWrapper(cc, balancer.BuildOptions{
|
||||
DialCreds: credsClone,
|
||||
CredsBundle: cc.dopts.copts.CredsBundle,
|
||||
Dialer: cc.dopts.copts.Dialer,
|
||||
Authority: cc.authority,
|
||||
CustomUserAgent: cc.dopts.copts.UserAgent,
|
||||
ChannelzParentID: cc.channelzID,
|
||||
Target: cc.parsedTarget,
|
||||
})
|
||||
} else {
|
||||
cc.balancerWrapper.exitIdleMode()
|
||||
}
|
||||
cc.firstResolveEvent = grpcsync.NewEvent()
|
||||
cc.mu.Unlock()
|
||||
|
||||
// This needs to be called without cc.mu because this builds a new resolver
|
||||
// which might update state or report error inline which needs to be handled
|
||||
// by cc.updateResolverState() which also grabs cc.mu.
|
||||
if err := cc.initResolverWrapper(credsClone); err != nil {
|
||||
// which might update state or report error inline, which would then need to
|
||||
// acquire cc.mu.
|
||||
if err := cc.resolverWrapper.start(); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if exitedIdle {
|
||||
cc.addTraceEvent("exiting idle mode")
|
||||
}
|
||||
cc.addTraceEvent("exiting idle mode")
|
||||
return nil
|
||||
}
|
||||
|
||||
// enterIdleMode puts the channel in idle mode, and as part of it shuts down the
|
||||
// name resolver, load balancer and any subchannels.
|
||||
func (cc *ClientConn) enterIdleMode() error {
|
||||
cc.mu.Lock()
|
||||
defer cc.mu.Unlock()
|
||||
|
||||
if cc.conns == nil {
|
||||
return ErrClientConnClosing
|
||||
}
|
||||
if cc.idlenessState != ccIdlenessStateActive {
|
||||
channelz.Warningf(logger, cc.channelzID, "ClientConn asked to enter idle mode, current mode is %v", cc.idlenessState)
|
||||
return nil
|
||||
}
|
||||
|
||||
// initIdleStateLocked initializes common state to how it should be while idle.
|
||||
func (cc *ClientConn) initIdleStateLocked() {
|
||||
cc.resolverWrapper = newCCResolverWrapper(cc)
|
||||
cc.balancerWrapper = newCCBalancerWrapper(cc)
|
||||
cc.firstResolveEvent = grpcsync.NewEvent()
|
||||
// cc.conns == nil is a proxy for the ClientConn being closed. So, instead
|
||||
// of setting it to nil here, we recreate the map. This also means that we
|
||||
// don't have to do this when exiting idle mode.
|
||||
conns := cc.conns
|
||||
cc.conns = make(map[*addrConn]struct{})
|
||||
}
|
||||
|
||||
// TODO: Currently, we close the resolver wrapper upon entering idle mode
|
||||
// and create a new one upon exiting idle mode. This means that the
|
||||
// `cc.resolverWrapper` field would be overwritten everytime we exit idle
|
||||
// mode. While this means that we need to hold `cc.mu` when accessing
|
||||
// `cc.resolverWrapper`, it makes the code simpler in the wrapper. We should
|
||||
// try to do the same for the balancer and picker wrappers too.
|
||||
cc.resolverWrapper.close()
|
||||
cc.blockingpicker.enterIdleMode()
|
||||
cc.balancerWrapper.enterIdleMode()
|
||||
// enterIdleMode puts the channel in idle mode, and as part of it shuts down the
|
||||
// name resolver, load balancer, and any subchannels. This should never be
|
||||
// called directly; use cc.idlenessMgr.EnterIdleMode instead.
|
||||
func (cc *ClientConn) enterIdleMode() {
|
||||
cc.mu.Lock()
|
||||
|
||||
if cc.conns == nil {
|
||||
cc.mu.Unlock()
|
||||
return
|
||||
}
|
||||
|
||||
conns := cc.conns
|
||||
|
||||
rWrapper := cc.resolverWrapper
|
||||
rWrapper.close()
|
||||
cc.pickerWrapper.reset()
|
||||
bWrapper := cc.balancerWrapper
|
||||
bWrapper.close()
|
||||
cc.csMgr.updateState(connectivity.Idle)
|
||||
cc.idlenessState = ccIdlenessStateIdle
|
||||
cc.addTraceEvent("entering idle mode")
|
||||
|
||||
go func() {
|
||||
for ac := range conns {
|
||||
ac.tearDown(errConnIdling)
|
||||
}
|
||||
}()
|
||||
cc.initIdleStateLocked()
|
||||
|
||||
return nil
|
||||
cc.mu.Unlock()
|
||||
|
||||
// Block until the name resolver and LB policy are closed.
|
||||
<-rWrapper.serializer.Done()
|
||||
<-bWrapper.serializer.Done()
|
||||
|
||||
// Close all subchannels after the LB policy is closed.
|
||||
for ac := range conns {
|
||||
ac.tearDown(errConnIdling)
|
||||
}
|
||||
}
|
||||
|
||||
// validateTransportCredentials performs a series of checks on the configured
|
||||
@ -649,66 +589,35 @@ type ClientConn struct {
|
||||
dopts dialOptions // Default and user specified dial options.
|
||||
channelzID *channelz.Identifier // Channelz identifier for the channel.
|
||||
resolverBuilder resolver.Builder // See parseTargetAndFindResolver().
|
||||
balancerWrapper *ccBalancerWrapper // Uses gracefulswitch.balancer underneath.
|
||||
idlenessMgr idle.Manager
|
||||
idlenessMgr *idle.Manager
|
||||
|
||||
// The following provide their own synchronization, and therefore don't
|
||||
// require cc.mu to be held to access them.
|
||||
csMgr *connectivityStateManager
|
||||
blockingpicker *pickerWrapper
|
||||
pickerWrapper *pickerWrapper
|
||||
safeConfigSelector iresolver.SafeConfigSelector
|
||||
czData *channelzData
|
||||
retryThrottler atomic.Value // Updated from service config.
|
||||
|
||||
// firstResolveEvent is used to track whether the name resolver sent us at
|
||||
// least one update. RPCs block on this event.
|
||||
firstResolveEvent *grpcsync.Event
|
||||
|
||||
// mu protects the following fields.
|
||||
// TODO: split mu so the same mutex isn't used for everything.
|
||||
mu sync.RWMutex
|
||||
resolverWrapper *ccResolverWrapper // Initialized in Dial; cleared in Close.
|
||||
resolverWrapper *ccResolverWrapper // Always recreated whenever entering idle to simplify Close.
|
||||
balancerWrapper *ccBalancerWrapper // Always recreated whenever entering idle to simplify Close.
|
||||
sc *ServiceConfig // Latest service config received from the resolver.
|
||||
conns map[*addrConn]struct{} // Set to nil on close.
|
||||
mkp keepalive.ClientParameters // May be updated upon receipt of a GoAway.
|
||||
idlenessState ccIdlenessState // Tracks idleness state of the channel.
|
||||
exitIdleCond *sync.Cond // Signalled when channel exits idle.
|
||||
// firstResolveEvent is used to track whether the name resolver sent us at
|
||||
// least one update. RPCs block on this event. May be accessed without mu
|
||||
// if we know we cannot be asked to enter idle mode while accessing it (e.g.
|
||||
// when the idle manager has already been closed, or if we are already
|
||||
// entering idle mode).
|
||||
firstResolveEvent *grpcsync.Event
|
||||
|
||||
lceMu sync.Mutex // protects lastConnectionError
|
||||
lastConnectionError error
|
||||
}
|
||||
|
||||
// ccIdlenessState tracks the idleness state of the channel.
|
||||
//
|
||||
// Channels start off in `active` and move to `idle` after a period of
|
||||
// inactivity. When moving back to `active` upon an incoming RPC, they
|
||||
// transition through `exiting_idle`. This state is useful for synchronization
|
||||
// with Close().
|
||||
//
|
||||
// This state tracking is mostly for self-protection. The idlenessManager is
|
||||
// expected to keep track of the state as well, and is expected not to call into
|
||||
// the ClientConn unnecessarily.
|
||||
type ccIdlenessState int8
|
||||
|
||||
const (
|
||||
ccIdlenessStateActive ccIdlenessState = iota
|
||||
ccIdlenessStateIdle
|
||||
ccIdlenessStateExitingIdle
|
||||
)
|
||||
|
||||
func (s ccIdlenessState) String() string {
|
||||
switch s {
|
||||
case ccIdlenessStateActive:
|
||||
return "active"
|
||||
case ccIdlenessStateIdle:
|
||||
return "idle"
|
||||
case ccIdlenessStateExitingIdle:
|
||||
return "exitingIdle"
|
||||
default:
|
||||
return "unknown"
|
||||
}
|
||||
}
|
||||
|
||||
// WaitForStateChange waits until the connectivity.State of ClientConn changes from sourceState or
|
||||
// ctx expires. A true value is returned in former case and false in latter.
|
||||
//
|
||||
@ -748,29 +657,15 @@ func (cc *ClientConn) GetState() connectivity.State {
|
||||
// Notice: This API is EXPERIMENTAL and may be changed or removed in a later
|
||||
// release.
|
||||
func (cc *ClientConn) Connect() {
|
||||
cc.exitIdleMode()
|
||||
if err := cc.idlenessMgr.ExitIdleMode(); err != nil {
|
||||
cc.addTraceEvent(err.Error())
|
||||
return
|
||||
}
|
||||
// If the ClientConn was not in idle mode, we need to call ExitIdle on the
|
||||
// LB policy so that connections can be created.
|
||||
cc.balancerWrapper.exitIdleMode()
|
||||
}
|
||||
|
||||
func (cc *ClientConn) scWatcher() {
|
||||
for {
|
||||
select {
|
||||
case sc, ok := <-cc.dopts.scChan:
|
||||
if !ok {
|
||||
return
|
||||
}
|
||||
cc.mu.Lock()
|
||||
// TODO: load balance policy runtime change is ignored.
|
||||
// We may revisit this decision in the future.
|
||||
cc.sc = &sc
|
||||
cc.safeConfigSelector.UpdateConfigSelector(&defaultConfigSelector{&sc})
|
||||
cc.mu.Unlock()
|
||||
case <-cc.ctx.Done():
|
||||
return
|
||||
}
|
||||
}
|
||||
cc.mu.Lock()
|
||||
cc.balancerWrapper.exitIdle()
|
||||
cc.mu.Unlock()
|
||||
}
|
||||
|
||||
// waitForResolvedAddrs blocks until the resolver has provided addresses or the
|
||||
@ -804,11 +699,11 @@ func init() {
|
||||
internal.SubscribeToConnectivityStateChanges = func(cc *ClientConn, s grpcsync.Subscriber) func() {
|
||||
return cc.csMgr.pubSub.Subscribe(s)
|
||||
}
|
||||
internal.EnterIdleModeForTesting = func(cc *ClientConn) error {
|
||||
return cc.enterIdleMode()
|
||||
internal.EnterIdleModeForTesting = func(cc *ClientConn) {
|
||||
cc.idlenessMgr.EnterIdleModeForTesting()
|
||||
}
|
||||
internal.ExitIdleModeForTesting = func(cc *ClientConn) error {
|
||||
return cc.exitIdleMode()
|
||||
return cc.idlenessMgr.ExitIdleMode()
|
||||
}
|
||||
}
|
||||
|
||||
@ -824,9 +719,8 @@ func (cc *ClientConn) maybeApplyDefaultServiceConfig(addrs []resolver.Address) {
|
||||
}
|
||||
}
|
||||
|
||||
func (cc *ClientConn) updateResolverState(s resolver.State, err error) error {
|
||||
func (cc *ClientConn) updateResolverStateAndUnlock(s resolver.State, err error) error {
|
||||
defer cc.firstResolveEvent.Fire()
|
||||
cc.mu.Lock()
|
||||
// Check if the ClientConn is already closed. Some fields (e.g.
|
||||
// balancerWrapper) are set to nil when closing the ClientConn, and could
|
||||
// cause nil pointer panic if we don't have this check.
|
||||
@ -872,7 +766,7 @@ func (cc *ClientConn) updateResolverState(s resolver.State, err error) error {
|
||||
if cc.sc == nil {
|
||||
// Apply the failing LB only if we haven't received valid service config
|
||||
// from the name resolver in the past.
|
||||
cc.applyFailingLB(s.ServiceConfig)
|
||||
cc.applyFailingLBLocked(s.ServiceConfig)
|
||||
cc.mu.Unlock()
|
||||
return ret
|
||||
}
|
||||
@ -894,15 +788,13 @@ func (cc *ClientConn) updateResolverState(s resolver.State, err error) error {
|
||||
return ret
|
||||
}
|
||||
|
||||
// applyFailingLB is akin to configuring an LB policy on the channel which
|
||||
// applyFailingLBLocked is akin to configuring an LB policy on the channel which
|
||||
// always fails RPCs. Here, an actual LB policy is not configured, but an always
|
||||
// erroring picker is configured, which returns errors with information about
|
||||
// what was invalid in the received service config. A config selector with no
|
||||
// service config is configured, and the connectivity state of the channel is
|
||||
// set to TransientFailure.
|
||||
//
|
||||
// Caller must hold cc.mu.
|
||||
func (cc *ClientConn) applyFailingLB(sc *serviceconfig.ParseResult) {
|
||||
func (cc *ClientConn) applyFailingLBLocked(sc *serviceconfig.ParseResult) {
|
||||
var err error
|
||||
if sc.Err != nil {
|
||||
err = status.Errorf(codes.Unavailable, "error parsing service config: %v", sc.Err)
|
||||
@ -910,14 +802,10 @@ func (cc *ClientConn) applyFailingLB(sc *serviceconfig.ParseResult) {
|
||||
err = status.Errorf(codes.Unavailable, "illegal service config type: %T", sc.Config)
|
||||
}
|
||||
cc.safeConfigSelector.UpdateConfigSelector(&defaultConfigSelector{nil})
|
||||
cc.blockingpicker.updatePicker(base.NewErrPicker(err))
|
||||
cc.pickerWrapper.updatePicker(base.NewErrPicker(err))
|
||||
cc.csMgr.updateState(connectivity.TransientFailure)
|
||||
}
|
||||
|
||||
func (cc *ClientConn) handleSubConnStateChange(sc balancer.SubConn, s connectivity.State, err error) {
|
||||
cc.balancerWrapper.updateSubConnState(sc, s, err)
|
||||
}
|
||||
|
||||
// Makes a copy of the input addresses slice and clears out the balancer
|
||||
// attributes field. Addresses are passed during subconn creation and address
|
||||
// update operations. In both cases, we will clear the balancer attributes by
|
||||
@ -932,10 +820,14 @@ func copyAddressesWithoutBalancerAttributes(in []resolver.Address) []resolver.Ad
|
||||
return out
|
||||
}
|
||||
|
||||
// newAddrConn creates an addrConn for addrs and adds it to cc.conns.
|
||||
// newAddrConnLocked creates an addrConn for addrs and adds it to cc.conns.
|
||||
//
|
||||
// Caller needs to make sure len(addrs) > 0.
|
||||
func (cc *ClientConn) newAddrConn(addrs []resolver.Address, opts balancer.NewSubConnOptions) (*addrConn, error) {
|
||||
func (cc *ClientConn) newAddrConnLocked(addrs []resolver.Address, opts balancer.NewSubConnOptions) (*addrConn, error) {
|
||||
if cc.conns == nil {
|
||||
return nil, ErrClientConnClosing
|
||||
}
|
||||
|
||||
ac := &addrConn{
|
||||
state: connectivity.Idle,
|
||||
cc: cc,
|
||||
@ -947,12 +839,6 @@ func (cc *ClientConn) newAddrConn(addrs []resolver.Address, opts balancer.NewSub
|
||||
stateChan: make(chan struct{}),
|
||||
}
|
||||
ac.ctx, ac.cancel = context.WithCancel(cc.ctx)
|
||||
// Track ac in cc. This needs to be done before any getTransport(...) is called.
|
||||
cc.mu.Lock()
|
||||
defer cc.mu.Unlock()
|
||||
if cc.conns == nil {
|
||||
return nil, ErrClientConnClosing
|
||||
}
|
||||
|
||||
var err error
|
||||
ac.channelzID, err = channelz.RegisterSubChannel(ac, cc.channelzID, "")
|
||||
@ -968,6 +854,7 @@ func (cc *ClientConn) newAddrConn(addrs []resolver.Address, opts balancer.NewSub
|
||||
},
|
||||
})
|
||||
|
||||
// Track ac in cc. This needs to be done before any getTransport(...) is called.
|
||||
cc.conns[ac] = struct{}{}
|
||||
return ac, nil
|
||||
}
|
||||
@ -1174,7 +1061,7 @@ func (cc *ClientConn) healthCheckConfig() *healthCheckConfig {
|
||||
}
|
||||
|
||||
func (cc *ClientConn) getTransport(ctx context.Context, failfast bool, method string) (transport.ClientTransport, balancer.PickResult, error) {
|
||||
return cc.blockingpicker.pick(ctx, failfast, balancer.PickInfo{
|
||||
return cc.pickerWrapper.pick(ctx, failfast, balancer.PickInfo{
|
||||
Ctx: ctx,
|
||||
FullMethodName: method,
|
||||
})
|
||||
@ -1216,12 +1103,12 @@ func (cc *ClientConn) applyServiceConfigAndBalancer(sc *ServiceConfig, configSel
|
||||
|
||||
func (cc *ClientConn) resolveNow(o resolver.ResolveNowOptions) {
|
||||
cc.mu.RLock()
|
||||
r := cc.resolverWrapper
|
||||
cc.resolverWrapper.resolveNow(o)
|
||||
cc.mu.RUnlock()
|
||||
if r == nil {
|
||||
return
|
||||
}
|
||||
go r.resolveNow(o)
|
||||
}
|
||||
|
||||
func (cc *ClientConn) resolveNowLocked(o resolver.ResolveNowOptions) {
|
||||
cc.resolverWrapper.resolveNow(o)
|
||||
}
|
||||
|
||||
// ResetConnectBackoff wakes up all subchannels in transient failure and causes
|
||||
@ -1253,40 +1140,32 @@ func (cc *ClientConn) Close() error {
|
||||
<-cc.csMgr.pubSub.Done()
|
||||
}()
|
||||
|
||||
// Prevent calls to enter/exit idle immediately, and ensure we are not
|
||||
// currently entering/exiting idle mode.
|
||||
cc.idlenessMgr.Close()
|
||||
|
||||
cc.mu.Lock()
|
||||
if cc.conns == nil {
|
||||
cc.mu.Unlock()
|
||||
return ErrClientConnClosing
|
||||
}
|
||||
|
||||
for cc.idlenessState == ccIdlenessStateExitingIdle {
|
||||
cc.exitIdleCond.Wait()
|
||||
}
|
||||
|
||||
conns := cc.conns
|
||||
cc.conns = nil
|
||||
cc.csMgr.updateState(connectivity.Shutdown)
|
||||
|
||||
pWrapper := cc.blockingpicker
|
||||
rWrapper := cc.resolverWrapper
|
||||
bWrapper := cc.balancerWrapper
|
||||
idlenessMgr := cc.idlenessMgr
|
||||
// We can safely unlock and continue to access all fields now as
|
||||
// cc.conns==nil, preventing any further operations on cc.
|
||||
cc.mu.Unlock()
|
||||
|
||||
cc.resolverWrapper.close()
|
||||
// The order of closing matters here since the balancer wrapper assumes the
|
||||
// picker is closed before it is closed.
|
||||
if pWrapper != nil {
|
||||
pWrapper.close()
|
||||
}
|
||||
if bWrapper != nil {
|
||||
bWrapper.close()
|
||||
}
|
||||
if rWrapper != nil {
|
||||
rWrapper.close()
|
||||
}
|
||||
if idlenessMgr != nil {
|
||||
idlenessMgr.Close()
|
||||
}
|
||||
cc.pickerWrapper.close()
|
||||
cc.balancerWrapper.close()
|
||||
|
||||
<-cc.resolverWrapper.serializer.Done()
|
||||
<-cc.balancerWrapper.serializer.Done()
|
||||
|
||||
for ac := range conns {
|
||||
ac.tearDown(ErrClientConnClosing)
|
||||
@ -1307,7 +1186,7 @@ type addrConn struct {
|
||||
|
||||
cc *ClientConn
|
||||
dopts dialOptions
|
||||
acbw balancer.SubConn
|
||||
acbw *acBalancerWrapper
|
||||
scopts balancer.NewSubConnOptions
|
||||
|
||||
// transport is set when there's a viable transport (note: ac state may not be READY as LB channel
|
||||
@ -1345,7 +1224,7 @@ func (ac *addrConn) updateConnectivityState(s connectivity.State, lastErr error)
|
||||
} else {
|
||||
channelz.Infof(logger, ac.channelzID, "Subchannel Connectivity change to %v, last error: %s", s, lastErr)
|
||||
}
|
||||
ac.cc.handleSubConnStateChange(ac.acbw, s, lastErr)
|
||||
ac.acbw.updateState(s, lastErr)
|
||||
}
|
||||
|
||||
// adjustParams updates parameters used to create transports upon
|
||||
@ -1849,7 +1728,7 @@ func (cc *ClientConn) parseTargetAndFindResolver() error {
|
||||
if err != nil {
|
||||
channelz.Infof(logger, cc.channelzID, "dial target %q parse failed: %v", cc.target, err)
|
||||
} else {
|
||||
channelz.Infof(logger, cc.channelzID, "parsed dial target is: %+v", parsedTarget)
|
||||
channelz.Infof(logger, cc.channelzID, "parsed dial target is: %#v", parsedTarget)
|
||||
rb = cc.getResolver(parsedTarget.URL.Scheme)
|
||||
if rb != nil {
|
||||
cc.parsedTarget = parsedTarget
|
||||
@ -2007,32 +1886,3 @@ func (cc *ClientConn) determineAuthority() error {
|
||||
channelz.Infof(logger, cc.channelzID, "Channel authority set to %q", cc.authority)
|
||||
return nil
|
||||
}
|
||||
|
||||
// initResolverWrapper creates a ccResolverWrapper, which builds the name
|
||||
// resolver. This method grabs the lock to assign the newly built resolver
|
||||
// wrapper to the cc.resolverWrapper field.
|
||||
func (cc *ClientConn) initResolverWrapper(creds credentials.TransportCredentials) error {
|
||||
rw, err := newCCResolverWrapper(cc, ccResolverWrapperOpts{
|
||||
target: cc.parsedTarget,
|
||||
builder: cc.resolverBuilder,
|
||||
bOpts: resolver.BuildOptions{
|
||||
DisableServiceConfig: cc.dopts.disableServiceConfig,
|
||||
DialCreds: creds,
|
||||
CredsBundle: cc.dopts.copts.CredsBundle,
|
||||
Dialer: cc.dopts.copts.Dialer,
|
||||
},
|
||||
channelzID: cc.channelzID,
|
||||
})
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to build resolver: %v", err)
|
||||
}
|
||||
// Resolver implementations may report state update or error inline when
|
||||
// built (or right after), and this is handled in cc.updateResolverState.
|
||||
// Also, an error from the resolver might lead to a re-resolution request
|
||||
// from the balancer, which is handled in resolveNow() where
|
||||
// `cc.resolverWrapper` is accessed. Hence, we need to hold the lock here.
|
||||
cc.mu.Lock()
|
||||
cc.resolverWrapper = rw
|
||||
cc.mu.Unlock()
|
||||
return nil
|
||||
}
|
||||
|
Reference in New Issue
Block a user