rebase: use latest google.golang.org/grpc version

google.golang.org/grpc was listed in go.mod:replace and updating the
dependency was not possible. There should not be a reason to prevent
updating, commit 5aca04d540 added it but without explanation.

Signed-off-by: Niels de Vos <ndevos@redhat.com>
This commit is contained in:
Niels de Vos
2021-08-31 14:38:09 +02:00
committed by mergify[bot]
parent bf77ab57dc
commit c12b155940
44 changed files with 1285 additions and 814 deletions

View File

@ -52,14 +52,20 @@ import (
// of the RPC.
type StreamHandler func(srv interface{}, stream ServerStream) error
// StreamDesc represents a streaming RPC service's method specification.
// StreamDesc represents a streaming RPC service's method specification. Used
// on the server when registering services and on the client when initiating
// new streams.
type StreamDesc struct {
StreamName string
Handler StreamHandler
// StreamName and Handler are only used when registering handlers on a
// server.
StreamName string // the name of the method excluding the service
Handler StreamHandler // the handler called for the method
// At least one of these is true.
ServerStreams bool
ClientStreams bool
// ServerStreams and ClientStreams are used for registering handlers on a
// server as well as defining RPC behavior when passed to NewClientStream
// and ClientConn.NewStream. At least one must be true.
ServerStreams bool // indicates the server can perform streaming sends
ClientStreams bool // indicates the client can perform streaming sends
}
// Stream defines the common interface a client or server stream has to satisfy.
@ -166,7 +172,6 @@ func newClientStream(ctx context.Context, desc *StreamDesc, cc *ClientConn, meth
}
}()
}
c := defaultCallInfo()
// Provide an opportunity for the first RPC to see the first service config
// provided by the resolver.
if err := cc.waitForResolvedAddrs(ctx); err != nil {
@ -175,18 +180,40 @@ func newClientStream(ctx context.Context, desc *StreamDesc, cc *ClientConn, meth
var mc serviceconfig.MethodConfig
var onCommit func()
rpcConfig, err := cc.safeConfigSelector.SelectConfig(iresolver.RPCInfo{Context: ctx, Method: method})
if err != nil {
return nil, status.Convert(err).Err()
var newStream = func(ctx context.Context, done func()) (iresolver.ClientStream, error) {
return newClientStreamWithParams(ctx, desc, cc, method, mc, onCommit, done, opts...)
}
rpcInfo := iresolver.RPCInfo{Context: ctx, Method: method}
rpcConfig, err := cc.safeConfigSelector.SelectConfig(rpcInfo)
if err != nil {
return nil, toRPCErr(err)
}
if rpcConfig != nil {
if rpcConfig.Context != nil {
ctx = rpcConfig.Context
}
mc = rpcConfig.MethodConfig
onCommit = rpcConfig.OnCommitted
if rpcConfig.Interceptor != nil {
rpcInfo.Context = nil
ns := newStream
newStream = func(ctx context.Context, done func()) (iresolver.ClientStream, error) {
cs, err := rpcConfig.Interceptor.NewStream(ctx, rpcInfo, done, ns)
if err != nil {
return nil, toRPCErr(err)
}
return cs, nil
}
}
}
return newStream(ctx, func() {})
}
func newClientStreamWithParams(ctx context.Context, desc *StreamDesc, cc *ClientConn, method string, mc serviceconfig.MethodConfig, onCommit, doneFunc func(), opts ...CallOption) (_ iresolver.ClientStream, err error) {
c := defaultCallInfo()
if mc.WaitForReady != nil {
c.failFast = !*mc.WaitForReady
}
@ -223,6 +250,7 @@ func newClientStream(ctx context.Context, desc *StreamDesc, cc *ClientConn, meth
Host: cc.authority,
Method: method,
ContentSubtype: c.contentSubtype,
DoneFunc: doneFunc,
}
// Set our outgoing compression according to the UseCompressor CallOption, if
@ -267,9 +295,11 @@ func newClientStream(ctx context.Context, desc *StreamDesc, cc *ClientConn, meth
ctx = sh.TagRPC(ctx, &stats.RPCTagInfo{FullMethodName: method, FailFast: c.failFast})
beginTime = time.Now()
begin := &stats.Begin{
Client: true,
BeginTime: beginTime,
FailFast: c.failFast,
Client: true,
BeginTime: beginTime,
FailFast: c.failFast,
IsClientStream: desc.ClientStreams,
IsServerStream: desc.ServerStreams,
}
sh.HandleRPC(ctx, begin)
}
@ -391,12 +421,9 @@ func (a *csAttempt) newStream() error {
cs.callHdr.PreviousAttempts = cs.numRetries
s, err := a.t.NewStream(cs.ctx, cs.callHdr)
if err != nil {
if _, ok := err.(transport.PerformedIOError); ok {
// Return without converting to an RPC error so retry code can
// inspect.
return err
}
return toRPCErr(err)
// Return without converting to an RPC error so retry code can
// inspect.
return err
}
cs.attempt.s = s
cs.attempt.p = &parser{r: s}
@ -495,19 +522,28 @@ func (cs *clientStream) commitAttempt() {
// shouldRetry returns nil if the RPC should be retried; otherwise it returns
// the error that should be returned by the operation.
func (cs *clientStream) shouldRetry(err error) error {
unprocessed := false
if cs.attempt.s == nil {
pioErr, ok := err.(transport.PerformedIOError)
if ok {
// Unwrap error.
err = toRPCErr(pioErr.Err)
} else {
unprocessed = true
// Error from NewClientStream.
nse, ok := err.(*transport.NewStreamError)
if !ok {
// Unexpected, but assume no I/O was performed and the RPC is not
// fatal, so retry indefinitely.
return nil
}
if !ok && !cs.callInfo.failFast {
// In the event of a non-IO operation error from NewStream, we
// never attempted to write anything to the wire, so we can retry
// indefinitely for non-fail-fast RPCs.
// Unwrap and convert error.
err = toRPCErr(nse.Err)
// Never retry DoNotRetry errors, which indicate the RPC should not be
// retried due to max header list size violation, etc.
if nse.DoNotRetry {
return err
}
// In the event of a non-IO operation error from NewStream, we never
// attempted to write anything to the wire, so we can retry
// indefinitely.
if !nse.PerformedIO {
return nil
}
}
@ -516,6 +552,7 @@ func (cs *clientStream) shouldRetry(err error) error {
return err
}
// Wait for the trailers.
unprocessed := false
if cs.attempt.s != nil {
<-cs.attempt.s.Done()
unprocessed = cs.attempt.s.Unprocessed()
@ -604,7 +641,7 @@ func (cs *clientStream) shouldRetry(err error) error {
// Returns nil if a retry was performed and succeeded; error otherwise.
func (cs *clientStream) retryLocked(lastErr error) error {
for {
cs.attempt.finish(lastErr)
cs.attempt.finish(toRPCErr(lastErr))
if err := cs.shouldRetry(lastErr); err != nil {
cs.commitAttemptLocked()
return err
@ -631,7 +668,11 @@ func (cs *clientStream) withRetry(op func(a *csAttempt) error, onSuccess func())
for {
if cs.committed {
cs.mu.Unlock()
return op(cs.attempt)
// toRPCErr is used in case the error from the attempt comes from
// NewClientStream, which intentionally doesn't return a status
// error to allow for further inspection; all other errors should
// already be status errors.
return toRPCErr(op(cs.attempt))
}
a := cs.attempt
cs.mu.Unlock()