rebase: bump google.golang.org/grpc from 1.46.0 to 1.46.2

Bumps [google.golang.org/grpc](https://github.com/grpc/grpc-go) from 1.46.0 to 1.46.2.
- [Release notes](https://github.com/grpc/grpc-go/releases)
- [Commits](https://github.com/grpc/grpc-go/compare/v1.46.0...v1.46.2)

---
updated-dependencies:
- dependency-name: google.golang.org/grpc
  dependency-type: direct:production
  update-type: version-update:semver-patch
...

Signed-off-by: dependabot[bot] <support@github.com>
This commit is contained in:
dependabot[bot]
2022-05-16 20:15:22 +00:00
committed by mergify[bot]
parent 0b043d3c21
commit eea957b6b3
8 changed files with 127 additions and 114 deletions

View File

@ -303,14 +303,28 @@ func newClientStreamWithParams(ctx context.Context, desc *StreamDesc, cc *Client
}
cs.binlog = binarylog.GetMethodLogger(method)
if err := cs.newAttemptLocked(false /* isTransparent */); err != nil {
cs.attempt, err = cs.newAttemptLocked(false /* isTransparent */)
if err != nil {
cs.finish(err)
return nil, err
}
op := func(a *csAttempt) error { return a.newStream() }
// Pick the transport to use and create a new stream on the transport.
// Assign cs.attempt upon success.
op := func(a *csAttempt) error {
if err := a.getTransport(); err != nil {
return err
}
if err := a.newStream(); err != nil {
return err
}
// Because this operation is always called either here (while creating
// the clientStream) or by the retry code while locked when replaying
// the operation, it is safe to access cs.attempt directly.
cs.attempt = a
return nil
}
if err := cs.withRetry(op, func() { cs.bufferForRetryLocked(0, op) }); err != nil {
cs.finish(err)
return nil, err
}
@ -349,9 +363,15 @@ func newClientStreamWithParams(ctx context.Context, desc *StreamDesc, cc *Client
return cs, nil
}
// newAttemptLocked creates a new attempt with a transport.
// If it succeeds, then it replaces clientStream's attempt with this new attempt.
func (cs *clientStream) newAttemptLocked(isTransparent bool) (retErr error) {
// newAttemptLocked creates a new csAttempt without a transport or stream.
func (cs *clientStream) newAttemptLocked(isTransparent bool) (*csAttempt, error) {
if err := cs.ctx.Err(); err != nil {
return nil, toRPCErr(err)
}
if err := cs.cc.ctx.Err(); err != nil {
return nil, ErrClientConnClosing
}
ctx := newContextWithRPCInfo(cs.ctx, cs.callInfo.failFast, cs.callInfo.codec, cs.cp, cs.comp)
method := cs.callHdr.Method
sh := cs.cc.dopts.copts.StatsHandler
@ -385,27 +405,6 @@ func (cs *clientStream) newAttemptLocked(isTransparent bool) (retErr error) {
ctx = trace.NewContext(ctx, trInfo.tr)
}
newAttempt := &csAttempt{
ctx: ctx,
beginTime: beginTime,
cs: cs,
dc: cs.cc.dopts.dc,
statsHandler: sh,
trInfo: trInfo,
}
defer func() {
if retErr != nil {
// This attempt is not set in the clientStream, so it's finish won't
// be called. Call it here for stats and trace in case they are not
// nil.
newAttempt.finish(retErr)
}
}()
if err := ctx.Err(); err != nil {
return toRPCErr(err)
}
if cs.cc.parsedTarget.Scheme == "xds" {
// Add extra metadata (metadata that will be added by transport) to context
// so the balancer can see them.
@ -413,16 +412,32 @@ func (cs *clientStream) newAttemptLocked(isTransparent bool) (retErr error) {
"content-type", grpcutil.ContentType(cs.callHdr.ContentSubtype),
))
}
t, done, err := cs.cc.getTransport(ctx, cs.callInfo.failFast, cs.callHdr.Method)
return &csAttempt{
ctx: ctx,
beginTime: beginTime,
cs: cs,
dc: cs.cc.dopts.dc,
statsHandler: sh,
trInfo: trInfo,
}, nil
}
func (a *csAttempt) getTransport() error {
cs := a.cs
var err error
a.t, a.done, err = cs.cc.getTransport(a.ctx, cs.callInfo.failFast, cs.callHdr.Method)
if err != nil {
if de, ok := err.(dropError); ok {
err = de.error
a.drop = true
}
return err
}
if trInfo != nil {
trInfo.firstLine.SetRemoteAddr(t.RemoteAddr())
if a.trInfo != nil {
a.trInfo.firstLine.SetRemoteAddr(a.t.RemoteAddr())
}
newAttempt.t = t
newAttempt.done = done
cs.attempt = newAttempt
return nil
}
@ -431,12 +446,21 @@ func (a *csAttempt) newStream() error {
cs.callHdr.PreviousAttempts = cs.numRetries
s, err := a.t.NewStream(a.ctx, cs.callHdr)
if err != nil {
// Return without converting to an RPC error so retry code can
// inspect.
return err
nse, ok := err.(*transport.NewStreamError)
if !ok {
// Unexpected.
return err
}
if nse.AllowTransparentRetry {
a.allowTransparentRetry = true
}
// Unwrap and convert error.
return toRPCErr(nse.Err)
}
cs.attempt.s = s
cs.attempt.p = &parser{r: s}
a.s = s
a.p = &parser{r: s}
return nil
}
@ -514,6 +538,11 @@ type csAttempt struct {
statsHandler stats.Handler
beginTime time.Time
// set for newStream errors that may be transparently retried
allowTransparentRetry bool
// set for pick errors that are returned as a status
drop bool
}
func (cs *clientStream) commitAttemptLocked() {
@ -533,41 +562,21 @@ func (cs *clientStream) commitAttempt() {
// shouldRetry returns nil if the RPC should be retried; otherwise it returns
// the error that should be returned by the operation. If the RPC should be
// retried, the bool indicates whether it is being retried transparently.
func (cs *clientStream) shouldRetry(err error) (bool, error) {
if cs.attempt.s == nil {
// Error from NewClientStream.
nse, ok := err.(*transport.NewStreamError)
if !ok {
// Unexpected, but assume no I/O was performed and the RPC is not
// fatal, so retry indefinitely.
return true, nil
}
func (a *csAttempt) shouldRetry(err error) (bool, error) {
cs := a.cs
// Unwrap and convert error.
err = toRPCErr(nse.Err)
// Never retry DoNotRetry errors, which indicate the RPC should not be
// retried due to max header list size violation, etc.
if nse.DoNotRetry {
return false, err
}
// In the event of a non-IO operation error from NewStream, we never
// attempted to write anything to the wire, so we can retry
// indefinitely.
if !nse.DoNotTransparentRetry {
return true, nil
}
}
if cs.finished || cs.committed {
// RPC is finished or committed; cannot retry.
if cs.finished || cs.committed || a.drop {
// RPC is finished or committed or was dropped by the picker; cannot retry.
return false, err
}
if a.s == nil && a.allowTransparentRetry {
return true, nil
}
// Wait for the trailers.
unprocessed := false
if cs.attempt.s != nil {
<-cs.attempt.s.Done()
unprocessed = cs.attempt.s.Unprocessed()
if a.s != nil {
<-a.s.Done()
unprocessed = a.s.Unprocessed()
}
if cs.firstAttempt && unprocessed {
// First attempt, stream unprocessed: transparently retry.
@ -579,14 +588,14 @@ func (cs *clientStream) shouldRetry(err error) (bool, error) {
pushback := 0
hasPushback := false
if cs.attempt.s != nil {
if !cs.attempt.s.TrailersOnly() {
if a.s != nil {
if !a.s.TrailersOnly() {
return false, err
}
// TODO(retry): Move down if the spec changes to not check server pushback
// before considering this a failure for throttling.
sps := cs.attempt.s.Trailer()["grpc-retry-pushback-ms"]
sps := a.s.Trailer()["grpc-retry-pushback-ms"]
if len(sps) == 1 {
var e error
if pushback, e = strconv.Atoi(sps[0]); e != nil || pushback < 0 {
@ -603,10 +612,10 @@ func (cs *clientStream) shouldRetry(err error) (bool, error) {
}
var code codes.Code
if cs.attempt.s != nil {
code = cs.attempt.s.Status().Code()
if a.s != nil {
code = a.s.Status().Code()
} else {
code = status.Convert(err).Code()
code = status.Code(err)
}
rp := cs.methodConfig.RetryPolicy
@ -651,19 +660,24 @@ func (cs *clientStream) shouldRetry(err error) (bool, error) {
}
// Returns nil if a retry was performed and succeeded; error otherwise.
func (cs *clientStream) retryLocked(lastErr error) error {
func (cs *clientStream) retryLocked(attempt *csAttempt, lastErr error) error {
for {
cs.attempt.finish(toRPCErr(lastErr))
isTransparent, err := cs.shouldRetry(lastErr)
attempt.finish(toRPCErr(lastErr))
isTransparent, err := attempt.shouldRetry(lastErr)
if err != nil {
cs.commitAttemptLocked()
return err
}
cs.firstAttempt = false
if err := cs.newAttemptLocked(isTransparent); err != nil {
attempt, err = cs.newAttemptLocked(isTransparent)
if err != nil {
// Only returns error if the clientconn is closed or the context of
// the stream is canceled.
return err
}
if lastErr = cs.replayBufferLocked(); lastErr == nil {
// Note that the first op in the replay buffer always sets cs.attempt
// if it is able to pick a transport and create a stream.
if lastErr = cs.replayBufferLocked(attempt); lastErr == nil {
return nil
}
}
@ -673,7 +687,10 @@ func (cs *clientStream) Context() context.Context {
cs.commitAttempt()
// No need to lock before using attempt, since we know it is committed and
// cannot change.
return cs.attempt.s.Context()
if cs.attempt.s != nil {
return cs.attempt.s.Context()
}
return cs.ctx
}
func (cs *clientStream) withRetry(op func(a *csAttempt) error, onSuccess func()) error {
@ -703,7 +720,7 @@ func (cs *clientStream) withRetry(op func(a *csAttempt) error, onSuccess func())
cs.mu.Unlock()
return err
}
if err := cs.retryLocked(err); err != nil {
if err := cs.retryLocked(a, err); err != nil {
cs.mu.Unlock()
return err
}
@ -734,7 +751,7 @@ func (cs *clientStream) Header() (metadata.MD, error) {
cs.binlog.Log(logEntry)
cs.serverHeaderBinlogged = true
}
return m, err
return m, nil
}
func (cs *clientStream) Trailer() metadata.MD {
@ -752,10 +769,9 @@ func (cs *clientStream) Trailer() metadata.MD {
return cs.attempt.s.Trailer()
}
func (cs *clientStream) replayBufferLocked() error {
a := cs.attempt
func (cs *clientStream) replayBufferLocked(attempt *csAttempt) error {
for _, f := range cs.buffer {
if err := f(a); err != nil {
if err := f(attempt); err != nil {
return err
}
}
@ -803,22 +819,17 @@ func (cs *clientStream) SendMsg(m interface{}) (err error) {
if len(payload) > *cs.callInfo.maxSendMessageSize {
return status.Errorf(codes.ResourceExhausted, "trying to send message larger than max (%d vs. %d)", len(payload), *cs.callInfo.maxSendMessageSize)
}
msgBytes := data // Store the pointer before setting to nil. For binary logging.
op := func(a *csAttempt) error {
err := a.sendMsg(m, hdr, payload, data)
// nil out the message and uncomp when replaying; they are only needed for
// stats which is disabled for subsequent attempts.
m, data = nil, nil
return err
return a.sendMsg(m, hdr, payload, data)
}
err = cs.withRetry(op, func() { cs.bufferForRetryLocked(len(hdr)+len(payload), op) })
if cs.binlog != nil && err == nil {
cs.binlog.Log(&binarylog.ClientMessage{
OnClientSide: true,
Message: msgBytes,
Message: data,
})
}
return
return err
}
func (cs *clientStream) RecvMsg(m interface{}) error {