mirror of
https://github.com/ceph/ceph-csi.git
synced 2025-06-13 18:43:34 +00:00
update vendor to latest kubernetes 1.14.0
some of the kubernetes independent packages are moved out of the tree to new projects. Signed-off-by: Madhu Rajanna <madhupr007@gmail.com>
This commit is contained in:
committed by
mergify[bot]
parent
3f35bfd4d7
commit
f60a07ae82
492
vendor/google.golang.org/grpc/clientconn.go
generated
vendored
492
vendor/google.golang.org/grpc/clientconn.go
generated
vendored
@ -36,7 +36,6 @@ import (
|
||||
"google.golang.org/grpc/connectivity"
|
||||
"google.golang.org/grpc/credentials"
|
||||
"google.golang.org/grpc/grpclog"
|
||||
"google.golang.org/grpc/internal"
|
||||
"google.golang.org/grpc/internal/backoff"
|
||||
"google.golang.org/grpc/internal/channelz"
|
||||
"google.golang.org/grpc/internal/envconfig"
|
||||
@ -87,7 +86,7 @@ var (
|
||||
// with other individual Transport Credentials.
|
||||
errTransportCredsAndBundle = errors.New("grpc: credentials.Bundle may not be used with individual TransportCredentials")
|
||||
// errTransportCredentialsMissing indicates that users want to transmit security
|
||||
// information (e.g., oauth2 token) which requires secure connection on an insecure
|
||||
// information (e.g., OAuth2 token) which requires secure connection on an insecure
|
||||
// connection.
|
||||
errTransportCredentialsMissing = errors.New("grpc: the credentials require transport level security (use grpc.WithTransportCredentials() to set)")
|
||||
// errCredentialsConflict indicates that grpc.WithTransportCredentials()
|
||||
@ -238,9 +237,9 @@ func DialContext(ctx context.Context, target string, opts ...DialOption) (conn *
|
||||
grpclog.Infof("parsed scheme: %q", cc.parsedTarget.Scheme)
|
||||
cc.dopts.resolverBuilder = resolver.Get(cc.parsedTarget.Scheme)
|
||||
if cc.dopts.resolverBuilder == nil {
|
||||
// If resolver builder is still nil, the parse target's scheme is
|
||||
// If resolver builder is still nil, the parsed target's scheme is
|
||||
// not registered. Fallback to default resolver and set Endpoint to
|
||||
// the original unparsed target.
|
||||
// the original target.
|
||||
grpclog.Infof("scheme %q not registered, fallback to default scheme", cc.parsedTarget.Scheme)
|
||||
cc.parsedTarget = resolver.Target{
|
||||
Scheme: resolver.GetDefaultScheme(),
|
||||
@ -437,7 +436,7 @@ func (cc *ClientConn) scWatcher() {
|
||||
}
|
||||
cc.mu.Lock()
|
||||
// TODO: load balance policy runtime change is ignored.
|
||||
// We may revist this decision in the future.
|
||||
// We may revisit this decision in the future.
|
||||
cc.sc = sc
|
||||
cc.scRaw = ""
|
||||
cc.mu.Unlock()
|
||||
@ -592,13 +591,12 @@ func (cc *ClientConn) handleSubConnStateChange(sc balancer.SubConn, s connectivi
|
||||
// Caller needs to make sure len(addrs) > 0.
|
||||
func (cc *ClientConn) newAddrConn(addrs []resolver.Address, opts balancer.NewSubConnOptions) (*addrConn, error) {
|
||||
ac := &addrConn{
|
||||
cc: cc,
|
||||
addrs: addrs,
|
||||
scopts: opts,
|
||||
dopts: cc.dopts,
|
||||
czData: new(channelzData),
|
||||
successfulHandshake: true, // make the first nextAddr() call _not_ move addrIdx up by 1
|
||||
resetBackoff: make(chan struct{}),
|
||||
cc: cc,
|
||||
addrs: addrs,
|
||||
scopts: opts,
|
||||
dopts: cc.dopts,
|
||||
czData: new(channelzData),
|
||||
resetBackoff: make(chan struct{}),
|
||||
}
|
||||
ac.ctx, ac.cancel = context.WithCancel(cc.ctx)
|
||||
// Track ac in cc. This needs to be done before any getTransport(...) is called.
|
||||
@ -680,11 +678,10 @@ func (ac *addrConn) connect() error {
|
||||
return nil
|
||||
}
|
||||
ac.updateConnectivityState(connectivity.Connecting)
|
||||
ac.cc.handleSubConnStateChange(ac.acbw, ac.state)
|
||||
ac.mu.Unlock()
|
||||
|
||||
// Start a goroutine connecting to the server asynchronously.
|
||||
go ac.resetTransport(false)
|
||||
go ac.resetTransport()
|
||||
return nil
|
||||
}
|
||||
|
||||
@ -703,6 +700,12 @@ func (ac *addrConn) tryUpdateAddrs(addrs []resolver.Address) bool {
|
||||
return true
|
||||
}
|
||||
|
||||
// Unless we're busy reconnecting already, let's reconnect from the top of
|
||||
// the list.
|
||||
if ac.state != connectivity.Ready {
|
||||
return false
|
||||
}
|
||||
|
||||
var curAddrFound bool
|
||||
for _, a := range addrs {
|
||||
if reflect.DeepEqual(ac.curAddr, a) {
|
||||
@ -713,7 +716,6 @@ func (ac *addrConn) tryUpdateAddrs(addrs []resolver.Address) bool {
|
||||
grpclog.Infof("addrConn: tryUpdateAddrs curAddrFound: %v", curAddrFound)
|
||||
if curAddrFound {
|
||||
ac.addrs = addrs
|
||||
ac.addrIdx = 0 // Start reconnecting from beginning in the new list.
|
||||
}
|
||||
|
||||
return curAddrFound
|
||||
@ -913,7 +915,6 @@ type addrConn struct {
|
||||
transport transport.ClientTransport // The current transport.
|
||||
|
||||
mu sync.Mutex
|
||||
addrIdx int // The index in addrs list to start reconnecting from.
|
||||
curAddr resolver.Address // The current address.
|
||||
addrs []resolver.Address // All addresses that the resolver resolved to.
|
||||
|
||||
@ -922,33 +923,28 @@ type addrConn struct {
|
||||
|
||||
tearDownErr error // The reason this addrConn is torn down.
|
||||
|
||||
backoffIdx int
|
||||
// backoffDeadline is the time until which resetTransport needs to
|
||||
// wait before increasing backoffIdx count.
|
||||
backoffDeadline time.Time
|
||||
// connectDeadline is the time by which all connection
|
||||
// negotiations must complete.
|
||||
connectDeadline time.Time
|
||||
|
||||
backoffIdx int // Needs to be stateful for resetConnectBackoff.
|
||||
resetBackoff chan struct{}
|
||||
|
||||
channelzID int64 // channelz unique identification number
|
||||
channelzID int64 // channelz unique identification number.
|
||||
czData *channelzData
|
||||
|
||||
successfulHandshake bool
|
||||
|
||||
healthCheckEnabled bool
|
||||
}
|
||||
|
||||
// Note: this requires a lock on ac.mu.
|
||||
func (ac *addrConn) updateConnectivityState(s connectivity.State) {
|
||||
if ac.state == s {
|
||||
return
|
||||
}
|
||||
|
||||
updateMsg := fmt.Sprintf("Subchannel Connectivity change to %v", s)
|
||||
ac.state = s
|
||||
if channelz.IsOn() {
|
||||
channelz.AddTraceEvent(ac.channelzID, &channelz.TraceEventDesc{
|
||||
Desc: fmt.Sprintf("Subchannel Connectivity change to %v", s),
|
||||
Desc: updateMsg,
|
||||
Severity: channelz.CtINFO,
|
||||
})
|
||||
}
|
||||
ac.cc.handleSubConnStateChange(ac.acbw, s)
|
||||
}
|
||||
|
||||
// adjustParams updates parameters used to create transports upon
|
||||
@ -965,173 +961,218 @@ func (ac *addrConn) adjustParams(r transport.GoAwayReason) {
|
||||
}
|
||||
}
|
||||
|
||||
// resetTransport makes sure that a healthy ac.transport exists.
|
||||
//
|
||||
// The transport will close itself when it encounters an error, or on GOAWAY, or on deadline waiting for handshake, or
|
||||
// when the clientconn is closed. Each iteration creating a new transport will try a different address that the balancer
|
||||
// assigned to the addrConn, until it has tried all addresses. Once it has tried all addresses, it will re-resolve to
|
||||
// get a new address list. If an error is received, the list is re-resolved and the next reset attempt will try from the
|
||||
// beginning. This method has backoff built in. The backoff amount starts at 0 and increases each time resolution occurs
|
||||
// (addresses are exhausted). The backoff amount is reset to 0 each time a handshake is received.
|
||||
//
|
||||
// If the DialOption WithWaitForHandshake was set, resetTransport returns successfully only after handshake is received.
|
||||
func (ac *addrConn) resetTransport(resolveNow bool) {
|
||||
for {
|
||||
// If this is the first in a line of resets, we want to resolve immediately. The only other time we
|
||||
// want to reset is if we have tried all the addresses handed to us.
|
||||
if resolveNow {
|
||||
ac.mu.Lock()
|
||||
func (ac *addrConn) resetTransport() {
|
||||
for i := 0; ; i++ {
|
||||
tryNextAddrFromStart := grpcsync.NewEvent()
|
||||
|
||||
ac.mu.Lock()
|
||||
if i > 0 {
|
||||
ac.cc.resolveNow(resolver.ResolveNowOption{})
|
||||
ac.mu.Unlock()
|
||||
}
|
||||
|
||||
ac.mu.Lock()
|
||||
if ac.state == connectivity.Shutdown {
|
||||
ac.mu.Unlock()
|
||||
return
|
||||
}
|
||||
|
||||
// The transport that was used before is no longer viable.
|
||||
ac.transport = nil
|
||||
// If the connection is READY, a failure must have occurred.
|
||||
// Otherwise, we'll consider this is a transient failure when:
|
||||
// We've exhausted all addresses
|
||||
// We're in CONNECTING
|
||||
// And it's not the very first addr to try TODO(deklerk) find a better way to do this than checking ac.successfulHandshake
|
||||
if ac.state == connectivity.Ready || (ac.addrIdx == len(ac.addrs)-1 && ac.state == connectivity.Connecting && !ac.successfulHandshake) {
|
||||
ac.updateConnectivityState(connectivity.TransientFailure)
|
||||
ac.cc.handleSubConnStateChange(ac.acbw, ac.state)
|
||||
}
|
||||
ac.transport = nil
|
||||
ac.mu.Unlock()
|
||||
|
||||
if err := ac.nextAddr(); err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
ac.mu.Lock()
|
||||
if ac.state == connectivity.Shutdown {
|
||||
ac.mu.Unlock()
|
||||
return
|
||||
}
|
||||
|
||||
backoffIdx := ac.backoffIdx
|
||||
backoffFor := ac.dopts.bs.Backoff(backoffIdx)
|
||||
addrs := ac.addrs
|
||||
backoffFor := ac.dopts.bs.Backoff(ac.backoffIdx)
|
||||
|
||||
// This will be the duration that dial gets to finish.
|
||||
dialDuration := getMinConnectTimeout()
|
||||
if backoffFor > dialDuration {
|
||||
if dialDuration < backoffFor {
|
||||
// Give dial more time as we keep failing to connect.
|
||||
dialDuration = backoffFor
|
||||
}
|
||||
start := time.Now()
|
||||
connectDeadline := start.Add(dialDuration)
|
||||
ac.backoffDeadline = start.Add(backoffFor)
|
||||
ac.connectDeadline = connectDeadline
|
||||
|
||||
connectDeadline := time.Now().Add(dialDuration)
|
||||
ac.mu.Unlock()
|
||||
|
||||
ac.cc.mu.RLock()
|
||||
ac.dopts.copts.KeepaliveParams = ac.cc.mkp
|
||||
ac.cc.mu.RUnlock()
|
||||
addrLoop:
|
||||
for _, addr := range addrs {
|
||||
ac.mu.Lock()
|
||||
|
||||
if ac.state == connectivity.Shutdown {
|
||||
ac.mu.Unlock()
|
||||
return
|
||||
}
|
||||
ac.updateConnectivityState(connectivity.Connecting)
|
||||
ac.transport = nil
|
||||
|
||||
ac.cc.mu.RLock()
|
||||
ac.dopts.copts.KeepaliveParams = ac.cc.mkp
|
||||
ac.cc.mu.RUnlock()
|
||||
|
||||
if ac.state == connectivity.Shutdown {
|
||||
ac.mu.Unlock()
|
||||
return
|
||||
}
|
||||
|
||||
copts := ac.dopts.copts
|
||||
if ac.scopts.CredsBundle != nil {
|
||||
copts.CredsBundle = ac.scopts.CredsBundle
|
||||
}
|
||||
hctx, hcancel := context.WithCancel(ac.ctx)
|
||||
defer hcancel()
|
||||
ac.mu.Unlock()
|
||||
|
||||
if channelz.IsOn() {
|
||||
channelz.AddTraceEvent(ac.channelzID, &channelz.TraceEventDesc{
|
||||
Desc: fmt.Sprintf("Subchannel picks a new address %q to connect", addr.Addr),
|
||||
Severity: channelz.CtINFO,
|
||||
})
|
||||
}
|
||||
|
||||
reconnect := grpcsync.NewEvent()
|
||||
prefaceReceived := make(chan struct{})
|
||||
newTr, err := ac.createTransport(addr, copts, connectDeadline, reconnect, prefaceReceived)
|
||||
if err == nil {
|
||||
ac.mu.Lock()
|
||||
ac.curAddr = addr
|
||||
ac.transport = newTr
|
||||
ac.mu.Unlock()
|
||||
|
||||
healthCheckConfig := ac.cc.healthCheckConfig()
|
||||
// LB channel health checking is only enabled when all the four requirements below are met:
|
||||
// 1. it is not disabled by the user with the WithDisableHealthCheck DialOption,
|
||||
// 2. the internal.HealthCheckFunc is set by importing the grpc/healthcheck package,
|
||||
// 3. a service config with non-empty healthCheckConfig field is provided,
|
||||
// 4. the current load balancer allows it.
|
||||
healthcheckManagingState := false
|
||||
if !ac.cc.dopts.disableHealthCheck && healthCheckConfig != nil && ac.scopts.HealthCheckEnabled {
|
||||
if ac.cc.dopts.healthCheckFunc == nil {
|
||||
// TODO: add a link to the health check doc in the error message.
|
||||
grpclog.Error("the client side LB channel health check function has not been set.")
|
||||
} else {
|
||||
// TODO(deklerk) refactor to just return transport
|
||||
go ac.startHealthCheck(hctx, newTr, addr, healthCheckConfig.ServiceName)
|
||||
healthcheckManagingState = true
|
||||
}
|
||||
}
|
||||
if !healthcheckManagingState {
|
||||
ac.mu.Lock()
|
||||
ac.updateConnectivityState(connectivity.Ready)
|
||||
ac.mu.Unlock()
|
||||
}
|
||||
} else {
|
||||
hcancel()
|
||||
if err == errConnClosing {
|
||||
return
|
||||
}
|
||||
|
||||
if tryNextAddrFromStart.HasFired() {
|
||||
break addrLoop
|
||||
}
|
||||
continue
|
||||
}
|
||||
|
||||
backoffFor = 0
|
||||
ac.mu.Lock()
|
||||
reqHandshake := ac.dopts.reqHandshake
|
||||
ac.mu.Unlock()
|
||||
|
||||
<-reconnect.Done()
|
||||
hcancel()
|
||||
|
||||
if reqHandshake == envconfig.RequireHandshakeHybrid {
|
||||
// In RequireHandshakeHybrid mode, we must check to see whether
|
||||
// server preface has arrived yet to decide whether to start
|
||||
// reconnecting at the top of the list (server preface received)
|
||||
// or continue with the next addr in the list as if the
|
||||
// connection were not successful (server preface not received).
|
||||
select {
|
||||
case <-prefaceReceived:
|
||||
// We received a server preface - huzzah! We consider this
|
||||
// a success and restart from the top of the addr list.
|
||||
ac.mu.Lock()
|
||||
ac.backoffIdx = 0
|
||||
ac.mu.Unlock()
|
||||
break addrLoop
|
||||
default:
|
||||
// Despite having set state to READY, in hybrid mode we
|
||||
// consider this a failure and continue connecting at the
|
||||
// next addr in the list.
|
||||
ac.mu.Lock()
|
||||
if ac.state == connectivity.Shutdown {
|
||||
ac.mu.Unlock()
|
||||
return
|
||||
}
|
||||
|
||||
ac.updateConnectivityState(connectivity.TransientFailure)
|
||||
ac.mu.Unlock()
|
||||
|
||||
if tryNextAddrFromStart.HasFired() {
|
||||
break addrLoop
|
||||
}
|
||||
}
|
||||
} else {
|
||||
// In RequireHandshakeOn mode, we would have already waited for
|
||||
// the server preface, so we consider this a success and restart
|
||||
// from the top of the addr list. In RequireHandshakeOff mode,
|
||||
// we don't care to wait for the server preface before
|
||||
// considering this a success, so we also restart from the top
|
||||
// of the addr list.
|
||||
ac.mu.Lock()
|
||||
ac.backoffIdx = 0
|
||||
ac.mu.Unlock()
|
||||
break addrLoop
|
||||
}
|
||||
}
|
||||
|
||||
// After exhausting all addresses, or after need to reconnect after a
|
||||
// READY, the addrConn enters TRANSIENT_FAILURE.
|
||||
ac.mu.Lock()
|
||||
|
||||
if ac.state == connectivity.Shutdown {
|
||||
ac.mu.Unlock()
|
||||
return
|
||||
}
|
||||
ac.updateConnectivityState(connectivity.TransientFailure)
|
||||
|
||||
if ac.state != connectivity.Connecting {
|
||||
ac.updateConnectivityState(connectivity.Connecting)
|
||||
ac.cc.handleSubConnStateChange(ac.acbw, ac.state)
|
||||
}
|
||||
|
||||
addr := ac.addrs[ac.addrIdx]
|
||||
copts := ac.dopts.copts
|
||||
if ac.scopts.CredsBundle != nil {
|
||||
copts.CredsBundle = ac.scopts.CredsBundle
|
||||
}
|
||||
// Backoff.
|
||||
b := ac.resetBackoff
|
||||
timer := time.NewTimer(backoffFor)
|
||||
acctx := ac.ctx
|
||||
ac.mu.Unlock()
|
||||
|
||||
if channelz.IsOn() {
|
||||
channelz.AddTraceEvent(ac.channelzID, &channelz.TraceEventDesc{
|
||||
Desc: fmt.Sprintf("Subchannel picks a new address %q to connect", addr.Addr),
|
||||
Severity: channelz.CtINFO,
|
||||
})
|
||||
select {
|
||||
case <-timer.C:
|
||||
ac.mu.Lock()
|
||||
ac.backoffIdx++
|
||||
ac.mu.Unlock()
|
||||
case <-b:
|
||||
timer.Stop()
|
||||
case <-acctx.Done():
|
||||
timer.Stop()
|
||||
return
|
||||
}
|
||||
|
||||
if err := ac.createTransport(backoffIdx, addr, copts, connectDeadline); err != nil {
|
||||
continue
|
||||
}
|
||||
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
// createTransport creates a connection to one of the backends in addrs.
|
||||
func (ac *addrConn) createTransport(backoffNum int, addr resolver.Address, copts transport.ConnectOptions, connectDeadline time.Time) error {
|
||||
oneReset := sync.Once{}
|
||||
skipReset := make(chan struct{})
|
||||
allowedToReset := make(chan struct{})
|
||||
prefaceReceived := make(chan struct{})
|
||||
// createTransport creates a connection to one of the backends in addrs. It
|
||||
// sets ac.transport in the success case, or it returns an error if it was
|
||||
// unable to successfully create a transport.
|
||||
//
|
||||
// If waitForHandshake is enabled, it blocks until server preface arrives.
|
||||
func (ac *addrConn) createTransport(addr resolver.Address, copts transport.ConnectOptions, connectDeadline time.Time, reconnect *grpcsync.Event, prefaceReceived chan struct{}) (transport.ClientTransport, error) {
|
||||
onCloseCalled := make(chan struct{})
|
||||
|
||||
var prefaceMu sync.Mutex
|
||||
var serverPrefaceReceived bool
|
||||
var clientPrefaceWrote bool
|
||||
|
||||
hcCtx, hcCancel := context.WithCancel(ac.ctx)
|
||||
|
||||
onGoAway := func(r transport.GoAwayReason) {
|
||||
hcCancel()
|
||||
ac.mu.Lock()
|
||||
ac.adjustParams(r)
|
||||
ac.mu.Unlock()
|
||||
select {
|
||||
case <-skipReset: // The outer resetTransport loop will handle reconnection.
|
||||
return
|
||||
case <-allowedToReset: // We're in the clear to reset.
|
||||
go oneReset.Do(func() { ac.resetTransport(false) })
|
||||
}
|
||||
}
|
||||
|
||||
prefaceTimer := time.NewTimer(connectDeadline.Sub(time.Now()))
|
||||
|
||||
onClose := func() {
|
||||
hcCancel()
|
||||
close(onCloseCalled)
|
||||
prefaceTimer.Stop()
|
||||
|
||||
select {
|
||||
case <-skipReset: // The outer resetTransport loop will handle reconnection.
|
||||
return
|
||||
case <-allowedToReset: // We're in the clear to reset.
|
||||
oneReset.Do(func() { ac.resetTransport(false) })
|
||||
}
|
||||
}
|
||||
|
||||
target := transport.TargetInfo{
|
||||
Addr: addr.Addr,
|
||||
Metadata: addr.Metadata,
|
||||
Authority: ac.cc.authority,
|
||||
}
|
||||
|
||||
prefaceTimer := time.NewTimer(time.Until(connectDeadline))
|
||||
|
||||
onGoAway := func(r transport.GoAwayReason) {
|
||||
ac.mu.Lock()
|
||||
ac.adjustParams(r)
|
||||
ac.mu.Unlock()
|
||||
reconnect.Fire()
|
||||
}
|
||||
|
||||
onClose := func() {
|
||||
close(onCloseCalled)
|
||||
prefaceTimer.Stop()
|
||||
reconnect.Fire()
|
||||
}
|
||||
|
||||
onPrefaceReceipt := func() {
|
||||
close(prefaceReceived)
|
||||
prefaceTimer.Stop()
|
||||
|
||||
// TODO(deklerk): optimization; does anyone else actually use this lock? maybe we can just remove it for this scope
|
||||
ac.mu.Lock()
|
||||
|
||||
prefaceMu.Lock()
|
||||
serverPrefaceReceived = true
|
||||
if clientPrefaceWrote {
|
||||
ac.successfulHandshake = true
|
||||
}
|
||||
prefaceMu.Unlock()
|
||||
|
||||
ac.mu.Unlock()
|
||||
}
|
||||
|
||||
connectCtx, cancel := context.WithDeadline(ac.ctx, connectDeadline)
|
||||
@ -1143,13 +1184,6 @@ func (ac *addrConn) createTransport(backoffNum int, addr resolver.Address, copts
|
||||
newTr, err := transport.NewClientTransport(connectCtx, ac.cc.ctx, target, copts, onPrefaceReceipt, onGoAway, onClose)
|
||||
|
||||
if err == nil {
|
||||
prefaceMu.Lock()
|
||||
clientPrefaceWrote = true
|
||||
if serverPrefaceReceived || ac.dopts.reqHandshake == envconfig.RequireHandshakeOff {
|
||||
ac.successfulHandshake = true
|
||||
}
|
||||
prefaceMu.Unlock()
|
||||
|
||||
if ac.dopts.reqHandshake == envconfig.RequireHandshakeOn {
|
||||
select {
|
||||
case <-prefaceTimer.C:
|
||||
@ -1160,8 +1194,7 @@ func (ac *addrConn) createTransport(backoffNum int, addr resolver.Address, copts
|
||||
// We got the preface - huzzah! things are good.
|
||||
case <-onCloseCalled:
|
||||
// The transport has already closed - noop.
|
||||
close(allowedToReset)
|
||||
return nil
|
||||
return nil, errors.New("connection closed")
|
||||
}
|
||||
} else if ac.dopts.reqHandshake == envconfig.RequireHandshakeHybrid {
|
||||
go func() {
|
||||
@ -1186,70 +1219,32 @@ func (ac *addrConn) createTransport(backoffNum int, addr resolver.Address, copts
|
||||
// ac.tearDown(...) has been invoked.
|
||||
ac.mu.Unlock()
|
||||
|
||||
// We don't want to reset during this close because we prefer to kick out of this function and let the loop
|
||||
// in resetTransport take care of reconnecting.
|
||||
close(skipReset)
|
||||
|
||||
return errConnClosing
|
||||
return nil, errConnClosing
|
||||
}
|
||||
ac.mu.Unlock()
|
||||
grpclog.Warningf("grpc: addrConn.createTransport failed to connect to %v. Err :%v. Reconnecting...", addr, err)
|
||||
|
||||
// We don't want to reset during this close because we prefer to kick out of this function and let the loop
|
||||
// in resetTransport take care of reconnecting.
|
||||
close(skipReset)
|
||||
|
||||
return err
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// Now there is a viable transport to be use, so set ac.transport to reflect the new viable transport.
|
||||
ac.mu.Lock()
|
||||
if ac.state == connectivity.Shutdown {
|
||||
ac.mu.Unlock()
|
||||
close(skipReset)
|
||||
newTr.Close()
|
||||
return nil
|
||||
return nil, errConnClosing
|
||||
}
|
||||
ac.transport = newTr
|
||||
ac.mu.Unlock()
|
||||
|
||||
healthCheckConfig := ac.cc.healthCheckConfig()
|
||||
// LB channel health checking is only enabled when all the four requirements below are met:
|
||||
// 1. it is not disabled by the user with the WithDisableHealthCheck DialOption,
|
||||
// 2. the internal.HealthCheckFunc is set by importing the grpc/healthcheck package,
|
||||
// 3. a service config with non-empty healthCheckConfig field is provided,
|
||||
// 4. the current load balancer allows it.
|
||||
if !ac.cc.dopts.disableHealthCheck && healthCheckConfig != nil && ac.scopts.HealthCheckEnabled {
|
||||
if internal.HealthCheckFunc != nil {
|
||||
go ac.startHealthCheck(hcCtx, newTr, addr, healthCheckConfig.ServiceName)
|
||||
close(allowedToReset)
|
||||
return nil
|
||||
}
|
||||
// TODO: add a link to the health check doc in the error message.
|
||||
grpclog.Error("the client side LB channel health check function has not been set.")
|
||||
}
|
||||
|
||||
// No LB channel health check case
|
||||
// Now there is a viable transport to be use, so set ac.transport to reflect the new viable transport.
|
||||
ac.mu.Lock()
|
||||
|
||||
if ac.state == connectivity.Shutdown {
|
||||
ac.mu.Unlock()
|
||||
|
||||
// unblock onGoAway/onClose callback.
|
||||
close(skipReset)
|
||||
return errConnClosing
|
||||
newTr.Close()
|
||||
return nil, errConnClosing
|
||||
}
|
||||
|
||||
ac.updateConnectivityState(connectivity.Ready)
|
||||
ac.cc.handleSubConnStateChange(ac.acbw, ac.state)
|
||||
ac.curAddr = addr
|
||||
|
||||
ac.mu.Unlock()
|
||||
|
||||
// Ok, _now_ we will finally let the transport reset if it encounters a closable error. Without this, the reader
|
||||
// goroutine failing races with all the code in this method that sets the connection to "ready".
|
||||
close(allowedToReset)
|
||||
return nil
|
||||
return newTr, nil
|
||||
}
|
||||
|
||||
func (ac *addrConn) startHealthCheck(ctx context.Context, newTr transport.ClientTransport, addr resolver.Address, serviceName string) {
|
||||
@ -1269,19 +1264,12 @@ func (ac *addrConn) startHealthCheck(ctx context.Context, newTr transport.Client
|
||||
firstReady = false
|
||||
ac.curAddr = addr
|
||||
}
|
||||
if ac.state != connectivity.Ready {
|
||||
ac.updateConnectivityState(connectivity.Ready)
|
||||
ac.cc.handleSubConnStateChange(ac.acbw, ac.state)
|
||||
}
|
||||
ac.updateConnectivityState(connectivity.Ready)
|
||||
} else {
|
||||
if ac.state != connectivity.TransientFailure {
|
||||
ac.updateConnectivityState(connectivity.TransientFailure)
|
||||
ac.cc.handleSubConnStateChange(ac.acbw, ac.state)
|
||||
}
|
||||
ac.updateConnectivityState(connectivity.TransientFailure)
|
||||
}
|
||||
}
|
||||
|
||||
err := internal.HealthCheckFunc(ctx, newStream, reportHealth, serviceName)
|
||||
err := ac.cc.dopts.healthCheckFunc(ctx, newStream, reportHealth, serviceName)
|
||||
if err != nil {
|
||||
if status.Code(err) == codes.Unimplemented {
|
||||
if channelz.IsOn() {
|
||||
@ -1297,55 +1285,6 @@ func (ac *addrConn) startHealthCheck(ctx context.Context, newTr transport.Client
|
||||
}
|
||||
}
|
||||
|
||||
// nextAddr increments the addrIdx if there are more addresses to try. If
|
||||
// there are no more addrs to try it will re-resolve, set addrIdx to 0, and
|
||||
// increment the backoffIdx.
|
||||
//
|
||||
// nextAddr must be called without ac.mu being held.
|
||||
func (ac *addrConn) nextAddr() error {
|
||||
ac.mu.Lock()
|
||||
|
||||
// If a handshake has been observed, we want the next usage to start at
|
||||
// index 0 immediately.
|
||||
if ac.successfulHandshake {
|
||||
ac.successfulHandshake = false
|
||||
ac.backoffDeadline = time.Time{}
|
||||
ac.connectDeadline = time.Time{}
|
||||
ac.addrIdx = 0
|
||||
ac.backoffIdx = 0
|
||||
ac.mu.Unlock()
|
||||
return nil
|
||||
}
|
||||
|
||||
if ac.addrIdx < len(ac.addrs)-1 {
|
||||
ac.addrIdx++
|
||||
ac.mu.Unlock()
|
||||
return nil
|
||||
}
|
||||
|
||||
ac.addrIdx = 0
|
||||
ac.backoffIdx++
|
||||
|
||||
if ac.state == connectivity.Shutdown {
|
||||
ac.mu.Unlock()
|
||||
return errConnClosing
|
||||
}
|
||||
ac.cc.resolveNow(resolver.ResolveNowOption{})
|
||||
backoffDeadline := ac.backoffDeadline
|
||||
b := ac.resetBackoff
|
||||
ac.mu.Unlock()
|
||||
timer := time.NewTimer(backoffDeadline.Sub(time.Now()))
|
||||
select {
|
||||
case <-timer.C:
|
||||
case <-b:
|
||||
timer.Stop()
|
||||
case <-ac.ctx.Done():
|
||||
timer.Stop()
|
||||
return ac.ctx.Err()
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (ac *addrConn) resetConnectBackoff() {
|
||||
ac.mu.Lock()
|
||||
close(ac.resetBackoff)
|
||||
@ -1394,7 +1333,6 @@ func (ac *addrConn) tearDown(err error) {
|
||||
ac.updateConnectivityState(connectivity.Shutdown)
|
||||
ac.cancel()
|
||||
ac.tearDownErr = err
|
||||
ac.cc.handleSubConnStateChange(ac.acbw, ac.state)
|
||||
ac.curAddr = resolver.Address{}
|
||||
if err == errConnDrain && curTr != nil {
|
||||
// GracefulClose(...) may be executed multiple times when
|
||||
|
Reference in New Issue
Block a user