mirror of
https://github.com/ceph/ceph-csi.git
synced 2025-06-13 18:43:34 +00:00
vendor update for E2E framework
Signed-off-by: Madhu Rajanna <madhupr007@gmail.com>
This commit is contained in:
611
vendor/google.golang.org/grpc/clientconn.go
generated
vendored
611
vendor/google.golang.org/grpc/clientconn.go
generated
vendored
@ -42,7 +42,6 @@ import (
|
||||
"google.golang.org/grpc/internal/grpcsync"
|
||||
"google.golang.org/grpc/internal/transport"
|
||||
"google.golang.org/grpc/keepalive"
|
||||
"google.golang.org/grpc/metadata"
|
||||
"google.golang.org/grpc/resolver"
|
||||
_ "google.golang.org/grpc/resolver/dns" // To register dns resolver.
|
||||
_ "google.golang.org/grpc/resolver/passthrough" // To register passthrough resolver.
|
||||
@ -69,11 +68,9 @@ var (
|
||||
errConnClosing = errors.New("grpc: the connection is closing")
|
||||
// errBalancerClosed indicates that the balancer is closed.
|
||||
errBalancerClosed = errors.New("grpc: balancer is closed")
|
||||
// We use an accessor so that minConnectTimeout can be
|
||||
// atomically read and updated while testing.
|
||||
getMinConnectTimeout = func() time.Duration {
|
||||
return minConnectTimeout
|
||||
}
|
||||
// invalidDefaultServiceConfigErrPrefix is used to prefix the json parsing error for the default
|
||||
// service config.
|
||||
invalidDefaultServiceConfigErrPrefix = "grpc: the provided default service config is invalid"
|
||||
)
|
||||
|
||||
// The following errors are returned from Dial and DialContext
|
||||
@ -140,6 +137,15 @@ func DialContext(ctx context.Context, target string, opts ...DialOption) (conn *
|
||||
opt.apply(&cc.dopts)
|
||||
}
|
||||
|
||||
chainUnaryClientInterceptors(cc)
|
||||
chainStreamClientInterceptors(cc)
|
||||
|
||||
defer func() {
|
||||
if err != nil {
|
||||
cc.Close()
|
||||
}
|
||||
}()
|
||||
|
||||
if channelz.IsOn() {
|
||||
if cc.dopts.channelzParentID != 0 {
|
||||
cc.channelzID = channelz.RegisterChannel(&channelzChannel{cc}, cc.dopts.channelzParentID, target)
|
||||
@ -179,6 +185,13 @@ func DialContext(ctx context.Context, target string, opts ...DialOption) (conn *
|
||||
}
|
||||
}
|
||||
|
||||
if cc.dopts.defaultServiceConfigRawJSON != nil {
|
||||
sc, err := parseServiceConfig(*cc.dopts.defaultServiceConfigRawJSON)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("%s: %v", invalidDefaultServiceConfigErrPrefix, err)
|
||||
}
|
||||
cc.dopts.defaultServiceConfig = sc
|
||||
}
|
||||
cc.mkp = cc.dopts.copts.KeepaliveParams
|
||||
|
||||
if cc.dopts.copts.Dialer == nil {
|
||||
@ -201,17 +214,12 @@ func DialContext(ctx context.Context, target string, opts ...DialOption) (conn *
|
||||
ctx, cancel = context.WithTimeout(ctx, cc.dopts.timeout)
|
||||
defer cancel()
|
||||
}
|
||||
|
||||
defer func() {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
conn, err = nil, ctx.Err()
|
||||
default:
|
||||
}
|
||||
|
||||
if err != nil {
|
||||
cc.Close()
|
||||
}
|
||||
}()
|
||||
|
||||
scSet := false
|
||||
@ -220,7 +228,7 @@ func DialContext(ctx context.Context, target string, opts ...DialOption) (conn *
|
||||
select {
|
||||
case sc, ok := <-cc.dopts.scChan:
|
||||
if ok {
|
||||
cc.sc = sc
|
||||
cc.sc = &sc
|
||||
scSet = true
|
||||
}
|
||||
default:
|
||||
@ -266,7 +274,7 @@ func DialContext(ctx context.Context, target string, opts ...DialOption) (conn *
|
||||
select {
|
||||
case sc, ok := <-cc.dopts.scChan:
|
||||
if ok {
|
||||
cc.sc = sc
|
||||
cc.sc = &sc
|
||||
}
|
||||
case <-ctx.Done():
|
||||
return nil, ctx.Err()
|
||||
@ -285,6 +293,7 @@ func DialContext(ctx context.Context, target string, opts ...DialOption) (conn *
|
||||
CredsBundle: cc.dopts.copts.CredsBundle,
|
||||
Dialer: cc.dopts.copts.Dialer,
|
||||
ChannelzParentID: cc.channelzID,
|
||||
Target: cc.parsedTarget,
|
||||
}
|
||||
|
||||
// Build the resolver.
|
||||
@ -322,6 +331,68 @@ func DialContext(ctx context.Context, target string, opts ...DialOption) (conn *
|
||||
return cc, nil
|
||||
}
|
||||
|
||||
// chainUnaryClientInterceptors chains all unary client interceptors into one.
|
||||
func chainUnaryClientInterceptors(cc *ClientConn) {
|
||||
interceptors := cc.dopts.chainUnaryInts
|
||||
// Prepend dopts.unaryInt to the chaining interceptors if it exists, since unaryInt will
|
||||
// be executed before any other chained interceptors.
|
||||
if cc.dopts.unaryInt != nil {
|
||||
interceptors = append([]UnaryClientInterceptor{cc.dopts.unaryInt}, interceptors...)
|
||||
}
|
||||
var chainedInt UnaryClientInterceptor
|
||||
if len(interceptors) == 0 {
|
||||
chainedInt = nil
|
||||
} else if len(interceptors) == 1 {
|
||||
chainedInt = interceptors[0]
|
||||
} else {
|
||||
chainedInt = func(ctx context.Context, method string, req, reply interface{}, cc *ClientConn, invoker UnaryInvoker, opts ...CallOption) error {
|
||||
return interceptors[0](ctx, method, req, reply, cc, getChainUnaryInvoker(interceptors, 0, invoker), opts...)
|
||||
}
|
||||
}
|
||||
cc.dopts.unaryInt = chainedInt
|
||||
}
|
||||
|
||||
// getChainUnaryInvoker recursively generate the chained unary invoker.
|
||||
func getChainUnaryInvoker(interceptors []UnaryClientInterceptor, curr int, finalInvoker UnaryInvoker) UnaryInvoker {
|
||||
if curr == len(interceptors)-1 {
|
||||
return finalInvoker
|
||||
}
|
||||
return func(ctx context.Context, method string, req, reply interface{}, cc *ClientConn, opts ...CallOption) error {
|
||||
return interceptors[curr+1](ctx, method, req, reply, cc, getChainUnaryInvoker(interceptors, curr+1, finalInvoker), opts...)
|
||||
}
|
||||
}
|
||||
|
||||
// chainStreamClientInterceptors chains all stream client interceptors into one.
|
||||
func chainStreamClientInterceptors(cc *ClientConn) {
|
||||
interceptors := cc.dopts.chainStreamInts
|
||||
// Prepend dopts.streamInt to the chaining interceptors if it exists, since streamInt will
|
||||
// be executed before any other chained interceptors.
|
||||
if cc.dopts.streamInt != nil {
|
||||
interceptors = append([]StreamClientInterceptor{cc.dopts.streamInt}, interceptors...)
|
||||
}
|
||||
var chainedInt StreamClientInterceptor
|
||||
if len(interceptors) == 0 {
|
||||
chainedInt = nil
|
||||
} else if len(interceptors) == 1 {
|
||||
chainedInt = interceptors[0]
|
||||
} else {
|
||||
chainedInt = func(ctx context.Context, desc *StreamDesc, cc *ClientConn, method string, streamer Streamer, opts ...CallOption) (ClientStream, error) {
|
||||
return interceptors[0](ctx, desc, cc, method, getChainStreamer(interceptors, 0, streamer), opts...)
|
||||
}
|
||||
}
|
||||
cc.dopts.streamInt = chainedInt
|
||||
}
|
||||
|
||||
// getChainStreamer recursively generate the chained client stream constructor.
|
||||
func getChainStreamer(interceptors []StreamClientInterceptor, curr int, finalStreamer Streamer) Streamer {
|
||||
if curr == len(interceptors)-1 {
|
||||
return finalStreamer
|
||||
}
|
||||
return func(ctx context.Context, desc *StreamDesc, cc *ClientConn, method string, opts ...CallOption) (ClientStream, error) {
|
||||
return interceptors[curr+1](ctx, desc, cc, method, getChainStreamer(interceptors, curr+1, finalStreamer), opts...)
|
||||
}
|
||||
}
|
||||
|
||||
// connectivityStateManager keeps the connectivity.State of ClientConn.
|
||||
// This struct will eventually be exported so the balancers can access it.
|
||||
type connectivityStateManager struct {
|
||||
@ -388,14 +459,11 @@ type ClientConn struct {
|
||||
|
||||
mu sync.RWMutex
|
||||
resolverWrapper *ccResolverWrapper
|
||||
sc ServiceConfig
|
||||
scRaw string
|
||||
sc *ServiceConfig
|
||||
conns map[*addrConn]struct{}
|
||||
// Keepalive parameter can be updated if a GoAway is received.
|
||||
mkp keepalive.ClientParameters
|
||||
curBalancerName string
|
||||
preBalancerName string // previous balancer name.
|
||||
curAddresses []resolver.Address
|
||||
balancerWrapper *ccBalancerWrapper
|
||||
retryThrottler atomic.Value
|
||||
|
||||
@ -437,8 +505,7 @@ func (cc *ClientConn) scWatcher() {
|
||||
cc.mu.Lock()
|
||||
// TODO: load balance policy runtime change is ignored.
|
||||
// We may revisit this decision in the future.
|
||||
cc.sc = sc
|
||||
cc.scRaw = ""
|
||||
cc.sc = &sc
|
||||
cc.mu.Unlock()
|
||||
case <-cc.ctx.Done():
|
||||
return
|
||||
@ -465,59 +532,84 @@ func (cc *ClientConn) waitForResolvedAddrs(ctx context.Context) error {
|
||||
}
|
||||
}
|
||||
|
||||
func (cc *ClientConn) handleResolvedAddrs(addrs []resolver.Address, err error) {
|
||||
// gRPC should resort to default service config when:
|
||||
// * resolver service config is disabled
|
||||
// * or, resolver does not return a service config or returns an invalid one.
|
||||
func (cc *ClientConn) fallbackToDefaultServiceConfig(sc string) bool {
|
||||
if cc.dopts.disableServiceConfig {
|
||||
return true
|
||||
}
|
||||
// The logic below is temporary, will be removed once we change the resolver.State ServiceConfig field type.
|
||||
// Right now, we assume that empty service config string means resolver does not return a config.
|
||||
if sc == "" {
|
||||
return true
|
||||
}
|
||||
// TODO: the logic below is temporary. Once we finish the logic to validate service config
|
||||
// in resolver, we will replace the logic below.
|
||||
_, err := parseServiceConfig(sc)
|
||||
return err != nil
|
||||
}
|
||||
|
||||
func (cc *ClientConn) updateResolverState(s resolver.State) error {
|
||||
cc.mu.Lock()
|
||||
defer cc.mu.Unlock()
|
||||
// Check if the ClientConn is already closed. Some fields (e.g.
|
||||
// balancerWrapper) are set to nil when closing the ClientConn, and could
|
||||
// cause nil pointer panic if we don't have this check.
|
||||
if cc.conns == nil {
|
||||
// cc was closed.
|
||||
return
|
||||
return nil
|
||||
}
|
||||
|
||||
if reflect.DeepEqual(cc.curAddresses, addrs) {
|
||||
return
|
||||
if cc.fallbackToDefaultServiceConfig(s.ServiceConfig) {
|
||||
if cc.dopts.defaultServiceConfig != nil && cc.sc == nil {
|
||||
cc.applyServiceConfig(cc.dopts.defaultServiceConfig)
|
||||
}
|
||||
} else {
|
||||
// TODO: the parsing logic below will be moved inside resolver.
|
||||
sc, err := parseServiceConfig(s.ServiceConfig)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if cc.sc == nil || cc.sc.rawJSONString != s.ServiceConfig {
|
||||
cc.applyServiceConfig(sc)
|
||||
}
|
||||
}
|
||||
|
||||
cc.curAddresses = addrs
|
||||
cc.firstResolveEvent.Fire()
|
||||
// update the service config that will be sent to balancer.
|
||||
if cc.sc != nil {
|
||||
s.ServiceConfig = cc.sc.rawJSONString
|
||||
}
|
||||
|
||||
if cc.dopts.balancerBuilder == nil {
|
||||
// Only look at balancer types and switch balancer if balancer dial
|
||||
// option is not set.
|
||||
var isGRPCLB bool
|
||||
for _, a := range addrs {
|
||||
for _, a := range s.Addresses {
|
||||
if a.Type == resolver.GRPCLB {
|
||||
isGRPCLB = true
|
||||
break
|
||||
}
|
||||
}
|
||||
var newBalancerName string
|
||||
// TODO: use new loadBalancerConfig field with appropriate priority.
|
||||
if isGRPCLB {
|
||||
newBalancerName = grpclbName
|
||||
} else if cc.sc != nil && cc.sc.LB != nil {
|
||||
newBalancerName = *cc.sc.LB
|
||||
} else {
|
||||
// Address list doesn't contain grpclb address. Try to pick a
|
||||
// non-grpclb balancer.
|
||||
newBalancerName = cc.curBalancerName
|
||||
// If current balancer is grpclb, switch to the previous one.
|
||||
if newBalancerName == grpclbName {
|
||||
newBalancerName = cc.preBalancerName
|
||||
}
|
||||
// The following could be true in two cases:
|
||||
// - the first time handling resolved addresses
|
||||
// (curBalancerName="")
|
||||
// - the first time handling non-grpclb addresses
|
||||
// (curBalancerName="grpclb", preBalancerName="")
|
||||
if newBalancerName == "" {
|
||||
newBalancerName = PickFirstBalancerName
|
||||
}
|
||||
newBalancerName = PickFirstBalancerName
|
||||
}
|
||||
cc.switchBalancer(newBalancerName)
|
||||
} else if cc.balancerWrapper == nil {
|
||||
// Balancer dial option was set, and this is the first time handling
|
||||
// resolved addresses. Build a balancer with dopts.balancerBuilder.
|
||||
cc.curBalancerName = cc.dopts.balancerBuilder.Name()
|
||||
cc.balancerWrapper = newCCBalancerWrapper(cc, cc.dopts.balancerBuilder, cc.balancerBuildOpts)
|
||||
}
|
||||
|
||||
cc.balancerWrapper.handleResolvedAddrs(addrs, nil)
|
||||
cc.balancerWrapper.updateResolverState(s)
|
||||
cc.firstResolveEvent.Fire()
|
||||
return nil
|
||||
}
|
||||
|
||||
// switchBalancer starts the switching from current balancer to the balancer
|
||||
@ -529,10 +621,6 @@ func (cc *ClientConn) handleResolvedAddrs(addrs []resolver.Address, err error) {
|
||||
//
|
||||
// Caller must hold cc.mu.
|
||||
func (cc *ClientConn) switchBalancer(name string) {
|
||||
if cc.conns == nil {
|
||||
return
|
||||
}
|
||||
|
||||
if strings.ToLower(cc.curBalancerName) == strings.ToLower(name) {
|
||||
return
|
||||
}
|
||||
@ -542,15 +630,11 @@ func (cc *ClientConn) switchBalancer(name string) {
|
||||
grpclog.Infoln("ignoring balancer switching: Balancer DialOption used instead")
|
||||
return
|
||||
}
|
||||
// TODO(bar switching) change this to two steps: drain and close.
|
||||
// Keep track of sc in wrapper.
|
||||
if cc.balancerWrapper != nil {
|
||||
cc.balancerWrapper.close()
|
||||
}
|
||||
|
||||
builder := balancer.Get(name)
|
||||
// TODO(yuxuanli): If user send a service config that does not contain a valid balancer name, should
|
||||
// we reuse previous one?
|
||||
if channelz.IsOn() {
|
||||
if builder == nil {
|
||||
channelz.AddTraceEvent(cc.channelzID, &channelz.TraceEventDesc{
|
||||
@ -569,7 +653,6 @@ func (cc *ClientConn) switchBalancer(name string) {
|
||||
builder = newPickfirstBuilder()
|
||||
}
|
||||
|
||||
cc.preBalancerName = cc.curBalancerName
|
||||
cc.curBalancerName = builder.Name()
|
||||
cc.balancerWrapper = newCCBalancerWrapper(cc, builder, cc.balancerBuildOpts)
|
||||
}
|
||||
@ -732,6 +815,9 @@ func (cc *ClientConn) GetMethodConfig(method string) MethodConfig {
|
||||
// TODO: Avoid the locking here.
|
||||
cc.mu.RLock()
|
||||
defer cc.mu.RUnlock()
|
||||
if cc.sc == nil {
|
||||
return MethodConfig{}
|
||||
}
|
||||
m, ok := cc.sc.Methods[method]
|
||||
if !ok {
|
||||
i := strings.LastIndex(method, "/")
|
||||
@ -743,14 +829,15 @@ func (cc *ClientConn) GetMethodConfig(method string) MethodConfig {
|
||||
func (cc *ClientConn) healthCheckConfig() *healthCheckConfig {
|
||||
cc.mu.RLock()
|
||||
defer cc.mu.RUnlock()
|
||||
if cc.sc == nil {
|
||||
return nil
|
||||
}
|
||||
return cc.sc.healthCheckConfig
|
||||
}
|
||||
|
||||
func (cc *ClientConn) getTransport(ctx context.Context, failfast bool, method string) (transport.ClientTransport, func(balancer.DoneInfo), error) {
|
||||
hdr, _ := metadata.FromOutgoingContext(ctx)
|
||||
t, done, err := cc.blockingpicker.pick(ctx, failfast, balancer.PickOptions{
|
||||
FullMethodName: method,
|
||||
Header: hdr,
|
||||
})
|
||||
if err != nil {
|
||||
return nil, nil, toRPCErr(err)
|
||||
@ -758,65 +845,25 @@ func (cc *ClientConn) getTransport(ctx context.Context, failfast bool, method st
|
||||
return t, done, nil
|
||||
}
|
||||
|
||||
// handleServiceConfig parses the service config string in JSON format to Go native
|
||||
// struct ServiceConfig, and store both the struct and the JSON string in ClientConn.
|
||||
func (cc *ClientConn) handleServiceConfig(js string) error {
|
||||
if cc.dopts.disableServiceConfig {
|
||||
return nil
|
||||
func (cc *ClientConn) applyServiceConfig(sc *ServiceConfig) error {
|
||||
if sc == nil {
|
||||
// should never reach here.
|
||||
return fmt.Errorf("got nil pointer for service config")
|
||||
}
|
||||
if cc.scRaw == js {
|
||||
return nil
|
||||
}
|
||||
if channelz.IsOn() {
|
||||
channelz.AddTraceEvent(cc.channelzID, &channelz.TraceEventDesc{
|
||||
// The special formatting of \"%s\" instead of %q is to provide nice printing of service config
|
||||
// for human consumption.
|
||||
Desc: fmt.Sprintf("Channel has a new service config \"%s\"", js),
|
||||
Severity: channelz.CtINFO,
|
||||
})
|
||||
}
|
||||
sc, err := parseServiceConfig(js)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
cc.mu.Lock()
|
||||
// Check if the ClientConn is already closed. Some fields (e.g.
|
||||
// balancerWrapper) are set to nil when closing the ClientConn, and could
|
||||
// cause nil pointer panic if we don't have this check.
|
||||
if cc.conns == nil {
|
||||
cc.mu.Unlock()
|
||||
return nil
|
||||
}
|
||||
cc.scRaw = js
|
||||
cc.sc = sc
|
||||
|
||||
if sc.retryThrottling != nil {
|
||||
if cc.sc.retryThrottling != nil {
|
||||
newThrottler := &retryThrottler{
|
||||
tokens: sc.retryThrottling.MaxTokens,
|
||||
max: sc.retryThrottling.MaxTokens,
|
||||
thresh: sc.retryThrottling.MaxTokens / 2,
|
||||
ratio: sc.retryThrottling.TokenRatio,
|
||||
tokens: cc.sc.retryThrottling.MaxTokens,
|
||||
max: cc.sc.retryThrottling.MaxTokens,
|
||||
thresh: cc.sc.retryThrottling.MaxTokens / 2,
|
||||
ratio: cc.sc.retryThrottling.TokenRatio,
|
||||
}
|
||||
cc.retryThrottler.Store(newThrottler)
|
||||
} else {
|
||||
cc.retryThrottler.Store((*retryThrottler)(nil))
|
||||
}
|
||||
|
||||
if sc.LB != nil && *sc.LB != grpclbName { // "grpclb" is not a valid balancer option in service config.
|
||||
if cc.curBalancerName == grpclbName {
|
||||
// If current balancer is grpclb, there's at least one grpclb
|
||||
// balancer address in the resolved list. Don't switch the balancer,
|
||||
// but change the previous balancer name, so if a new resolved
|
||||
// address list doesn't contain grpclb address, balancer will be
|
||||
// switched to *sc.LB.
|
||||
cc.preBalancerName = *sc.LB
|
||||
} else {
|
||||
cc.switchBalancer(*sc.LB)
|
||||
cc.balancerWrapper.handleResolvedAddrs(cc.curAddresses, nil)
|
||||
}
|
||||
}
|
||||
|
||||
cc.mu.Unlock()
|
||||
return nil
|
||||
}
|
||||
|
||||
@ -892,7 +939,7 @@ func (cc *ClientConn) Close() error {
|
||||
}
|
||||
channelz.AddTraceEvent(cc.channelzID, ted)
|
||||
// TraceEvent needs to be called before RemoveEntry, as TraceEvent may add trace reference to
|
||||
// the entity beng deleted, and thus prevent it from being deleted right away.
|
||||
// the entity being deleted, and thus prevent it from being deleted right away.
|
||||
channelz.RemoveEntry(cc.channelzID)
|
||||
}
|
||||
return nil
|
||||
@ -921,8 +968,6 @@ type addrConn struct {
|
||||
// Use updateConnectivityState for updating addrConn's connectivity state.
|
||||
state connectivity.State
|
||||
|
||||
tearDownErr error // The reason this addrConn is torn down.
|
||||
|
||||
backoffIdx int // Needs to be stateful for resetConnectBackoff.
|
||||
resetBackoff chan struct{}
|
||||
|
||||
@ -963,191 +1008,169 @@ func (ac *addrConn) adjustParams(r transport.GoAwayReason) {
|
||||
|
||||
func (ac *addrConn) resetTransport() {
|
||||
for i := 0; ; i++ {
|
||||
tryNextAddrFromStart := grpcsync.NewEvent()
|
||||
|
||||
ac.mu.Lock()
|
||||
if i > 0 {
|
||||
ac.cc.resolveNow(resolver.ResolveNowOption{})
|
||||
}
|
||||
|
||||
ac.mu.Lock()
|
||||
if ac.state == connectivity.Shutdown {
|
||||
ac.mu.Unlock()
|
||||
return
|
||||
}
|
||||
|
||||
addrs := ac.addrs
|
||||
backoffFor := ac.dopts.bs.Backoff(ac.backoffIdx)
|
||||
|
||||
// This will be the duration that dial gets to finish.
|
||||
dialDuration := getMinConnectTimeout()
|
||||
dialDuration := minConnectTimeout
|
||||
if ac.dopts.minConnectTimeout != nil {
|
||||
dialDuration = ac.dopts.minConnectTimeout()
|
||||
}
|
||||
|
||||
if dialDuration < backoffFor {
|
||||
// Give dial more time as we keep failing to connect.
|
||||
dialDuration = backoffFor
|
||||
}
|
||||
// We can potentially spend all the time trying the first address, and
|
||||
// if the server accepts the connection and then hangs, the following
|
||||
// addresses will never be tried.
|
||||
//
|
||||
// The spec doesn't mention what should be done for multiple addresses.
|
||||
// https://github.com/grpc/grpc/blob/master/doc/connection-backoff.md#proposed-backoff-algorithm
|
||||
connectDeadline := time.Now().Add(dialDuration)
|
||||
ac.mu.Unlock()
|
||||
|
||||
addrLoop:
|
||||
for _, addr := range addrs {
|
||||
newTr, addr, reconnect, err := ac.tryAllAddrs(addrs, connectDeadline)
|
||||
if err != nil {
|
||||
// After exhausting all addresses, the addrConn enters
|
||||
// TRANSIENT_FAILURE.
|
||||
ac.mu.Lock()
|
||||
|
||||
if ac.state == connectivity.Shutdown {
|
||||
ac.mu.Unlock()
|
||||
return
|
||||
}
|
||||
ac.updateConnectivityState(connectivity.Connecting)
|
||||
ac.transport = nil
|
||||
ac.updateConnectivityState(connectivity.TransientFailure)
|
||||
|
||||
ac.cc.mu.RLock()
|
||||
ac.dopts.copts.KeepaliveParams = ac.cc.mkp
|
||||
ac.cc.mu.RUnlock()
|
||||
// Backoff.
|
||||
b := ac.resetBackoff
|
||||
ac.mu.Unlock()
|
||||
|
||||
if ac.state == connectivity.Shutdown {
|
||||
timer := time.NewTimer(backoffFor)
|
||||
select {
|
||||
case <-timer.C:
|
||||
ac.mu.Lock()
|
||||
ac.backoffIdx++
|
||||
ac.mu.Unlock()
|
||||
case <-b:
|
||||
timer.Stop()
|
||||
case <-ac.ctx.Done():
|
||||
timer.Stop()
|
||||
return
|
||||
}
|
||||
|
||||
copts := ac.dopts.copts
|
||||
if ac.scopts.CredsBundle != nil {
|
||||
copts.CredsBundle = ac.scopts.CredsBundle
|
||||
}
|
||||
hctx, hcancel := context.WithCancel(ac.ctx)
|
||||
defer hcancel()
|
||||
ac.mu.Unlock()
|
||||
|
||||
if channelz.IsOn() {
|
||||
channelz.AddTraceEvent(ac.channelzID, &channelz.TraceEventDesc{
|
||||
Desc: fmt.Sprintf("Subchannel picks a new address %q to connect", addr.Addr),
|
||||
Severity: channelz.CtINFO,
|
||||
})
|
||||
}
|
||||
|
||||
reconnect := grpcsync.NewEvent()
|
||||
prefaceReceived := make(chan struct{})
|
||||
newTr, err := ac.createTransport(addr, copts, connectDeadline, reconnect, prefaceReceived)
|
||||
if err == nil {
|
||||
ac.mu.Lock()
|
||||
ac.curAddr = addr
|
||||
ac.transport = newTr
|
||||
ac.mu.Unlock()
|
||||
|
||||
healthCheckConfig := ac.cc.healthCheckConfig()
|
||||
// LB channel health checking is only enabled when all the four requirements below are met:
|
||||
// 1. it is not disabled by the user with the WithDisableHealthCheck DialOption,
|
||||
// 2. the internal.HealthCheckFunc is set by importing the grpc/healthcheck package,
|
||||
// 3. a service config with non-empty healthCheckConfig field is provided,
|
||||
// 4. the current load balancer allows it.
|
||||
healthcheckManagingState := false
|
||||
if !ac.cc.dopts.disableHealthCheck && healthCheckConfig != nil && ac.scopts.HealthCheckEnabled {
|
||||
if ac.cc.dopts.healthCheckFunc == nil {
|
||||
// TODO: add a link to the health check doc in the error message.
|
||||
grpclog.Error("the client side LB channel health check function has not been set.")
|
||||
} else {
|
||||
// TODO(deklerk) refactor to just return transport
|
||||
go ac.startHealthCheck(hctx, newTr, addr, healthCheckConfig.ServiceName)
|
||||
healthcheckManagingState = true
|
||||
}
|
||||
}
|
||||
if !healthcheckManagingState {
|
||||
ac.mu.Lock()
|
||||
ac.updateConnectivityState(connectivity.Ready)
|
||||
ac.mu.Unlock()
|
||||
}
|
||||
} else {
|
||||
hcancel()
|
||||
if err == errConnClosing {
|
||||
return
|
||||
}
|
||||
|
||||
if tryNextAddrFromStart.HasFired() {
|
||||
break addrLoop
|
||||
}
|
||||
continue
|
||||
}
|
||||
|
||||
backoffFor = 0
|
||||
ac.mu.Lock()
|
||||
reqHandshake := ac.dopts.reqHandshake
|
||||
ac.mu.Unlock()
|
||||
|
||||
<-reconnect.Done()
|
||||
hcancel()
|
||||
|
||||
if reqHandshake == envconfig.RequireHandshakeHybrid {
|
||||
// In RequireHandshakeHybrid mode, we must check to see whether
|
||||
// server preface has arrived yet to decide whether to start
|
||||
// reconnecting at the top of the list (server preface received)
|
||||
// or continue with the next addr in the list as if the
|
||||
// connection were not successful (server preface not received).
|
||||
select {
|
||||
case <-prefaceReceived:
|
||||
// We received a server preface - huzzah! We consider this
|
||||
// a success and restart from the top of the addr list.
|
||||
ac.mu.Lock()
|
||||
ac.backoffIdx = 0
|
||||
ac.mu.Unlock()
|
||||
break addrLoop
|
||||
default:
|
||||
// Despite having set state to READY, in hybrid mode we
|
||||
// consider this a failure and continue connecting at the
|
||||
// next addr in the list.
|
||||
ac.mu.Lock()
|
||||
if ac.state == connectivity.Shutdown {
|
||||
ac.mu.Unlock()
|
||||
return
|
||||
}
|
||||
|
||||
ac.updateConnectivityState(connectivity.TransientFailure)
|
||||
ac.mu.Unlock()
|
||||
|
||||
if tryNextAddrFromStart.HasFired() {
|
||||
break addrLoop
|
||||
}
|
||||
}
|
||||
} else {
|
||||
// In RequireHandshakeOn mode, we would have already waited for
|
||||
// the server preface, so we consider this a success and restart
|
||||
// from the top of the addr list. In RequireHandshakeOff mode,
|
||||
// we don't care to wait for the server preface before
|
||||
// considering this a success, so we also restart from the top
|
||||
// of the addr list.
|
||||
ac.mu.Lock()
|
||||
ac.backoffIdx = 0
|
||||
ac.mu.Unlock()
|
||||
break addrLoop
|
||||
}
|
||||
continue
|
||||
}
|
||||
|
||||
// After exhausting all addresses, or after need to reconnect after a
|
||||
// READY, the addrConn enters TRANSIENT_FAILURE.
|
||||
ac.mu.Lock()
|
||||
if ac.state == connectivity.Shutdown {
|
||||
newTr.Close()
|
||||
ac.mu.Unlock()
|
||||
return
|
||||
}
|
||||
ac.curAddr = addr
|
||||
ac.transport = newTr
|
||||
ac.backoffIdx = 0
|
||||
|
||||
healthCheckConfig := ac.cc.healthCheckConfig()
|
||||
// LB channel health checking is only enabled when all the four requirements below are met:
|
||||
// 1. it is not disabled by the user with the WithDisableHealthCheck DialOption,
|
||||
// 2. the internal.HealthCheckFunc is set by importing the grpc/healthcheck package,
|
||||
// 3. a service config with non-empty healthCheckConfig field is provided,
|
||||
// 4. the current load balancer allows it.
|
||||
hctx, hcancel := context.WithCancel(ac.ctx)
|
||||
healthcheckManagingState := false
|
||||
if !ac.cc.dopts.disableHealthCheck && healthCheckConfig != nil && ac.scopts.HealthCheckEnabled {
|
||||
if ac.cc.dopts.healthCheckFunc == nil {
|
||||
// TODO: add a link to the health check doc in the error message.
|
||||
grpclog.Error("the client side LB channel health check function has not been set.")
|
||||
} else {
|
||||
// TODO(deklerk) refactor to just return transport
|
||||
go ac.startHealthCheck(hctx, newTr, addr, healthCheckConfig.ServiceName)
|
||||
healthcheckManagingState = true
|
||||
}
|
||||
}
|
||||
if !healthcheckManagingState {
|
||||
ac.updateConnectivityState(connectivity.Ready)
|
||||
}
|
||||
ac.mu.Unlock()
|
||||
|
||||
// Block until the created transport is down. And when this happens,
|
||||
// we restart from the top of the addr list.
|
||||
<-reconnect.Done()
|
||||
hcancel()
|
||||
|
||||
// Need to reconnect after a READY, the addrConn enters
|
||||
// TRANSIENT_FAILURE.
|
||||
//
|
||||
// This will set addrConn to TRANSIENT_FAILURE for a very short period
|
||||
// of time, and turns CONNECTING. It seems reasonable to skip this, but
|
||||
// READY-CONNECTING is not a valid transition.
|
||||
ac.mu.Lock()
|
||||
if ac.state == connectivity.Shutdown {
|
||||
ac.mu.Unlock()
|
||||
return
|
||||
}
|
||||
ac.updateConnectivityState(connectivity.TransientFailure)
|
||||
|
||||
// Backoff.
|
||||
b := ac.resetBackoff
|
||||
timer := time.NewTimer(backoffFor)
|
||||
acctx := ac.ctx
|
||||
ac.mu.Unlock()
|
||||
|
||||
select {
|
||||
case <-timer.C:
|
||||
ac.mu.Lock()
|
||||
ac.backoffIdx++
|
||||
ac.mu.Unlock()
|
||||
case <-b:
|
||||
timer.Stop()
|
||||
case <-acctx.Done():
|
||||
timer.Stop()
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// createTransport creates a connection to one of the backends in addrs. It
|
||||
// sets ac.transport in the success case, or it returns an error if it was
|
||||
// unable to successfully create a transport.
|
||||
//
|
||||
// If waitForHandshake is enabled, it blocks until server preface arrives.
|
||||
func (ac *addrConn) createTransport(addr resolver.Address, copts transport.ConnectOptions, connectDeadline time.Time, reconnect *grpcsync.Event, prefaceReceived chan struct{}) (transport.ClientTransport, error) {
|
||||
// tryAllAddrs tries to creates a connection to the addresses, and stop when at the
|
||||
// first successful one. It returns the transport, the address and a Event in
|
||||
// the successful case. The Event fires when the returned transport disconnects.
|
||||
func (ac *addrConn) tryAllAddrs(addrs []resolver.Address, connectDeadline time.Time) (transport.ClientTransport, resolver.Address, *grpcsync.Event, error) {
|
||||
for _, addr := range addrs {
|
||||
ac.mu.Lock()
|
||||
if ac.state == connectivity.Shutdown {
|
||||
ac.mu.Unlock()
|
||||
return nil, resolver.Address{}, nil, errConnClosing
|
||||
}
|
||||
ac.updateConnectivityState(connectivity.Connecting)
|
||||
ac.transport = nil
|
||||
|
||||
ac.cc.mu.RLock()
|
||||
ac.dopts.copts.KeepaliveParams = ac.cc.mkp
|
||||
ac.cc.mu.RUnlock()
|
||||
|
||||
copts := ac.dopts.copts
|
||||
if ac.scopts.CredsBundle != nil {
|
||||
copts.CredsBundle = ac.scopts.CredsBundle
|
||||
}
|
||||
ac.mu.Unlock()
|
||||
|
||||
if channelz.IsOn() {
|
||||
channelz.AddTraceEvent(ac.channelzID, &channelz.TraceEventDesc{
|
||||
Desc: fmt.Sprintf("Subchannel picks a new address %q to connect", addr.Addr),
|
||||
Severity: channelz.CtINFO,
|
||||
})
|
||||
}
|
||||
|
||||
newTr, reconnect, err := ac.createTransport(addr, copts, connectDeadline)
|
||||
if err == nil {
|
||||
return newTr, addr, reconnect, nil
|
||||
}
|
||||
ac.cc.blockingpicker.updateConnectionError(err)
|
||||
}
|
||||
|
||||
// Couldn't connect to any address.
|
||||
return nil, resolver.Address{}, nil, fmt.Errorf("couldn't connect to any address")
|
||||
}
|
||||
|
||||
// createTransport creates a connection to addr. It returns the transport and a
|
||||
// Event in the successful case. The Event fires when the returned transport
|
||||
// disconnects.
|
||||
func (ac *addrConn) createTransport(addr resolver.Address, copts transport.ConnectOptions, connectDeadline time.Time) (transport.ClientTransport, *grpcsync.Event, error) {
|
||||
prefaceReceived := make(chan struct{})
|
||||
onCloseCalled := make(chan struct{})
|
||||
reconnect := grpcsync.NewEvent()
|
||||
|
||||
target := transport.TargetInfo{
|
||||
Addr: addr.Addr,
|
||||
@ -1155,8 +1178,6 @@ func (ac *addrConn) createTransport(addr resolver.Address, copts transport.Conne
|
||||
Authority: ac.cc.authority,
|
||||
}
|
||||
|
||||
prefaceTimer := time.NewTimer(time.Until(connectDeadline))
|
||||
|
||||
onGoAway := func(r transport.GoAwayReason) {
|
||||
ac.mu.Lock()
|
||||
ac.adjustParams(r)
|
||||
@ -1166,13 +1187,11 @@ func (ac *addrConn) createTransport(addr resolver.Address, copts transport.Conne
|
||||
|
||||
onClose := func() {
|
||||
close(onCloseCalled)
|
||||
prefaceTimer.Stop()
|
||||
reconnect.Fire()
|
||||
}
|
||||
|
||||
onPrefaceReceipt := func() {
|
||||
close(prefaceReceived)
|
||||
prefaceTimer.Stop()
|
||||
}
|
||||
|
||||
connectCtx, cancel := context.WithDeadline(ac.ctx, connectDeadline)
|
||||
@ -1182,69 +1201,28 @@ func (ac *addrConn) createTransport(addr resolver.Address, copts transport.Conne
|
||||
}
|
||||
|
||||
newTr, err := transport.NewClientTransport(connectCtx, ac.cc.ctx, target, copts, onPrefaceReceipt, onGoAway, onClose)
|
||||
|
||||
if err == nil {
|
||||
if ac.dopts.reqHandshake == envconfig.RequireHandshakeOn {
|
||||
select {
|
||||
case <-prefaceTimer.C:
|
||||
// We didn't get the preface in time.
|
||||
newTr.Close()
|
||||
err = errors.New("timed out waiting for server handshake")
|
||||
case <-prefaceReceived:
|
||||
// We got the preface - huzzah! things are good.
|
||||
case <-onCloseCalled:
|
||||
// The transport has already closed - noop.
|
||||
return nil, errors.New("connection closed")
|
||||
}
|
||||
} else if ac.dopts.reqHandshake == envconfig.RequireHandshakeHybrid {
|
||||
go func() {
|
||||
select {
|
||||
case <-prefaceTimer.C:
|
||||
// We didn't get the preface in time.
|
||||
newTr.Close()
|
||||
case <-prefaceReceived:
|
||||
// We got the preface just in the nick of time - huzzah!
|
||||
case <-onCloseCalled:
|
||||
// The transport has already closed - noop.
|
||||
}
|
||||
}()
|
||||
}
|
||||
}
|
||||
|
||||
if err != nil {
|
||||
// newTr is either nil, or closed.
|
||||
ac.cc.blockingpicker.updateConnectionError(err)
|
||||
ac.mu.Lock()
|
||||
if ac.state == connectivity.Shutdown {
|
||||
// ac.tearDown(...) has been invoked.
|
||||
ac.mu.Unlock()
|
||||
|
||||
return nil, errConnClosing
|
||||
}
|
||||
ac.mu.Unlock()
|
||||
grpclog.Warningf("grpc: addrConn.createTransport failed to connect to %v. Err :%v. Reconnecting...", addr, err)
|
||||
return nil, err
|
||||
return nil, nil, err
|
||||
}
|
||||
|
||||
// Now there is a viable transport to be use, so set ac.transport to reflect the new viable transport.
|
||||
ac.mu.Lock()
|
||||
if ac.state == connectivity.Shutdown {
|
||||
ac.mu.Unlock()
|
||||
newTr.Close()
|
||||
return nil, errConnClosing
|
||||
if ac.dopts.reqHandshake == envconfig.RequireHandshakeOn {
|
||||
select {
|
||||
case <-time.After(connectDeadline.Sub(time.Now())):
|
||||
// We didn't get the preface in time.
|
||||
newTr.Close()
|
||||
grpclog.Warningf("grpc: addrConn.createTransport failed to connect to %v: didn't receive server preface in time. Reconnecting...", addr)
|
||||
return nil, nil, errors.New("timed out waiting for server handshake")
|
||||
case <-prefaceReceived:
|
||||
// We got the preface - huzzah! things are good.
|
||||
case <-onCloseCalled:
|
||||
// The transport has already closed - noop.
|
||||
return nil, nil, errors.New("connection closed")
|
||||
// TODO(deklerk) this should bail on ac.ctx.Done(). Add a test and fix.
|
||||
}
|
||||
}
|
||||
ac.mu.Unlock()
|
||||
|
||||
// Now there is a viable transport to be use, so set ac.transport to reflect the new viable transport.
|
||||
ac.mu.Lock()
|
||||
if ac.state == connectivity.Shutdown {
|
||||
ac.mu.Unlock()
|
||||
newTr.Close()
|
||||
return nil, errConnClosing
|
||||
}
|
||||
ac.mu.Unlock()
|
||||
|
||||
return newTr, nil
|
||||
return newTr, reconnect, nil
|
||||
}
|
||||
|
||||
func (ac *addrConn) startHealthCheck(ctx context.Context, newTr transport.ClientTransport, addr resolver.Address, serviceName string) {
|
||||
@ -1332,7 +1310,6 @@ func (ac *addrConn) tearDown(err error) {
|
||||
// between setting the state and logic that waits on context cancelation / etc.
|
||||
ac.updateConnectivityState(connectivity.Shutdown)
|
||||
ac.cancel()
|
||||
ac.tearDownErr = err
|
||||
ac.curAddr = resolver.Address{}
|
||||
if err == errConnDrain && curTr != nil {
|
||||
// GracefulClose(...) may be executed multiple times when
|
||||
|
Reference in New Issue
Block a user