mirror of
https://github.com/ceph/ceph-csi.git
synced 2025-06-13 10:33:35 +00:00
500
vendor/google.golang.org/grpc/stream.go
generated
vendored
500
vendor/google.golang.org/grpc/stream.go
generated
vendored
@ -19,6 +19,7 @@
|
||||
package grpc
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"io"
|
||||
"math"
|
||||
@ -26,16 +27,18 @@ import (
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"golang.org/x/net/context"
|
||||
"golang.org/x/net/trace"
|
||||
"google.golang.org/grpc/balancer"
|
||||
"google.golang.org/grpc/codes"
|
||||
"google.golang.org/grpc/connectivity"
|
||||
"google.golang.org/grpc/encoding"
|
||||
"google.golang.org/grpc/grpclog"
|
||||
"google.golang.org/grpc/internal/binarylog"
|
||||
"google.golang.org/grpc/internal/channelz"
|
||||
"google.golang.org/grpc/internal/grpcrand"
|
||||
"google.golang.org/grpc/internal/transport"
|
||||
"google.golang.org/grpc/metadata"
|
||||
"google.golang.org/grpc/peer"
|
||||
"google.golang.org/grpc/stats"
|
||||
"google.golang.org/grpc/status"
|
||||
)
|
||||
@ -82,7 +85,8 @@ type ClientStream interface {
|
||||
// stream.Recv has returned a non-nil error (including io.EOF).
|
||||
Trailer() metadata.MD
|
||||
// CloseSend closes the send direction of the stream. It closes the stream
|
||||
// when non-nil error is met.
|
||||
// when non-nil error is met. It is also not safe to call CloseSend
|
||||
// concurrently with SendMsg.
|
||||
CloseSend() error
|
||||
// Context returns the context for this stream.
|
||||
//
|
||||
@ -105,7 +109,8 @@ type ClientStream interface {
|
||||
//
|
||||
// It is safe to have a goroutine calling SendMsg and another goroutine
|
||||
// calling RecvMsg on the same stream at the same time, but it is not safe
|
||||
// to call SendMsg on the same stream in different goroutines.
|
||||
// to call SendMsg on the same stream in different goroutines. It is also
|
||||
// not safe to call CloseSend concurrently with SendMsg.
|
||||
SendMsg(m interface{}) error
|
||||
// RecvMsg blocks until it receives a message into m or the stream is
|
||||
// done. It returns io.EOF when the stream completes successfully. On
|
||||
@ -160,6 +165,11 @@ func newClientStream(ctx context.Context, desc *StreamDesc, cc *ClientConn, meth
|
||||
}()
|
||||
}
|
||||
c := defaultCallInfo()
|
||||
// Provide an opportunity for the first RPC to see the first service config
|
||||
// provided by the resolver.
|
||||
if err := cc.waitForResolvedAddrs(ctx); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
mc := cc.GetMethodConfig(method)
|
||||
if mc.WaitForReady != nil {
|
||||
c.failFast = !*mc.WaitForReady
|
||||
@ -262,6 +272,7 @@ func newClientStream(ctx context.Context, desc *StreamDesc, cc *ClientConn, meth
|
||||
if !cc.dopts.disableRetry {
|
||||
cs.retryThrottler = cc.retryThrottler.Load().(*retryThrottler)
|
||||
}
|
||||
cs.binlog = binarylog.GetMethodLogger(method)
|
||||
|
||||
cs.callInfo.stream = cs
|
||||
// Only this initial attempt has stats/tracing.
|
||||
@ -277,6 +288,23 @@ func newClientStream(ctx context.Context, desc *StreamDesc, cc *ClientConn, meth
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if cs.binlog != nil {
|
||||
md, _ := metadata.FromOutgoingContext(ctx)
|
||||
logEntry := &binarylog.ClientHeader{
|
||||
OnClientSide: true,
|
||||
Header: md,
|
||||
MethodName: method,
|
||||
Authority: cs.cc.authority,
|
||||
}
|
||||
if deadline, ok := ctx.Deadline(); ok {
|
||||
logEntry.Timeout = deadline.Sub(time.Now())
|
||||
if logEntry.Timeout < 0 {
|
||||
logEntry.Timeout = 0
|
||||
}
|
||||
}
|
||||
cs.binlog.Log(logEntry)
|
||||
}
|
||||
|
||||
if desc != unaryStreamDesc {
|
||||
// Listen on cc and stream contexts to cleanup when the user closes the
|
||||
// ClientConn or cancels the stream context. In all other cases, an error
|
||||
@ -350,6 +378,15 @@ type clientStream struct {
|
||||
|
||||
retryThrottler *retryThrottler // The throttler active when the RPC began.
|
||||
|
||||
binlog *binarylog.MethodLogger // Binary logger, can be nil.
|
||||
// serverHeaderBinlogged is a boolean for whether server header has been
|
||||
// logged. Server header will be logged when the first time one of those
|
||||
// happens: stream.Header(), stream.Recv().
|
||||
//
|
||||
// It's only read and used by Recv() and Header(), so it doesn't need to be
|
||||
// synchronized.
|
||||
serverHeaderBinlogged bool
|
||||
|
||||
mu sync.Mutex
|
||||
firstAttempt bool // if true, transparent retry is valid
|
||||
numRetries int // exclusive of transparent retry attempt(s)
|
||||
@ -561,6 +598,20 @@ func (cs *clientStream) Header() (metadata.MD, error) {
|
||||
}, cs.commitAttemptLocked)
|
||||
if err != nil {
|
||||
cs.finish(err)
|
||||
return nil, err
|
||||
}
|
||||
if cs.binlog != nil && !cs.serverHeaderBinlogged {
|
||||
// Only log if binary log is on and header has not been logged.
|
||||
logEntry := &binarylog.ServerHeader{
|
||||
OnClientSide: true,
|
||||
Header: m,
|
||||
PeerAddr: nil,
|
||||
}
|
||||
if peer, ok := peer.FromContext(cs.Context()); ok {
|
||||
logEntry.PeerAddr = peer.Addr
|
||||
}
|
||||
cs.binlog.Log(logEntry)
|
||||
cs.serverHeaderBinlogged = true
|
||||
}
|
||||
return m, err
|
||||
}
|
||||
@ -633,6 +684,7 @@ func (cs *clientStream) SendMsg(m interface{}) (err error) {
|
||||
if len(payload) > *cs.callInfo.maxSendMessageSize {
|
||||
return status.Errorf(codes.ResourceExhausted, "trying to send message larger than max (%d vs. %d)", len(payload), *cs.callInfo.maxSendMessageSize)
|
||||
}
|
||||
msgBytes := data // Store the pointer before setting to nil. For binary logging.
|
||||
op := func(a *csAttempt) error {
|
||||
err := a.sendMsg(m, hdr, payload, data)
|
||||
// nil out the message and uncomp when replaying; they are only needed for
|
||||
@ -640,16 +692,53 @@ func (cs *clientStream) SendMsg(m interface{}) (err error) {
|
||||
m, data = nil, nil
|
||||
return err
|
||||
}
|
||||
return cs.withRetry(op, func() { cs.bufferForRetryLocked(len(hdr)+len(payload), op) })
|
||||
err = cs.withRetry(op, func() { cs.bufferForRetryLocked(len(hdr)+len(payload), op) })
|
||||
if cs.binlog != nil && err == nil {
|
||||
cs.binlog.Log(&binarylog.ClientMessage{
|
||||
OnClientSide: true,
|
||||
Message: msgBytes,
|
||||
})
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
func (cs *clientStream) RecvMsg(m interface{}) error {
|
||||
if cs.binlog != nil && !cs.serverHeaderBinlogged {
|
||||
// Call Header() to binary log header if it's not already logged.
|
||||
cs.Header()
|
||||
}
|
||||
var recvInfo *payloadInfo
|
||||
if cs.binlog != nil {
|
||||
recvInfo = &payloadInfo{}
|
||||
}
|
||||
err := cs.withRetry(func(a *csAttempt) error {
|
||||
return a.recvMsg(m)
|
||||
return a.recvMsg(m, recvInfo)
|
||||
}, cs.commitAttemptLocked)
|
||||
if cs.binlog != nil && err == nil {
|
||||
cs.binlog.Log(&binarylog.ServerMessage{
|
||||
OnClientSide: true,
|
||||
Message: recvInfo.uncompressedBytes,
|
||||
})
|
||||
}
|
||||
if err != nil || !cs.desc.ServerStreams {
|
||||
// err != nil or non-server-streaming indicates end of stream.
|
||||
cs.finish(err)
|
||||
|
||||
if cs.binlog != nil {
|
||||
// finish will not log Trailer. Log Trailer here.
|
||||
logEntry := &binarylog.ServerTrailer{
|
||||
OnClientSide: true,
|
||||
Trailer: cs.Trailer(),
|
||||
Err: err,
|
||||
}
|
||||
if logEntry.Err == io.EOF {
|
||||
logEntry.Err = nil
|
||||
}
|
||||
if peer, ok := peer.FromContext(cs.Context()); ok {
|
||||
logEntry.PeerAddr = peer.Addr
|
||||
}
|
||||
cs.binlog.Log(logEntry)
|
||||
}
|
||||
}
|
||||
return err
|
||||
}
|
||||
@ -669,6 +758,11 @@ func (cs *clientStream) CloseSend() error {
|
||||
return nil
|
||||
}
|
||||
cs.withRetry(op, func() { cs.bufferForRetryLocked(0, op) })
|
||||
if cs.binlog != nil {
|
||||
cs.binlog.Log(&binarylog.ClientHalfClose{
|
||||
OnClientSide: true,
|
||||
})
|
||||
}
|
||||
// We never returned an error here for reasons.
|
||||
return nil
|
||||
}
|
||||
@ -686,6 +780,16 @@ func (cs *clientStream) finish(err error) {
|
||||
cs.finished = true
|
||||
cs.commitAttemptLocked()
|
||||
cs.mu.Unlock()
|
||||
// For binary logging. only log cancel in finish (could be caused by RPC ctx
|
||||
// canceled or ClientConn closed). Trailer will be logged in RecvMsg.
|
||||
//
|
||||
// Only one of cancel or trailer needs to be logged. In the cases where
|
||||
// users don't call RecvMsg, users must have already canceled the RPC.
|
||||
if cs.binlog != nil && status.Code(err) == codes.Canceled {
|
||||
cs.binlog.Log(&binarylog.Cancel{
|
||||
OnClientSide: true,
|
||||
})
|
||||
}
|
||||
if err == nil {
|
||||
cs.retryThrottler.successfulRPC()
|
||||
}
|
||||
@ -735,14 +839,12 @@ func (a *csAttempt) sendMsg(m interface{}, hdr, payld, data []byte) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (a *csAttempt) recvMsg(m interface{}) (err error) {
|
||||
func (a *csAttempt) recvMsg(m interface{}, payInfo *payloadInfo) (err error) {
|
||||
cs := a.cs
|
||||
var inPayload *stats.InPayload
|
||||
if a.statsHandler != nil {
|
||||
inPayload = &stats.InPayload{
|
||||
Client: true,
|
||||
}
|
||||
if a.statsHandler != nil && payInfo == nil {
|
||||
payInfo = &payloadInfo{}
|
||||
}
|
||||
|
||||
if !a.decompSet {
|
||||
// Block until we receive headers containing received message encoding.
|
||||
if ct := a.s.RecvCompress(); ct != "" && ct != encoding.Identity {
|
||||
@ -759,7 +861,7 @@ func (a *csAttempt) recvMsg(m interface{}) (err error) {
|
||||
// Only initialize this state once per stream.
|
||||
a.decompSet = true
|
||||
}
|
||||
err = recv(a.p, cs.codec, a.s, a.dc, m, *cs.callInfo.maxReceiveMessageSize, inPayload, a.decomp)
|
||||
err = recv(a.p, cs.codec, a.s, a.dc, m, *cs.callInfo.maxReceiveMessageSize, payInfo, a.decomp)
|
||||
if err != nil {
|
||||
if err == io.EOF {
|
||||
if statusErr := a.s.Status().Err(); statusErr != nil {
|
||||
@ -776,8 +878,15 @@ func (a *csAttempt) recvMsg(m interface{}) (err error) {
|
||||
}
|
||||
a.mu.Unlock()
|
||||
}
|
||||
if inPayload != nil {
|
||||
a.statsHandler.HandleRPC(cs.ctx, inPayload)
|
||||
if a.statsHandler != nil {
|
||||
a.statsHandler.HandleRPC(cs.ctx, &stats.InPayload{
|
||||
Client: true,
|
||||
RecvTime: time.Now(),
|
||||
Payload: m,
|
||||
// TODO truncate large payload.
|
||||
Data: payInfo.uncompressedBytes,
|
||||
Length: len(payInfo.uncompressedBytes),
|
||||
})
|
||||
}
|
||||
if channelz.IsOn() {
|
||||
a.t.IncrMsgRecv()
|
||||
@ -786,7 +895,6 @@ func (a *csAttempt) recvMsg(m interface{}) (err error) {
|
||||
// Subsequent messages should be received by subsequent RecvMsg calls.
|
||||
return nil
|
||||
}
|
||||
|
||||
// Special handling for non-server-stream rpcs.
|
||||
// This recv expects EOF or errors, so we don't collect inPayload.
|
||||
err = recv(a.p, cs.codec, a.s, a.dc, m, *cs.callInfo.maxReceiveMessageSize, nil, a.decomp)
|
||||
@ -850,6 +958,299 @@ func (a *csAttempt) finish(err error) {
|
||||
a.mu.Unlock()
|
||||
}
|
||||
|
||||
func (ac *addrConn) newClientStream(ctx context.Context, desc *StreamDesc, method string, t transport.ClientTransport, opts ...CallOption) (_ ClientStream, err error) {
|
||||
ac.mu.Lock()
|
||||
if ac.transport != t {
|
||||
ac.mu.Unlock()
|
||||
return nil, status.Error(codes.Canceled, "the provided transport is no longer valid to use")
|
||||
}
|
||||
// transition to CONNECTING state when an attempt starts
|
||||
if ac.state != connectivity.Connecting {
|
||||
ac.updateConnectivityState(connectivity.Connecting)
|
||||
ac.cc.handleSubConnStateChange(ac.acbw, ac.state)
|
||||
}
|
||||
ac.mu.Unlock()
|
||||
|
||||
if t == nil {
|
||||
// TODO: return RPC error here?
|
||||
return nil, errors.New("transport provided is nil")
|
||||
}
|
||||
// defaultCallInfo contains unnecessary info(i.e. failfast, maxRetryRPCBufferSize), so we just initialize an empty struct.
|
||||
c := &callInfo{}
|
||||
|
||||
for _, o := range opts {
|
||||
if err := o.before(c); err != nil {
|
||||
return nil, toRPCErr(err)
|
||||
}
|
||||
}
|
||||
c.maxReceiveMessageSize = getMaxSize(nil, c.maxReceiveMessageSize, defaultClientMaxReceiveMessageSize)
|
||||
c.maxSendMessageSize = getMaxSize(nil, c.maxSendMessageSize, defaultServerMaxSendMessageSize)
|
||||
|
||||
// Possible context leak:
|
||||
// The cancel function for the child context we create will only be called
|
||||
// when RecvMsg returns a non-nil error, if the ClientConn is closed, or if
|
||||
// an error is generated by SendMsg.
|
||||
// https://github.com/grpc/grpc-go/issues/1818.
|
||||
ctx, cancel := context.WithCancel(ctx)
|
||||
defer func() {
|
||||
if err != nil {
|
||||
cancel()
|
||||
}
|
||||
}()
|
||||
|
||||
if err := setCallInfoCodec(c); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
callHdr := &transport.CallHdr{
|
||||
Host: ac.cc.authority,
|
||||
Method: method,
|
||||
ContentSubtype: c.contentSubtype,
|
||||
}
|
||||
|
||||
// Set our outgoing compression according to the UseCompressor CallOption, if
|
||||
// set. In that case, also find the compressor from the encoding package.
|
||||
// Otherwise, use the compressor configured by the WithCompressor DialOption,
|
||||
// if set.
|
||||
var cp Compressor
|
||||
var comp encoding.Compressor
|
||||
if ct := c.compressorType; ct != "" {
|
||||
callHdr.SendCompress = ct
|
||||
if ct != encoding.Identity {
|
||||
comp = encoding.GetCompressor(ct)
|
||||
if comp == nil {
|
||||
return nil, status.Errorf(codes.Internal, "grpc: Compressor is not installed for requested grpc-encoding %q", ct)
|
||||
}
|
||||
}
|
||||
} else if ac.cc.dopts.cp != nil {
|
||||
callHdr.SendCompress = ac.cc.dopts.cp.Type()
|
||||
cp = ac.cc.dopts.cp
|
||||
}
|
||||
if c.creds != nil {
|
||||
callHdr.Creds = c.creds
|
||||
}
|
||||
|
||||
as := &addrConnStream{
|
||||
callHdr: callHdr,
|
||||
ac: ac,
|
||||
ctx: ctx,
|
||||
cancel: cancel,
|
||||
opts: opts,
|
||||
callInfo: c,
|
||||
desc: desc,
|
||||
codec: c.codec,
|
||||
cp: cp,
|
||||
comp: comp,
|
||||
t: t,
|
||||
}
|
||||
|
||||
as.callInfo.stream = as
|
||||
s, err := as.t.NewStream(as.ctx, as.callHdr)
|
||||
if err != nil {
|
||||
err = toRPCErr(err)
|
||||
return nil, err
|
||||
}
|
||||
as.s = s
|
||||
as.p = &parser{r: s}
|
||||
ac.incrCallsStarted()
|
||||
if desc != unaryStreamDesc {
|
||||
// Listen on cc and stream contexts to cleanup when the user closes the
|
||||
// ClientConn or cancels the stream context. In all other cases, an error
|
||||
// should already be injected into the recv buffer by the transport, which
|
||||
// the client will eventually receive, and then we will cancel the stream's
|
||||
// context in clientStream.finish.
|
||||
go func() {
|
||||
select {
|
||||
case <-ac.ctx.Done():
|
||||
as.finish(status.Error(codes.Canceled, "grpc: the SubConn is closing"))
|
||||
case <-ctx.Done():
|
||||
as.finish(toRPCErr(ctx.Err()))
|
||||
}
|
||||
}()
|
||||
}
|
||||
return as, nil
|
||||
}
|
||||
|
||||
type addrConnStream struct {
|
||||
s *transport.Stream
|
||||
ac *addrConn
|
||||
callHdr *transport.CallHdr
|
||||
cancel context.CancelFunc
|
||||
opts []CallOption
|
||||
callInfo *callInfo
|
||||
t transport.ClientTransport
|
||||
ctx context.Context
|
||||
sentLast bool
|
||||
desc *StreamDesc
|
||||
codec baseCodec
|
||||
cp Compressor
|
||||
comp encoding.Compressor
|
||||
decompSet bool
|
||||
dc Decompressor
|
||||
decomp encoding.Compressor
|
||||
p *parser
|
||||
done func(balancer.DoneInfo)
|
||||
mu sync.Mutex
|
||||
finished bool
|
||||
}
|
||||
|
||||
func (as *addrConnStream) Header() (metadata.MD, error) {
|
||||
m, err := as.s.Header()
|
||||
if err != nil {
|
||||
as.finish(toRPCErr(err))
|
||||
}
|
||||
return m, err
|
||||
}
|
||||
|
||||
func (as *addrConnStream) Trailer() metadata.MD {
|
||||
return as.s.Trailer()
|
||||
}
|
||||
|
||||
func (as *addrConnStream) CloseSend() error {
|
||||
if as.sentLast {
|
||||
// TODO: return an error and finish the stream instead, due to API misuse?
|
||||
return nil
|
||||
}
|
||||
as.sentLast = true
|
||||
|
||||
as.t.Write(as.s, nil, nil, &transport.Options{Last: true})
|
||||
// Always return nil; io.EOF is the only error that might make sense
|
||||
// instead, but there is no need to signal the client to call RecvMsg
|
||||
// as the only use left for the stream after CloseSend is to call
|
||||
// RecvMsg. This also matches historical behavior.
|
||||
return nil
|
||||
}
|
||||
|
||||
func (as *addrConnStream) Context() context.Context {
|
||||
return as.s.Context()
|
||||
}
|
||||
|
||||
func (as *addrConnStream) SendMsg(m interface{}) (err error) {
|
||||
defer func() {
|
||||
if err != nil && err != io.EOF {
|
||||
// Call finish on the client stream for errors generated by this SendMsg
|
||||
// call, as these indicate problems created by this client. (Transport
|
||||
// errors are converted to an io.EOF error in csAttempt.sendMsg; the real
|
||||
// error will be returned from RecvMsg eventually in that case, or be
|
||||
// retried.)
|
||||
as.finish(err)
|
||||
}
|
||||
}()
|
||||
if as.sentLast {
|
||||
return status.Errorf(codes.Internal, "SendMsg called after CloseSend")
|
||||
}
|
||||
if !as.desc.ClientStreams {
|
||||
as.sentLast = true
|
||||
}
|
||||
data, err := encode(as.codec, m)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
compData, err := compress(data, as.cp, as.comp)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
hdr, payld := msgHeader(data, compData)
|
||||
// TODO(dfawley): should we be checking len(data) instead?
|
||||
if len(payld) > *as.callInfo.maxSendMessageSize {
|
||||
return status.Errorf(codes.ResourceExhausted, "trying to send message larger than max (%d vs. %d)", len(payld), *as.callInfo.maxSendMessageSize)
|
||||
}
|
||||
|
||||
if err := as.t.Write(as.s, hdr, payld, &transport.Options{Last: !as.desc.ClientStreams}); err != nil {
|
||||
if !as.desc.ClientStreams {
|
||||
// For non-client-streaming RPCs, we return nil instead of EOF on error
|
||||
// because the generated code requires it. finish is not called; RecvMsg()
|
||||
// will call it with the stream's status independently.
|
||||
return nil
|
||||
}
|
||||
return io.EOF
|
||||
}
|
||||
|
||||
if channelz.IsOn() {
|
||||
as.t.IncrMsgSent()
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (as *addrConnStream) RecvMsg(m interface{}) (err error) {
|
||||
defer func() {
|
||||
if err != nil || !as.desc.ServerStreams {
|
||||
// err != nil or non-server-streaming indicates end of stream.
|
||||
as.finish(err)
|
||||
}
|
||||
}()
|
||||
|
||||
if !as.decompSet {
|
||||
// Block until we receive headers containing received message encoding.
|
||||
if ct := as.s.RecvCompress(); ct != "" && ct != encoding.Identity {
|
||||
if as.dc == nil || as.dc.Type() != ct {
|
||||
// No configured decompressor, or it does not match the incoming
|
||||
// message encoding; attempt to find a registered compressor that does.
|
||||
as.dc = nil
|
||||
as.decomp = encoding.GetCompressor(ct)
|
||||
}
|
||||
} else {
|
||||
// No compression is used; disable our decompressor.
|
||||
as.dc = nil
|
||||
}
|
||||
// Only initialize this state once per stream.
|
||||
as.decompSet = true
|
||||
}
|
||||
err = recv(as.p, as.codec, as.s, as.dc, m, *as.callInfo.maxReceiveMessageSize, nil, as.decomp)
|
||||
if err != nil {
|
||||
if err == io.EOF {
|
||||
if statusErr := as.s.Status().Err(); statusErr != nil {
|
||||
return statusErr
|
||||
}
|
||||
return io.EOF // indicates successful end of stream.
|
||||
}
|
||||
return toRPCErr(err)
|
||||
}
|
||||
|
||||
if channelz.IsOn() {
|
||||
as.t.IncrMsgRecv()
|
||||
}
|
||||
if as.desc.ServerStreams {
|
||||
// Subsequent messages should be received by subsequent RecvMsg calls.
|
||||
return nil
|
||||
}
|
||||
|
||||
// Special handling for non-server-stream rpcs.
|
||||
// This recv expects EOF or errors, so we don't collect inPayload.
|
||||
err = recv(as.p, as.codec, as.s, as.dc, m, *as.callInfo.maxReceiveMessageSize, nil, as.decomp)
|
||||
if err == nil {
|
||||
return toRPCErr(errors.New("grpc: client streaming protocol violation: get <nil>, want <EOF>"))
|
||||
}
|
||||
if err == io.EOF {
|
||||
return as.s.Status().Err() // non-server streaming Recv returns nil on success
|
||||
}
|
||||
return toRPCErr(err)
|
||||
}
|
||||
|
||||
func (as *addrConnStream) finish(err error) {
|
||||
as.mu.Lock()
|
||||
if as.finished {
|
||||
as.mu.Unlock()
|
||||
return
|
||||
}
|
||||
as.finished = true
|
||||
if err == io.EOF {
|
||||
// Ending a stream with EOF indicates a success.
|
||||
err = nil
|
||||
}
|
||||
if as.s != nil {
|
||||
as.t.CloseStream(as.s, err)
|
||||
}
|
||||
|
||||
if err != nil {
|
||||
as.ac.incrCallsFailed()
|
||||
} else {
|
||||
as.ac.incrCallsSucceeded()
|
||||
}
|
||||
as.cancel()
|
||||
as.mu.Unlock()
|
||||
}
|
||||
|
||||
// ServerStream defines the server-side behavior of a streaming RPC.
|
||||
//
|
||||
// All errors returned from ServerStream methods are compatible with the
|
||||
@ -916,6 +1317,15 @@ type serverStream struct {
|
||||
|
||||
statsHandler stats.Handler
|
||||
|
||||
binlog *binarylog.MethodLogger
|
||||
// serverHeaderBinlogged indicates whether server header has been logged. It
|
||||
// will happen when one of the following two happens: stream.SendHeader(),
|
||||
// stream.Send().
|
||||
//
|
||||
// It's only checked in send and sendHeader, doesn't need to be
|
||||
// synchronized.
|
||||
serverHeaderBinlogged bool
|
||||
|
||||
mu sync.Mutex // protects trInfo.tr after the service handler runs.
|
||||
}
|
||||
|
||||
@ -931,7 +1341,15 @@ func (ss *serverStream) SetHeader(md metadata.MD) error {
|
||||
}
|
||||
|
||||
func (ss *serverStream) SendHeader(md metadata.MD) error {
|
||||
return ss.t.WriteHeader(ss.s, md)
|
||||
err := ss.t.WriteHeader(ss.s, md)
|
||||
if ss.binlog != nil && !ss.serverHeaderBinlogged {
|
||||
h, _ := ss.s.Header()
|
||||
ss.binlog.Log(&binarylog.ServerHeader{
|
||||
Header: h,
|
||||
})
|
||||
ss.serverHeaderBinlogged = true
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
||||
func (ss *serverStream) SetTrailer(md metadata.MD) {
|
||||
@ -958,6 +1376,12 @@ func (ss *serverStream) SendMsg(m interface{}) (err error) {
|
||||
if err != nil && err != io.EOF {
|
||||
st, _ := status.FromError(toRPCErr(err))
|
||||
ss.t.WriteStatus(ss.s, st)
|
||||
// Non-user specified status was sent out. This should be an error
|
||||
// case (as a server side Cancel maybe).
|
||||
//
|
||||
// This is not handled specifically now. User will return a final
|
||||
// status from the service handler, we will log that error instead.
|
||||
// This behavior is similar to an interceptor.
|
||||
}
|
||||
if channelz.IsOn() && err == nil {
|
||||
ss.t.IncrMsgSent()
|
||||
@ -979,6 +1403,18 @@ func (ss *serverStream) SendMsg(m interface{}) (err error) {
|
||||
if err := ss.t.Write(ss.s, hdr, payload, &transport.Options{Last: false}); err != nil {
|
||||
return toRPCErr(err)
|
||||
}
|
||||
if ss.binlog != nil {
|
||||
if !ss.serverHeaderBinlogged {
|
||||
h, _ := ss.s.Header()
|
||||
ss.binlog.Log(&binarylog.ServerHeader{
|
||||
Header: h,
|
||||
})
|
||||
ss.serverHeaderBinlogged = true
|
||||
}
|
||||
ss.binlog.Log(&binarylog.ServerMessage{
|
||||
Message: data,
|
||||
})
|
||||
}
|
||||
if ss.statsHandler != nil {
|
||||
ss.statsHandler.HandleRPC(ss.s.Context(), outPayload(false, m, data, payload, time.Now()))
|
||||
}
|
||||
@ -1002,17 +1438,26 @@ func (ss *serverStream) RecvMsg(m interface{}) (err error) {
|
||||
if err != nil && err != io.EOF {
|
||||
st, _ := status.FromError(toRPCErr(err))
|
||||
ss.t.WriteStatus(ss.s, st)
|
||||
// Non-user specified status was sent out. This should be an error
|
||||
// case (as a server side Cancel maybe).
|
||||
//
|
||||
// This is not handled specifically now. User will return a final
|
||||
// status from the service handler, we will log that error instead.
|
||||
// This behavior is similar to an interceptor.
|
||||
}
|
||||
if channelz.IsOn() && err == nil {
|
||||
ss.t.IncrMsgRecv()
|
||||
}
|
||||
}()
|
||||
var inPayload *stats.InPayload
|
||||
if ss.statsHandler != nil {
|
||||
inPayload = &stats.InPayload{}
|
||||
var payInfo *payloadInfo
|
||||
if ss.statsHandler != nil || ss.binlog != nil {
|
||||
payInfo = &payloadInfo{}
|
||||
}
|
||||
if err := recv(ss.p, ss.codec, ss.s, ss.dc, m, ss.maxReceiveMessageSize, inPayload, ss.decomp); err != nil {
|
||||
if err := recv(ss.p, ss.codec, ss.s, ss.dc, m, ss.maxReceiveMessageSize, payInfo, ss.decomp); err != nil {
|
||||
if err == io.EOF {
|
||||
if ss.binlog != nil {
|
||||
ss.binlog.Log(&binarylog.ClientHalfClose{})
|
||||
}
|
||||
return err
|
||||
}
|
||||
if err == io.ErrUnexpectedEOF {
|
||||
@ -1020,8 +1465,19 @@ func (ss *serverStream) RecvMsg(m interface{}) (err error) {
|
||||
}
|
||||
return toRPCErr(err)
|
||||
}
|
||||
if inPayload != nil {
|
||||
ss.statsHandler.HandleRPC(ss.s.Context(), inPayload)
|
||||
if ss.statsHandler != nil {
|
||||
ss.statsHandler.HandleRPC(ss.s.Context(), &stats.InPayload{
|
||||
RecvTime: time.Now(),
|
||||
Payload: m,
|
||||
// TODO truncate large payload.
|
||||
Data: payInfo.uncompressedBytes,
|
||||
Length: len(payInfo.uncompressedBytes),
|
||||
})
|
||||
}
|
||||
if ss.binlog != nil {
|
||||
ss.binlog.Log(&binarylog.ClientMessage{
|
||||
Message: payInfo.uncompressedBytes,
|
||||
})
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
Reference in New Issue
Block a user