rebase: bump google.golang.org/grpc from 1.40.0 to 1.41.0

Bumps [google.golang.org/grpc](https://github.com/grpc/grpc-go) from 1.40.0 to 1.41.0.
- [Release notes](https://github.com/grpc/grpc-go/releases)
- [Commits](https://github.com/grpc/grpc-go/compare/v1.40.0...v1.41.0)

---
updated-dependencies:
- dependency-name: google.golang.org/grpc
  dependency-type: direct:production
  update-type: version-update:semver-minor
...

Signed-off-by: dependabot[bot] <support@github.com>
This commit is contained in:
dependabot[bot] 2021-09-28 09:47:59 +00:00 committed by mergify[bot]
parent 3480cb2c25
commit b85076365c
44 changed files with 657 additions and 517 deletions

2
go.mod
View File

@ -23,7 +23,7 @@ require (
github.com/stretchr/testify v1.7.0
golang.org/x/crypto v0.0.0-20210616213533-5ff15b29337e
golang.org/x/sys v0.0.0-20210817190340-bfb29a6856f2
google.golang.org/grpc v1.40.0
google.golang.org/grpc v1.41.0
k8s.io/api v0.22.2
k8s.io/apimachinery v0.22.2
k8s.io/client-go v12.0.0+incompatible

8
go.sum
View File

@ -183,7 +183,7 @@ github.com/cloudfoundry/gofileutils v0.0.0-20170111115228-4d0c80011a0f/go.mod h1
github.com/clusterhq/flocker-go v0.0.0-20160920122132-2b8b7259d313/go.mod h1:P1wt9Z3DP8O6W3rvwCt0REIlshg1InHImaLW0t3ObY0=
github.com/cncf/udpa/go v0.0.0-20191209042840-269d4d468f6f/go.mod h1:M8M6+tZqaGXZJjfX53e64911xZQV5JYwmTeXPW+k8Sc=
github.com/cncf/udpa/go v0.0.0-20201120205902-5459f2c99403/go.mod h1:WmhPx2Nbnhtbo57+VJT5O0JRkEi1Wbu0z5j0R8u5Hbk=
github.com/cncf/xds/go v0.0.0-20210312221358-fbca930ec8ed/go.mod h1:eXthEFrGJvWHgFFCl3hGmgk+/aYT6PnTQLykKQRLhEs=
github.com/cncf/xds/go v0.0.0-20210805033703-aa0b78936158/go.mod h1:eXthEFrGJvWHgFFCl3hGmgk+/aYT6PnTQLykKQRLhEs=
github.com/cockroachdb/apd v1.1.0/go.mod h1:8Sl8LxpKi29FqWXR16WEFZRNSz3SoPzUzeMeY4+DwBQ=
github.com/cockroachdb/cockroach-go v0.0.0-20181001143604-e0a95dfd547c/go.mod h1:XGLbWH/ujMcbPbhZq52Nv6UrCghb1yGn//133kEsvDk=
github.com/cockroachdb/datadriven v0.0.0-20190809214429-80d97fb3cbaa/go.mod h1:zn76sxSg3SzpJ0PPJaLDCu+Bu0Lg3sKTORVIj19EIF8=
@ -279,7 +279,7 @@ github.com/envoyproxy/go-control-plane v0.9.1-0.20191026205805-5f8ba28d4473/go.m
github.com/envoyproxy/go-control-plane v0.9.4/go.mod h1:6rpuAdCZL397s3pYoYcLgu1mIlRU8Am5FuJP05cCM98=
github.com/envoyproxy/go-control-plane v0.9.9-0.20201210154907-fd9021fe5dad/go.mod h1:cXg6YxExXjJnVBQHBLXeUAgxn2UodCpnH306RInaBQk=
github.com/envoyproxy/go-control-plane v0.9.9-0.20210217033140-668b12f5399d/go.mod h1:cXg6YxExXjJnVBQHBLXeUAgxn2UodCpnH306RInaBQk=
github.com/envoyproxy/go-control-plane v0.9.9-0.20210512163311-63b5d3c536b0/go.mod h1:hliV/p42l8fGbc6Y9bQ70uLwIvmJyVE5k4iMKlh8wCQ=
github.com/envoyproxy/go-control-plane v0.9.10-0.20210907150352-cf90f659a021/go.mod h1:AFq3mo9L8Lqqiid3OhADV3RfLJnjiw63cSpi+fDTRC0=
github.com/envoyproxy/protoc-gen-validate v0.1.0/go.mod h1:iSmxcyjqTsJpI2R4NaDN7+kN2VEUnK/pcBlmesArF7c=
github.com/euank/go-kmsg-parser v2.0.0+incompatible/go.mod h1:MhmAMZ8V4CYH4ybgdRwPr2TU5ThnS43puaKEMpja1uw=
github.com/evanphx/json-patch v0.5.2/go.mod h1:ZWS5hhDbVDyob71nXKNL0+PWn6ToqBHMikGIFbs31qQ=
@ -1454,8 +1454,8 @@ google.golang.org/grpc v1.35.0/go.mod h1:qjiiYl8FncCW8feJPdyg3v6XW24KsRHe+dy9BAG
google.golang.org/grpc v1.36.0/go.mod h1:qjiiYl8FncCW8feJPdyg3v6XW24KsRHe+dy9BAGRRjU=
google.golang.org/grpc v1.37.0/go.mod h1:NREThFqKR1f3iQ6oBuvc5LadQuXVGo9rkm5ZGrQdJfM=
google.golang.org/grpc v1.38.0/go.mod h1:NREThFqKR1f3iQ6oBuvc5LadQuXVGo9rkm5ZGrQdJfM=
google.golang.org/grpc v1.40.0 h1:AGJ0Ih4mHjSeibYkFGh1dD9KJ/eOtZ93I6hoHhukQ5Q=
google.golang.org/grpc v1.40.0/go.mod h1:ogyxbiOoUXAkP+4+xa6PZSE9DZgIHtSpzjDTB9KAK34=
google.golang.org/grpc v1.41.0 h1:f+PlOh7QV4iIJkPrx5NQ7qaNGFQ3OTse67yaDHfju4E=
google.golang.org/grpc v1.41.0/go.mod h1:U3l9uK9J0sini8mHphKoXyaqDA/8VyGnDee1zzIUK6k=
google.golang.org/protobuf v1.22.0/go.mod h1:EGpADcykh3NcUnDUJcl1+ZksZNG86OlYog2l/sGQquU=
google.golang.org/protobuf v1.23.0/go.mod h1:EGpADcykh3NcUnDUJcl1+ZksZNG86OlYog2l/sGQquU=
google.golang.org/protobuf v1.23.1-0.20200526195155-81db48ad09cc/go.mod h1:EGpADcykh3NcUnDUJcl1+ZksZNG86OlYog2l/sGQquU=

View File

@ -8,17 +8,18 @@ See [CONTRIBUTING.md](https://github.com/grpc/grpc-community/blob/master/CONTRIB
for general contribution guidelines.
## Maintainers (in alphabetical order)
- [canguler](https://github.com/canguler), Google LLC
- [cesarghali](https://github.com/cesarghali), Google LLC
- [dfawley](https://github.com/dfawley), Google LLC
- [easwars](https://github.com/easwars), Google LLC
- [jadekler](https://github.com/jadekler), Google LLC
- [menghanl](https://github.com/menghanl), Google LLC
- [srini100](https://github.com/srini100), Google LLC
## Emeritus Maintainers (in alphabetical order)
- [adelez](https://github.com/adelez), Google LLC
- [canguler](https://github.com/canguler), Google LLC
- [iamqizhao](https://github.com/iamqizhao), Google LLC
- [jadekler](https://github.com/jadekler), Google LLC
- [jtattermusch](https://github.com/jtattermusch), Google LLC
- [lyuxuan](https://github.com/lyuxuan), Google LLC
- [makmukhi](https://github.com/makmukhi), Google LLC

View File

@ -41,8 +41,6 @@ vetdeps:
clean \
proto \
test \
testappengine \
testappenginedeps \
testrace \
vet \
vetdeps

13
vendor/google.golang.org/grpc/NOTICE.txt generated vendored Normal file
View File

@ -0,0 +1,13 @@
Copyright 2014 gRPC authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.

View File

@ -75,24 +75,26 @@ func Get(name string) Builder {
return nil
}
// SubConn represents a gRPC sub connection.
// Each sub connection contains a list of addresses. gRPC will
// try to connect to them (in sequence), and stop trying the
// remainder once one connection is successful.
// A SubConn represents a single connection to a gRPC backend service.
//
// The reconnect backoff will be applied on the list, not a single address.
// For example, try_on_all_addresses -> backoff -> try_on_all_addresses.
// Each SubConn contains a list of addresses.
//
// All SubConns start in IDLE, and will not try to connect. To trigger
// the connecting, Balancers must call Connect.
// When the connection encounters an error, it will reconnect immediately.
// When the connection becomes IDLE, it will not reconnect unless Connect is
// called.
// All SubConns start in IDLE, and will not try to connect. To trigger the
// connecting, Balancers must call Connect. If a connection re-enters IDLE,
// Balancers must call Connect again to trigger a new connection attempt.
//
// This interface is to be implemented by gRPC. Users should not need a
// brand new implementation of this interface. For the situations like
// testing, the new implementation should embed this interface. This allows
// gRPC to add new methods to this interface.
// gRPC will try to connect to the addresses in sequence, and stop trying the
// remainder once the first connection is successful. If an attempt to connect
// to all addresses encounters an error, the SubConn will enter
// TRANSIENT_FAILURE for a backoff period, and then transition to IDLE.
//
// Once established, if a connection is lost, the SubConn will transition
// directly to IDLE.
//
// This interface is to be implemented by gRPC. Users should not need their own
// implementation of this interface. For situations like testing, any
// implementations should embed this interface. This allows gRPC to add new
// methods to this interface.
type SubConn interface {
// UpdateAddresses updates the addresses used in this SubConn.
// gRPC checks if currently-connected address is still in the new list.
@ -326,6 +328,20 @@ type Balancer interface {
Close()
}
// ExitIdler is an optional interface for balancers to implement. If
// implemented, ExitIdle will be called when ClientConn.Connect is called, if
// the ClientConn is idle. If unimplemented, ClientConn.Connect will cause
// all SubConns to connect.
//
// Notice: it will be required for all balancers to implement this in a future
// release.
type ExitIdler interface {
// ExitIdle instructs the LB policy to reconnect to backends / exit the
// IDLE state, if appropriate and possible. Note that SubConns that enter
// the IDLE state will not reconnect until SubConn.Connect is called.
ExitIdle()
}
// SubConnState describes the state of a SubConn.
type SubConnState struct {
// ConnectivityState is the connectivity state of the SubConn.
@ -355,6 +371,8 @@ var ErrBadResolverState = errors.New("bad resolver state")
type ConnectivityStateEvaluator struct {
numReady uint64 // Number of addrConns in ready state.
numConnecting uint64 // Number of addrConns in connecting state.
numTransientFailure uint64 // Number of addrConns in transient failure state.
numIdle uint64 // Number of addrConns in idle state.
}
// RecordTransition records state change happening in subConn and based on that
@ -362,9 +380,11 @@ type ConnectivityStateEvaluator struct {
//
// - If at least one SubConn in Ready, the aggregated state is Ready;
// - Else if at least one SubConn in Connecting, the aggregated state is Connecting;
// - Else the aggregated state is TransientFailure.
// - Else if at least one SubConn is TransientFailure, the aggregated state is Transient Failure;
// - Else if at least one SubConn is Idle, the aggregated state is Idle;
// - Else there are no subconns and the aggregated state is Transient Failure
//
// Idle and Shutdown are not considered.
// Shutdown is not considered.
func (cse *ConnectivityStateEvaluator) RecordTransition(oldState, newState connectivity.State) connectivity.State {
// Update counters.
for idx, state := range []connectivity.State{oldState, newState} {
@ -374,6 +394,10 @@ func (cse *ConnectivityStateEvaluator) RecordTransition(oldState, newState conne
cse.numReady += updateVal
case connectivity.Connecting:
cse.numConnecting += updateVal
case connectivity.TransientFailure:
cse.numTransientFailure += updateVal
case connectivity.Idle:
cse.numIdle += updateVal
}
}
@ -384,5 +408,11 @@ func (cse *ConnectivityStateEvaluator) RecordTransition(oldState, newState conne
if cse.numConnecting > 0 {
return connectivity.Connecting
}
if cse.numTransientFailure > 0 {
return connectivity.TransientFailure
}
if cse.numIdle > 0 {
return connectivity.Idle
}
return connectivity.TransientFailure
}

View File

@ -133,6 +133,7 @@ func (b *baseBalancer) UpdateClientConnState(s balancer.ClientConnState) error {
}
b.subConns[aNoAttrs] = subConnInfo{subConn: sc, attrs: a.Attributes}
b.scStates[sc] = connectivity.Idle
b.csEvltr.RecordTransition(connectivity.Shutdown, connectivity.Idle)
sc.Connect()
} else {
// Always update the subconn's address in case the attributes
@ -213,10 +214,14 @@ func (b *baseBalancer) UpdateSubConnState(sc balancer.SubConn, state balancer.Su
}
return
}
if oldS == connectivity.TransientFailure && s == connectivity.Connecting {
// Once a subconn enters TRANSIENT_FAILURE, ignore subsequent
if oldS == connectivity.TransientFailure &&
(s == connectivity.Connecting || s == connectivity.Idle) {
// Once a subconn enters TRANSIENT_FAILURE, ignore subsequent IDLE or
// CONNECTING transitions to prevent the aggregated state from being
// always CONNECTING when many backends exist but are all down.
if s == connectivity.Idle {
sc.Connect()
}
return
}
b.scStates[sc] = s
@ -242,7 +247,6 @@ func (b *baseBalancer) UpdateSubConnState(sc balancer.SubConn, state balancer.Su
b.state == connectivity.TransientFailure {
b.regeneratePicker()
}
b.cc.UpdateState(balancer.State{ConnectivityState: b.state, Picker: b.picker})
}
@ -251,6 +255,11 @@ func (b *baseBalancer) UpdateSubConnState(sc balancer.SubConn, state balancer.Su
func (b *baseBalancer) Close() {
}
// ExitIdle is a nop because the base balancer attempts to stay connected to
// all SubConns at all times.
func (b *baseBalancer) ExitIdle() {
}
// NewErrPicker returns a Picker that always returns err on Pick().
func NewErrPicker(err error) balancer.Picker {
return &errPicker{err: err}

View File

@ -47,11 +47,11 @@ func init() {
type rrPickerBuilder struct{}
func (*rrPickerBuilder) Build(info base.PickerBuildInfo) balancer.Picker {
logger.Infof("roundrobinPicker: newPicker called with info: %v", info)
logger.Infof("roundrobinPicker: Build called with info: %v", info)
if len(info.ReadySCs) == 0 {
return base.NewErrPicker(balancer.ErrNoSubConnAvailable)
}
var scs []balancer.SubConn
scs := make([]balancer.SubConn, 0, len(info.ReadySCs))
for sc := range info.ReadySCs {
scs = append(scs, sc)
}

View File

@ -37,12 +37,17 @@ type scStateUpdate struct {
err error
}
// exitIdle contains no data and is just a signal sent on the updateCh in
// ccBalancerWrapper to instruct the balancer to exit idle.
type exitIdle struct{}
// ccBalancerWrapper is a wrapper on top of cc for balancers.
// It implements balancer.ClientConn interface.
type ccBalancerWrapper struct {
cc *ClientConn
balancerMu sync.Mutex // synchronizes calls to the balancer
balancer balancer.Balancer
hasExitIdle bool
updateCh *buffer.Unbounded
closed *grpcsync.Event
done *grpcsync.Event
@ -61,6 +66,7 @@ func newCCBalancerWrapper(cc *ClientConn, b balancer.Builder, bopts balancer.Bui
}
go ccb.watcher()
ccb.balancer = b.Build(ccb, bopts)
_, ccb.hasExitIdle = ccb.balancer.(balancer.ExitIdler)
return ccb
}
@ -86,6 +92,17 @@ func (ccb *ccBalancerWrapper) watcher() {
ccb.cc.removeAddrConn(u.getAddrConn(), errConnDrain)
}
ccb.mu.Unlock()
case exitIdle:
if ccb.cc.GetState() == connectivity.Idle {
if ei, ok := ccb.balancer.(balancer.ExitIdler); ok {
// We already checked that the balancer implements
// ExitIdle before pushing the event to updateCh, but
// check conditionally again as defensive programming.
ccb.balancerMu.Lock()
ei.ExitIdle()
ccb.balancerMu.Unlock()
}
}
default:
logger.Errorf("ccBalancerWrapper.watcher: unknown update %+v, type %T", t, t)
}
@ -118,6 +135,14 @@ func (ccb *ccBalancerWrapper) close() {
<-ccb.done.Done()
}
func (ccb *ccBalancerWrapper) exitIdle() bool {
if !ccb.hasExitIdle {
return false
}
ccb.updateCh.Put(exitIdle{})
return true
}
func (ccb *ccBalancerWrapper) handleSubConnStateChange(sc balancer.SubConn, s connectivity.State, err error) {
// When updating addresses for a SubConn, if the address in use is not in
// the new addresses, the old ac will be tearDown() and a new ac will be
@ -144,8 +169,8 @@ func (ccb *ccBalancerWrapper) updateClientConnState(ccs *balancer.ClientConnStat
func (ccb *ccBalancerWrapper) resolverError(err error) {
ccb.balancerMu.Lock()
defer ccb.balancerMu.Unlock()
ccb.balancer.ResolverError(err)
ccb.balancerMu.Unlock()
}
func (ccb *ccBalancerWrapper) NewSubConn(addrs []resolver.Address, opts balancer.NewSubConnOptions) (balancer.SubConn, error) {
@ -239,17 +264,17 @@ func (acbw *acBalancerWrapper) UpdateAddresses(addrs []resolver.Address) {
return
}
ac, err := cc.newAddrConn(addrs, opts)
newAC, err := cc.newAddrConn(addrs, opts)
if err != nil {
channelz.Warningf(logger, acbw.ac.channelzID, "acBalancerWrapper: UpdateAddresses: failed to newAddrConn: %v", err)
return
}
acbw.ac = ac
ac.mu.Lock()
ac.acbw = acbw
ac.mu.Unlock()
acbw.ac = newAC
newAC.mu.Lock()
newAC.acbw = acbw
newAC.mu.Unlock()
if acState != connectivity.Idle {
ac.connect()
go newAC.connect()
}
}
}
@ -257,7 +282,7 @@ func (acbw *acBalancerWrapper) UpdateAddresses(addrs []resolver.Address) {
func (acbw *acBalancerWrapper) Connect() {
acbw.mu.Lock()
defer acbw.mu.Unlock()
acbw.ac.connect()
go acbw.ac.connect()
}
func (acbw *acBalancerWrapper) getAddrConn() *addrConn {

View File

@ -322,6 +322,7 @@ func DialContext(ctx context.Context, target string, opts ...DialOption) (conn *
// A blocking dial blocks until the clientConn is ready.
if cc.dopts.block {
for {
cc.Connect()
s := cc.GetState()
if s == connectivity.Ready {
break
@ -539,12 +540,31 @@ func (cc *ClientConn) WaitForStateChange(ctx context.Context, sourceState connec
//
// Experimental
//
// Notice: This API is EXPERIMENTAL and may be changed or removed in a
// later release.
// Notice: This API is EXPERIMENTAL and may be changed or removed in a later
// release.
func (cc *ClientConn) GetState() connectivity.State {
return cc.csMgr.getState()
}
// Connect causes all subchannels in the ClientConn to attempt to connect if
// the channel is idle. Does not wait for the connection attempts to begin
// before returning.
//
// Experimental
//
// Notice: This API is EXPERIMENTAL and may be changed or removed in a later
// release.
func (cc *ClientConn) Connect() {
cc.mu.Lock()
defer cc.mu.Unlock()
if cc.balancerWrapper != nil && cc.balancerWrapper.exitIdle() {
return
}
for ac := range cc.conns {
go ac.connect()
}
}
func (cc *ClientConn) scWatcher() {
for {
select {
@ -845,8 +865,7 @@ func (ac *addrConn) connect() error {
ac.updateConnectivityState(connectivity.Connecting, nil)
ac.mu.Unlock()
// Start a goroutine connecting to the server asynchronously.
go ac.resetTransport()
ac.resetTransport()
return nil
}
@ -883,6 +902,10 @@ func (ac *addrConn) tryUpdateAddrs(addrs []resolver.Address) bool {
// ac.state is Ready, try to find the connected address.
var curAddrFound bool
for _, a := range addrs {
// a.ServerName takes precedent over ClientConn authority, if present.
if a.ServerName == "" {
a.ServerName = ac.cc.authority
}
if reflect.DeepEqual(ac.curAddr, a) {
curAddrFound = true
break
@ -1135,11 +1158,6 @@ func (ac *addrConn) adjustParams(r transport.GoAwayReason) {
}
func (ac *addrConn) resetTransport() {
for i := 0; ; i++ {
if i > 0 {
ac.cc.resolveNow(resolver.ResolveNowOptions{})
}
ac.mu.Lock()
if ac.state == connectivity.Shutdown {
ac.mu.Unlock()
@ -1167,11 +1185,10 @@ func (ac *addrConn) resetTransport() {
connectDeadline := time.Now().Add(dialDuration)
ac.updateConnectivityState(connectivity.Connecting, nil)
ac.transport = nil
ac.mu.Unlock()
newTr, addr, reconnect, err := ac.tryAllAddrs(addrs, connectDeadline)
if err != nil {
if err := ac.tryAllAddrs(addrs, connectDeadline); err != nil {
ac.cc.resolveNow(resolver.ResolveNowOptions{})
// After exhausting all addresses, the addrConn enters
// TRANSIENT_FAILURE.
ac.mu.Lock()
@ -1197,50 +1214,30 @@ func (ac *addrConn) resetTransport() {
timer.Stop()
return
}
continue
}
ac.mu.Lock()
if ac.state == connectivity.Shutdown {
if ac.state != connectivity.Shutdown {
ac.updateConnectivityState(connectivity.Idle, err)
}
ac.mu.Unlock()
newTr.Close(fmt.Errorf("reached connectivity state: SHUTDOWN"))
return
}
ac.curAddr = addr
ac.transport = newTr
// Success; reset backoff.
ac.mu.Lock()
ac.backoffIdx = 0
hctx, hcancel := context.WithCancel(ac.ctx)
ac.startHealthCheck(hctx)
ac.mu.Unlock()
// Block until the created transport is down. And when this happens,
// we restart from the top of the addr list.
<-reconnect.Done()
hcancel()
// restart connecting - the top of the loop will set state to
// CONNECTING. This is against the current connectivity semantics doc,
// however it allows for graceful behavior for RPCs not yet dispatched
// - unfortunate timing would otherwise lead to the RPC failing even
// though the TRANSIENT_FAILURE state (called for by the doc) would be
// instantaneous.
//
// Ideally we should transition to Idle here and block until there is
// RPC activity that leads to the balancer requesting a reconnect of
// the associated SubConn.
}
}
// tryAllAddrs tries to creates a connection to the addresses, and stop when at the
// first successful one. It returns the transport, the address and a Event in
// the successful case. The Event fires when the returned transport disconnects.
func (ac *addrConn) tryAllAddrs(addrs []resolver.Address, connectDeadline time.Time) (transport.ClientTransport, resolver.Address, *grpcsync.Event, error) {
// tryAllAddrs tries to creates a connection to the addresses, and stop when at
// the first successful one. It returns an error if no address was successfully
// connected, or updates ac appropriately with the new transport.
func (ac *addrConn) tryAllAddrs(addrs []resolver.Address, connectDeadline time.Time) error {
var firstConnErr error
for _, addr := range addrs {
ac.mu.Lock()
if ac.state == connectivity.Shutdown {
ac.mu.Unlock()
return nil, resolver.Address{}, nil, errConnClosing
return errConnClosing
}
ac.cc.mu.RLock()
@ -1255,9 +1252,9 @@ func (ac *addrConn) tryAllAddrs(addrs []resolver.Address, connectDeadline time.T
channelz.Infof(logger, ac.channelzID, "Subchannel picks a new address %q to connect", addr.Addr)
newTr, reconnect, err := ac.createTransport(addr, copts, connectDeadline)
err := ac.createTransport(addr, copts, connectDeadline)
if err == nil {
return newTr, addr, reconnect, nil
return nil
}
if firstConnErr == nil {
firstConnErr = err
@ -1266,57 +1263,54 @@ func (ac *addrConn) tryAllAddrs(addrs []resolver.Address, connectDeadline time.T
}
// Couldn't connect to any address.
return nil, resolver.Address{}, nil, firstConnErr
return firstConnErr
}
// createTransport creates a connection to addr. It returns the transport and a
// Event in the successful case. The Event fires when the returned transport
// disconnects.
func (ac *addrConn) createTransport(addr resolver.Address, copts transport.ConnectOptions, connectDeadline time.Time) (transport.ClientTransport, *grpcsync.Event, error) {
prefaceReceived := make(chan struct{})
onCloseCalled := make(chan struct{})
reconnect := grpcsync.NewEvent()
// createTransport creates a connection to addr. It returns an error if the
// address was not successfully connected, or updates ac appropriately with the
// new transport.
func (ac *addrConn) createTransport(addr resolver.Address, copts transport.ConnectOptions, connectDeadline time.Time) error {
// TODO: Delete prefaceReceived and move the logic to wait for it into the
// transport.
prefaceReceived := grpcsync.NewEvent()
connClosed := grpcsync.NewEvent()
// addr.ServerName takes precedent over ClientConn authority, if present.
if addr.ServerName == "" {
addr.ServerName = ac.cc.authority
}
once := sync.Once{}
onGoAway := func(r transport.GoAwayReason) {
ac.mu.Lock()
ac.adjustParams(r)
once.Do(func() {
if ac.state == connectivity.Ready {
// Prevent this SubConn from being used for new RPCs by setting its
// state to Connecting.
//
// TODO: this should be Idle when grpc-go properly supports it.
ac.updateConnectivityState(connectivity.Connecting, nil)
}
})
ac.mu.Unlock()
reconnect.Fire()
}
hctx, hcancel := context.WithCancel(ac.ctx)
hcStarted := false // protected by ac.mu
onClose := func() {
ac.mu.Lock()
once.Do(func() {
if ac.state == connectivity.Ready {
// Prevent this SubConn from being used for new RPCs by setting its
// state to Connecting.
defer ac.mu.Unlock()
defer connClosed.Fire()
if !hcStarted || hctx.Err() != nil {
// We didn't start the health check or set the state to READY, so
// no need to do anything else here.
//
// TODO: this should be Idle when grpc-go properly supports it.
ac.updateConnectivityState(connectivity.Connecting, nil)
// OR, we have already cancelled the health check context, meaning
// we have already called onClose once for this transport. In this
// case it would be dangerous to clear the transport and update the
// state, since there may be a new transport in this addrConn.
return
}
hcancel()
ac.transport = nil
// Refresh the name resolver
ac.cc.resolveNow(resolver.ResolveNowOptions{})
if ac.state != connectivity.Shutdown {
ac.updateConnectivityState(connectivity.Idle, nil)
}
})
ac.mu.Unlock()
close(onCloseCalled)
reconnect.Fire()
}
onPrefaceReceipt := func() {
close(prefaceReceived)
onGoAway := func(r transport.GoAwayReason) {
ac.mu.Lock()
ac.adjustParams(r)
ac.mu.Unlock()
onClose()
}
connectCtx, cancel := context.WithDeadline(ac.ctx, connectDeadline)
@ -1325,27 +1319,67 @@ func (ac *addrConn) createTransport(addr resolver.Address, copts transport.Conne
copts.ChannelzParentID = ac.channelzID
}
newTr, err := transport.NewClientTransport(connectCtx, ac.cc.ctx, addr, copts, onPrefaceReceipt, onGoAway, onClose)
newTr, err := transport.NewClientTransport(connectCtx, ac.cc.ctx, addr, copts, func() { prefaceReceived.Fire() }, onGoAway, onClose)
if err != nil {
// newTr is either nil, or closed.
channelz.Warningf(logger, ac.channelzID, "grpc: addrConn.createTransport failed to connect to %v. Err: %v. Reconnecting...", addr, err)
return nil, nil, err
channelz.Warningf(logger, ac.channelzID, "grpc: addrConn.createTransport failed to connect to %v. Err: %v", addr, err)
return err
}
select {
case <-time.After(time.Until(connectDeadline)):
case <-connectCtx.Done():
// We didn't get the preface in time.
newTr.Close(fmt.Errorf("failed to receive server preface within timeout"))
channelz.Warningf(logger, ac.channelzID, "grpc: addrConn.createTransport failed to connect to %v: didn't receive server preface in time. Reconnecting...", addr)
return nil, nil, errors.New("timed out waiting for server handshake")
case <-prefaceReceived:
// We got the preface - huzzah! things are good.
case <-onCloseCalled:
// The transport has already closed - noop.
return nil, nil, errors.New("connection closed")
// TODO(deklerk) this should bail on ac.ctx.Done(). Add a test and fix.
// The error we pass to Close() is immaterial since there are no open
// streams at this point, so no trailers with error details will be sent
// out. We just need to pass a non-nil error.
newTr.Close(transport.ErrConnClosing)
if connectCtx.Err() == context.DeadlineExceeded {
err := errors.New("failed to receive server preface within timeout")
channelz.Warningf(logger, ac.channelzID, "grpc: addrConn.createTransport failed to connect to %v: %v", addr, err)
return err
}
return nil
case <-prefaceReceived.Done():
// We got the preface - huzzah! things are good.
ac.mu.Lock()
defer ac.mu.Unlock()
if connClosed.HasFired() {
// onClose called first; go idle but do nothing else.
if ac.state != connectivity.Shutdown {
ac.updateConnectivityState(connectivity.Idle, nil)
}
return nil
}
if ac.state == connectivity.Shutdown {
// This can happen if the subConn was removed while in `Connecting`
// state. tearDown() would have set the state to `Shutdown`, but
// would not have closed the transport since ac.transport would not
// been set at that point.
//
// We run this in a goroutine because newTr.Close() calls onClose()
// inline, which requires locking ac.mu.
//
// The error we pass to Close() is immaterial since there are no open
// streams at this point, so no trailers with error details will be sent
// out. We just need to pass a non-nil error.
go newTr.Close(transport.ErrConnClosing)
return nil
}
ac.curAddr = addr
ac.transport = newTr
hcStarted = true
ac.startHealthCheck(hctx) // Will set state to READY if appropriate.
return nil
case <-connClosed.Done():
// The transport has already closed. If we received the preface, too,
// this is not an error.
select {
case <-prefaceReceived.Done():
return nil
default:
return errors.New("connection closed before server preface received")
}
}
return newTr, reconnect, nil
}
// startHealthCheck starts the health checking stream (RPC) to watch the health

View File

@ -18,7 +18,6 @@
// Package connectivity defines connectivity semantics.
// For details, see https://github.com/grpc/grpc/blob/master/doc/connectivity-semantics-and-api.md.
// All APIs in this package are experimental.
package connectivity
import (
@ -45,7 +44,7 @@ func (s State) String() string {
return "SHUTDOWN"
default:
logger.Errorf("unknown connectivity state: %d", s)
return "Invalid-State"
return "INVALID_STATE"
}
}
@ -61,3 +60,35 @@ const (
// Shutdown indicates the ClientConn has started shutting down.
Shutdown
)
// ServingMode indicates the current mode of operation of the server.
//
// Only xDS enabled gRPC servers currently report their serving mode.
type ServingMode int
const (
// ServingModeStarting indicates that the server is starting up.
ServingModeStarting ServingMode = iota
// ServingModeServing indicates that the server contains all required
// configuration and is serving RPCs.
ServingModeServing
// ServingModeNotServing indicates that the server is not accepting new
// connections. Existing connections will be closed gracefully, allowing
// in-progress RPCs to complete. A server enters this mode when it does not
// contain the required configuration to serve RPCs.
ServingModeNotServing
)
func (s ServingMode) String() string {
switch s {
case ServingModeStarting:
return "STARTING"
case ServingModeServing:
return "SERVING"
case ServingModeNotServing:
return "NOT_SERVING"
default:
logger.Errorf("unknown serving mode: %d", s)
return "INVALID_MODE"
}
}

View File

@ -1,30 +0,0 @@
// +build go1.12
/*
*
* Copyright 2019 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package credentials
import "crypto/tls"
// This init function adds cipher suite constants only defined in Go 1.12.
func init() {
cipherSuiteLookup[tls.TLS_AES_128_GCM_SHA256] = "TLS_AES_128_GCM_SHA256"
cipherSuiteLookup[tls.TLS_AES_256_GCM_SHA384] = "TLS_AES_256_GCM_SHA384"
cipherSuiteLookup[tls.TLS_CHACHA20_POLY1305_SHA256] = "TLS_CHACHA20_POLY1305_SHA256"
}

View File

@ -230,4 +230,7 @@ var cipherSuiteLookup = map[uint16]string{
tls.TLS_ECDHE_RSA_WITH_AES_128_CBC_SHA256: "TLS_ECDHE_RSA_WITH_AES_128_CBC_SHA256",
tls.TLS_ECDHE_RSA_WITH_CHACHA20_POLY1305: "TLS_ECDHE_RSA_WITH_CHACHA20_POLY1305",
tls.TLS_ECDHE_ECDSA_WITH_CHACHA20_POLY1305: "TLS_ECDHE_ECDSA_WITH_CHACHA20_POLY1305",
tls.TLS_AES_128_GCM_SHA256: "TLS_AES_128_GCM_SHA256",
tls.TLS_AES_256_GCM_SHA384: "TLS_AES_256_GCM_SHA384",
tls.TLS_CHACHA20_POLY1305_SHA256: "TLS_CHACHA20_POLY1305_SHA256",
}

View File

@ -1,11 +1,11 @@
module google.golang.org/grpc
go 1.11
go 1.14
require (
github.com/cespare/xxhash v1.1.0
github.com/cespare/xxhash/v2 v2.1.1
github.com/cncf/udpa/go v0.0.0-20201120205902-5459f2c99403
github.com/envoyproxy/go-control-plane v0.9.9-0.20210512163311-63b5d3c536b0
github.com/envoyproxy/go-control-plane v0.9.10-0.20210907150352-cf90f659a021
github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b
github.com/golang/protobuf v1.4.3
github.com/google/go-cmp v0.5.0

22
vendor/google.golang.org/grpc/go.sum generated vendored
View File

@ -2,27 +2,25 @@ cloud.google.com/go v0.26.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMT
cloud.google.com/go v0.34.0 h1:eOI3/cP2VTU6uZLDYAoic+eyzzB9YyGmJ7eIjl8rOPg=
cloud.google.com/go v0.34.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMTw=
github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU=
github.com/OneOfOne/xxhash v1.2.2 h1:KMrpdQIwFcEqXDklaen+P1axHaj9BSKzvpUUfnHldSE=
github.com/OneOfOne/xxhash v1.2.2/go.mod h1:HSdplMjZKSmBqAxg5vPj2TmRDmfkzw+cTzAElWljhcU=
github.com/antihax/optional v1.0.0/go.mod h1:uupD/76wgC+ih3iEmQUL+0Ugr19nfwCT1kdvxnR2qWY=
github.com/census-instrumentation/opencensus-proto v0.2.1 h1:glEXhBS5PSLLv4IXzLA5yPRVX4bilULVyxxbrfOtDAk=
github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU=
github.com/cespare/xxhash v1.1.0 h1:a6HrQnmkObjyL+Gs60czilIUGqrzKutQD6XZog3p+ko=
github.com/cespare/xxhash v1.1.0/go.mod h1:XrSqR1VqqWfGrhpAt58auRo0WTKS1nRRg3ghfAqPWnc=
github.com/cespare/xxhash/v2 v2.1.1 h1:6MnRN8NT7+YBpUIWxHtefFZOKTAPgGjpQSxqLNn0+qY=
github.com/cespare/xxhash/v2 v2.1.1/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs=
github.com/client9/misspell v0.3.4/go.mod h1:qj6jICC3Q7zFZvVWo7KLAzC3yx5G7kyvSDkc90ppPyw=
github.com/cncf/udpa/go v0.0.0-20191209042840-269d4d468f6f/go.mod h1:M8M6+tZqaGXZJjfX53e64911xZQV5JYwmTeXPW+k8Sc=
github.com/cncf/udpa/go v0.0.0-20201120205902-5459f2c99403 h1:cqQfy1jclcSy/FwLjemeg3SR1yaINm74aQyupQ0Bl8M=
github.com/cncf/udpa/go v0.0.0-20201120205902-5459f2c99403/go.mod h1:WmhPx2Nbnhtbo57+VJT5O0JRkEi1Wbu0z5j0R8u5Hbk=
github.com/cncf/xds/go v0.0.0-20210312221358-fbca930ec8ed h1:OZmjad4L3H8ncOIR8rnb5MREYqG8ixi5+WbeUsquF0c=
github.com/cncf/xds/go v0.0.0-20210312221358-fbca930ec8ed/go.mod h1:eXthEFrGJvWHgFFCl3hGmgk+/aYT6PnTQLykKQRLhEs=
github.com/cncf/xds/go v0.0.0-20210805033703-aa0b78936158 h1:CevA8fI91PAnP8vpnXuB8ZYAZ5wqY86nAbxfgK8tWO4=
github.com/cncf/xds/go v0.0.0-20210805033703-aa0b78936158/go.mod h1:eXthEFrGJvWHgFFCl3hGmgk+/aYT6PnTQLykKQRLhEs=
github.com/davecgh/go-spew v1.1.0 h1:ZDRjVQ15GmhC3fiQ8ni8+OwkZQO4DARzQgrnXU1Liz8=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/envoyproxy/go-control-plane v0.9.0/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4=
github.com/envoyproxy/go-control-plane v0.9.1-0.20191026205805-5f8ba28d4473/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4=
github.com/envoyproxy/go-control-plane v0.9.4/go.mod h1:6rpuAdCZL397s3pYoYcLgu1mIlRU8Am5FuJP05cCM98=
github.com/envoyproxy/go-control-plane v0.9.9-0.20201210154907-fd9021fe5dad/go.mod h1:cXg6YxExXjJnVBQHBLXeUAgxn2UodCpnH306RInaBQk=
github.com/envoyproxy/go-control-plane v0.9.9-0.20210512163311-63b5d3c536b0 h1:dulLQAYQFYtG5MTplgNGHWuV2D+OBD+Z8lmDBmbLg+s=
github.com/envoyproxy/go-control-plane v0.9.9-0.20210512163311-63b5d3c536b0/go.mod h1:hliV/p42l8fGbc6Y9bQ70uLwIvmJyVE5k4iMKlh8wCQ=
github.com/envoyproxy/go-control-plane v0.9.10-0.20210907150352-cf90f659a021 h1:fP+fF0up6oPY49OrjPrhIJ8yQfdIM85NXMLkMg1EXVs=
github.com/envoyproxy/go-control-plane v0.9.10-0.20210907150352-cf90f659a021/go.mod h1:AFq3mo9L8Lqqiid3OhADV3RfLJnjiw63cSpi+fDTRC0=
github.com/envoyproxy/protoc-gen-validate v0.1.0 h1:EQciDnbrYxy13PgWoY8AqoxGiPrpgBZ1R8UNe3ddc+A=
github.com/envoyproxy/protoc-gen-validate v0.1.0/go.mod h1:iSmxcyjqTsJpI2R4NaDN7+kN2VEUnK/pcBlmesArF7c=
github.com/ghodss/yaml v1.0.0/go.mod h1:4dBDuWmgqj2HViK6kFavaiC9ZROes6MMH2rRYeMEF04=
@ -54,11 +52,10 @@ github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZb
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/prometheus/client_model v0.0.0-20190812154241-14fe0d1b01d4/go.mod h1:xMI15A0UPsDsEKsMN9yxemIoYk6Tm2C1GtYGdfGttqA=
github.com/rogpeppe/fastuuid v1.2.0/go.mod h1:jVj6XXZzXRy/MSR5jhDC/2q6DgLz+nrA6LYCDYWNEvQ=
github.com/spaolacci/murmur3 v0.0.0-20180118202830-f09979ecbc72 h1:qLC7fQah7D6K1B0ujays3HV9gkFtllcxhzImRR7ArPQ=
github.com/spaolacci/murmur3 v0.0.0-20180118202830-f09979ecbc72/go.mod h1:JwIasOWyU6f++ZhiEuf87xNszmSA2myDM2Kzu9HwQUA=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/testify v1.5.1 h1:nOGnQDM7FYENwehXlg/kFVnos3rEvtKTjRvOWSzb6H4=
github.com/stretchr/testify v1.5.1/go.mod h1:5W2xD1RspED5o8YsWQXVCued0rvSQ+mT+I5cxcmMvtA=
github.com/stretchr/testify v1.7.0 h1:nwc3DEeHmmLAfoZucVR881uASk0Mfjw8xYJ99tb5CcY=
github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
go.opentelemetry.io/proto/otlp v0.7.0/go.mod h1:PqfVotwruBrMGOCsRd/89rSnXhoiJIqeYNgFYFoEGnI=
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=
@ -122,7 +119,8 @@ google.golang.org/protobuf v1.25.0/go.mod h1:9JNX74DMeImyA3h4bdi1ymwjUzf21/xIlba
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
gopkg.in/yaml.v2 v2.2.3 h1:fvjTMHxHEw/mxHbtzPi3JCcKXQRAnQTBRo6YCJSVHKI=
gopkg.in/yaml.v2 v2.2.3/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c h1:dUUwHk2QECo/6vqA44rthZ8ie2QXMNeKRTHCNY2nXvo=
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
honnef.co/go/tools v0.0.0-20190102054323-c2f93a96b099/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4=
honnef.co/go/tools v0.0.0-20190523083050-ea95bdfd59fc/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4=

View File

@ -1,6 +0,0 @@
#!/bin/bash
TMP=$(mktemp -d /tmp/sdk.XXX) \
&& curl -o $TMP.zip "https://storage.googleapis.com/appengine-sdks/featured/go_appengine_sdk_linux_amd64-1.9.68.zip" \
&& unzip -q $TMP.zip -d $TMP \
&& export PATH="$PATH:$TMP/go_appengine"

View File

@ -630,7 +630,7 @@ func (c *channelMap) GetServerSockets(id int64, startID int64, maxResults int64)
if count == 0 {
end = true
}
var s []*SocketMetric
s := make([]*SocketMetric, 0, len(sks))
for _, ns := range sks {
sm := &SocketMetric{}
sm.SocketData = ns.s.ChannelzMetric()

View File

@ -1,5 +1,3 @@
// +build !appengine
/*
*
* Copyright 2018 gRPC authors.

View File

@ -1,4 +1,5 @@
// +build !linux appengine
//go:build !linux
// +build !linux
/*
*
@ -37,6 +38,6 @@ type SocketOptionData struct {
// Windows OS doesn't support Socket Option
func (s *SocketOptionData) Getsockopt(fd uintptr) {
once.Do(func() {
logger.Warning("Channelz: socket options are not supported on non-linux os and appengine.")
logger.Warning("Channelz: socket options are not supported on non-linux environments")
})
}

View File

@ -1,5 +1,3 @@
// +build linux,!appengine
/*
*
* Copyright 2018 gRPC authors.

View File

@ -1,4 +1,5 @@
// +build !linux appengine
//go:build !linux
// +build !linux
/*
*

View File

@ -1,5 +1,3 @@
// +build !appengine
/*
*
* Copyright 2020 gRPC authors.

View File

@ -1,31 +0,0 @@
// +build appengine
/*
*
* Copyright 2020 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package credentials
import (
"crypto/tls"
"net/url"
)
// SPIFFEIDFromState is a no-op for appengine builds.
func SPIFFEIDFromState(state tls.ConnectionState) *url.URL {
return nil
}

View File

@ -1,5 +1,3 @@
// +build !appengine
/*
*
* Copyright 2018 gRPC authors.

View File

@ -1,30 +0,0 @@
// +build appengine
/*
*
* Copyright 2018 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package credentials
import (
"net"
)
// WrapSyscallConn returns newConn on appengine.
func WrapSyscallConn(rawConn, newConn net.Conn) net.Conn {
return newConn
}

View File

@ -18,7 +18,9 @@
package credentials
import "crypto/tls"
import (
"crypto/tls"
)
const alpnProtoStrH2 = "h2"

View File

@ -22,6 +22,8 @@ package envconfig
import (
"os"
"strings"
xdsenv "google.golang.org/grpc/internal/xds/env"
)
const (
@ -31,8 +33,8 @@ const (
)
var (
// Retry is set if retry is explicitly enabled via "GRPC_GO_RETRY=on".
Retry = strings.EqualFold(os.Getenv(retryStr), "on")
// Retry is set if retry is explicitly enabled via "GRPC_GO_RETRY=on" or if XDS retry support is enabled.
Retry = strings.EqualFold(os.Getenv(retryStr), "on") || xdsenv.RetrySupport
// TXTErrIgnore is set if TXT errors should be ignored ("GRPC_GO_IGNORE_TXT_ERRORS" is not "false").
TXTErrIgnore = !strings.EqualFold(os.Getenv(txtErrIgnoreStr), "false")
)

View File

@ -117,9 +117,12 @@ type ClientInterceptor interface {
NewStream(ctx context.Context, ri RPCInfo, done func(), newStream func(ctx context.Context, done func()) (ClientStream, error)) (ClientStream, error)
}
// ServerInterceptor is unimplementable; do not use.
// ServerInterceptor is an interceptor for incoming RPC's on gRPC server side.
type ServerInterceptor interface {
notDefined()
// AllowRPC checks if an incoming RPC is allowed to proceed based on
// information about connection RPC was received on, and HTTP Headers. This
// information will be piped into context.
AllowRPC(ctx context.Context) error // TODO: Make this a real interceptor for filters such as rate limiting.
}
type csKeyType string

View File

@ -277,18 +277,13 @@ func (d *dnsResolver) lookupSRV() ([]resolver.Address, error) {
return newAddrs, nil
}
var filterError = func(err error) error {
func handleDNSError(err error, lookupType string) error {
if dnsErr, ok := err.(*net.DNSError); ok && !dnsErr.IsTimeout && !dnsErr.IsTemporary {
// Timeouts and temporary errors should be communicated to gRPC to
// attempt another DNS query (with backoff). Other errors should be
// suppressed (they may represent the absence of a TXT record).
return nil
}
return err
}
func handleDNSError(err error, lookupType string) error {
err = filterError(err)
if err != nil {
err = fmt.Errorf("dns: %v record lookup error: %v", lookupType, err)
logger.Info(err)
@ -323,12 +318,12 @@ func (d *dnsResolver) lookupTXT() *serviceconfig.ParseResult {
}
func (d *dnsResolver) lookupHost() ([]resolver.Address, error) {
var newAddrs []resolver.Address
addrs, err := d.resolver.LookupHost(d.ctx, d.host)
if err != nil {
err = handleDNSError(err, "A")
return nil, err
}
newAddrs := make([]resolver.Address, 0, len(addrs))
for _, a := range addrs {
ip, ok := formatIP(a)
if !ok {

View File

@ -1,33 +0,0 @@
// +build go1.13
/*
*
* Copyright 2019 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package dns
import "net"
func init() {
filterError = func(err error) error {
if dnsErr, ok := err.(*net.DNSError); ok && dnsErr.IsNotFound {
// The name does not exist; not an error.
return nil
}
return err
}
}

View File

@ -78,6 +78,7 @@ func (bc *BalancerConfig) UnmarshalJSON(b []byte) error {
return err
}
var names []string
for i, lbcfg := range ir {
if len(lbcfg) != 1 {
return fmt.Errorf("invalid loadBalancingConfig: entry %v does not contain exactly 1 policy/config pair: %q", i, lbcfg)
@ -92,6 +93,7 @@ func (bc *BalancerConfig) UnmarshalJSON(b []byte) error {
for name, jsonCfg = range lbcfg {
}
names = append(names, name)
builder := balancer.Get(name)
if builder == nil {
// If the balancer is not registered, move on to the next config.
@ -120,7 +122,7 @@ func (bc *BalancerConfig) UnmarshalJSON(b []byte) error {
// return. This means we had a loadBalancingConfig slice but did not
// encounter a registered policy. The config is considered invalid in this
// case.
return fmt.Errorf("invalid loadBalancingConfig: no supported policies found")
return fmt.Errorf("invalid loadBalancingConfig: no supported policies found in %v", names)
}
// MethodConfig defines the configuration recommended by the service providers for a

View File

@ -1,5 +1,3 @@
// +build !appengine
/*
*
* Copyright 2018 gRPC authors.

View File

@ -1,4 +1,5 @@
// +build !linux appengine
//go:build !linux
// +build !linux
/*
*
@ -35,41 +36,41 @@ var logger = grpclog.Component("core")
func log() {
once.Do(func() {
logger.Info("CPU time info is unavailable on non-linux or appengine environment.")
logger.Info("CPU time info is unavailable on non-linux environments.")
})
}
// GetCPUTime returns the how much CPU time has passed since the start of this process.
// It always returns 0 under non-linux or appengine environment.
// GetCPUTime returns the how much CPU time has passed since the start of this
// process. It always returns 0 under non-linux environments.
func GetCPUTime() int64 {
log()
return 0
}
// Rusage is an empty struct under non-linux or appengine environment.
// Rusage is an empty struct under non-linux environments.
type Rusage struct{}
// GetRusage is a no-op function under non-linux or appengine environment.
// GetRusage is a no-op function under non-linux environments.
func GetRusage() *Rusage {
log()
return nil
}
// CPUTimeDiff returns the differences of user CPU time and system CPU time used
// between two Rusage structs. It a no-op function for non-linux or appengine environment.
// between two Rusage structs. It a no-op function for non-linux environments.
func CPUTimeDiff(first *Rusage, latest *Rusage) (float64, float64) {
log()
return 0, 0
}
// SetTCPUserTimeout is a no-op function under non-linux or appengine environments
// SetTCPUserTimeout is a no-op function under non-linux environments.
func SetTCPUserTimeout(conn net.Conn, timeout time.Duration) error {
log()
return nil
}
// GetTCPUserTimeout is a no-op function under non-linux or appengine environments
// a negative return value indicates the operation is not supported
// GetTCPUserTimeout is a no-op function under non-linux environments.
// A negative return value indicates the operation is not supported
func GetTCPUserTimeout(conn net.Conn) (int, error) {
log()
return -1, nil

View File

@ -616,12 +616,22 @@ func (t *http2Client) getCallAuthData(ctx context.Context, audience string, call
return callAuthData, nil
}
// NewStreamError wraps an error and reports additional information.
// NewStreamError wraps an error and reports additional information. Typically
// NewStream errors result in transparent retry, as they mean nothing went onto
// the wire. However, there are two notable exceptions:
//
// 1. If the stream headers violate the max header list size allowed by the
// server. In this case there is no reason to retry at all, as it is
// assumed the RPC would continue to fail on subsequent attempts.
// 2. If the credentials errored when requesting their headers. In this case,
// it's possible a retry can fix the problem, but indefinitely transparently
// retrying is not appropriate as it is likely the credentials, if they can
// eventually succeed, would need I/O to do so.
type NewStreamError struct {
Err error
DoNotRetry bool
PerformedIO bool
DoNotTransparentRetry bool
}
func (e NewStreamError) Error() string {
@ -631,24 +641,10 @@ func (e NewStreamError) Error() string {
// NewStream creates a stream and registers it into the transport as "active"
// streams. All non-nil errors returned will be *NewStreamError.
func (t *http2Client) NewStream(ctx context.Context, callHdr *CallHdr) (_ *Stream, err error) {
defer func() {
if err != nil {
nse, ok := err.(*NewStreamError)
if !ok {
nse = &NewStreamError{Err: err}
}
if len(t.perRPCCreds) > 0 || callHdr.Creds != nil {
// We may have performed I/O in the per-RPC creds callback, so do not
// allow transparent retry.
nse.PerformedIO = true
}
err = nse
}
}()
ctx = peer.NewContext(ctx, t.getPeer())
headerFields, err := t.createHeaderFields(ctx, callHdr)
if err != nil {
return nil, err
return nil, &NewStreamError{Err: err, DoNotTransparentRetry: true}
}
s := t.newStream(ctx, callHdr)
cleanup := func(err error) {
@ -748,7 +744,7 @@ func (t *http2Client) NewStream(ctx context.Context, callHdr *CallHdr) (_ *Strea
return true
}, hdr)
if err != nil {
return nil, err
return nil, &NewStreamError{Err: err}
}
if success {
break
@ -759,12 +755,12 @@ func (t *http2Client) NewStream(ctx context.Context, callHdr *CallHdr) (_ *Strea
firstTry = false
select {
case <-ch:
case <-s.ctx.Done():
return nil, ContextErr(s.ctx.Err())
case <-ctx.Done():
return nil, &NewStreamError{Err: ContextErr(ctx.Err())}
case <-t.goAway:
return nil, errStreamDrain
return nil, &NewStreamError{Err: errStreamDrain}
case <-t.ctx.Done():
return nil, ErrConnClosing
return nil, &NewStreamError{Err: ErrConnClosing}
}
}
if t.statsHandler != nil {

View File

@ -133,6 +133,22 @@ type http2Server struct {
// underlying conn gets closed before the client preface could be read, it
// returns a nil transport and a nil error.
func NewServerTransport(conn net.Conn, config *ServerConfig) (_ ServerTransport, err error) {
var authInfo credentials.AuthInfo
rawConn := conn
if config.Credentials != nil {
var err error
conn, authInfo, err = config.Credentials.ServerHandshake(rawConn)
if err != nil {
// ErrConnDispatched means that the connection was dispatched away
// from gRPC; those connections should be left open. io.EOF means
// the connection was closed before handshaking completed, which can
// happen naturally from probers. Return these errors directly.
if err == credentials.ErrConnDispatched || err == io.EOF {
return nil, err
}
return nil, connectionErrorf(false, err, "ServerHandshake(%q) failed: %v", rawConn.RemoteAddr(), err)
}
}
writeBufSize := config.WriteBufferSize
readBufSize := config.ReadBufferSize
maxHeaderListSize := defaultServerMaxHeaderListSize
@ -215,14 +231,15 @@ func NewServerTransport(conn net.Conn, config *ServerConfig) (_ ServerTransport,
if kep.MinTime == 0 {
kep.MinTime = defaultKeepalivePolicyMinTime
}
done := make(chan struct{})
t := &http2Server{
ctx: context.Background(),
ctx: setConnection(context.Background(), rawConn),
done: done,
conn: conn,
remoteAddr: conn.RemoteAddr(),
localAddr: conn.LocalAddr(),
authInfo: config.AuthInfo,
authInfo: authInfo,
framer: framer,
readerDone: make(chan struct{}),
writerDone: make(chan struct{}),
@ -1345,3 +1362,18 @@ func getJitter(v time.Duration) time.Duration {
j := grpcrand.Int63n(2*r) - r
return time.Duration(j)
}
type connectionKey struct{}
// GetConnection gets the connection from the context.
func GetConnection(ctx context.Context) net.Conn {
conn, _ := ctx.Value(connectionKey{}).(net.Conn)
return conn
}
// SetConnection adds the connection to the context to be able to get
// information about the destination ip and port for an incoming RPC. This also
// allows any unary or streaming interceptors to see the connection.
func setConnection(ctx context.Context, conn net.Conn) context.Context {
return context.WithValue(ctx, connectionKey{}, conn)
}

View File

@ -30,6 +30,7 @@ import (
"net"
"sync"
"sync/atomic"
"time"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/credentials"
@ -518,7 +519,8 @@ const (
// ServerConfig consists of all the configurations to establish a server transport.
type ServerConfig struct {
MaxStreams uint32
AuthInfo credentials.AuthInfo
ConnectionTimeout time.Duration
Credentials credentials.TransportCredentials
InTapHandle tap.ServerInHandle
StatsHandler stats.Handler
KeepaliveParams keepalive.ServerParameters

95
vendor/google.golang.org/grpc/internal/xds/env/env.go generated vendored Normal file
View File

@ -0,0 +1,95 @@
/*
*
* Copyright 2020 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
// Package env acts a single source of definition for all environment variables
// related to the xDS implementation in gRPC.
package env
import (
"os"
"strings"
)
const (
// BootstrapFileNameEnv is the env variable to set bootstrap file name.
// Do not use this and read from env directly. Its value is read and kept in
// variable BootstrapFileName.
//
// When both bootstrap FileName and FileContent are set, FileName is used.
BootstrapFileNameEnv = "GRPC_XDS_BOOTSTRAP"
// BootstrapFileContentEnv is the env variable to set bootstrapp file
// content. Do not use this and read from env directly. Its value is read
// and kept in variable BootstrapFileName.
//
// When both bootstrap FileName and FileContent are set, FileName is used.
BootstrapFileContentEnv = "GRPC_XDS_BOOTSTRAP_CONFIG"
ringHashSupportEnv = "GRPC_XDS_EXPERIMENTAL_ENABLE_RING_HASH"
clientSideSecuritySupportEnv = "GRPC_XDS_EXPERIMENTAL_SECURITY_SUPPORT"
aggregateAndDNSSupportEnv = "GRPC_XDS_EXPERIMENTAL_ENABLE_AGGREGATE_AND_LOGICAL_DNS_CLUSTER"
retrySupportEnv = "GRPC_XDS_EXPERIMENTAL_ENABLE_RETRY"
rbacSupportEnv = "GRPC_XDS_EXPERIMENTAL_ENABLE_RBAC"
c2pResolverSupportEnv = "GRPC_EXPERIMENTAL_GOOGLE_C2P_RESOLVER"
c2pResolverTestOnlyTrafficDirectorURIEnv = "GRPC_TEST_ONLY_GOOGLE_C2P_RESOLVER_TRAFFIC_DIRECTOR_URI"
)
var (
// BootstrapFileName holds the name of the file which contains xDS bootstrap
// configuration. Users can specify the location of the bootstrap file by
// setting the environment variable "GRPC_XDS_BOOTSTRAP".
//
// When both bootstrap FileName and FileContent are set, FileName is used.
BootstrapFileName = os.Getenv(BootstrapFileNameEnv)
// BootstrapFileContent holds the content of the xDS bootstrap
// configuration. Users can specify the bootstrap config by
// setting the environment variable "GRPC_XDS_BOOTSTRAP_CONFIG".
//
// When both bootstrap FileName and FileContent are set, FileName is used.
BootstrapFileContent = os.Getenv(BootstrapFileContentEnv)
// RingHashSupport indicates whether ring hash support is enabled, which can
// be disabled by setting the environment variable
// "GRPC_XDS_EXPERIMENTAL_ENABLE_RING_HASH" to "false".
RingHashSupport = !strings.EqualFold(os.Getenv(ringHashSupportEnv), "false")
// ClientSideSecuritySupport is used to control processing of security
// configuration on the client-side.
//
// Note that there is no env var protection for the server-side because we
// have a brand new API on the server-side and users explicitly need to use
// the new API to get security integration on the server.
ClientSideSecuritySupport = !strings.EqualFold(os.Getenv(clientSideSecuritySupportEnv), "false")
// AggregateAndDNSSupportEnv indicates whether processing of aggregated
// cluster and DNS cluster is enabled, which can be enabled by setting the
// environment variable
// "GRPC_XDS_EXPERIMENTAL_ENABLE_AGGREGATE_AND_LOGICAL_DNS_CLUSTER" to
// "true".
AggregateAndDNSSupportEnv = strings.EqualFold(os.Getenv(aggregateAndDNSSupportEnv), "true")
// RetrySupport indicates whether xDS retry is enabled.
RetrySupport = !strings.EqualFold(os.Getenv(retrySupportEnv), "false")
// RBACSupport indicates whether xDS configured RBAC HTTP Filter is enabled.
RBACSupport = strings.EqualFold(os.Getenv(rbacSupportEnv), "true")
// C2PResolverSupport indicates whether support for C2P resolver is enabled.
// This can be enabled by setting the environment variable
// "GRPC_EXPERIMENTAL_GOOGLE_C2P_RESOLVER" to "true".
C2PResolverSupport = strings.EqualFold(os.Getenv(c2pResolverSupportEnv), "true")
// C2PResolverTestOnlyTrafficDirectorURI is the TD URI for testing.
C2PResolverTestOnlyTrafficDirectorURI = os.Getenv(c2pResolverTestOnlyTrafficDirectorURIEnv)
)

View File

@ -107,10 +107,12 @@ func (b *pickfirstBalancer) UpdateSubConnState(sc balancer.SubConn, s balancer.S
}
switch s.ConnectivityState {
case connectivity.Ready, connectivity.Idle:
case connectivity.Ready:
b.cc.UpdateState(balancer.State{ConnectivityState: s.ConnectivityState, Picker: &picker{result: balancer.PickResult{SubConn: sc}}})
case connectivity.Connecting:
b.cc.UpdateState(balancer.State{ConnectivityState: s.ConnectivityState, Picker: &picker{err: balancer.ErrNoSubConnAvailable}})
case connectivity.Idle:
b.cc.UpdateState(balancer.State{ConnectivityState: s.ConnectivityState, Picker: &idlePicker{sc: sc}})
case connectivity.TransientFailure:
b.cc.UpdateState(balancer.State{
ConnectivityState: s.ConnectivityState,
@ -122,6 +124,12 @@ func (b *pickfirstBalancer) UpdateSubConnState(sc balancer.SubConn, s balancer.S
func (b *pickfirstBalancer) Close() {
}
func (b *pickfirstBalancer) ExitIdle() {
if b.state == connectivity.Idle {
b.sc.Connect()
}
}
type picker struct {
result balancer.PickResult
err error
@ -131,6 +139,17 @@ func (p *picker) Pick(info balancer.PickInfo) (balancer.PickResult, error) {
return p.result, p.err
}
// idlePicker is used when the SubConn is IDLE and kicks the SubConn into
// CONNECTING when Pick is called.
type idlePicker struct {
sc balancer.SubConn
}
func (i *idlePicker) Pick(info balancer.PickInfo) (balancer.PickResult, error) {
i.sc.Connect()
return balancer.PickResult{}, balancer.ErrNoSubConnAvailable
}
func init() {
balancer.Register(newPickfirstBuilder())
}

View File

@ -710,13 +710,6 @@ func (s *Server) GetServiceInfo() map[string]ServiceInfo {
// the server being stopped.
var ErrServerStopped = errors.New("grpc: the server has been stopped")
func (s *Server) useTransportAuthenticator(rawConn net.Conn) (net.Conn, credentials.AuthInfo, error) {
if s.opts.creds == nil {
return rawConn, nil, nil
}
return s.opts.creds.ServerHandshake(rawConn)
}
type listenSocket struct {
net.Listener
channelzID int64
@ -839,35 +832,14 @@ func (s *Server) handleRawConn(lisAddr string, rawConn net.Conn) {
return
}
rawConn.SetDeadline(time.Now().Add(s.opts.connectionTimeout))
conn, authInfo, err := s.useTransportAuthenticator(rawConn)
if err != nil {
// ErrConnDispatched means that the connection was dispatched away from
// gRPC; those connections should be left open.
if err != credentials.ErrConnDispatched {
// In deployments where a gRPC server runs behind a cloud load
// balancer which performs regular TCP level health checks, the
// connection is closed immediately by the latter. Skipping the
// error here will help reduce log clutter.
if err != io.EOF {
s.mu.Lock()
s.errorf("ServerHandshake(%q) failed: %v", rawConn.RemoteAddr(), err)
s.mu.Unlock()
channelz.Warningf(logger, s.channelzID, "grpc: Server.Serve failed to complete security handshake from %q: %v", rawConn.RemoteAddr(), err)
}
rawConn.Close()
}
rawConn.SetDeadline(time.Time{})
return
}
// Finish handshaking (HTTP2)
st := s.newHTTP2Transport(conn, authInfo)
st := s.newHTTP2Transport(rawConn)
rawConn.SetDeadline(time.Time{})
if st == nil {
conn.Close()
return
}
rawConn.SetDeadline(time.Time{})
if !s.addConn(lisAddr, st) {
return
}
@ -888,10 +860,11 @@ func (s *Server) drainServerTransports(addr string) {
// newHTTP2Transport sets up a http/2 transport (using the
// gRPC http2 server transport in transport/http2_server.go).
func (s *Server) newHTTP2Transport(c net.Conn, authInfo credentials.AuthInfo) transport.ServerTransport {
func (s *Server) newHTTP2Transport(c net.Conn) transport.ServerTransport {
config := &transport.ServerConfig{
MaxStreams: s.opts.maxConcurrentStreams,
AuthInfo: authInfo,
ConnectionTimeout: s.opts.connectionTimeout,
Credentials: s.opts.creds,
InTapHandle: s.opts.inTapHandle,
StatsHandler: s.opts.statsHandler,
KeepaliveParams: s.opts.keepaliveParams,
@ -909,8 +882,17 @@ func (s *Server) newHTTP2Transport(c net.Conn, authInfo credentials.AuthInfo) tr
s.mu.Lock()
s.errorf("NewServerTransport(%q) failed: %v", c.RemoteAddr(), err)
s.mu.Unlock()
// ErrConnDispatched means that the connection was dispatched away from
// gRPC; those connections should be left open.
if err != credentials.ErrConnDispatched {
c.Close()
}
// Don't log on ErrConnDispatched and io.EOF to prevent log spam.
if err != credentials.ErrConnDispatched {
if err != io.EOF {
channelz.Warning(logger, s.channelzID, "grpc: Server.Serve failed to create ServerTransport: ", err)
}
}
return nil
}

View File

@ -36,12 +36,12 @@ type RPCStats interface {
IsClient() bool
}
// Begin contains stats when an RPC begins.
// Begin contains stats when an RPC attempt begins.
// FailFast is only valid if this Begin is from client side.
type Begin struct {
// Client is true if this Begin is from client side.
Client bool
// BeginTime is the time when the RPC begins.
// BeginTime is the time when the RPC attempt begins.
BeginTime time.Time
// FailFast indicates if this RPC is failfast.
FailFast bool
@ -49,6 +49,9 @@ type Begin struct {
IsClientStream bool
// IsServerStream indicates whether the RPC is a server streaming RPC.
IsServerStream bool
// IsTransparentRetryAttempt indicates whether this attempt was initiated
// due to transparently retrying a previous attempt.
IsTransparentRetryAttempt bool
}
// IsClient indicates if the stats information is from client side.

View File

@ -274,35 +274,6 @@ func newClientStreamWithParams(ctx context.Context, desc *StreamDesc, cc *Client
if c.creds != nil {
callHdr.Creds = c.creds
}
var trInfo *traceInfo
if EnableTracing {
trInfo = &traceInfo{
tr: trace.New("grpc.Sent."+methodFamily(method), method),
firstLine: firstLine{
client: true,
},
}
if deadline, ok := ctx.Deadline(); ok {
trInfo.firstLine.deadline = time.Until(deadline)
}
trInfo.tr.LazyLog(&trInfo.firstLine, false)
ctx = trace.NewContext(ctx, trInfo.tr)
}
ctx = newContextWithRPCInfo(ctx, c.failFast, c.codec, cp, comp)
sh := cc.dopts.copts.StatsHandler
var beginTime time.Time
if sh != nil {
ctx = sh.TagRPC(ctx, &stats.RPCTagInfo{FullMethodName: method, FailFast: c.failFast})
beginTime = time.Now()
begin := &stats.Begin{
Client: true,
BeginTime: beginTime,
FailFast: c.failFast,
IsClientStream: desc.ClientStreams,
IsServerStream: desc.ServerStreams,
}
sh.HandleRPC(ctx, begin)
}
cs := &clientStream{
callHdr: callHdr,
@ -316,7 +287,6 @@ func newClientStreamWithParams(ctx context.Context, desc *StreamDesc, cc *Client
cp: cp,
comp: comp,
cancel: cancel,
beginTime: beginTime,
firstAttempt: true,
onCommit: onCommit,
}
@ -325,9 +295,7 @@ func newClientStreamWithParams(ctx context.Context, desc *StreamDesc, cc *Client
}
cs.binlog = binarylog.GetMethodLogger(method)
// Only this initial attempt has stats/tracing.
// TODO(dfawley): move to newAttempt when per-attempt stats are implemented.
if err := cs.newAttemptLocked(sh, trInfo); err != nil {
if err := cs.newAttemptLocked(false /* isTransparent */); err != nil {
cs.finish(err)
return nil, err
}
@ -375,8 +343,43 @@ func newClientStreamWithParams(ctx context.Context, desc *StreamDesc, cc *Client
// newAttemptLocked creates a new attempt with a transport.
// If it succeeds, then it replaces clientStream's attempt with this new attempt.
func (cs *clientStream) newAttemptLocked(sh stats.Handler, trInfo *traceInfo) (retErr error) {
func (cs *clientStream) newAttemptLocked(isTransparent bool) (retErr error) {
ctx := newContextWithRPCInfo(cs.ctx, cs.callInfo.failFast, cs.callInfo.codec, cs.cp, cs.comp)
method := cs.callHdr.Method
sh := cs.cc.dopts.copts.StatsHandler
var beginTime time.Time
if sh != nil {
ctx = sh.TagRPC(ctx, &stats.RPCTagInfo{FullMethodName: method, FailFast: cs.callInfo.failFast})
beginTime = time.Now()
begin := &stats.Begin{
Client: true,
BeginTime: beginTime,
FailFast: cs.callInfo.failFast,
IsClientStream: cs.desc.ClientStreams,
IsServerStream: cs.desc.ServerStreams,
IsTransparentRetryAttempt: isTransparent,
}
sh.HandleRPC(ctx, begin)
}
var trInfo *traceInfo
if EnableTracing {
trInfo = &traceInfo{
tr: trace.New("grpc.Sent."+methodFamily(method), method),
firstLine: firstLine{
client: true,
},
}
if deadline, ok := ctx.Deadline(); ok {
trInfo.firstLine.deadline = time.Until(deadline)
}
trInfo.tr.LazyLog(&trInfo.firstLine, false)
ctx = trace.NewContext(ctx, trInfo.tr)
}
newAttempt := &csAttempt{
ctx: ctx,
beginTime: beginTime,
cs: cs,
dc: cs.cc.dopts.dc,
statsHandler: sh,
@ -391,15 +394,14 @@ func (cs *clientStream) newAttemptLocked(sh stats.Handler, trInfo *traceInfo) (r
}
}()
if err := cs.ctx.Err(); err != nil {
if err := ctx.Err(); err != nil {
return toRPCErr(err)
}
ctx := cs.ctx
if cs.cc.parsedTarget.Scheme == "xds" {
// Add extra metadata (metadata that will be added by transport) to context
// so the balancer can see them.
ctx = grpcutil.WithExtraMetadata(cs.ctx, metadata.Pairs(
ctx = grpcutil.WithExtraMetadata(ctx, metadata.Pairs(
"content-type", grpcutil.ContentType(cs.callHdr.ContentSubtype),
))
}
@ -419,7 +421,7 @@ func (cs *clientStream) newAttemptLocked(sh stats.Handler, trInfo *traceInfo) (r
func (a *csAttempt) newStream() error {
cs := a.cs
cs.callHdr.PreviousAttempts = cs.numRetries
s, err := a.t.NewStream(cs.ctx, cs.callHdr)
s, err := a.t.NewStream(a.ctx, cs.callHdr)
if err != nil {
// Return without converting to an RPC error so retry code can
// inspect.
@ -445,7 +447,6 @@ type clientStream struct {
cancel context.CancelFunc // cancels all attempts
sentLast bool // sent an end stream
beginTime time.Time
methodConfig *MethodConfig
@ -485,6 +486,7 @@ type clientStream struct {
// csAttempt implements a single transport stream attempt within a
// clientStream.
type csAttempt struct {
ctx context.Context
cs *clientStream
t transport.ClientTransport
s *transport.Stream
@ -503,6 +505,7 @@ type csAttempt struct {
trInfo *traceInfo
statsHandler stats.Handler
beginTime time.Time
}
func (cs *clientStream) commitAttemptLocked() {
@ -520,15 +523,16 @@ func (cs *clientStream) commitAttempt() {
}
// shouldRetry returns nil if the RPC should be retried; otherwise it returns
// the error that should be returned by the operation.
func (cs *clientStream) shouldRetry(err error) error {
// the error that should be returned by the operation. If the RPC should be
// retried, the bool indicates whether it is being retried transparently.
func (cs *clientStream) shouldRetry(err error) (bool, error) {
if cs.attempt.s == nil {
// Error from NewClientStream.
nse, ok := err.(*transport.NewStreamError)
if !ok {
// Unexpected, but assume no I/O was performed and the RPC is not
// fatal, so retry indefinitely.
return nil
return true, nil
}
// Unwrap and convert error.
@ -537,19 +541,19 @@ func (cs *clientStream) shouldRetry(err error) error {
// Never retry DoNotRetry errors, which indicate the RPC should not be
// retried due to max header list size violation, etc.
if nse.DoNotRetry {
return err
return false, err
}
// In the event of a non-IO operation error from NewStream, we never
// attempted to write anything to the wire, so we can retry
// indefinitely.
if !nse.PerformedIO {
return nil
if !nse.DoNotTransparentRetry {
return true, nil
}
}
if cs.finished || cs.committed {
// RPC is finished or committed; cannot retry.
return err
return false, err
}
// Wait for the trailers.
unprocessed := false
@ -559,17 +563,17 @@ func (cs *clientStream) shouldRetry(err error) error {
}
if cs.firstAttempt && unprocessed {
// First attempt, stream unprocessed: transparently retry.
return nil
return true, nil
}
if cs.cc.dopts.disableRetry {
return err
return false, err
}
pushback := 0
hasPushback := false
if cs.attempt.s != nil {
if !cs.attempt.s.TrailersOnly() {
return err
return false, err
}
// TODO(retry): Move down if the spec changes to not check server pushback
@ -580,13 +584,13 @@ func (cs *clientStream) shouldRetry(err error) error {
if pushback, e = strconv.Atoi(sps[0]); e != nil || pushback < 0 {
channelz.Infof(logger, cs.cc.channelzID, "Server retry pushback specified to abort (%q).", sps[0])
cs.retryThrottler.throttle() // This counts as a failure for throttling.
return err
return false, err
}
hasPushback = true
} else if len(sps) > 1 {
channelz.Warningf(logger, cs.cc.channelzID, "Server retry pushback specified multiple values (%q); not retrying.", sps)
cs.retryThrottler.throttle() // This counts as a failure for throttling.
return err
return false, err
}
}
@ -599,16 +603,16 @@ func (cs *clientStream) shouldRetry(err error) error {
rp := cs.methodConfig.RetryPolicy
if rp == nil || !rp.RetryableStatusCodes[code] {
return err
return false, err
}
// Note: the ordering here is important; we count this as a failure
// only if the code matched a retryable code.
if cs.retryThrottler.throttle() {
return err
return false, err
}
if cs.numRetries+1 >= rp.MaxAttempts {
return err
return false, err
}
var dur time.Duration
@ -631,10 +635,10 @@ func (cs *clientStream) shouldRetry(err error) error {
select {
case <-t.C:
cs.numRetries++
return nil
return false, nil
case <-cs.ctx.Done():
t.Stop()
return status.FromContextError(cs.ctx.Err()).Err()
return false, status.FromContextError(cs.ctx.Err()).Err()
}
}
@ -642,12 +646,13 @@ func (cs *clientStream) shouldRetry(err error) error {
func (cs *clientStream) retryLocked(lastErr error) error {
for {
cs.attempt.finish(toRPCErr(lastErr))
if err := cs.shouldRetry(lastErr); err != nil {
isTransparent, err := cs.shouldRetry(lastErr)
if err != nil {
cs.commitAttemptLocked()
return err
}
cs.firstAttempt = false
if err := cs.newAttemptLocked(nil, nil); err != nil {
if err := cs.newAttemptLocked(isTransparent); err != nil {
return err
}
if lastErr = cs.replayBufferLocked(); lastErr == nil {
@ -937,7 +942,7 @@ func (a *csAttempt) sendMsg(m interface{}, hdr, payld, data []byte) error {
return io.EOF
}
if a.statsHandler != nil {
a.statsHandler.HandleRPC(cs.ctx, outPayload(true, m, data, payld, time.Now()))
a.statsHandler.HandleRPC(a.ctx, outPayload(true, m, data, payld, time.Now()))
}
if channelz.IsOn() {
a.t.IncrMsgSent()
@ -985,7 +990,7 @@ func (a *csAttempt) recvMsg(m interface{}, payInfo *payloadInfo) (err error) {
a.mu.Unlock()
}
if a.statsHandler != nil {
a.statsHandler.HandleRPC(cs.ctx, &stats.InPayload{
a.statsHandler.HandleRPC(a.ctx, &stats.InPayload{
Client: true,
RecvTime: time.Now(),
Payload: m,
@ -1047,12 +1052,12 @@ func (a *csAttempt) finish(err error) {
if a.statsHandler != nil {
end := &stats.End{
Client: true,
BeginTime: a.cs.beginTime,
BeginTime: a.beginTime,
EndTime: time.Now(),
Trailer: tr,
Error: err,
}
a.statsHandler.HandleRPC(a.cs.ctx, end)
a.statsHandler.HandleRPC(a.ctx, end)
}
if a.trInfo != nil && a.trInfo.tr != nil {
if err == nil {

View File

@ -19,4 +19,4 @@
package grpc
// Version is the current grpc version.
const Version = "1.40.0"
const Version = "1.41.0"

View File

@ -89,10 +89,6 @@ not git grep "\(import \|^\s*\)\"github.com/golang/protobuf/ptypes/" -- "*.go"
# - Ensure all xds proto imports are renamed to *pb or *grpc.
git grep '"github.com/envoyproxy/go-control-plane/envoy' -- '*.go' ':(exclude)*.pb.go' | not grep -v 'pb "\|grpc "'
# - Check imports that are illegal in appengine (until Go 1.11).
# TODO: Remove when we drop Go 1.10 support
go list -f {{.Dir}} ./... | xargs go run test/go_vet/vet.go
misspell -error .
# - Check that generated proto files are up to date.

3
vendor/modules.txt vendored
View File

@ -424,7 +424,7 @@ google.golang.org/appengine/urlfetch
google.golang.org/genproto/googleapis/api/httpbody
google.golang.org/genproto/googleapis/rpc/status
google.golang.org/genproto/protobuf/field_mask
# google.golang.org/grpc v1.40.0
# google.golang.org/grpc v1.41.0
## explicit
google.golang.org/grpc
google.golang.org/grpc/attributes
@ -463,6 +463,7 @@ google.golang.org/grpc/internal/status
google.golang.org/grpc/internal/syscall
google.golang.org/grpc/internal/transport
google.golang.org/grpc/internal/transport/networktype
google.golang.org/grpc/internal/xds/env
google.golang.org/grpc/keepalive
google.golang.org/grpc/metadata
google.golang.org/grpc/peer