rebase: add kube-storage/replication to go.mod

add dependent kube-storage/replication package
to the vendor.

update grpc to latest release v1.35.0.

Signed-off-by: Madhu Rajanna <madhupr007@gmail.com>
This commit is contained in:
Madhu Rajanna
2021-02-10 12:21:30 +05:30
committed by mergify[bot]
parent c642637cec
commit 342d282780
97 changed files with 6215 additions and 3551 deletions

View File

@ -23,7 +23,6 @@ import (
"errors"
"fmt"
"math"
"net"
"reflect"
"strings"
"sync"
@ -35,10 +34,11 @@ import (
"google.golang.org/grpc/codes"
"google.golang.org/grpc/connectivity"
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/grpclog"
"google.golang.org/grpc/internal/backoff"
"google.golang.org/grpc/internal/channelz"
"google.golang.org/grpc/internal/grpcsync"
"google.golang.org/grpc/internal/grpcutil"
iresolver "google.golang.org/grpc/internal/resolver"
"google.golang.org/grpc/internal/transport"
"google.golang.org/grpc/keepalive"
"google.golang.org/grpc/resolver"
@ -48,6 +48,7 @@ import (
_ "google.golang.org/grpc/balancer/roundrobin" // To register roundrobin.
_ "google.golang.org/grpc/internal/resolver/dns" // To register dns resolver.
_ "google.golang.org/grpc/internal/resolver/passthrough" // To register passthrough resolver.
_ "google.golang.org/grpc/internal/resolver/unix" // To register unix resolver.
)
const (
@ -68,8 +69,6 @@ var (
errConnDrain = errors.New("grpc: the connection is drained")
// errConnClosing indicates that the connection is closing.
errConnClosing = errors.New("grpc: the connection is closing")
// errBalancerClosed indicates that the balancer is closed.
errBalancerClosed = errors.New("grpc: balancer is closed")
// invalidDefaultServiceConfigErrPrefix is used to prefix the json parsing error for the default
// service config.
invalidDefaultServiceConfigErrPrefix = "grpc: the provided default service config is invalid"
@ -106,6 +105,17 @@ func Dial(target string, opts ...DialOption) (*ClientConn, error) {
return DialContext(context.Background(), target, opts...)
}
type defaultConfigSelector struct {
sc *ServiceConfig
}
func (dcs *defaultConfigSelector) SelectConfig(rpcInfo iresolver.RPCInfo) (*iresolver.RPCConfig, error) {
return &iresolver.RPCConfig{
Context: rpcInfo.Context,
MethodConfig: getMethodConfig(dcs.sc, rpcInfo.Method),
}, nil
}
// DialContext creates a client connection to the given target. By default, it's
// a non-blocking dial (the function won't wait for connections to be
// established, and connecting happens in the background). To make it a blocking
@ -151,20 +161,17 @@ func DialContext(ctx context.Context, target string, opts ...DialOption) (conn *
if channelz.IsOn() {
if cc.dopts.channelzParentID != 0 {
cc.channelzID = channelz.RegisterChannel(&channelzChannel{cc}, cc.dopts.channelzParentID, target)
channelz.AddTraceEvent(cc.channelzID, &channelz.TraceEventDesc{
channelz.AddTraceEvent(logger, cc.channelzID, 0, &channelz.TraceEventDesc{
Desc: "Channel Created",
Severity: channelz.CtINFO,
Severity: channelz.CtInfo,
Parent: &channelz.TraceEventDesc{
Desc: fmt.Sprintf("Nested Channel(id:%d) created", cc.channelzID),
Severity: channelz.CtINFO,
Severity: channelz.CtInfo,
},
})
} else {
cc.channelzID = channelz.RegisterChannel(&channelzChannel{cc}, 0, target)
channelz.AddTraceEvent(cc.channelzID, &channelz.TraceEventDesc{
Desc: "Channel Created",
Severity: channelz.CtINFO,
})
channelz.Info(logger, cc.channelzID, "Channel Created")
}
cc.csMgr.channelzID = cc.channelzID
}
@ -196,15 +203,6 @@ func DialContext(ctx context.Context, target string, opts ...DialOption) (conn *
}
cc.mkp = cc.dopts.copts.KeepaliveParams
if cc.dopts.copts.Dialer == nil {
cc.dopts.copts.Dialer = newProxyDialer(
func(ctx context.Context, addr string) (net.Conn, error) {
network, addr := parseDialTarget(addr)
return (&net.Dialer{}).DialContext(ctx, network, addr)
},
)
}
if cc.dopts.copts.UserAgent != "" {
cc.dopts.copts.UserAgent += " " + grpcUA
} else {
@ -219,7 +217,14 @@ func DialContext(ctx context.Context, target string, opts ...DialOption) (conn *
defer func() {
select {
case <-ctx.Done():
conn, err = nil, ctx.Err()
switch {
case ctx.Err() == err:
conn = nil
case err == nil || !cc.dopts.returnLastError:
conn, err = nil, ctx.Err()
default:
conn, err = nil, fmt.Errorf("%v: %v", ctx.Err(), err)
}
default:
}
}()
@ -231,6 +236,7 @@ func DialContext(ctx context.Context, target string, opts ...DialOption) (conn *
case sc, ok := <-cc.dopts.scChan:
if ok {
cc.sc = &sc
cc.safeConfigSelector.UpdateConfigSelector(&defaultConfigSelector{&sc})
scSet = true
}
default:
@ -239,30 +245,35 @@ func DialContext(ctx context.Context, target string, opts ...DialOption) (conn *
if cc.dopts.bs == nil {
cc.dopts.bs = backoff.DefaultExponential
}
if cc.dopts.resolverBuilder == nil {
// Only try to parse target when resolver builder is not already set.
cc.parsedTarget = parseTarget(cc.target)
grpclog.Infof("parsed scheme: %q", cc.parsedTarget.Scheme)
cc.dopts.resolverBuilder = resolver.Get(cc.parsedTarget.Scheme)
if cc.dopts.resolverBuilder == nil {
// If resolver builder is still nil, the parsed target's scheme is
// not registered. Fallback to default resolver and set Endpoint to
// the original target.
grpclog.Infof("scheme %q not registered, fallback to default scheme", cc.parsedTarget.Scheme)
cc.parsedTarget = resolver.Target{
Scheme: resolver.GetDefaultScheme(),
Endpoint: target,
}
cc.dopts.resolverBuilder = resolver.Get(cc.parsedTarget.Scheme)
// Determine the resolver to use.
cc.parsedTarget = grpcutil.ParseTarget(cc.target, cc.dopts.copts.Dialer != nil)
channelz.Infof(logger, cc.channelzID, "parsed scheme: %q", cc.parsedTarget.Scheme)
resolverBuilder := cc.getResolver(cc.parsedTarget.Scheme)
if resolverBuilder == nil {
// If resolver builder is still nil, the parsed target's scheme is
// not registered. Fallback to default resolver and set Endpoint to
// the original target.
channelz.Infof(logger, cc.channelzID, "scheme %q not registered, fallback to default scheme", cc.parsedTarget.Scheme)
cc.parsedTarget = resolver.Target{
Scheme: resolver.GetDefaultScheme(),
Endpoint: target,
}
resolverBuilder = cc.getResolver(cc.parsedTarget.Scheme)
if resolverBuilder == nil {
return nil, fmt.Errorf("could not get resolver for default scheme: %q", cc.parsedTarget.Scheme)
}
} else {
cc.parsedTarget = resolver.Target{Endpoint: target}
}
creds := cc.dopts.copts.TransportCredentials
if creds != nil && creds.Info().ServerName != "" {
cc.authority = creds.Info().ServerName
} else if cc.dopts.insecure && cc.dopts.authority != "" {
cc.authority = cc.dopts.authority
} else if strings.HasPrefix(cc.target, "unix:") || strings.HasPrefix(cc.target, "unix-abstract:") {
cc.authority = "localhost"
} else if strings.HasPrefix(cc.parsedTarget.Endpoint, ":") {
cc.authority = "localhost" + cc.parsedTarget.Endpoint
} else {
// Use endpoint from "scheme://authority/endpoint" as the default
// authority for ClientConn.
@ -275,6 +286,7 @@ func DialContext(ctx context.Context, target string, opts ...DialOption) (conn *
case sc, ok := <-cc.dopts.scChan:
if ok {
cc.sc = &sc
cc.safeConfigSelector.UpdateConfigSelector(&defaultConfigSelector{&sc})
}
case <-ctx.Done():
return nil, ctx.Err()
@ -292,19 +304,20 @@ func DialContext(ctx context.Context, target string, opts ...DialOption) (conn *
DialCreds: credsClone,
CredsBundle: cc.dopts.copts.CredsBundle,
Dialer: cc.dopts.copts.Dialer,
CustomUserAgent: cc.dopts.copts.UserAgent,
ChannelzParentID: cc.channelzID,
Target: cc.parsedTarget,
}
// Build the resolver.
rWrapper, err := newCCResolverWrapper(cc)
rWrapper, err := newCCResolverWrapper(cc, resolverBuilder)
if err != nil {
return nil, fmt.Errorf("failed to build resolver: %v", err)
}
cc.mu.Lock()
cc.resolverWrapper = rWrapper
cc.mu.Unlock()
// A blocking dial blocks until the clientConn is ready.
if cc.dopts.block {
for {
@ -312,7 +325,7 @@ func DialContext(ctx context.Context, target string, opts ...DialOption) (conn *
if s == connectivity.Ready {
break
} else if cc.dopts.copts.FailOnNonTempDialError && s == connectivity.TransientFailure {
if err = cc.blockingpicker.connectionError(); err != nil {
if err = cc.connectionError(); err != nil {
terr, ok := err.(interface {
Temporary() bool
})
@ -323,6 +336,9 @@ func DialContext(ctx context.Context, target string, opts ...DialOption) (conn *
}
if !cc.WaitForStateChange(ctx, s) {
// ctx got timeout or canceled.
if err = cc.connectionError(); err != nil && cc.dopts.returnLastError {
return nil, err
}
return nil, ctx.Err()
}
}
@ -415,12 +431,7 @@ func (csm *connectivityStateManager) updateState(state connectivity.State) {
return
}
csm.state = state
if channelz.IsOn() {
channelz.AddTraceEvent(csm.channelzID, &channelz.TraceEventDesc{
Desc: fmt.Sprintf("Channel Connectivity change to %v", state),
Severity: channelz.CtINFO,
})
}
channelz.Infof(logger, csm.channelzID, "Channel Connectivity change to %v", state)
if csm.notifyChan != nil {
// There are other goroutines waiting on this channel.
close(csm.notifyChan)
@ -443,6 +454,20 @@ func (csm *connectivityStateManager) getNotifyChan() <-chan struct{} {
return csm.notifyChan
}
// ClientConnInterface defines the functions clients need to perform unary and
// streaming RPCs. It is implemented by *ClientConn, and is only intended to
// be referenced by generated code.
type ClientConnInterface interface {
// Invoke performs a unary RPC and returns after the response is received
// into reply.
Invoke(ctx context.Context, method string, args interface{}, reply interface{}, opts ...CallOption) error
// NewStream begins a streaming RPC.
NewStream(ctx context.Context, desc *StreamDesc, method string, opts ...CallOption) (ClientStream, error)
}
// Assert *ClientConn implements ClientConnInterface.
var _ ClientConnInterface = (*ClientConn)(nil)
// ClientConn represents a virtual connection to a conceptual endpoint, to
// perform RPCs.
//
@ -468,6 +493,8 @@ type ClientConn struct {
balancerBuildOpts balancer.BuildOptions
blockingpicker *pickerWrapper
safeConfigSelector iresolver.SafeConfigSelector
mu sync.RWMutex
resolverWrapper *ccResolverWrapper
sc *ServiceConfig
@ -482,11 +509,18 @@ type ClientConn struct {
channelzID int64 // channelz unique identification number
czData *channelzData
lceMu sync.Mutex // protects lastConnectionError
lastConnectionError error
}
// WaitForStateChange waits until the connectivity.State of ClientConn changes from sourceState or
// ctx expires. A true value is returned in former case and false in latter.
// This is an EXPERIMENTAL API.
//
// Experimental
//
// Notice: This API is EXPERIMENTAL and may be changed or removed in a
// later release.
func (cc *ClientConn) WaitForStateChange(ctx context.Context, sourceState connectivity.State) bool {
ch := cc.csMgr.getNotifyChan()
if cc.csMgr.getState() != sourceState {
@ -501,7 +535,11 @@ func (cc *ClientConn) WaitForStateChange(ctx context.Context, sourceState connec
}
// GetState returns the connectivity.State of ClientConn.
// This is an EXPERIMENTAL API.
//
// Experimental
//
// Notice: This API is EXPERIMENTAL and may be changed or removed in a
// later release.
func (cc *ClientConn) GetState() connectivity.State {
return cc.csMgr.getState()
}
@ -517,6 +555,7 @@ func (cc *ClientConn) scWatcher() {
// TODO: load balance policy runtime change is ignored.
// We may revisit this decision in the future.
cc.sc = &sc
cc.safeConfigSelector.UpdateConfigSelector(&defaultConfigSelector{&sc})
cc.mu.Unlock()
case <-cc.ctx.Done():
return
@ -555,13 +594,13 @@ func init() {
func (cc *ClientConn) maybeApplyDefaultServiceConfig(addrs []resolver.Address) {
if cc.sc != nil {
cc.applyServiceConfigAndBalancer(cc.sc, addrs)
cc.applyServiceConfigAndBalancer(cc.sc, nil, addrs)
return
}
if cc.dopts.defaultServiceConfig != nil {
cc.applyServiceConfigAndBalancer(cc.dopts.defaultServiceConfig, addrs)
cc.applyServiceConfigAndBalancer(cc.dopts.defaultServiceConfig, &defaultConfigSelector{cc.dopts.defaultServiceConfig}, addrs)
} else {
cc.applyServiceConfigAndBalancer(emptyServiceConfig, addrs)
cc.applyServiceConfigAndBalancer(emptyServiceConfig, &defaultConfigSelector{emptyServiceConfig}, addrs)
}
}
@ -598,7 +637,15 @@ func (cc *ClientConn) updateResolverState(s resolver.State, err error) error {
// default, per the error handling design?
} else {
if sc, ok := s.ServiceConfig.Config.(*ServiceConfig); s.ServiceConfig.Err == nil && ok {
cc.applyServiceConfigAndBalancer(sc, s.Addresses)
configSelector := iresolver.GetConfigSelector(s)
if configSelector != nil {
if len(s.ServiceConfig.Config.(*ServiceConfig).Methods) != 0 {
channelz.Infof(logger, cc.channelzID, "method configs in service config will be ignored due to presence of config selector")
}
} else {
configSelector = &defaultConfigSelector{sc}
}
cc.applyServiceConfigAndBalancer(sc, configSelector, s.Addresses)
} else {
ret = balancer.ErrBadResolverState
if cc.balancerWrapper == nil {
@ -608,6 +655,7 @@ func (cc *ClientConn) updateResolverState(s resolver.State, err error) error {
} else {
err = status.Errorf(codes.Unavailable, "illegal service config type: %T", s.ServiceConfig.Config)
}
cc.safeConfigSelector.UpdateConfigSelector(&defaultConfigSelector{cc.sc})
cc.blockingpicker.updatePicker(base.NewErrPicker(err))
cc.csMgr.updateState(connectivity.TransientFailure)
cc.mu.Unlock()
@ -656,9 +704,9 @@ func (cc *ClientConn) switchBalancer(name string) {
return
}
grpclog.Infof("ClientConn switching balancer to %q", name)
channelz.Infof(logger, cc.channelzID, "ClientConn switching balancer to %q", name)
if cc.dopts.balancerBuilder != nil {
grpclog.Infoln("ignoring balancer switching: Balancer DialOption used instead")
channelz.Info(logger, cc.channelzID, "ignoring balancer switching: Balancer DialOption used instead")
return
}
if cc.balancerWrapper != nil {
@ -666,22 +714,12 @@ func (cc *ClientConn) switchBalancer(name string) {
}
builder := balancer.Get(name)
if channelz.IsOn() {
if builder == nil {
channelz.AddTraceEvent(cc.channelzID, &channelz.TraceEventDesc{
Desc: fmt.Sprintf("Channel switches to new LB policy %q due to fallback from invalid balancer name", PickFirstBalancerName),
Severity: channelz.CtWarning,
})
} else {
channelz.AddTraceEvent(cc.channelzID, &channelz.TraceEventDesc{
Desc: fmt.Sprintf("Channel switches to new LB policy %q", name),
Severity: channelz.CtINFO,
})
}
}
if builder == nil {
grpclog.Infof("failed to get balancer builder for: %v, using pick_first instead", name)
channelz.Warningf(logger, cc.channelzID, "Channel switches to new LB policy %q due to fallback from invalid balancer name", PickFirstBalancerName)
channelz.Infof(logger, cc.channelzID, "failed to get balancer builder for: %v, using pick_first instead", name)
builder = newPickfirstBuilder()
} else {
channelz.Infof(logger, cc.channelzID, "Channel switches to new LB policy %q", name)
}
cc.curBalancerName = builder.Name()
@ -705,6 +743,7 @@ func (cc *ClientConn) handleSubConnStateChange(sc balancer.SubConn, s connectivi
// Caller needs to make sure len(addrs) > 0.
func (cc *ClientConn) newAddrConn(addrs []resolver.Address, opts balancer.NewSubConnOptions) (*addrConn, error) {
ac := &addrConn{
state: connectivity.Idle,
cc: cc,
addrs: addrs,
scopts: opts,
@ -721,12 +760,12 @@ func (cc *ClientConn) newAddrConn(addrs []resolver.Address, opts balancer.NewSub
}
if channelz.IsOn() {
ac.channelzID = channelz.RegisterSubChannel(ac, cc.channelzID, "")
channelz.AddTraceEvent(ac.channelzID, &channelz.TraceEventDesc{
channelz.AddTraceEvent(logger, ac.channelzID, 0, &channelz.TraceEventDesc{
Desc: "Subchannel Created",
Severity: channelz.CtINFO,
Severity: channelz.CtInfo,
Parent: &channelz.TraceEventDesc{
Desc: fmt.Sprintf("Subchannel(id:%d) created", ac.channelzID),
Severity: channelz.CtINFO,
Severity: channelz.CtInfo,
},
})
}
@ -760,7 +799,11 @@ func (cc *ClientConn) channelzMetric() *channelz.ChannelInternalMetric {
}
// Target returns the target string of the ClientConn.
// This is an EXPERIMENTAL API.
//
// Experimental
//
// Notice: This API is EXPERIMENTAL and may be changed or removed in a
// later release.
func (cc *ClientConn) Target() string {
return cc.target
}
@ -819,7 +862,7 @@ func (ac *addrConn) connect() error {
func (ac *addrConn) tryUpdateAddrs(addrs []resolver.Address) bool {
ac.mu.Lock()
defer ac.mu.Unlock()
grpclog.Infof("addrConn: tryUpdateAddrs curAddr: %v, addrs: %v", ac.curAddr, addrs)
channelz.Infof(logger, ac.channelzID, "addrConn: tryUpdateAddrs curAddr: %v, addrs: %v", ac.curAddr, addrs)
if ac.state == connectivity.Shutdown ||
ac.state == connectivity.TransientFailure ||
ac.state == connectivity.Idle {
@ -839,7 +882,7 @@ func (ac *addrConn) tryUpdateAddrs(addrs []resolver.Address) bool {
break
}
}
grpclog.Infof("addrConn: tryUpdateAddrs curAddrFound: %v", curAddrFound)
channelz.Infof(logger, ac.channelzID, "addrConn: tryUpdateAddrs curAddrFound: %v", curAddrFound)
if curAddrFound {
ac.addrs = addrs
}
@ -847,26 +890,33 @@ func (ac *addrConn) tryUpdateAddrs(addrs []resolver.Address) bool {
return curAddrFound
}
func getMethodConfig(sc *ServiceConfig, method string) MethodConfig {
if sc == nil {
return MethodConfig{}
}
if m, ok := sc.Methods[method]; ok {
return m
}
i := strings.LastIndex(method, "/")
if m, ok := sc.Methods[method[:i+1]]; ok {
return m
}
return sc.Methods[""]
}
// GetMethodConfig gets the method config of the input method.
// If there's an exact match for input method (i.e. /service/method), we return
// the corresponding MethodConfig.
// If there isn't an exact match for the input method, we look for the default config
// under the service (i.e /service/). If there is a default MethodConfig for
// the service, we return it.
// If there isn't an exact match for the input method, we look for the service's default
// config under the service (i.e /service/) and then for the default for all services (empty string).
//
// If there is a default MethodConfig for the service, we return it.
// Otherwise, we return an empty MethodConfig.
func (cc *ClientConn) GetMethodConfig(method string) MethodConfig {
// TODO: Avoid the locking here.
cc.mu.RLock()
defer cc.mu.RUnlock()
if cc.sc == nil {
return MethodConfig{}
}
m, ok := cc.sc.Methods[method]
if !ok {
i := strings.LastIndex(method, "/")
m = cc.sc.Methods[method[:i+1]]
}
return m
return getMethodConfig(cc.sc, method)
}
func (cc *ClientConn) healthCheckConfig() *healthCheckConfig {
@ -889,12 +939,15 @@ func (cc *ClientConn) getTransport(ctx context.Context, failfast bool, method st
return t, done, nil
}
func (cc *ClientConn) applyServiceConfigAndBalancer(sc *ServiceConfig, addrs []resolver.Address) {
func (cc *ClientConn) applyServiceConfigAndBalancer(sc *ServiceConfig, configSelector iresolver.ConfigSelector, addrs []resolver.Address) {
if sc == nil {
// should never reach here.
return
}
cc.sc = sc
if configSelector != nil {
cc.safeConfigSelector.UpdateConfigSelector(configSelector)
}
if cc.sc.retryThrottling != nil {
newThrottler := &retryThrottler{
@ -958,7 +1011,10 @@ func (cc *ClientConn) resolveNow(o resolver.ResolveNowOptions) {
// However, if a previously unavailable network becomes available, this may be
// used to trigger an immediate reconnect.
//
// This API is EXPERIMENTAL.
// Experimental
//
// Notice: This API is EXPERIMENTAL and may be changed or removed in a
// later release.
func (cc *ClientConn) ResetConnectBackoff() {
cc.mu.Lock()
conns := cc.conns
@ -1002,15 +1058,15 @@ func (cc *ClientConn) Close() error {
if channelz.IsOn() {
ted := &channelz.TraceEventDesc{
Desc: "Channel Deleted",
Severity: channelz.CtINFO,
Severity: channelz.CtInfo,
}
if cc.dopts.channelzParentID != 0 {
ted.Parent = &channelz.TraceEventDesc{
Desc: fmt.Sprintf("Nested channel(id:%d) deleted", cc.channelzID),
Severity: channelz.CtINFO,
Severity: channelz.CtInfo,
}
}
channelz.AddTraceEvent(cc.channelzID, ted)
channelz.AddTraceEvent(logger, cc.channelzID, 0, ted)
// TraceEvent needs to be called before RemoveEntry, as TraceEvent may add trace reference to
// the entity being deleted, and thus prevent it from being deleted right away.
channelz.RemoveEntry(cc.channelzID)
@ -1053,15 +1109,8 @@ func (ac *addrConn) updateConnectivityState(s connectivity.State, lastErr error)
if ac.state == s {
return
}
updateMsg := fmt.Sprintf("Subchannel Connectivity change to %v", s)
ac.state = s
if channelz.IsOn() {
channelz.AddTraceEvent(ac.channelzID, &channelz.TraceEventDesc{
Desc: updateMsg,
Severity: channelz.CtINFO,
})
}
channelz.Infof(logger, ac.channelzID, "Subchannel Connectivity change to %v", s)
ac.cc.handleSubConnStateChange(ac.acbw, s, lastErr)
}
@ -1198,12 +1247,7 @@ func (ac *addrConn) tryAllAddrs(addrs []resolver.Address, connectDeadline time.T
}
ac.mu.Unlock()
if channelz.IsOn() {
channelz.AddTraceEvent(ac.channelzID, &channelz.TraceEventDesc{
Desc: fmt.Sprintf("Subchannel picks a new address %q to connect", addr.Addr),
Severity: channelz.CtINFO,
})
}
channelz.Infof(logger, ac.channelzID, "Subchannel picks a new address %q to connect", addr.Addr)
newTr, reconnect, err := ac.createTransport(addr, copts, connectDeadline)
if err == nil {
@ -1212,7 +1256,7 @@ func (ac *addrConn) tryAllAddrs(addrs []resolver.Address, connectDeadline time.T
if firstConnErr == nil {
firstConnErr = err
}
ac.cc.blockingpicker.updateConnectionError(err)
ac.cc.updateConnectionError(err)
}
// Couldn't connect to any address.
@ -1227,16 +1271,9 @@ func (ac *addrConn) createTransport(addr resolver.Address, copts transport.Conne
onCloseCalled := make(chan struct{})
reconnect := grpcsync.NewEvent()
authority := ac.cc.authority
// addr.ServerName takes precedent over ClientConn authority, if present.
if addr.ServerName != "" {
authority = addr.ServerName
}
target := transport.TargetInfo{
Addr: addr.Addr,
Metadata: addr.Metadata,
Authority: authority,
if addr.ServerName == "" {
addr.ServerName = ac.cc.authority
}
once := sync.Once{}
@ -1282,10 +1319,10 @@ func (ac *addrConn) createTransport(addr resolver.Address, copts transport.Conne
copts.ChannelzParentID = ac.channelzID
}
newTr, err := transport.NewClientTransport(connectCtx, ac.cc.ctx, target, copts, onPrefaceReceipt, onGoAway, onClose)
newTr, err := transport.NewClientTransport(connectCtx, ac.cc.ctx, addr, copts, onPrefaceReceipt, onGoAway, onClose)
if err != nil {
// newTr is either nil, or closed.
grpclog.Warningf("grpc: addrConn.createTransport failed to connect to %v. Err :%v. Reconnecting...", addr, err)
channelz.Warningf(logger, ac.channelzID, "grpc: addrConn.createTransport failed to connect to %v. Err: %v. Reconnecting...", addr, err)
return nil, nil, err
}
@ -1293,7 +1330,7 @@ func (ac *addrConn) createTransport(addr resolver.Address, copts transport.Conne
case <-time.After(time.Until(connectDeadline)):
// We didn't get the preface in time.
newTr.Close()
grpclog.Warningf("grpc: addrConn.createTransport failed to connect to %v: didn't receive server preface in time. Reconnecting...", addr)
channelz.Warningf(logger, ac.channelzID, "grpc: addrConn.createTransport failed to connect to %v: didn't receive server preface in time. Reconnecting...", addr)
return nil, nil, errors.New("timed out waiting for server handshake")
case <-prefaceReceived:
// We got the preface - huzzah! things are good.
@ -1310,7 +1347,7 @@ func (ac *addrConn) createTransport(addr resolver.Address, copts transport.Conne
//
// LB channel health checking is enabled when all requirements below are met:
// 1. it is not disabled by the user with the WithDisableHealthCheck DialOption
// 2. internal.HealthCheckFunc is set by importing the grpc/healthcheck package
// 2. internal.HealthCheckFunc is set by importing the grpc/health package
// 3. a service config with non-empty healthCheckConfig field is provided
// 4. the load balancer requests it
//
@ -1340,7 +1377,7 @@ func (ac *addrConn) startHealthCheck(ctx context.Context) {
// The health package is not imported to set health check function.
//
// TODO: add a link to the health check doc in the error message.
grpclog.Error("Health check is requested but health check function is not set.")
channelz.Error(logger, ac.channelzID, "Health check is requested but health check function is not set.")
return
}
@ -1370,15 +1407,9 @@ func (ac *addrConn) startHealthCheck(ctx context.Context) {
err := ac.cc.dopts.healthCheckFunc(ctx, newStream, setConnectivityState, healthCheckConfig.ServiceName)
if err != nil {
if status.Code(err) == codes.Unimplemented {
if channelz.IsOn() {
channelz.AddTraceEvent(ac.channelzID, &channelz.TraceEventDesc{
Desc: "Subchannel health check is unimplemented at server side, thus health check is disabled",
Severity: channelz.CtError,
})
}
grpclog.Error("Subchannel health check is unimplemented at server side, thus health check is disabled")
channelz.Error(logger, ac.channelzID, "Subchannel health check is unimplemented at server side, thus health check is disabled")
} else {
grpclog.Errorf("HealthCheckFunc exits with unexpected error %v", err)
channelz.Errorf(logger, ac.channelzID, "HealthCheckFunc exits with unexpected error %v", err)
}
}
}()
@ -1443,12 +1474,12 @@ func (ac *addrConn) tearDown(err error) {
ac.mu.Lock()
}
if channelz.IsOn() {
channelz.AddTraceEvent(ac.channelzID, &channelz.TraceEventDesc{
channelz.AddTraceEvent(logger, ac.channelzID, 0, &channelz.TraceEventDesc{
Desc: "Subchannel Deleted",
Severity: channelz.CtINFO,
Severity: channelz.CtInfo,
Parent: &channelz.TraceEventDesc{
Desc: fmt.Sprintf("Subchanel(id:%d) deleted", ac.channelzID),
Severity: channelz.CtINFO,
Severity: channelz.CtInfo,
},
})
// TraceEvent needs to be called before RemoveEntry, as TraceEvent may add trace reference to
@ -1542,3 +1573,24 @@ func (c *channelzChannel) ChannelzMetric() *channelz.ChannelInternalMetric {
// Deprecated: This error is never returned by grpc and should not be
// referenced by users.
var ErrClientConnTimeout = errors.New("grpc: timed out when dialing")
func (cc *ClientConn) getResolver(scheme string) resolver.Builder {
for _, rb := range cc.dopts.resolvers {
if scheme == rb.Scheme() {
return rb
}
}
return resolver.Get(scheme)
}
func (cc *ClientConn) updateConnectionError(err error) {
cc.lceMu.Lock()
cc.lastConnectionError = err
cc.lceMu.Unlock()
}
func (cc *ClientConn) connectionError() error {
cc.lceMu.Lock()
defer cc.lceMu.Unlock()
return cc.lastConnectionError
}