mirror of
https://github.com/ceph/ceph-csi.git
synced 2025-06-14 18:53:35 +00:00
rebase: bump golang.org/x/net from 0.35.0 to 0.36.0 in /e2e
Bumps [golang.org/x/net](https://github.com/golang/net) from 0.35.0 to 0.36.0. - [Commits](https://github.com/golang/net/compare/v0.35.0...v0.36.0) --- updated-dependencies: - dependency-name: golang.org/x/net dependency-type: indirect ... Signed-off-by: dependabot[bot] <support@github.com>
This commit is contained in:
committed by
mergify[bot]
parent
a81dc2cbad
commit
76b4f53897
294
e2e/vendor/google.golang.org/grpc/stream.go
generated
vendored
294
e2e/vendor/google.golang.org/grpc/stream.go
generated
vendored
@ -258,9 +258,9 @@ func newClientStream(ctx context.Context, desc *StreamDesc, cc *ClientConn, meth
|
||||
}
|
||||
|
||||
func newClientStreamWithParams(ctx context.Context, desc *StreamDesc, cc *ClientConn, method string, mc serviceconfig.MethodConfig, onCommit, doneFunc func(), opts ...CallOption) (_ iresolver.ClientStream, err error) {
|
||||
c := defaultCallInfo()
|
||||
callInfo := defaultCallInfo()
|
||||
if mc.WaitForReady != nil {
|
||||
c.failFast = !*mc.WaitForReady
|
||||
callInfo.failFast = !*mc.WaitForReady
|
||||
}
|
||||
|
||||
// Possible context leak:
|
||||
@ -281,20 +281,20 @@ func newClientStreamWithParams(ctx context.Context, desc *StreamDesc, cc *Client
|
||||
}()
|
||||
|
||||
for _, o := range opts {
|
||||
if err := o.before(c); err != nil {
|
||||
if err := o.before(callInfo); err != nil {
|
||||
return nil, toRPCErr(err)
|
||||
}
|
||||
}
|
||||
c.maxSendMessageSize = getMaxSize(mc.MaxReqSize, c.maxSendMessageSize, defaultClientMaxSendMessageSize)
|
||||
c.maxReceiveMessageSize = getMaxSize(mc.MaxRespSize, c.maxReceiveMessageSize, defaultClientMaxReceiveMessageSize)
|
||||
if err := setCallInfoCodec(c); err != nil {
|
||||
callInfo.maxSendMessageSize = getMaxSize(mc.MaxReqSize, callInfo.maxSendMessageSize, defaultClientMaxSendMessageSize)
|
||||
callInfo.maxReceiveMessageSize = getMaxSize(mc.MaxRespSize, callInfo.maxReceiveMessageSize, defaultClientMaxReceiveMessageSize)
|
||||
if err := setCallInfoCodec(callInfo); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
callHdr := &transport.CallHdr{
|
||||
Host: cc.authority,
|
||||
Method: method,
|
||||
ContentSubtype: c.contentSubtype,
|
||||
ContentSubtype: callInfo.contentSubtype,
|
||||
DoneFunc: doneFunc,
|
||||
}
|
||||
|
||||
@ -302,22 +302,22 @@ func newClientStreamWithParams(ctx context.Context, desc *StreamDesc, cc *Client
|
||||
// set. In that case, also find the compressor from the encoding package.
|
||||
// Otherwise, use the compressor configured by the WithCompressor DialOption,
|
||||
// if set.
|
||||
var cp Compressor
|
||||
var comp encoding.Compressor
|
||||
if ct := c.compressorType; ct != "" {
|
||||
var compressorV0 Compressor
|
||||
var compressorV1 encoding.Compressor
|
||||
if ct := callInfo.compressorName; ct != "" {
|
||||
callHdr.SendCompress = ct
|
||||
if ct != encoding.Identity {
|
||||
comp = encoding.GetCompressor(ct)
|
||||
if comp == nil {
|
||||
compressorV1 = encoding.GetCompressor(ct)
|
||||
if compressorV1 == nil {
|
||||
return nil, status.Errorf(codes.Internal, "grpc: Compressor is not installed for requested grpc-encoding %q", ct)
|
||||
}
|
||||
}
|
||||
} else if cc.dopts.cp != nil {
|
||||
callHdr.SendCompress = cc.dopts.cp.Type()
|
||||
cp = cc.dopts.cp
|
||||
} else if cc.dopts.compressorV0 != nil {
|
||||
callHdr.SendCompress = cc.dopts.compressorV0.Type()
|
||||
compressorV0 = cc.dopts.compressorV0
|
||||
}
|
||||
if c.creds != nil {
|
||||
callHdr.Creds = c.creds
|
||||
if callInfo.creds != nil {
|
||||
callHdr.Creds = callInfo.creds
|
||||
}
|
||||
|
||||
cs := &clientStream{
|
||||
@ -325,12 +325,12 @@ func newClientStreamWithParams(ctx context.Context, desc *StreamDesc, cc *Client
|
||||
ctx: ctx,
|
||||
methodConfig: &mc,
|
||||
opts: opts,
|
||||
callInfo: c,
|
||||
callInfo: callInfo,
|
||||
cc: cc,
|
||||
desc: desc,
|
||||
codec: c.codec,
|
||||
cp: cp,
|
||||
comp: comp,
|
||||
codec: callInfo.codec,
|
||||
compressorV0: compressorV0,
|
||||
compressorV1: compressorV1,
|
||||
cancel: cancel,
|
||||
firstAttempt: true,
|
||||
onCommit: onCommit,
|
||||
@ -412,7 +412,7 @@ func (cs *clientStream) newAttemptLocked(isTransparent bool) (*csAttempt, error)
|
||||
return nil, ErrClientConnClosing
|
||||
}
|
||||
|
||||
ctx := newContextWithRPCInfo(cs.ctx, cs.callInfo.failFast, cs.callInfo.codec, cs.cp, cs.comp)
|
||||
ctx := newContextWithRPCInfo(cs.ctx, cs.callInfo.failFast, cs.callInfo.codec, cs.compressorV0, cs.compressorV1)
|
||||
method := cs.callHdr.Method
|
||||
var beginTime time.Time
|
||||
shs := cs.cc.dopts.copts.StatsHandlers
|
||||
@ -454,12 +454,12 @@ func (cs *clientStream) newAttemptLocked(isTransparent bool) (*csAttempt, error)
|
||||
}
|
||||
|
||||
return &csAttempt{
|
||||
ctx: ctx,
|
||||
beginTime: beginTime,
|
||||
cs: cs,
|
||||
dc: cs.cc.dopts.dc,
|
||||
statsHandlers: shs,
|
||||
trInfo: trInfo,
|
||||
ctx: ctx,
|
||||
beginTime: beginTime,
|
||||
cs: cs,
|
||||
decompressorV0: cs.cc.dopts.dc,
|
||||
statsHandlers: shs,
|
||||
trInfo: trInfo,
|
||||
}, nil
|
||||
}
|
||||
|
||||
@ -467,7 +467,7 @@ func (a *csAttempt) getTransport() error {
|
||||
cs := a.cs
|
||||
|
||||
var err error
|
||||
a.t, a.pickResult, err = cs.cc.getTransport(a.ctx, cs.callInfo.failFast, cs.callHdr.Method)
|
||||
a.transport, a.pickResult, err = cs.cc.getTransport(a.ctx, cs.callInfo.failFast, cs.callHdr.Method)
|
||||
if err != nil {
|
||||
if de, ok := err.(dropError); ok {
|
||||
err = de.error
|
||||
@ -476,7 +476,7 @@ func (a *csAttempt) getTransport() error {
|
||||
return err
|
||||
}
|
||||
if a.trInfo != nil {
|
||||
a.trInfo.firstLine.SetRemoteAddr(a.t.RemoteAddr())
|
||||
a.trInfo.firstLine.SetRemoteAddr(a.transport.RemoteAddr())
|
||||
}
|
||||
return nil
|
||||
}
|
||||
@ -503,7 +503,7 @@ func (a *csAttempt) newStream() error {
|
||||
a.ctx = metadata.NewOutgoingContext(a.ctx, md)
|
||||
}
|
||||
|
||||
s, err := a.t.NewStream(a.ctx, cs.callHdr)
|
||||
s, err := a.transport.NewStream(a.ctx, cs.callHdr)
|
||||
if err != nil {
|
||||
nse, ok := err.(*transport.NewStreamError)
|
||||
if !ok {
|
||||
@ -518,9 +518,9 @@ func (a *csAttempt) newStream() error {
|
||||
// Unwrap and convert error.
|
||||
return toRPCErr(nse.Err)
|
||||
}
|
||||
a.s = s
|
||||
a.transportStream = s
|
||||
a.ctx = s.Context()
|
||||
a.p = &parser{r: s, bufferPool: a.cs.cc.dopts.copts.BufferPool}
|
||||
a.parser = &parser{r: s, bufferPool: a.cs.cc.dopts.copts.BufferPool}
|
||||
return nil
|
||||
}
|
||||
|
||||
@ -532,9 +532,9 @@ type clientStream struct {
|
||||
cc *ClientConn
|
||||
desc *StreamDesc
|
||||
|
||||
codec baseCodec
|
||||
cp Compressor
|
||||
comp encoding.Compressor
|
||||
codec baseCodec
|
||||
compressorV0 Compressor
|
||||
compressorV1 encoding.Compressor
|
||||
|
||||
cancel context.CancelFunc // cancels all attempts
|
||||
|
||||
@ -583,17 +583,17 @@ type replayOp struct {
|
||||
// csAttempt implements a single transport stream attempt within a
|
||||
// clientStream.
|
||||
type csAttempt struct {
|
||||
ctx context.Context
|
||||
cs *clientStream
|
||||
t transport.ClientTransport
|
||||
s *transport.ClientStream
|
||||
p *parser
|
||||
pickResult balancer.PickResult
|
||||
ctx context.Context
|
||||
cs *clientStream
|
||||
transport transport.ClientTransport
|
||||
transportStream *transport.ClientStream
|
||||
parser *parser
|
||||
pickResult balancer.PickResult
|
||||
|
||||
finished bool
|
||||
dc Decompressor
|
||||
decomp encoding.Compressor
|
||||
decompSet bool
|
||||
finished bool
|
||||
decompressorV0 Decompressor
|
||||
decompressorV1 encoding.Compressor
|
||||
decompressorSet bool
|
||||
|
||||
mu sync.Mutex // guards trInfo.tr
|
||||
// trInfo may be nil (if EnableTracing is false).
|
||||
@ -639,14 +639,14 @@ func (a *csAttempt) shouldRetry(err error) (bool, error) {
|
||||
// RPC is finished or committed or was dropped by the picker; cannot retry.
|
||||
return false, err
|
||||
}
|
||||
if a.s == nil && a.allowTransparentRetry {
|
||||
if a.transportStream == nil && a.allowTransparentRetry {
|
||||
return true, nil
|
||||
}
|
||||
// Wait for the trailers.
|
||||
unprocessed := false
|
||||
if a.s != nil {
|
||||
<-a.s.Done()
|
||||
unprocessed = a.s.Unprocessed()
|
||||
if a.transportStream != nil {
|
||||
<-a.transportStream.Done()
|
||||
unprocessed = a.transportStream.Unprocessed()
|
||||
}
|
||||
if cs.firstAttempt && unprocessed {
|
||||
// First attempt, stream unprocessed: transparently retry.
|
||||
@ -658,14 +658,14 @@ func (a *csAttempt) shouldRetry(err error) (bool, error) {
|
||||
|
||||
pushback := 0
|
||||
hasPushback := false
|
||||
if a.s != nil {
|
||||
if !a.s.TrailersOnly() {
|
||||
if a.transportStream != nil {
|
||||
if !a.transportStream.TrailersOnly() {
|
||||
return false, err
|
||||
}
|
||||
|
||||
// TODO(retry): Move down if the spec changes to not check server pushback
|
||||
// before considering this a failure for throttling.
|
||||
sps := a.s.Trailer()["grpc-retry-pushback-ms"]
|
||||
sps := a.transportStream.Trailer()["grpc-retry-pushback-ms"]
|
||||
if len(sps) == 1 {
|
||||
var e error
|
||||
if pushback, e = strconv.Atoi(sps[0]); e != nil || pushback < 0 {
|
||||
@ -682,8 +682,8 @@ func (a *csAttempt) shouldRetry(err error) (bool, error) {
|
||||
}
|
||||
|
||||
var code codes.Code
|
||||
if a.s != nil {
|
||||
code = a.s.Status().Code()
|
||||
if a.transportStream != nil {
|
||||
code = a.transportStream.Status().Code()
|
||||
} else {
|
||||
code = status.Code(err)
|
||||
}
|
||||
@ -756,8 +756,8 @@ func (cs *clientStream) Context() context.Context {
|
||||
cs.commitAttempt()
|
||||
// No need to lock before using attempt, since we know it is committed and
|
||||
// cannot change.
|
||||
if cs.attempt.s != nil {
|
||||
return cs.attempt.s.Context()
|
||||
if cs.attempt.transportStream != nil {
|
||||
return cs.attempt.transportStream.Context()
|
||||
}
|
||||
return cs.ctx
|
||||
}
|
||||
@ -794,9 +794,9 @@ func (cs *clientStream) withRetry(op func(a *csAttempt) error, onSuccess func())
|
||||
continue
|
||||
}
|
||||
if err == io.EOF {
|
||||
<-a.s.Done()
|
||||
<-a.transportStream.Done()
|
||||
}
|
||||
if err == nil || (err == io.EOF && a.s.Status().Code() == codes.OK) {
|
||||
if err == nil || (err == io.EOF && a.transportStream.Status().Code() == codes.OK) {
|
||||
onSuccess()
|
||||
cs.mu.Unlock()
|
||||
return err
|
||||
@ -812,7 +812,7 @@ func (cs *clientStream) Header() (metadata.MD, error) {
|
||||
var m metadata.MD
|
||||
err := cs.withRetry(func(a *csAttempt) error {
|
||||
var err error
|
||||
m, err = a.s.Header()
|
||||
m, err = a.transportStream.Header()
|
||||
return toRPCErr(err)
|
||||
}, cs.commitAttemptLocked)
|
||||
|
||||
@ -856,10 +856,10 @@ func (cs *clientStream) Trailer() metadata.MD {
|
||||
// directions -- it will prevent races and should not meaningfully impact
|
||||
// performance.
|
||||
cs.commitAttempt()
|
||||
if cs.attempt.s == nil {
|
||||
if cs.attempt.transportStream == nil {
|
||||
return nil
|
||||
}
|
||||
return cs.attempt.s.Trailer()
|
||||
return cs.attempt.transportStream.Trailer()
|
||||
}
|
||||
|
||||
func (cs *clientStream) replayBufferLocked(attempt *csAttempt) error {
|
||||
@ -904,7 +904,7 @@ func (cs *clientStream) SendMsg(m any) (err error) {
|
||||
}
|
||||
|
||||
// load hdr, payload, data
|
||||
hdr, data, payload, pf, err := prepareMsg(m, cs.codec, cs.cp, cs.comp, cs.cc.dopts.copts.BufferPool)
|
||||
hdr, data, payload, pf, err := prepareMsg(m, cs.codec, cs.compressorV0, cs.compressorV1, cs.cc.dopts.copts.BufferPool)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
@ -992,7 +992,7 @@ func (cs *clientStream) CloseSend() error {
|
||||
}
|
||||
cs.sentLast = true
|
||||
op := func(a *csAttempt) error {
|
||||
a.s.Write(nil, nil, &transport.WriteOptions{Last: true})
|
||||
a.transportStream.Write(nil, nil, &transport.WriteOptions{Last: true})
|
||||
// Always return nil; io.EOF is the only error that might make sense
|
||||
// instead, but there is no need to signal the client to call RecvMsg
|
||||
// as the only use left for the stream after CloseSend is to call
|
||||
@ -1030,7 +1030,7 @@ func (cs *clientStream) finish(err error) {
|
||||
if cs.attempt != nil {
|
||||
cs.attempt.finish(err)
|
||||
// after functions all rely upon having a stream.
|
||||
if cs.attempt.s != nil {
|
||||
if cs.attempt.transportStream != nil {
|
||||
for _, o := range cs.opts {
|
||||
o.after(cs.callInfo, cs.attempt)
|
||||
}
|
||||
@ -1084,7 +1084,7 @@ func (a *csAttempt) sendMsg(m any, hdr []byte, payld mem.BufferSlice, dataLength
|
||||
}
|
||||
a.mu.Unlock()
|
||||
}
|
||||
if err := a.s.Write(hdr, payld, &transport.WriteOptions{Last: !cs.desc.ClientStreams}); err != nil {
|
||||
if err := a.transportStream.Write(hdr, payld, &transport.WriteOptions{Last: !cs.desc.ClientStreams}); err != nil {
|
||||
if !cs.desc.ClientStreams {
|
||||
// For non-client-streaming RPCs, we return nil instead of EOF on error
|
||||
// because the generated code requires it. finish is not called; RecvMsg()
|
||||
@ -1108,25 +1108,25 @@ func (a *csAttempt) recvMsg(m any, payInfo *payloadInfo) (err error) {
|
||||
defer payInfo.free()
|
||||
}
|
||||
|
||||
if !a.decompSet {
|
||||
if !a.decompressorSet {
|
||||
// Block until we receive headers containing received message encoding.
|
||||
if ct := a.s.RecvCompress(); ct != "" && ct != encoding.Identity {
|
||||
if a.dc == nil || a.dc.Type() != ct {
|
||||
if ct := a.transportStream.RecvCompress(); ct != "" && ct != encoding.Identity {
|
||||
if a.decompressorV0 == nil || a.decompressorV0.Type() != ct {
|
||||
// No configured decompressor, or it does not match the incoming
|
||||
// message encoding; attempt to find a registered compressor that does.
|
||||
a.dc = nil
|
||||
a.decomp = encoding.GetCompressor(ct)
|
||||
a.decompressorV0 = nil
|
||||
a.decompressorV1 = encoding.GetCompressor(ct)
|
||||
}
|
||||
} else {
|
||||
// No compression is used; disable our decompressor.
|
||||
a.dc = nil
|
||||
a.decompressorV0 = nil
|
||||
}
|
||||
// Only initialize this state once per stream.
|
||||
a.decompSet = true
|
||||
a.decompressorSet = true
|
||||
}
|
||||
if err := recv(a.p, cs.codec, a.s, a.dc, m, *cs.callInfo.maxReceiveMessageSize, payInfo, a.decomp, false); err != nil {
|
||||
if err := recv(a.parser, cs.codec, a.transportStream, a.decompressorV0, m, *cs.callInfo.maxReceiveMessageSize, payInfo, a.decompressorV1, false); err != nil {
|
||||
if err == io.EOF {
|
||||
if statusErr := a.s.Status().Err(); statusErr != nil {
|
||||
if statusErr := a.transportStream.Status().Err(); statusErr != nil {
|
||||
return statusErr
|
||||
}
|
||||
return io.EOF // indicates successful end of stream.
|
||||
@ -1157,8 +1157,8 @@ func (a *csAttempt) recvMsg(m any, payInfo *payloadInfo) (err error) {
|
||||
}
|
||||
// Special handling for non-server-stream rpcs.
|
||||
// This recv expects EOF or errors, so we don't collect inPayload.
|
||||
if err := recv(a.p, cs.codec, a.s, a.dc, m, *cs.callInfo.maxReceiveMessageSize, nil, a.decomp, false); err == io.EOF {
|
||||
return a.s.Status().Err() // non-server streaming Recv returns nil on success
|
||||
if err := recv(a.parser, cs.codec, a.transportStream, a.decompressorV0, m, *cs.callInfo.maxReceiveMessageSize, nil, a.decompressorV1, false); err == io.EOF {
|
||||
return a.transportStream.Status().Err() // non-server streaming Recv returns nil on success
|
||||
} else if err != nil {
|
||||
return toRPCErr(err)
|
||||
}
|
||||
@ -1177,20 +1177,20 @@ func (a *csAttempt) finish(err error) {
|
||||
err = nil
|
||||
}
|
||||
var tr metadata.MD
|
||||
if a.s != nil {
|
||||
a.s.Close(err)
|
||||
tr = a.s.Trailer()
|
||||
if a.transportStream != nil {
|
||||
a.transportStream.Close(err)
|
||||
tr = a.transportStream.Trailer()
|
||||
}
|
||||
|
||||
if a.pickResult.Done != nil {
|
||||
br := false
|
||||
if a.s != nil {
|
||||
br = a.s.BytesReceived()
|
||||
if a.transportStream != nil {
|
||||
br = a.transportStream.BytesReceived()
|
||||
}
|
||||
a.pickResult.Done(balancer.DoneInfo{
|
||||
Err: err,
|
||||
Trailer: tr,
|
||||
BytesSent: a.s != nil,
|
||||
BytesSent: a.transportStream != nil,
|
||||
BytesReceived: br,
|
||||
ServerLoad: balancerload.Parse(tr),
|
||||
})
|
||||
@ -1272,7 +1272,7 @@ func newNonRetryClientStream(ctx context.Context, desc *StreamDesc, method strin
|
||||
// if set.
|
||||
var cp Compressor
|
||||
var comp encoding.Compressor
|
||||
if ct := c.compressorType; ct != "" {
|
||||
if ct := c.compressorName; ct != "" {
|
||||
callHdr.SendCompress = ct
|
||||
if ct != encoding.Identity {
|
||||
comp = encoding.GetCompressor(ct)
|
||||
@ -1280,9 +1280,9 @@ func newNonRetryClientStream(ctx context.Context, desc *StreamDesc, method strin
|
||||
return nil, status.Errorf(codes.Internal, "grpc: Compressor is not installed for requested grpc-encoding %q", ct)
|
||||
}
|
||||
}
|
||||
} else if ac.cc.dopts.cp != nil {
|
||||
callHdr.SendCompress = ac.cc.dopts.cp.Type()
|
||||
cp = ac.cc.dopts.cp
|
||||
} else if ac.cc.dopts.compressorV0 != nil {
|
||||
callHdr.SendCompress = ac.cc.dopts.compressorV0.Type()
|
||||
cp = ac.cc.dopts.compressorV0
|
||||
}
|
||||
if c.creds != nil {
|
||||
callHdr.Creds = c.creds
|
||||
@ -1290,26 +1290,26 @@ func newNonRetryClientStream(ctx context.Context, desc *StreamDesc, method strin
|
||||
|
||||
// Use a special addrConnStream to avoid retry.
|
||||
as := &addrConnStream{
|
||||
callHdr: callHdr,
|
||||
ac: ac,
|
||||
ctx: ctx,
|
||||
cancel: cancel,
|
||||
opts: opts,
|
||||
callInfo: c,
|
||||
desc: desc,
|
||||
codec: c.codec,
|
||||
cp: cp,
|
||||
comp: comp,
|
||||
t: t,
|
||||
callHdr: callHdr,
|
||||
ac: ac,
|
||||
ctx: ctx,
|
||||
cancel: cancel,
|
||||
opts: opts,
|
||||
callInfo: c,
|
||||
desc: desc,
|
||||
codec: c.codec,
|
||||
sendCompressorV0: cp,
|
||||
sendCompressorV1: comp,
|
||||
transport: t,
|
||||
}
|
||||
|
||||
s, err := as.t.NewStream(as.ctx, as.callHdr)
|
||||
s, err := as.transport.NewStream(as.ctx, as.callHdr)
|
||||
if err != nil {
|
||||
err = toRPCErr(err)
|
||||
return nil, err
|
||||
}
|
||||
as.s = s
|
||||
as.p = &parser{r: s, bufferPool: ac.dopts.copts.BufferPool}
|
||||
as.transportStream = s
|
||||
as.parser = &parser{r: s, bufferPool: ac.dopts.copts.BufferPool}
|
||||
ac.incrCallsStarted()
|
||||
if desc != unaryStreamDesc {
|
||||
// Listen on stream context to cleanup when the stream context is
|
||||
@ -1335,29 +1335,31 @@ func newNonRetryClientStream(ctx context.Context, desc *StreamDesc, method strin
|
||||
}
|
||||
|
||||
type addrConnStream struct {
|
||||
s *transport.ClientStream
|
||||
ac *addrConn
|
||||
callHdr *transport.CallHdr
|
||||
cancel context.CancelFunc
|
||||
opts []CallOption
|
||||
callInfo *callInfo
|
||||
t transport.ClientTransport
|
||||
ctx context.Context
|
||||
sentLast bool
|
||||
desc *StreamDesc
|
||||
codec baseCodec
|
||||
cp Compressor
|
||||
comp encoding.Compressor
|
||||
decompSet bool
|
||||
dc Decompressor
|
||||
decomp encoding.Compressor
|
||||
p *parser
|
||||
mu sync.Mutex
|
||||
finished bool
|
||||
transportStream *transport.ClientStream
|
||||
ac *addrConn
|
||||
callHdr *transport.CallHdr
|
||||
cancel context.CancelFunc
|
||||
opts []CallOption
|
||||
callInfo *callInfo
|
||||
transport transport.ClientTransport
|
||||
ctx context.Context
|
||||
sentLast bool
|
||||
desc *StreamDesc
|
||||
codec baseCodec
|
||||
sendCompressorV0 Compressor
|
||||
sendCompressorV1 encoding.Compressor
|
||||
decompressorSet bool
|
||||
decompressorV0 Decompressor
|
||||
decompressorV1 encoding.Compressor
|
||||
parser *parser
|
||||
|
||||
// mu guards finished and is held for the entire finish method.
|
||||
mu sync.Mutex
|
||||
finished bool
|
||||
}
|
||||
|
||||
func (as *addrConnStream) Header() (metadata.MD, error) {
|
||||
m, err := as.s.Header()
|
||||
m, err := as.transportStream.Header()
|
||||
if err != nil {
|
||||
as.finish(toRPCErr(err))
|
||||
}
|
||||
@ -1365,7 +1367,7 @@ func (as *addrConnStream) Header() (metadata.MD, error) {
|
||||
}
|
||||
|
||||
func (as *addrConnStream) Trailer() metadata.MD {
|
||||
return as.s.Trailer()
|
||||
return as.transportStream.Trailer()
|
||||
}
|
||||
|
||||
func (as *addrConnStream) CloseSend() error {
|
||||
@ -1375,7 +1377,7 @@ func (as *addrConnStream) CloseSend() error {
|
||||
}
|
||||
as.sentLast = true
|
||||
|
||||
as.s.Write(nil, nil, &transport.WriteOptions{Last: true})
|
||||
as.transportStream.Write(nil, nil, &transport.WriteOptions{Last: true})
|
||||
// Always return nil; io.EOF is the only error that might make sense
|
||||
// instead, but there is no need to signal the client to call RecvMsg
|
||||
// as the only use left for the stream after CloseSend is to call
|
||||
@ -1384,7 +1386,7 @@ func (as *addrConnStream) CloseSend() error {
|
||||
}
|
||||
|
||||
func (as *addrConnStream) Context() context.Context {
|
||||
return as.s.Context()
|
||||
return as.transportStream.Context()
|
||||
}
|
||||
|
||||
func (as *addrConnStream) SendMsg(m any) (err error) {
|
||||
@ -1406,7 +1408,7 @@ func (as *addrConnStream) SendMsg(m any) (err error) {
|
||||
}
|
||||
|
||||
// load hdr, payload, data
|
||||
hdr, data, payload, pf, err := prepareMsg(m, as.codec, as.cp, as.comp, as.ac.dopts.copts.BufferPool)
|
||||
hdr, data, payload, pf, err := prepareMsg(m, as.codec, as.sendCompressorV0, as.sendCompressorV1, as.ac.dopts.copts.BufferPool)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
@ -1425,7 +1427,7 @@ func (as *addrConnStream) SendMsg(m any) (err error) {
|
||||
return status.Errorf(codes.ResourceExhausted, "trying to send message larger than max (%d vs. %d)", payload.Len(), *as.callInfo.maxSendMessageSize)
|
||||
}
|
||||
|
||||
if err := as.s.Write(hdr, payload, &transport.WriteOptions{Last: !as.desc.ClientStreams}); err != nil {
|
||||
if err := as.transportStream.Write(hdr, payload, &transport.WriteOptions{Last: !as.desc.ClientStreams}); err != nil {
|
||||
if !as.desc.ClientStreams {
|
||||
// For non-client-streaming RPCs, we return nil instead of EOF on error
|
||||
// because the generated code requires it. finish is not called; RecvMsg()
|
||||
@ -1446,25 +1448,25 @@ func (as *addrConnStream) RecvMsg(m any) (err error) {
|
||||
}
|
||||
}()
|
||||
|
||||
if !as.decompSet {
|
||||
if !as.decompressorSet {
|
||||
// Block until we receive headers containing received message encoding.
|
||||
if ct := as.s.RecvCompress(); ct != "" && ct != encoding.Identity {
|
||||
if as.dc == nil || as.dc.Type() != ct {
|
||||
if ct := as.transportStream.RecvCompress(); ct != "" && ct != encoding.Identity {
|
||||
if as.decompressorV0 == nil || as.decompressorV0.Type() != ct {
|
||||
// No configured decompressor, or it does not match the incoming
|
||||
// message encoding; attempt to find a registered compressor that does.
|
||||
as.dc = nil
|
||||
as.decomp = encoding.GetCompressor(ct)
|
||||
as.decompressorV0 = nil
|
||||
as.decompressorV1 = encoding.GetCompressor(ct)
|
||||
}
|
||||
} else {
|
||||
// No compression is used; disable our decompressor.
|
||||
as.dc = nil
|
||||
as.decompressorV0 = nil
|
||||
}
|
||||
// Only initialize this state once per stream.
|
||||
as.decompSet = true
|
||||
as.decompressorSet = true
|
||||
}
|
||||
if err := recv(as.p, as.codec, as.s, as.dc, m, *as.callInfo.maxReceiveMessageSize, nil, as.decomp, false); err != nil {
|
||||
if err := recv(as.parser, as.codec, as.transportStream, as.decompressorV0, m, *as.callInfo.maxReceiveMessageSize, nil, as.decompressorV1, false); err != nil {
|
||||
if err == io.EOF {
|
||||
if statusErr := as.s.Status().Err(); statusErr != nil {
|
||||
if statusErr := as.transportStream.Status().Err(); statusErr != nil {
|
||||
return statusErr
|
||||
}
|
||||
return io.EOF // indicates successful end of stream.
|
||||
@ -1479,8 +1481,8 @@ func (as *addrConnStream) RecvMsg(m any) (err error) {
|
||||
|
||||
// Special handling for non-server-stream rpcs.
|
||||
// This recv expects EOF or errors, so we don't collect inPayload.
|
||||
if err := recv(as.p, as.codec, as.s, as.dc, m, *as.callInfo.maxReceiveMessageSize, nil, as.decomp, false); err == io.EOF {
|
||||
return as.s.Status().Err() // non-server streaming Recv returns nil on success
|
||||
if err := recv(as.parser, as.codec, as.transportStream, as.decompressorV0, m, *as.callInfo.maxReceiveMessageSize, nil, as.decompressorV1, false); err == io.EOF {
|
||||
return as.transportStream.Status().Err() // non-server streaming Recv returns nil on success
|
||||
} else if err != nil {
|
||||
return toRPCErr(err)
|
||||
}
|
||||
@ -1498,8 +1500,8 @@ func (as *addrConnStream) finish(err error) {
|
||||
// Ending a stream with EOF indicates a success.
|
||||
err = nil
|
||||
}
|
||||
if as.s != nil {
|
||||
as.s.Close(err)
|
||||
if as.transportStream != nil {
|
||||
as.transportStream.Close(err)
|
||||
}
|
||||
|
||||
if err != nil {
|
||||
@ -1570,10 +1572,10 @@ type serverStream struct {
|
||||
p *parser
|
||||
codec baseCodec
|
||||
|
||||
cp Compressor
|
||||
dc Decompressor
|
||||
comp encoding.Compressor
|
||||
decomp encoding.Compressor
|
||||
compressorV0 Compressor
|
||||
compressorV1 encoding.Compressor
|
||||
decompressorV0 Decompressor
|
||||
decompressorV1 encoding.Compressor
|
||||
|
||||
sendCompressorName string
|
||||
|
||||
@ -1669,12 +1671,12 @@ func (ss *serverStream) SendMsg(m any) (err error) {
|
||||
// Server handler could have set new compressor by calling SetSendCompressor.
|
||||
// In case it is set, we need to use it for compressing outbound message.
|
||||
if sendCompressorsName := ss.s.SendCompress(); sendCompressorsName != ss.sendCompressorName {
|
||||
ss.comp = encoding.GetCompressor(sendCompressorsName)
|
||||
ss.compressorV1 = encoding.GetCompressor(sendCompressorsName)
|
||||
ss.sendCompressorName = sendCompressorsName
|
||||
}
|
||||
|
||||
// load hdr, payload, data
|
||||
hdr, data, payload, pf, err := prepareMsg(m, ss.codec, ss.cp, ss.comp, ss.p.bufferPool)
|
||||
hdr, data, payload, pf, err := prepareMsg(m, ss.codec, ss.compressorV0, ss.compressorV1, ss.p.bufferPool)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
@ -1755,7 +1757,7 @@ func (ss *serverStream) RecvMsg(m any) (err error) {
|
||||
payInfo = &payloadInfo{}
|
||||
defer payInfo.free()
|
||||
}
|
||||
if err := recv(ss.p, ss.codec, ss.s, ss.dc, m, ss.maxReceiveMessageSize, payInfo, ss.decomp, true); err != nil {
|
||||
if err := recv(ss.p, ss.codec, ss.s, ss.decompressorV0, m, ss.maxReceiveMessageSize, payInfo, ss.decompressorV1, true); err != nil {
|
||||
if err == io.EOF {
|
||||
if len(ss.binlogs) != 0 {
|
||||
chc := &binarylog.ClientHalfClose{}
|
||||
|
Reference in New Issue
Block a user