ceph-csi/e2e/vendor/google.golang.org/grpc/stream.go
Niels de Vos f87d06ed85 build: move e2e dependencies into e2e/go.mod
Several packages are only used while running the e2e suite. These
packages are less important to update, as the they can not influence the
final executable that is part of the Ceph-CSI container-image.

By moving these dependencies out of the main Ceph-CSI go.mod, it is
easier to identify if a reported CVE affects Ceph-CSI, or only the
testing (like most of the Kubernetes CVEs).

Signed-off-by: Niels de Vos <ndevos@ibm.com>
2025-03-04 17:43:49 +01:00

1824 lines
55 KiB
Go

/*
*
* Copyright 2014 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package grpc
import (
"context"
"errors"
"io"
"math"
rand "math/rand/v2"
"strconv"
"sync"
"time"
"google.golang.org/grpc/balancer"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/encoding"
"google.golang.org/grpc/internal"
"google.golang.org/grpc/internal/balancerload"
"google.golang.org/grpc/internal/binarylog"
"google.golang.org/grpc/internal/channelz"
"google.golang.org/grpc/internal/grpcutil"
imetadata "google.golang.org/grpc/internal/metadata"
iresolver "google.golang.org/grpc/internal/resolver"
"google.golang.org/grpc/internal/serviceconfig"
istatus "google.golang.org/grpc/internal/status"
"google.golang.org/grpc/internal/transport"
"google.golang.org/grpc/mem"
"google.golang.org/grpc/metadata"
"google.golang.org/grpc/peer"
"google.golang.org/grpc/stats"
"google.golang.org/grpc/status"
)
var metadataFromOutgoingContextRaw = internal.FromOutgoingContextRaw.(func(context.Context) (metadata.MD, [][]string, bool))
// StreamHandler defines the handler called by gRPC server to complete the
// execution of a streaming RPC.
//
// If a StreamHandler returns an error, it should either be produced by the
// status package, or be one of the context errors. Otherwise, gRPC will use
// codes.Unknown as the status code and err.Error() as the status message of the
// RPC.
type StreamHandler func(srv any, stream ServerStream) error
// StreamDesc represents a streaming RPC service's method specification. Used
// on the server when registering services and on the client when initiating
// new streams.
type StreamDesc struct {
// StreamName and Handler are only used when registering handlers on a
// server.
StreamName string // the name of the method excluding the service
Handler StreamHandler // the handler called for the method
// ServerStreams and ClientStreams are used for registering handlers on a
// server as well as defining RPC behavior when passed to NewClientStream
// and ClientConn.NewStream. At least one must be true.
ServerStreams bool // indicates the server can perform streaming sends
ClientStreams bool // indicates the client can perform streaming sends
}
// Stream defines the common interface a client or server stream has to satisfy.
//
// Deprecated: See ClientStream and ServerStream documentation instead.
type Stream interface {
// Deprecated: See ClientStream and ServerStream documentation instead.
Context() context.Context
// Deprecated: See ClientStream and ServerStream documentation instead.
SendMsg(m any) error
// Deprecated: See ClientStream and ServerStream documentation instead.
RecvMsg(m any) error
}
// ClientStream defines the client-side behavior of a streaming RPC.
//
// All errors returned from ClientStream methods are compatible with the
// status package.
type ClientStream interface {
// Header returns the header metadata received from the server if there
// is any. It blocks if the metadata is not ready to read. If the metadata
// is nil and the error is also nil, then the stream was terminated without
// headers, and the status can be discovered by calling RecvMsg.
Header() (metadata.MD, error)
// Trailer returns the trailer metadata from the server, if there is any.
// It must only be called after stream.CloseAndRecv has returned, or
// stream.Recv has returned a non-nil error (including io.EOF).
Trailer() metadata.MD
// CloseSend closes the send direction of the stream. It closes the stream
// when non-nil error is met. It is also not safe to call CloseSend
// concurrently with SendMsg.
CloseSend() error
// Context returns the context for this stream.
//
// It should not be called until after Header or RecvMsg has returned. Once
// called, subsequent client-side retries are disabled.
Context() context.Context
// SendMsg is generally called by generated code. On error, SendMsg aborts
// the stream. If the error was generated by the client, the status is
// returned directly; otherwise, io.EOF is returned and the status of
// the stream may be discovered using RecvMsg. For unary or server-streaming
// RPCs (StreamDesc.ClientStreams is false), a nil error is returned
// unconditionally.
//
// SendMsg blocks until:
// - There is sufficient flow control to schedule m with the transport, or
// - The stream is done, or
// - The stream breaks.
//
// SendMsg does not wait until the message is received by the server. An
// untimely stream closure may result in lost messages. To ensure delivery,
// users should ensure the RPC completed successfully using RecvMsg.
//
// It is safe to have a goroutine calling SendMsg and another goroutine
// calling RecvMsg on the same stream at the same time, but it is not safe
// to call SendMsg on the same stream in different goroutines. It is also
// not safe to call CloseSend concurrently with SendMsg.
//
// It is not safe to modify the message after calling SendMsg. Tracing
// libraries and stats handlers may use the message lazily.
SendMsg(m any) error
// RecvMsg blocks until it receives a message into m or the stream is
// done. It returns io.EOF when the stream completes successfully. On
// any other error, the stream is aborted and the error contains the RPC
// status.
//
// It is safe to have a goroutine calling SendMsg and another goroutine
// calling RecvMsg on the same stream at the same time, but it is not
// safe to call RecvMsg on the same stream in different goroutines.
RecvMsg(m any) error
}
// NewStream creates a new Stream for the client side. This is typically
// called by generated code. ctx is used for the lifetime of the stream.
//
// To ensure resources are not leaked due to the stream returned, one of the following
// actions must be performed:
//
// 1. Call Close on the ClientConn.
// 2. Cancel the context provided.
// 3. Call RecvMsg until a non-nil error is returned. A protobuf-generated
// client-streaming RPC, for instance, might use the helper function
// CloseAndRecv (note that CloseSend does not Recv, therefore is not
// guaranteed to release all resources).
// 4. Receive a non-nil, non-io.EOF error from Header or SendMsg.
//
// If none of the above happen, a goroutine and a context will be leaked, and grpc
// will not call the optionally-configured stats handler with a stats.End message.
func (cc *ClientConn) NewStream(ctx context.Context, desc *StreamDesc, method string, opts ...CallOption) (ClientStream, error) {
// allow interceptor to see all applicable call options, which means those
// configured as defaults from dial option as well as per-call options
opts = combine(cc.dopts.callOptions, opts)
if cc.dopts.streamInt != nil {
return cc.dopts.streamInt(ctx, desc, cc, method, newClientStream, opts...)
}
return newClientStream(ctx, desc, cc, method, opts...)
}
// NewClientStream is a wrapper for ClientConn.NewStream.
func NewClientStream(ctx context.Context, desc *StreamDesc, cc *ClientConn, method string, opts ...CallOption) (ClientStream, error) {
return cc.NewStream(ctx, desc, method, opts...)
}
func newClientStream(ctx context.Context, desc *StreamDesc, cc *ClientConn, method string, opts ...CallOption) (_ ClientStream, err error) {
// Start tracking the RPC for idleness purposes. This is where a stream is
// created for both streaming and unary RPCs, and hence is a good place to
// track active RPC count.
if err := cc.idlenessMgr.OnCallBegin(); err != nil {
return nil, err
}
// Add a calloption, to decrement the active call count, that gets executed
// when the RPC completes.
opts = append([]CallOption{OnFinish(func(error) { cc.idlenessMgr.OnCallEnd() })}, opts...)
if md, added, ok := metadataFromOutgoingContextRaw(ctx); ok {
// validate md
if err := imetadata.Validate(md); err != nil {
return nil, status.Error(codes.Internal, err.Error())
}
// validate added
for _, kvs := range added {
for i := 0; i < len(kvs); i += 2 {
if err := imetadata.ValidatePair(kvs[i], kvs[i+1]); err != nil {
return nil, status.Error(codes.Internal, err.Error())
}
}
}
}
if channelz.IsOn() {
cc.incrCallsStarted()
defer func() {
if err != nil {
cc.incrCallsFailed()
}
}()
}
// Provide an opportunity for the first RPC to see the first service config
// provided by the resolver.
if err := cc.waitForResolvedAddrs(ctx); err != nil {
return nil, err
}
var mc serviceconfig.MethodConfig
var onCommit func()
newStream := func(ctx context.Context, done func()) (iresolver.ClientStream, error) {
return newClientStreamWithParams(ctx, desc, cc, method, mc, onCommit, done, opts...)
}
rpcInfo := iresolver.RPCInfo{Context: ctx, Method: method}
rpcConfig, err := cc.safeConfigSelector.SelectConfig(rpcInfo)
if err != nil {
if st, ok := status.FromError(err); ok {
// Restrict the code to the list allowed by gRFC A54.
if istatus.IsRestrictedControlPlaneCode(st) {
err = status.Errorf(codes.Internal, "config selector returned illegal status: %v", err)
}
return nil, err
}
return nil, toRPCErr(err)
}
if rpcConfig != nil {
if rpcConfig.Context != nil {
ctx = rpcConfig.Context
}
mc = rpcConfig.MethodConfig
onCommit = rpcConfig.OnCommitted
if rpcConfig.Interceptor != nil {
rpcInfo.Context = nil
ns := newStream
newStream = func(ctx context.Context, done func()) (iresolver.ClientStream, error) {
cs, err := rpcConfig.Interceptor.NewStream(ctx, rpcInfo, done, ns)
if err != nil {
return nil, toRPCErr(err)
}
return cs, nil
}
}
}
return newStream(ctx, func() {})
}
func newClientStreamWithParams(ctx context.Context, desc *StreamDesc, cc *ClientConn, method string, mc serviceconfig.MethodConfig, onCommit, doneFunc func(), opts ...CallOption) (_ iresolver.ClientStream, err error) {
c := defaultCallInfo()
if mc.WaitForReady != nil {
c.failFast = !*mc.WaitForReady
}
// Possible context leak:
// The cancel function for the child context we create will only be called
// when RecvMsg returns a non-nil error, if the ClientConn is closed, or if
// an error is generated by SendMsg.
// https://github.com/grpc/grpc-go/issues/1818.
var cancel context.CancelFunc
if mc.Timeout != nil && *mc.Timeout >= 0 {
ctx, cancel = context.WithTimeout(ctx, *mc.Timeout)
} else {
ctx, cancel = context.WithCancel(ctx)
}
defer func() {
if err != nil {
cancel()
}
}()
for _, o := range opts {
if err := o.before(c); err != nil {
return nil, toRPCErr(err)
}
}
c.maxSendMessageSize = getMaxSize(mc.MaxReqSize, c.maxSendMessageSize, defaultClientMaxSendMessageSize)
c.maxReceiveMessageSize = getMaxSize(mc.MaxRespSize, c.maxReceiveMessageSize, defaultClientMaxReceiveMessageSize)
if err := setCallInfoCodec(c); err != nil {
return nil, err
}
callHdr := &transport.CallHdr{
Host: cc.authority,
Method: method,
ContentSubtype: c.contentSubtype,
DoneFunc: doneFunc,
}
// Set our outgoing compression according to the UseCompressor CallOption, if
// set. In that case, also find the compressor from the encoding package.
// Otherwise, use the compressor configured by the WithCompressor DialOption,
// if set.
var cp Compressor
var comp encoding.Compressor
if ct := c.compressorType; ct != "" {
callHdr.SendCompress = ct
if ct != encoding.Identity {
comp = encoding.GetCompressor(ct)
if comp == nil {
return nil, status.Errorf(codes.Internal, "grpc: Compressor is not installed for requested grpc-encoding %q", ct)
}
}
} else if cc.dopts.cp != nil {
callHdr.SendCompress = cc.dopts.cp.Type()
cp = cc.dopts.cp
}
if c.creds != nil {
callHdr.Creds = c.creds
}
cs := &clientStream{
callHdr: callHdr,
ctx: ctx,
methodConfig: &mc,
opts: opts,
callInfo: c,
cc: cc,
desc: desc,
codec: c.codec,
cp: cp,
comp: comp,
cancel: cancel,
firstAttempt: true,
onCommit: onCommit,
}
if !cc.dopts.disableRetry {
cs.retryThrottler = cc.retryThrottler.Load().(*retryThrottler)
}
if ml := binarylog.GetMethodLogger(method); ml != nil {
cs.binlogs = append(cs.binlogs, ml)
}
if cc.dopts.binaryLogger != nil {
if ml := cc.dopts.binaryLogger.GetMethodLogger(method); ml != nil {
cs.binlogs = append(cs.binlogs, ml)
}
}
// Pick the transport to use and create a new stream on the transport.
// Assign cs.attempt upon success.
op := func(a *csAttempt) error {
if err := a.getTransport(); err != nil {
return err
}
if err := a.newStream(); err != nil {
return err
}
// Because this operation is always called either here (while creating
// the clientStream) or by the retry code while locked when replaying
// the operation, it is safe to access cs.attempt directly.
cs.attempt = a
return nil
}
if err := cs.withRetry(op, func() { cs.bufferForRetryLocked(0, op, nil) }); err != nil {
return nil, err
}
if len(cs.binlogs) != 0 {
md, _ := metadata.FromOutgoingContext(ctx)
logEntry := &binarylog.ClientHeader{
OnClientSide: true,
Header: md,
MethodName: method,
Authority: cs.cc.authority,
}
if deadline, ok := ctx.Deadline(); ok {
logEntry.Timeout = time.Until(deadline)
if logEntry.Timeout < 0 {
logEntry.Timeout = 0
}
}
for _, binlog := range cs.binlogs {
binlog.Log(cs.ctx, logEntry)
}
}
if desc != unaryStreamDesc {
// Listen on cc and stream contexts to cleanup when the user closes the
// ClientConn or cancels the stream context. In all other cases, an error
// should already be injected into the recv buffer by the transport, which
// the client will eventually receive, and then we will cancel the stream's
// context in clientStream.finish.
go func() {
select {
case <-cc.ctx.Done():
cs.finish(ErrClientConnClosing)
case <-ctx.Done():
cs.finish(toRPCErr(ctx.Err()))
}
}()
}
return cs, nil
}
// newAttemptLocked creates a new csAttempt without a transport or stream.
func (cs *clientStream) newAttemptLocked(isTransparent bool) (*csAttempt, error) {
if err := cs.ctx.Err(); err != nil {
return nil, toRPCErr(err)
}
if err := cs.cc.ctx.Err(); err != nil {
return nil, ErrClientConnClosing
}
ctx := newContextWithRPCInfo(cs.ctx, cs.callInfo.failFast, cs.callInfo.codec, cs.cp, cs.comp)
method := cs.callHdr.Method
var beginTime time.Time
shs := cs.cc.dopts.copts.StatsHandlers
for _, sh := range shs {
ctx = sh.TagRPC(ctx, &stats.RPCTagInfo{FullMethodName: method, FailFast: cs.callInfo.failFast})
beginTime = time.Now()
begin := &stats.Begin{
Client: true,
BeginTime: beginTime,
FailFast: cs.callInfo.failFast,
IsClientStream: cs.desc.ClientStreams,
IsServerStream: cs.desc.ServerStreams,
IsTransparentRetryAttempt: isTransparent,
}
sh.HandleRPC(ctx, begin)
}
var trInfo *traceInfo
if EnableTracing {
trInfo = &traceInfo{
tr: newTrace("grpc.Sent."+methodFamily(method), method),
firstLine: firstLine{
client: true,
},
}
if deadline, ok := ctx.Deadline(); ok {
trInfo.firstLine.deadline = time.Until(deadline)
}
trInfo.tr.LazyLog(&trInfo.firstLine, false)
ctx = newTraceContext(ctx, trInfo.tr)
}
if cs.cc.parsedTarget.URL.Scheme == internal.GRPCResolverSchemeExtraMetadata {
// Add extra metadata (metadata that will be added by transport) to context
// so the balancer can see them.
ctx = grpcutil.WithExtraMetadata(ctx, metadata.Pairs(
"content-type", grpcutil.ContentType(cs.callHdr.ContentSubtype),
))
}
return &csAttempt{
ctx: ctx,
beginTime: beginTime,
cs: cs,
dc: cs.cc.dopts.dc,
statsHandlers: shs,
trInfo: trInfo,
}, nil
}
func (a *csAttempt) getTransport() error {
cs := a.cs
var err error
a.t, a.pickResult, err = cs.cc.getTransport(a.ctx, cs.callInfo.failFast, cs.callHdr.Method)
if err != nil {
if de, ok := err.(dropError); ok {
err = de.error
a.drop = true
}
return err
}
if a.trInfo != nil {
a.trInfo.firstLine.SetRemoteAddr(a.t.RemoteAddr())
}
return nil
}
func (a *csAttempt) newStream() error {
cs := a.cs
cs.callHdr.PreviousAttempts = cs.numRetries
// Merge metadata stored in PickResult, if any, with existing call metadata.
// It is safe to overwrite the csAttempt's context here, since all state
// maintained in it are local to the attempt. When the attempt has to be
// retried, a new instance of csAttempt will be created.
if a.pickResult.Metadata != nil {
// We currently do not have a function it the metadata package which
// merges given metadata with existing metadata in a context. Existing
// function `AppendToOutgoingContext()` takes a variadic argument of key
// value pairs.
//
// TODO: Make it possible to retrieve key value pairs from metadata.MD
// in a form passable to AppendToOutgoingContext(), or create a version
// of AppendToOutgoingContext() that accepts a metadata.MD.
md, _ := metadata.FromOutgoingContext(a.ctx)
md = metadata.Join(md, a.pickResult.Metadata)
a.ctx = metadata.NewOutgoingContext(a.ctx, md)
}
s, err := a.t.NewStream(a.ctx, cs.callHdr)
if err != nil {
nse, ok := err.(*transport.NewStreamError)
if !ok {
// Unexpected.
return err
}
if nse.AllowTransparentRetry {
a.allowTransparentRetry = true
}
// Unwrap and convert error.
return toRPCErr(nse.Err)
}
a.s = s
a.ctx = s.Context()
a.p = &parser{r: s, bufferPool: a.cs.cc.dopts.copts.BufferPool}
return nil
}
// clientStream implements a client side Stream.
type clientStream struct {
callHdr *transport.CallHdr
opts []CallOption
callInfo *callInfo
cc *ClientConn
desc *StreamDesc
codec baseCodec
cp Compressor
comp encoding.Compressor
cancel context.CancelFunc // cancels all attempts
sentLast bool // sent an end stream
methodConfig *MethodConfig
ctx context.Context // the application's context, wrapped by stats/tracing
retryThrottler *retryThrottler // The throttler active when the RPC began.
binlogs []binarylog.MethodLogger
// serverHeaderBinlogged is a boolean for whether server header has been
// logged. Server header will be logged when the first time one of those
// happens: stream.Header(), stream.Recv().
//
// It's only read and used by Recv() and Header(), so it doesn't need to be
// synchronized.
serverHeaderBinlogged bool
mu sync.Mutex
firstAttempt bool // if true, transparent retry is valid
numRetries int // exclusive of transparent retry attempt(s)
numRetriesSincePushback int // retries since pushback; to reset backoff
finished bool // TODO: replace with atomic cmpxchg or sync.Once?
// attempt is the active client stream attempt.
// The only place where it is written is the newAttemptLocked method and this method never writes nil.
// So, attempt can be nil only inside newClientStream function when clientStream is first created.
// One of the first things done after clientStream's creation, is to call newAttemptLocked which either
// assigns a non nil value to the attempt or returns an error. If an error is returned from newAttemptLocked,
// then newClientStream calls finish on the clientStream and returns. So, finish method is the only
// place where we need to check if the attempt is nil.
attempt *csAttempt
// TODO(hedging): hedging will have multiple attempts simultaneously.
committed bool // active attempt committed for retry?
onCommit func()
replayBuffer []replayOp // operations to replay on retry
replayBufferSize int // current size of replayBuffer
}
type replayOp struct {
op func(a *csAttempt) error
cleanup func()
}
// csAttempt implements a single transport stream attempt within a
// clientStream.
type csAttempt struct {
ctx context.Context
cs *clientStream
t transport.ClientTransport
s *transport.ClientStream
p *parser
pickResult balancer.PickResult
finished bool
dc Decompressor
decomp encoding.Compressor
decompSet bool
mu sync.Mutex // guards trInfo.tr
// trInfo may be nil (if EnableTracing is false).
// trInfo.tr is set when created (if EnableTracing is true),
// and cleared when the finish method is called.
trInfo *traceInfo
statsHandlers []stats.Handler
beginTime time.Time
// set for newStream errors that may be transparently retried
allowTransparentRetry bool
// set for pick errors that are returned as a status
drop bool
}
func (cs *clientStream) commitAttemptLocked() {
if !cs.committed && cs.onCommit != nil {
cs.onCommit()
}
cs.committed = true
for _, op := range cs.replayBuffer {
if op.cleanup != nil {
op.cleanup()
}
}
cs.replayBuffer = nil
}
func (cs *clientStream) commitAttempt() {
cs.mu.Lock()
cs.commitAttemptLocked()
cs.mu.Unlock()
}
// shouldRetry returns nil if the RPC should be retried; otherwise it returns
// the error that should be returned by the operation. If the RPC should be
// retried, the bool indicates whether it is being retried transparently.
func (a *csAttempt) shouldRetry(err error) (bool, error) {
cs := a.cs
if cs.finished || cs.committed || a.drop {
// RPC is finished or committed or was dropped by the picker; cannot retry.
return false, err
}
if a.s == nil && a.allowTransparentRetry {
return true, nil
}
// Wait for the trailers.
unprocessed := false
if a.s != nil {
<-a.s.Done()
unprocessed = a.s.Unprocessed()
}
if cs.firstAttempt && unprocessed {
// First attempt, stream unprocessed: transparently retry.
return true, nil
}
if cs.cc.dopts.disableRetry {
return false, err
}
pushback := 0
hasPushback := false
if a.s != nil {
if !a.s.TrailersOnly() {
return false, err
}
// TODO(retry): Move down if the spec changes to not check server pushback
// before considering this a failure for throttling.
sps := a.s.Trailer()["grpc-retry-pushback-ms"]
if len(sps) == 1 {
var e error
if pushback, e = strconv.Atoi(sps[0]); e != nil || pushback < 0 {
channelz.Infof(logger, cs.cc.channelz, "Server retry pushback specified to abort (%q).", sps[0])
cs.retryThrottler.throttle() // This counts as a failure for throttling.
return false, err
}
hasPushback = true
} else if len(sps) > 1 {
channelz.Warningf(logger, cs.cc.channelz, "Server retry pushback specified multiple values (%q); not retrying.", sps)
cs.retryThrottler.throttle() // This counts as a failure for throttling.
return false, err
}
}
var code codes.Code
if a.s != nil {
code = a.s.Status().Code()
} else {
code = status.Code(err)
}
rp := cs.methodConfig.RetryPolicy
if rp == nil || !rp.RetryableStatusCodes[code] {
return false, err
}
// Note: the ordering here is important; we count this as a failure
// only if the code matched a retryable code.
if cs.retryThrottler.throttle() {
return false, err
}
if cs.numRetries+1 >= rp.MaxAttempts {
return false, err
}
var dur time.Duration
if hasPushback {
dur = time.Millisecond * time.Duration(pushback)
cs.numRetriesSincePushback = 0
} else {
fact := math.Pow(rp.BackoffMultiplier, float64(cs.numRetriesSincePushback))
cur := min(float64(rp.InitialBackoff)*fact, float64(rp.MaxBackoff))
// Apply jitter by multiplying with a random factor between 0.8 and 1.2
cur *= 0.8 + 0.4*rand.Float64()
dur = time.Duration(int64(cur))
cs.numRetriesSincePushback++
}
// TODO(dfawley): we could eagerly fail here if dur puts us past the
// deadline, but unsure if it is worth doing.
t := time.NewTimer(dur)
select {
case <-t.C:
cs.numRetries++
return false, nil
case <-cs.ctx.Done():
t.Stop()
return false, status.FromContextError(cs.ctx.Err()).Err()
}
}
// Returns nil if a retry was performed and succeeded; error otherwise.
func (cs *clientStream) retryLocked(attempt *csAttempt, lastErr error) error {
for {
attempt.finish(toRPCErr(lastErr))
isTransparent, err := attempt.shouldRetry(lastErr)
if err != nil {
cs.commitAttemptLocked()
return err
}
cs.firstAttempt = false
attempt, err = cs.newAttemptLocked(isTransparent)
if err != nil {
// Only returns error if the clientconn is closed or the context of
// the stream is canceled.
return err
}
// Note that the first op in replayBuffer always sets cs.attempt
// if it is able to pick a transport and create a stream.
if lastErr = cs.replayBufferLocked(attempt); lastErr == nil {
return nil
}
}
}
func (cs *clientStream) Context() context.Context {
cs.commitAttempt()
// No need to lock before using attempt, since we know it is committed and
// cannot change.
if cs.attempt.s != nil {
return cs.attempt.s.Context()
}
return cs.ctx
}
func (cs *clientStream) withRetry(op func(a *csAttempt) error, onSuccess func()) error {
cs.mu.Lock()
for {
if cs.committed {
cs.mu.Unlock()
// toRPCErr is used in case the error from the attempt comes from
// NewClientStream, which intentionally doesn't return a status
// error to allow for further inspection; all other errors should
// already be status errors.
return toRPCErr(op(cs.attempt))
}
if len(cs.replayBuffer) == 0 {
// For the first op, which controls creation of the stream and
// assigns cs.attempt, we need to create a new attempt inline
// before executing the first op. On subsequent ops, the attempt
// is created immediately before replaying the ops.
var err error
if cs.attempt, err = cs.newAttemptLocked(false /* isTransparent */); err != nil {
cs.mu.Unlock()
cs.finish(err)
return err
}
}
a := cs.attempt
cs.mu.Unlock()
err := op(a)
cs.mu.Lock()
if a != cs.attempt {
// We started another attempt already.
continue
}
if err == io.EOF {
<-a.s.Done()
}
if err == nil || (err == io.EOF && a.s.Status().Code() == codes.OK) {
onSuccess()
cs.mu.Unlock()
return err
}
if err := cs.retryLocked(a, err); err != nil {
cs.mu.Unlock()
return err
}
}
}
func (cs *clientStream) Header() (metadata.MD, error) {
var m metadata.MD
err := cs.withRetry(func(a *csAttempt) error {
var err error
m, err = a.s.Header()
return toRPCErr(err)
}, cs.commitAttemptLocked)
if m == nil && err == nil {
// The stream ended with success. Finish the clientStream.
err = io.EOF
}
if err != nil {
cs.finish(err)
// Do not return the error. The user should get it by calling Recv().
return nil, nil
}
if len(cs.binlogs) != 0 && !cs.serverHeaderBinlogged && m != nil {
// Only log if binary log is on and header has not been logged, and
// there is actually headers to log.
logEntry := &binarylog.ServerHeader{
OnClientSide: true,
Header: m,
PeerAddr: nil,
}
if peer, ok := peer.FromContext(cs.Context()); ok {
logEntry.PeerAddr = peer.Addr
}
cs.serverHeaderBinlogged = true
for _, binlog := range cs.binlogs {
binlog.Log(cs.ctx, logEntry)
}
}
return m, nil
}
func (cs *clientStream) Trailer() metadata.MD {
// On RPC failure, we never need to retry, because usage requires that
// RecvMsg() returned a non-nil error before calling this function is valid.
// We would have retried earlier if necessary.
//
// Commit the attempt anyway, just in case users are not following those
// directions -- it will prevent races and should not meaningfully impact
// performance.
cs.commitAttempt()
if cs.attempt.s == nil {
return nil
}
return cs.attempt.s.Trailer()
}
func (cs *clientStream) replayBufferLocked(attempt *csAttempt) error {
for _, f := range cs.replayBuffer {
if err := f.op(attempt); err != nil {
return err
}
}
return nil
}
func (cs *clientStream) bufferForRetryLocked(sz int, op func(a *csAttempt) error, cleanup func()) {
// Note: we still will buffer if retry is disabled (for transparent retries).
if cs.committed {
return
}
cs.replayBufferSize += sz
if cs.replayBufferSize > cs.callInfo.maxRetryRPCBufferSize {
cs.commitAttemptLocked()
cleanup()
return
}
cs.replayBuffer = append(cs.replayBuffer, replayOp{op: op, cleanup: cleanup})
}
func (cs *clientStream) SendMsg(m any) (err error) {
defer func() {
if err != nil && err != io.EOF {
// Call finish on the client stream for errors generated by this SendMsg
// call, as these indicate problems created by this client. (Transport
// errors are converted to an io.EOF error in csAttempt.sendMsg; the real
// error will be returned from RecvMsg eventually in that case, or be
// retried.)
cs.finish(err)
}
}()
if cs.sentLast {
return status.Errorf(codes.Internal, "SendMsg called after CloseSend")
}
if !cs.desc.ClientStreams {
cs.sentLast = true
}
// load hdr, payload, data
hdr, data, payload, pf, err := prepareMsg(m, cs.codec, cs.cp, cs.comp, cs.cc.dopts.copts.BufferPool)
if err != nil {
return err
}
defer func() {
data.Free()
// only free payload if compression was made, and therefore it is a different set
// of buffers from data.
if pf.isCompressed() {
payload.Free()
}
}()
dataLen := data.Len()
payloadLen := payload.Len()
// TODO(dfawley): should we be checking len(data) instead?
if payloadLen > *cs.callInfo.maxSendMessageSize {
return status.Errorf(codes.ResourceExhausted, "trying to send message larger than max (%d vs. %d)", payloadLen, *cs.callInfo.maxSendMessageSize)
}
// always take an extra ref in case data == payload (i.e. when the data isn't
// compressed). The original ref will always be freed by the deferred free above.
payload.Ref()
op := func(a *csAttempt) error {
return a.sendMsg(m, hdr, payload, dataLen, payloadLen)
}
// onSuccess is invoked when the op is captured for a subsequent retry. If the
// stream was established by a previous message and therefore retries are
// disabled, onSuccess will not be invoked, and payloadRef can be freed
// immediately.
onSuccessCalled := false
err = cs.withRetry(op, func() {
cs.bufferForRetryLocked(len(hdr)+payloadLen, op, payload.Free)
onSuccessCalled = true
})
if !onSuccessCalled {
payload.Free()
}
if len(cs.binlogs) != 0 && err == nil {
cm := &binarylog.ClientMessage{
OnClientSide: true,
Message: data.Materialize(),
}
for _, binlog := range cs.binlogs {
binlog.Log(cs.ctx, cm)
}
}
return err
}
func (cs *clientStream) RecvMsg(m any) error {
if len(cs.binlogs) != 0 && !cs.serverHeaderBinlogged {
// Call Header() to binary log header if it's not already logged.
cs.Header()
}
var recvInfo *payloadInfo
if len(cs.binlogs) != 0 {
recvInfo = &payloadInfo{}
defer recvInfo.free()
}
err := cs.withRetry(func(a *csAttempt) error {
return a.recvMsg(m, recvInfo)
}, cs.commitAttemptLocked)
if len(cs.binlogs) != 0 && err == nil {
sm := &binarylog.ServerMessage{
OnClientSide: true,
Message: recvInfo.uncompressedBytes.Materialize(),
}
for _, binlog := range cs.binlogs {
binlog.Log(cs.ctx, sm)
}
}
if err != nil || !cs.desc.ServerStreams {
// err != nil or non-server-streaming indicates end of stream.
cs.finish(err)
}
return err
}
func (cs *clientStream) CloseSend() error {
if cs.sentLast {
// TODO: return an error and finish the stream instead, due to API misuse?
return nil
}
cs.sentLast = true
op := func(a *csAttempt) error {
a.s.Write(nil, nil, &transport.WriteOptions{Last: true})
// Always return nil; io.EOF is the only error that might make sense
// instead, but there is no need to signal the client to call RecvMsg
// as the only use left for the stream after CloseSend is to call
// RecvMsg. This also matches historical behavior.
return nil
}
cs.withRetry(op, func() { cs.bufferForRetryLocked(0, op, nil) })
if len(cs.binlogs) != 0 {
chc := &binarylog.ClientHalfClose{
OnClientSide: true,
}
for _, binlog := range cs.binlogs {
binlog.Log(cs.ctx, chc)
}
}
// We never returned an error here for reasons.
return nil
}
func (cs *clientStream) finish(err error) {
if err == io.EOF {
// Ending a stream with EOF indicates a success.
err = nil
}
cs.mu.Lock()
if cs.finished {
cs.mu.Unlock()
return
}
cs.finished = true
for _, onFinish := range cs.callInfo.onFinish {
onFinish(err)
}
cs.commitAttemptLocked()
if cs.attempt != nil {
cs.attempt.finish(err)
// after functions all rely upon having a stream.
if cs.attempt.s != nil {
for _, o := range cs.opts {
o.after(cs.callInfo, cs.attempt)
}
}
}
cs.mu.Unlock()
// Only one of cancel or trailer needs to be logged.
if len(cs.binlogs) != 0 {
switch err {
case errContextCanceled, errContextDeadline, ErrClientConnClosing:
c := &binarylog.Cancel{
OnClientSide: true,
}
for _, binlog := range cs.binlogs {
binlog.Log(cs.ctx, c)
}
default:
logEntry := &binarylog.ServerTrailer{
OnClientSide: true,
Trailer: cs.Trailer(),
Err: err,
}
if peer, ok := peer.FromContext(cs.Context()); ok {
logEntry.PeerAddr = peer.Addr
}
for _, binlog := range cs.binlogs {
binlog.Log(cs.ctx, logEntry)
}
}
}
if err == nil {
cs.retryThrottler.successfulRPC()
}
if channelz.IsOn() {
if err != nil {
cs.cc.incrCallsFailed()
} else {
cs.cc.incrCallsSucceeded()
}
}
cs.cancel()
}
func (a *csAttempt) sendMsg(m any, hdr []byte, payld mem.BufferSlice, dataLength, payloadLength int) error {
cs := a.cs
if a.trInfo != nil {
a.mu.Lock()
if a.trInfo.tr != nil {
a.trInfo.tr.LazyLog(&payload{sent: true, msg: m}, true)
}
a.mu.Unlock()
}
if err := a.s.Write(hdr, payld, &transport.WriteOptions{Last: !cs.desc.ClientStreams}); err != nil {
if !cs.desc.ClientStreams {
// For non-client-streaming RPCs, we return nil instead of EOF on error
// because the generated code requires it. finish is not called; RecvMsg()
// will call it with the stream's status independently.
return nil
}
return io.EOF
}
if len(a.statsHandlers) != 0 {
for _, sh := range a.statsHandlers {
sh.HandleRPC(a.ctx, outPayload(true, m, dataLength, payloadLength, time.Now()))
}
}
return nil
}
func (a *csAttempt) recvMsg(m any, payInfo *payloadInfo) (err error) {
cs := a.cs
if len(a.statsHandlers) != 0 && payInfo == nil {
payInfo = &payloadInfo{}
defer payInfo.free()
}
if !a.decompSet {
// Block until we receive headers containing received message encoding.
if ct := a.s.RecvCompress(); ct != "" && ct != encoding.Identity {
if a.dc == nil || a.dc.Type() != ct {
// No configured decompressor, or it does not match the incoming
// message encoding; attempt to find a registered compressor that does.
a.dc = nil
a.decomp = encoding.GetCompressor(ct)
}
} else {
// No compression is used; disable our decompressor.
a.dc = nil
}
// Only initialize this state once per stream.
a.decompSet = true
}
if err := recv(a.p, cs.codec, a.s, a.dc, m, *cs.callInfo.maxReceiveMessageSize, payInfo, a.decomp, false); err != nil {
if err == io.EOF {
if statusErr := a.s.Status().Err(); statusErr != nil {
return statusErr
}
return io.EOF // indicates successful end of stream.
}
return toRPCErr(err)
}
if a.trInfo != nil {
a.mu.Lock()
if a.trInfo.tr != nil {
a.trInfo.tr.LazyLog(&payload{sent: false, msg: m}, true)
}
a.mu.Unlock()
}
for _, sh := range a.statsHandlers {
sh.HandleRPC(a.ctx, &stats.InPayload{
Client: true,
RecvTime: time.Now(),
Payload: m,
WireLength: payInfo.compressedLength + headerLen,
CompressedLength: payInfo.compressedLength,
Length: payInfo.uncompressedBytes.Len(),
})
}
if cs.desc.ServerStreams {
// Subsequent messages should be received by subsequent RecvMsg calls.
return nil
}
// Special handling for non-server-stream rpcs.
// This recv expects EOF or errors, so we don't collect inPayload.
if err := recv(a.p, cs.codec, a.s, a.dc, m, *cs.callInfo.maxReceiveMessageSize, nil, a.decomp, false); err == io.EOF {
return a.s.Status().Err() // non-server streaming Recv returns nil on success
} else if err != nil {
return toRPCErr(err)
}
return toRPCErr(errors.New("grpc: client streaming protocol violation: get <nil>, want <EOF>"))
}
func (a *csAttempt) finish(err error) {
a.mu.Lock()
if a.finished {
a.mu.Unlock()
return
}
a.finished = true
if err == io.EOF {
// Ending a stream with EOF indicates a success.
err = nil
}
var tr metadata.MD
if a.s != nil {
a.s.Close(err)
tr = a.s.Trailer()
}
if a.pickResult.Done != nil {
br := false
if a.s != nil {
br = a.s.BytesReceived()
}
a.pickResult.Done(balancer.DoneInfo{
Err: err,
Trailer: tr,
BytesSent: a.s != nil,
BytesReceived: br,
ServerLoad: balancerload.Parse(tr),
})
}
for _, sh := range a.statsHandlers {
end := &stats.End{
Client: true,
BeginTime: a.beginTime,
EndTime: time.Now(),
Trailer: tr,
Error: err,
}
sh.HandleRPC(a.ctx, end)
}
if a.trInfo != nil && a.trInfo.tr != nil {
if err == nil {
a.trInfo.tr.LazyPrintf("RPC: [OK]")
} else {
a.trInfo.tr.LazyPrintf("RPC: [%v]", err)
a.trInfo.tr.SetError()
}
a.trInfo.tr.Finish()
a.trInfo.tr = nil
}
a.mu.Unlock()
}
// newNonRetryClientStream creates a ClientStream with the specified transport, on the
// given addrConn.
//
// It's expected that the given transport is either the same one in addrConn, or
// is already closed. To avoid race, transport is specified separately, instead
// of using ac.transport.
//
// Main difference between this and ClientConn.NewStream:
// - no retry
// - no service config (or wait for service config)
// - no tracing or stats
func newNonRetryClientStream(ctx context.Context, desc *StreamDesc, method string, t transport.ClientTransport, ac *addrConn, opts ...CallOption) (_ ClientStream, err error) {
if t == nil {
// TODO: return RPC error here?
return nil, errors.New("transport provided is nil")
}
// defaultCallInfo contains unnecessary info(i.e. failfast, maxRetryRPCBufferSize), so we just initialize an empty struct.
c := &callInfo{}
// Possible context leak:
// The cancel function for the child context we create will only be called
// when RecvMsg returns a non-nil error, if the ClientConn is closed, or if
// an error is generated by SendMsg.
// https://github.com/grpc/grpc-go/issues/1818.
ctx, cancel := context.WithCancel(ctx)
defer func() {
if err != nil {
cancel()
}
}()
for _, o := range opts {
if err := o.before(c); err != nil {
return nil, toRPCErr(err)
}
}
c.maxReceiveMessageSize = getMaxSize(nil, c.maxReceiveMessageSize, defaultClientMaxReceiveMessageSize)
c.maxSendMessageSize = getMaxSize(nil, c.maxSendMessageSize, defaultServerMaxSendMessageSize)
if err := setCallInfoCodec(c); err != nil {
return nil, err
}
callHdr := &transport.CallHdr{
Host: ac.cc.authority,
Method: method,
ContentSubtype: c.contentSubtype,
}
// Set our outgoing compression according to the UseCompressor CallOption, if
// set. In that case, also find the compressor from the encoding package.
// Otherwise, use the compressor configured by the WithCompressor DialOption,
// if set.
var cp Compressor
var comp encoding.Compressor
if ct := c.compressorType; ct != "" {
callHdr.SendCompress = ct
if ct != encoding.Identity {
comp = encoding.GetCompressor(ct)
if comp == nil {
return nil, status.Errorf(codes.Internal, "grpc: Compressor is not installed for requested grpc-encoding %q", ct)
}
}
} else if ac.cc.dopts.cp != nil {
callHdr.SendCompress = ac.cc.dopts.cp.Type()
cp = ac.cc.dopts.cp
}
if c.creds != nil {
callHdr.Creds = c.creds
}
// Use a special addrConnStream to avoid retry.
as := &addrConnStream{
callHdr: callHdr,
ac: ac,
ctx: ctx,
cancel: cancel,
opts: opts,
callInfo: c,
desc: desc,
codec: c.codec,
cp: cp,
comp: comp,
t: t,
}
s, err := as.t.NewStream(as.ctx, as.callHdr)
if err != nil {
err = toRPCErr(err)
return nil, err
}
as.s = s
as.p = &parser{r: s, bufferPool: ac.dopts.copts.BufferPool}
ac.incrCallsStarted()
if desc != unaryStreamDesc {
// Listen on stream context to cleanup when the stream context is
// canceled. Also listen for the addrConn's context in case the
// addrConn is closed or reconnects to a different address. In all
// other cases, an error should already be injected into the recv
// buffer by the transport, which the client will eventually receive,
// and then we will cancel the stream's context in
// addrConnStream.finish.
go func() {
ac.mu.Lock()
acCtx := ac.ctx
ac.mu.Unlock()
select {
case <-acCtx.Done():
as.finish(status.Error(codes.Canceled, "grpc: the SubConn is closing"))
case <-ctx.Done():
as.finish(toRPCErr(ctx.Err()))
}
}()
}
return as, nil
}
type addrConnStream struct {
s *transport.ClientStream
ac *addrConn
callHdr *transport.CallHdr
cancel context.CancelFunc
opts []CallOption
callInfo *callInfo
t transport.ClientTransport
ctx context.Context
sentLast bool
desc *StreamDesc
codec baseCodec
cp Compressor
comp encoding.Compressor
decompSet bool
dc Decompressor
decomp encoding.Compressor
p *parser
mu sync.Mutex
finished bool
}
func (as *addrConnStream) Header() (metadata.MD, error) {
m, err := as.s.Header()
if err != nil {
as.finish(toRPCErr(err))
}
return m, err
}
func (as *addrConnStream) Trailer() metadata.MD {
return as.s.Trailer()
}
func (as *addrConnStream) CloseSend() error {
if as.sentLast {
// TODO: return an error and finish the stream instead, due to API misuse?
return nil
}
as.sentLast = true
as.s.Write(nil, nil, &transport.WriteOptions{Last: true})
// Always return nil; io.EOF is the only error that might make sense
// instead, but there is no need to signal the client to call RecvMsg
// as the only use left for the stream after CloseSend is to call
// RecvMsg. This also matches historical behavior.
return nil
}
func (as *addrConnStream) Context() context.Context {
return as.s.Context()
}
func (as *addrConnStream) SendMsg(m any) (err error) {
defer func() {
if err != nil && err != io.EOF {
// Call finish on the client stream for errors generated by this SendMsg
// call, as these indicate problems created by this client. (Transport
// errors are converted to an io.EOF error in csAttempt.sendMsg; the real
// error will be returned from RecvMsg eventually in that case, or be
// retried.)
as.finish(err)
}
}()
if as.sentLast {
return status.Errorf(codes.Internal, "SendMsg called after CloseSend")
}
if !as.desc.ClientStreams {
as.sentLast = true
}
// load hdr, payload, data
hdr, data, payload, pf, err := prepareMsg(m, as.codec, as.cp, as.comp, as.ac.dopts.copts.BufferPool)
if err != nil {
return err
}
defer func() {
data.Free()
// only free payload if compression was made, and therefore it is a different set
// of buffers from data.
if pf.isCompressed() {
payload.Free()
}
}()
// TODO(dfawley): should we be checking len(data) instead?
if payload.Len() > *as.callInfo.maxSendMessageSize {
return status.Errorf(codes.ResourceExhausted, "trying to send message larger than max (%d vs. %d)", payload.Len(), *as.callInfo.maxSendMessageSize)
}
if err := as.s.Write(hdr, payload, &transport.WriteOptions{Last: !as.desc.ClientStreams}); err != nil {
if !as.desc.ClientStreams {
// For non-client-streaming RPCs, we return nil instead of EOF on error
// because the generated code requires it. finish is not called; RecvMsg()
// will call it with the stream's status independently.
return nil
}
return io.EOF
}
return nil
}
func (as *addrConnStream) RecvMsg(m any) (err error) {
defer func() {
if err != nil || !as.desc.ServerStreams {
// err != nil or non-server-streaming indicates end of stream.
as.finish(err)
}
}()
if !as.decompSet {
// Block until we receive headers containing received message encoding.
if ct := as.s.RecvCompress(); ct != "" && ct != encoding.Identity {
if as.dc == nil || as.dc.Type() != ct {
// No configured decompressor, or it does not match the incoming
// message encoding; attempt to find a registered compressor that does.
as.dc = nil
as.decomp = encoding.GetCompressor(ct)
}
} else {
// No compression is used; disable our decompressor.
as.dc = nil
}
// Only initialize this state once per stream.
as.decompSet = true
}
if err := recv(as.p, as.codec, as.s, as.dc, m, *as.callInfo.maxReceiveMessageSize, nil, as.decomp, false); err != nil {
if err == io.EOF {
if statusErr := as.s.Status().Err(); statusErr != nil {
return statusErr
}
return io.EOF // indicates successful end of stream.
}
return toRPCErr(err)
}
if as.desc.ServerStreams {
// Subsequent messages should be received by subsequent RecvMsg calls.
return nil
}
// Special handling for non-server-stream rpcs.
// This recv expects EOF or errors, so we don't collect inPayload.
if err := recv(as.p, as.codec, as.s, as.dc, m, *as.callInfo.maxReceiveMessageSize, nil, as.decomp, false); err == io.EOF {
return as.s.Status().Err() // non-server streaming Recv returns nil on success
} else if err != nil {
return toRPCErr(err)
}
return toRPCErr(errors.New("grpc: client streaming protocol violation: get <nil>, want <EOF>"))
}
func (as *addrConnStream) finish(err error) {
as.mu.Lock()
if as.finished {
as.mu.Unlock()
return
}
as.finished = true
if err == io.EOF {
// Ending a stream with EOF indicates a success.
err = nil
}
if as.s != nil {
as.s.Close(err)
}
if err != nil {
as.ac.incrCallsFailed()
} else {
as.ac.incrCallsSucceeded()
}
as.cancel()
as.mu.Unlock()
}
// ServerStream defines the server-side behavior of a streaming RPC.
//
// Errors returned from ServerStream methods are compatible with the status
// package. However, the status code will often not match the RPC status as
// seen by the client application, and therefore, should not be relied upon for
// this purpose.
type ServerStream interface {
// SetHeader sets the header metadata. It may be called multiple times.
// When call multiple times, all the provided metadata will be merged.
// All the metadata will be sent out when one of the following happens:
// - ServerStream.SendHeader() is called;
// - The first response is sent out;
// - An RPC status is sent out (error or success).
SetHeader(metadata.MD) error
// SendHeader sends the header metadata.
// The provided md and headers set by SetHeader() will be sent.
// It fails if called multiple times.
SendHeader(metadata.MD) error
// SetTrailer sets the trailer metadata which will be sent with the RPC status.
// When called more than once, all the provided metadata will be merged.
SetTrailer(metadata.MD)
// Context returns the context for this stream.
Context() context.Context
// SendMsg sends a message. On error, SendMsg aborts the stream and the
// error is returned directly.
//
// SendMsg blocks until:
// - There is sufficient flow control to schedule m with the transport, or
// - The stream is done, or
// - The stream breaks.
//
// SendMsg does not wait until the message is received by the client. An
// untimely stream closure may result in lost messages.
//
// It is safe to have a goroutine calling SendMsg and another goroutine
// calling RecvMsg on the same stream at the same time, but it is not safe
// to call SendMsg on the same stream in different goroutines.
//
// It is not safe to modify the message after calling SendMsg. Tracing
// libraries and stats handlers may use the message lazily.
SendMsg(m any) error
// RecvMsg blocks until it receives a message into m or the stream is
// done. It returns io.EOF when the client has performed a CloseSend. On
// any non-EOF error, the stream is aborted and the error contains the
// RPC status.
//
// It is safe to have a goroutine calling SendMsg and another goroutine
// calling RecvMsg on the same stream at the same time, but it is not
// safe to call RecvMsg on the same stream in different goroutines.
RecvMsg(m any) error
}
// serverStream implements a server side Stream.
type serverStream struct {
ctx context.Context
s *transport.ServerStream
p *parser
codec baseCodec
cp Compressor
dc Decompressor
comp encoding.Compressor
decomp encoding.Compressor
sendCompressorName string
maxReceiveMessageSize int
maxSendMessageSize int
trInfo *traceInfo
statsHandler []stats.Handler
binlogs []binarylog.MethodLogger
// serverHeaderBinlogged indicates whether server header has been logged. It
// will happen when one of the following two happens: stream.SendHeader(),
// stream.Send().
//
// It's only checked in send and sendHeader, doesn't need to be
// synchronized.
serverHeaderBinlogged bool
mu sync.Mutex // protects trInfo.tr after the service handler runs.
}
func (ss *serverStream) Context() context.Context {
return ss.ctx
}
func (ss *serverStream) SetHeader(md metadata.MD) error {
if md.Len() == 0 {
return nil
}
err := imetadata.Validate(md)
if err != nil {
return status.Error(codes.Internal, err.Error())
}
return ss.s.SetHeader(md)
}
func (ss *serverStream) SendHeader(md metadata.MD) error {
err := imetadata.Validate(md)
if err != nil {
return status.Error(codes.Internal, err.Error())
}
err = ss.s.SendHeader(md)
if len(ss.binlogs) != 0 && !ss.serverHeaderBinlogged {
h, _ := ss.s.Header()
sh := &binarylog.ServerHeader{
Header: h,
}
ss.serverHeaderBinlogged = true
for _, binlog := range ss.binlogs {
binlog.Log(ss.ctx, sh)
}
}
return err
}
func (ss *serverStream) SetTrailer(md metadata.MD) {
if md.Len() == 0 {
return
}
if err := imetadata.Validate(md); err != nil {
logger.Errorf("stream: failed to validate md when setting trailer, err: %v", err)
}
ss.s.SetTrailer(md)
}
func (ss *serverStream) SendMsg(m any) (err error) {
defer func() {
if ss.trInfo != nil {
ss.mu.Lock()
if ss.trInfo.tr != nil {
if err == nil {
ss.trInfo.tr.LazyLog(&payload{sent: true, msg: m}, true)
} else {
ss.trInfo.tr.LazyLog(&fmtStringer{"%v", []any{err}}, true)
ss.trInfo.tr.SetError()
}
}
ss.mu.Unlock()
}
if err != nil && err != io.EOF {
st, _ := status.FromError(toRPCErr(err))
ss.s.WriteStatus(st)
// Non-user specified status was sent out. This should be an error
// case (as a server side Cancel maybe).
//
// This is not handled specifically now. User will return a final
// status from the service handler, we will log that error instead.
// This behavior is similar to an interceptor.
}
}()
// Server handler could have set new compressor by calling SetSendCompressor.
// In case it is set, we need to use it for compressing outbound message.
if sendCompressorsName := ss.s.SendCompress(); sendCompressorsName != ss.sendCompressorName {
ss.comp = encoding.GetCompressor(sendCompressorsName)
ss.sendCompressorName = sendCompressorsName
}
// load hdr, payload, data
hdr, data, payload, pf, err := prepareMsg(m, ss.codec, ss.cp, ss.comp, ss.p.bufferPool)
if err != nil {
return err
}
defer func() {
data.Free()
// only free payload if compression was made, and therefore it is a different set
// of buffers from data.
if pf.isCompressed() {
payload.Free()
}
}()
dataLen := data.Len()
payloadLen := payload.Len()
// TODO(dfawley): should we be checking len(data) instead?
if payloadLen > ss.maxSendMessageSize {
return status.Errorf(codes.ResourceExhausted, "trying to send message larger than max (%d vs. %d)", payloadLen, ss.maxSendMessageSize)
}
if err := ss.s.Write(hdr, payload, &transport.WriteOptions{Last: false}); err != nil {
return toRPCErr(err)
}
if len(ss.binlogs) != 0 {
if !ss.serverHeaderBinlogged {
h, _ := ss.s.Header()
sh := &binarylog.ServerHeader{
Header: h,
}
ss.serverHeaderBinlogged = true
for _, binlog := range ss.binlogs {
binlog.Log(ss.ctx, sh)
}
}
sm := &binarylog.ServerMessage{
Message: data.Materialize(),
}
for _, binlog := range ss.binlogs {
binlog.Log(ss.ctx, sm)
}
}
if len(ss.statsHandler) != 0 {
for _, sh := range ss.statsHandler {
sh.HandleRPC(ss.s.Context(), outPayload(false, m, dataLen, payloadLen, time.Now()))
}
}
return nil
}
func (ss *serverStream) RecvMsg(m any) (err error) {
defer func() {
if ss.trInfo != nil {
ss.mu.Lock()
if ss.trInfo.tr != nil {
if err == nil {
ss.trInfo.tr.LazyLog(&payload{sent: false, msg: m}, true)
} else if err != io.EOF {
ss.trInfo.tr.LazyLog(&fmtStringer{"%v", []any{err}}, true)
ss.trInfo.tr.SetError()
}
}
ss.mu.Unlock()
}
if err != nil && err != io.EOF {
st, _ := status.FromError(toRPCErr(err))
ss.s.WriteStatus(st)
// Non-user specified status was sent out. This should be an error
// case (as a server side Cancel maybe).
//
// This is not handled specifically now. User will return a final
// status from the service handler, we will log that error instead.
// This behavior is similar to an interceptor.
}
}()
var payInfo *payloadInfo
if len(ss.statsHandler) != 0 || len(ss.binlogs) != 0 {
payInfo = &payloadInfo{}
defer payInfo.free()
}
if err := recv(ss.p, ss.codec, ss.s, ss.dc, m, ss.maxReceiveMessageSize, payInfo, ss.decomp, true); err != nil {
if err == io.EOF {
if len(ss.binlogs) != 0 {
chc := &binarylog.ClientHalfClose{}
for _, binlog := range ss.binlogs {
binlog.Log(ss.ctx, chc)
}
}
return err
}
if err == io.ErrUnexpectedEOF {
err = status.Error(codes.Internal, io.ErrUnexpectedEOF.Error())
}
return toRPCErr(err)
}
if len(ss.statsHandler) != 0 {
for _, sh := range ss.statsHandler {
sh.HandleRPC(ss.s.Context(), &stats.InPayload{
RecvTime: time.Now(),
Payload: m,
Length: payInfo.uncompressedBytes.Len(),
WireLength: payInfo.compressedLength + headerLen,
CompressedLength: payInfo.compressedLength,
})
}
}
if len(ss.binlogs) != 0 {
cm := &binarylog.ClientMessage{
Message: payInfo.uncompressedBytes.Materialize(),
}
for _, binlog := range ss.binlogs {
binlog.Log(ss.ctx, cm)
}
}
return nil
}
// MethodFromServerStream returns the method string for the input stream.
// The returned string is in the format of "/service/method".
func MethodFromServerStream(stream ServerStream) (string, bool) {
return Method(stream.Context())
}
// prepareMsg returns the hdr, payload and data using the compressors passed or
// using the passed preparedmsg. The returned boolean indicates whether
// compression was made and therefore whether the payload needs to be freed in
// addition to the returned data. Freeing the payload if the returned boolean is
// false can lead to undefined behavior.
func prepareMsg(m any, codec baseCodec, cp Compressor, comp encoding.Compressor, pool mem.BufferPool) (hdr []byte, data, payload mem.BufferSlice, pf payloadFormat, err error) {
if preparedMsg, ok := m.(*PreparedMsg); ok {
return preparedMsg.hdr, preparedMsg.encodedData, preparedMsg.payload, preparedMsg.pf, nil
}
// The input interface is not a prepared msg.
// Marshal and Compress the data at this point
data, err = encode(codec, m)
if err != nil {
return nil, nil, nil, 0, err
}
compData, pf, err := compress(data, cp, comp, pool)
if err != nil {
data.Free()
return nil, nil, nil, 0, err
}
hdr, payload = msgHeader(data, compData, pf)
return hdr, data, payload, pf, nil
}