rebase: bump google.golang.org/grpc from 1.65.0 to 1.66.0

Bumps [google.golang.org/grpc](https://github.com/grpc/grpc-go) from 1.65.0 to 1.66.0.
- [Release notes](https://github.com/grpc/grpc-go/releases)
- [Commits](https://github.com/grpc/grpc-go/compare/v1.65.0...v1.66.0)

---
updated-dependencies:
- dependency-name: google.golang.org/grpc
  dependency-type: direct:production
  update-type: version-update:semver-minor
...

Signed-off-by: dependabot[bot] <support@github.com>
This commit is contained in:
dependabot[bot]
2024-09-02 20:06:42 +00:00
committed by mergify[bot]
parent 89da94cfd0
commit 56cf915dff
59 changed files with 2807 additions and 1294 deletions

View File

@ -41,6 +41,7 @@ import (
"google.golang.org/grpc/internal/serviceconfig"
istatus "google.golang.org/grpc/internal/status"
"google.golang.org/grpc/internal/transport"
"google.golang.org/grpc/mem"
"google.golang.org/grpc/metadata"
"google.golang.org/grpc/peer"
"google.golang.org/grpc/stats"
@ -359,7 +360,7 @@ func newClientStreamWithParams(ctx context.Context, desc *StreamDesc, cc *Client
cs.attempt = a
return nil
}
if err := cs.withRetry(op, func() { cs.bufferForRetryLocked(0, op) }); err != nil {
if err := cs.withRetry(op, func() { cs.bufferForRetryLocked(0, op, nil) }); err != nil {
return nil, err
}
@ -517,7 +518,7 @@ func (a *csAttempt) newStream() error {
}
a.s = s
a.ctx = s.Context()
a.p = &parser{r: s, recvBufferPool: a.cs.cc.dopts.recvBufferPool}
a.p = &parser{r: s, bufferPool: a.cs.cc.dopts.copts.BufferPool}
return nil
}
@ -566,10 +567,15 @@ type clientStream struct {
// place where we need to check if the attempt is nil.
attempt *csAttempt
// TODO(hedging): hedging will have multiple attempts simultaneously.
committed bool // active attempt committed for retry?
onCommit func()
buffer []func(a *csAttempt) error // operations to replay on retry
bufferSize int // current size of buffer
committed bool // active attempt committed for retry?
onCommit func()
replayBuffer []replayOp // operations to replay on retry
replayBufferSize int // current size of replayBuffer
}
type replayOp struct {
op func(a *csAttempt) error
cleanup func()
}
// csAttempt implements a single transport stream attempt within a
@ -607,7 +613,12 @@ func (cs *clientStream) commitAttemptLocked() {
cs.onCommit()
}
cs.committed = true
cs.buffer = nil
for _, op := range cs.replayBuffer {
if op.cleanup != nil {
op.cleanup()
}
}
cs.replayBuffer = nil
}
func (cs *clientStream) commitAttempt() {
@ -732,7 +743,7 @@ func (cs *clientStream) retryLocked(attempt *csAttempt, lastErr error) error {
// the stream is canceled.
return err
}
// Note that the first op in the replay buffer always sets cs.attempt
// Note that the first op in replayBuffer always sets cs.attempt
// if it is able to pick a transport and create a stream.
if lastErr = cs.replayBufferLocked(attempt); lastErr == nil {
return nil
@ -761,7 +772,7 @@ func (cs *clientStream) withRetry(op func(a *csAttempt) error, onSuccess func())
// already be status errors.
return toRPCErr(op(cs.attempt))
}
if len(cs.buffer) == 0 {
if len(cs.replayBuffer) == 0 {
// For the first op, which controls creation of the stream and
// assigns cs.attempt, we need to create a new attempt inline
// before executing the first op. On subsequent ops, the attempt
@ -851,25 +862,26 @@ func (cs *clientStream) Trailer() metadata.MD {
}
func (cs *clientStream) replayBufferLocked(attempt *csAttempt) error {
for _, f := range cs.buffer {
if err := f(attempt); err != nil {
for _, f := range cs.replayBuffer {
if err := f.op(attempt); err != nil {
return err
}
}
return nil
}
func (cs *clientStream) bufferForRetryLocked(sz int, op func(a *csAttempt) error) {
func (cs *clientStream) bufferForRetryLocked(sz int, op func(a *csAttempt) error, cleanup func()) {
// Note: we still will buffer if retry is disabled (for transparent retries).
if cs.committed {
return
}
cs.bufferSize += sz
if cs.bufferSize > cs.callInfo.maxRetryRPCBufferSize {
cs.replayBufferSize += sz
if cs.replayBufferSize > cs.callInfo.maxRetryRPCBufferSize {
cs.commitAttemptLocked()
cleanup()
return
}
cs.buffer = append(cs.buffer, op)
cs.replayBuffer = append(cs.replayBuffer, replayOp{op: op, cleanup: cleanup})
}
func (cs *clientStream) SendMsg(m any) (err error) {
@ -891,23 +903,50 @@ func (cs *clientStream) SendMsg(m any) (err error) {
}
// load hdr, payload, data
hdr, payload, data, err := prepareMsg(m, cs.codec, cs.cp, cs.comp)
hdr, data, payload, pf, err := prepareMsg(m, cs.codec, cs.cp, cs.comp, cs.cc.dopts.copts.BufferPool)
if err != nil {
return err
}
defer func() {
data.Free()
// only free payload if compression was made, and therefore it is a different set
// of buffers from data.
if pf.isCompressed() {
payload.Free()
}
}()
dataLen := data.Len()
payloadLen := payload.Len()
// TODO(dfawley): should we be checking len(data) instead?
if len(payload) > *cs.callInfo.maxSendMessageSize {
return status.Errorf(codes.ResourceExhausted, "trying to send message larger than max (%d vs. %d)", len(payload), *cs.callInfo.maxSendMessageSize)
if payloadLen > *cs.callInfo.maxSendMessageSize {
return status.Errorf(codes.ResourceExhausted, "trying to send message larger than max (%d vs. %d)", payloadLen, *cs.callInfo.maxSendMessageSize)
}
// always take an extra ref in case data == payload (i.e. when the data isn't
// compressed). The original ref will always be freed by the deferred free above.
payload.Ref()
op := func(a *csAttempt) error {
return a.sendMsg(m, hdr, payload, data)
return a.sendMsg(m, hdr, payload, dataLen, payloadLen)
}
// onSuccess is invoked when the op is captured for a subsequent retry. If the
// stream was established by a previous message and therefore retries are
// disabled, onSuccess will not be invoked, and payloadRef can be freed
// immediately.
onSuccessCalled := false
err = cs.withRetry(op, func() {
cs.bufferForRetryLocked(len(hdr)+payloadLen, op, payload.Free)
onSuccessCalled = true
})
if !onSuccessCalled {
payload.Free()
}
err = cs.withRetry(op, func() { cs.bufferForRetryLocked(len(hdr)+len(payload), op) })
if len(cs.binlogs) != 0 && err == nil {
cm := &binarylog.ClientMessage{
OnClientSide: true,
Message: data,
Message: data.Materialize(),
}
for _, binlog := range cs.binlogs {
binlog.Log(cs.ctx, cm)
@ -924,6 +963,7 @@ func (cs *clientStream) RecvMsg(m any) error {
var recvInfo *payloadInfo
if len(cs.binlogs) != 0 {
recvInfo = &payloadInfo{}
defer recvInfo.free()
}
err := cs.withRetry(func(a *csAttempt) error {
return a.recvMsg(m, recvInfo)
@ -931,7 +971,7 @@ func (cs *clientStream) RecvMsg(m any) error {
if len(cs.binlogs) != 0 && err == nil {
sm := &binarylog.ServerMessage{
OnClientSide: true,
Message: recvInfo.uncompressedBytes,
Message: recvInfo.uncompressedBytes.Materialize(),
}
for _, binlog := range cs.binlogs {
binlog.Log(cs.ctx, sm)
@ -958,7 +998,7 @@ func (cs *clientStream) CloseSend() error {
// RecvMsg. This also matches historical behavior.
return nil
}
cs.withRetry(op, func() { cs.bufferForRetryLocked(0, op) })
cs.withRetry(op, func() { cs.bufferForRetryLocked(0, op, nil) })
if len(cs.binlogs) != 0 {
chc := &binarylog.ClientHalfClose{
OnClientSide: true,
@ -1034,7 +1074,7 @@ func (cs *clientStream) finish(err error) {
cs.cancel()
}
func (a *csAttempt) sendMsg(m any, hdr, payld, data []byte) error {
func (a *csAttempt) sendMsg(m any, hdr []byte, payld mem.BufferSlice, dataLength, payloadLength int) error {
cs := a.cs
if a.trInfo != nil {
a.mu.Lock()
@ -1052,8 +1092,10 @@ func (a *csAttempt) sendMsg(m any, hdr, payld, data []byte) error {
}
return io.EOF
}
for _, sh := range a.statsHandlers {
sh.HandleRPC(a.ctx, outPayload(true, m, data, payld, time.Now()))
if len(a.statsHandlers) != 0 {
for _, sh := range a.statsHandlers {
sh.HandleRPC(a.ctx, outPayload(true, m, dataLength, payloadLength, time.Now()))
}
}
if channelz.IsOn() {
a.t.IncrMsgSent()
@ -1065,6 +1107,7 @@ func (a *csAttempt) recvMsg(m any, payInfo *payloadInfo) (err error) {
cs := a.cs
if len(a.statsHandlers) != 0 && payInfo == nil {
payInfo = &payloadInfo{}
defer payInfo.free()
}
if !a.decompSet {
@ -1083,8 +1126,7 @@ func (a *csAttempt) recvMsg(m any, payInfo *payloadInfo) (err error) {
// Only initialize this state once per stream.
a.decompSet = true
}
err = recv(a.p, cs.codec, a.s, a.dc, m, *cs.callInfo.maxReceiveMessageSize, payInfo, a.decomp)
if err != nil {
if err := recv(a.p, cs.codec, a.s, a.dc, m, *cs.callInfo.maxReceiveMessageSize, payInfo, a.decomp, false); err != nil {
if err == io.EOF {
if statusErr := a.s.Status().Err(); statusErr != nil {
return statusErr
@ -1103,14 +1145,12 @@ func (a *csAttempt) recvMsg(m any, payInfo *payloadInfo) (err error) {
}
for _, sh := range a.statsHandlers {
sh.HandleRPC(a.ctx, &stats.InPayload{
Client: true,
RecvTime: time.Now(),
Payload: m,
// TODO truncate large payload.
Data: payInfo.uncompressedBytes,
Client: true,
RecvTime: time.Now(),
Payload: m,
WireLength: payInfo.compressedLength + headerLen,
CompressedLength: payInfo.compressedLength,
Length: len(payInfo.uncompressedBytes),
Length: payInfo.uncompressedBytes.Len(),
})
}
if channelz.IsOn() {
@ -1122,14 +1162,12 @@ func (a *csAttempt) recvMsg(m any, payInfo *payloadInfo) (err error) {
}
// Special handling for non-server-stream rpcs.
// This recv expects EOF or errors, so we don't collect inPayload.
err = recv(a.p, cs.codec, a.s, a.dc, m, *cs.callInfo.maxReceiveMessageSize, nil, a.decomp)
if err == nil {
return toRPCErr(errors.New("grpc: client streaming protocol violation: get <nil>, want <EOF>"))
}
if err == io.EOF {
if err := recv(a.p, cs.codec, a.s, a.dc, m, *cs.callInfo.maxReceiveMessageSize, nil, a.decomp, false); err == io.EOF {
return a.s.Status().Err() // non-server streaming Recv returns nil on success
} else if err != nil {
return toRPCErr(err)
}
return toRPCErr(err)
return toRPCErr(errors.New("grpc: client streaming protocol violation: get <nil>, want <EOF>"))
}
func (a *csAttempt) finish(err error) {
@ -1185,12 +1223,12 @@ func (a *csAttempt) finish(err error) {
a.mu.Unlock()
}
// newClientStream creates a ClientStream with the specified transport, on the
// newNonRetryClientStream creates a ClientStream with the specified transport, on the
// given addrConn.
//
// It's expected that the given transport is either the same one in addrConn, or
// is already closed. To avoid race, transport is specified separately, instead
// of using ac.transpot.
// of using ac.transport.
//
// Main difference between this and ClientConn.NewStream:
// - no retry
@ -1276,7 +1314,7 @@ func newNonRetryClientStream(ctx context.Context, desc *StreamDesc, method strin
return nil, err
}
as.s = s
as.p = &parser{r: s, recvBufferPool: ac.dopts.recvBufferPool}
as.p = &parser{r: s, bufferPool: ac.dopts.copts.BufferPool}
ac.incrCallsStarted()
if desc != unaryStreamDesc {
// Listen on stream context to cleanup when the stream context is
@ -1373,17 +1411,26 @@ func (as *addrConnStream) SendMsg(m any) (err error) {
}
// load hdr, payload, data
hdr, payld, _, err := prepareMsg(m, as.codec, as.cp, as.comp)
hdr, data, payload, pf, err := prepareMsg(m, as.codec, as.cp, as.comp, as.ac.dopts.copts.BufferPool)
if err != nil {
return err
}
defer func() {
data.Free()
// only free payload if compression was made, and therefore it is a different set
// of buffers from data.
if pf.isCompressed() {
payload.Free()
}
}()
// TODO(dfawley): should we be checking len(data) instead?
if len(payld) > *as.callInfo.maxSendMessageSize {
return status.Errorf(codes.ResourceExhausted, "trying to send message larger than max (%d vs. %d)", len(payld), *as.callInfo.maxSendMessageSize)
if payload.Len() > *as.callInfo.maxSendMessageSize {
return status.Errorf(codes.ResourceExhausted, "trying to send message larger than max (%d vs. %d)", payload.Len(), *as.callInfo.maxSendMessageSize)
}
if err := as.t.Write(as.s, hdr, payld, &transport.Options{Last: !as.desc.ClientStreams}); err != nil {
if err := as.t.Write(as.s, hdr, payload, &transport.Options{Last: !as.desc.ClientStreams}); err != nil {
if !as.desc.ClientStreams {
// For non-client-streaming RPCs, we return nil instead of EOF on error
// because the generated code requires it. finish is not called; RecvMsg()
@ -1423,8 +1470,7 @@ func (as *addrConnStream) RecvMsg(m any) (err error) {
// Only initialize this state once per stream.
as.decompSet = true
}
err = recv(as.p, as.codec, as.s, as.dc, m, *as.callInfo.maxReceiveMessageSize, nil, as.decomp)
if err != nil {
if err := recv(as.p, as.codec, as.s, as.dc, m, *as.callInfo.maxReceiveMessageSize, nil, as.decomp, false); err != nil {
if err == io.EOF {
if statusErr := as.s.Status().Err(); statusErr != nil {
return statusErr
@ -1444,14 +1490,12 @@ func (as *addrConnStream) RecvMsg(m any) (err error) {
// Special handling for non-server-stream rpcs.
// This recv expects EOF or errors, so we don't collect inPayload.
err = recv(as.p, as.codec, as.s, as.dc, m, *as.callInfo.maxReceiveMessageSize, nil, as.decomp)
if err == nil {
return toRPCErr(errors.New("grpc: client streaming protocol violation: get <nil>, want <EOF>"))
}
if err == io.EOF {
if err := recv(as.p, as.codec, as.s, as.dc, m, *as.callInfo.maxReceiveMessageSize, nil, as.decomp, false); err == io.EOF {
return as.s.Status().Err() // non-server streaming Recv returns nil on success
} else if err != nil {
return toRPCErr(err)
}
return toRPCErr(err)
return toRPCErr(errors.New("grpc: client streaming protocol violation: get <nil>, want <EOF>"))
}
func (as *addrConnStream) finish(err error) {
@ -1645,18 +1689,31 @@ func (ss *serverStream) SendMsg(m any) (err error) {
}
// load hdr, payload, data
hdr, payload, data, err := prepareMsg(m, ss.codec, ss.cp, ss.comp)
hdr, data, payload, pf, err := prepareMsg(m, ss.codec, ss.cp, ss.comp, ss.p.bufferPool)
if err != nil {
return err
}
defer func() {
data.Free()
// only free payload if compression was made, and therefore it is a different set
// of buffers from data.
if pf.isCompressed() {
payload.Free()
}
}()
dataLen := data.Len()
payloadLen := payload.Len()
// TODO(dfawley): should we be checking len(data) instead?
if len(payload) > ss.maxSendMessageSize {
return status.Errorf(codes.ResourceExhausted, "trying to send message larger than max (%d vs. %d)", len(payload), ss.maxSendMessageSize)
if payloadLen > ss.maxSendMessageSize {
return status.Errorf(codes.ResourceExhausted, "trying to send message larger than max (%d vs. %d)", payloadLen, ss.maxSendMessageSize)
}
if err := ss.t.Write(ss.s, hdr, payload, &transport.Options{Last: false}); err != nil {
return toRPCErr(err)
}
if len(ss.binlogs) != 0 {
if !ss.serverHeaderBinlogged {
h, _ := ss.s.Header()
@ -1669,7 +1726,7 @@ func (ss *serverStream) SendMsg(m any) (err error) {
}
}
sm := &binarylog.ServerMessage{
Message: data,
Message: data.Materialize(),
}
for _, binlog := range ss.binlogs {
binlog.Log(ss.ctx, sm)
@ -1677,7 +1734,7 @@ func (ss *serverStream) SendMsg(m any) (err error) {
}
if len(ss.statsHandler) != 0 {
for _, sh := range ss.statsHandler {
sh.HandleRPC(ss.s.Context(), outPayload(false, m, data, payload, time.Now()))
sh.HandleRPC(ss.s.Context(), outPayload(false, m, dataLen, payloadLen, time.Now()))
}
}
return nil
@ -1714,8 +1771,9 @@ func (ss *serverStream) RecvMsg(m any) (err error) {
var payInfo *payloadInfo
if len(ss.statsHandler) != 0 || len(ss.binlogs) != 0 {
payInfo = &payloadInfo{}
defer payInfo.free()
}
if err := recv(ss.p, ss.codec, ss.s, ss.dc, m, ss.maxReceiveMessageSize, payInfo, ss.decomp); err != nil {
if err := recv(ss.p, ss.codec, ss.s, ss.dc, m, ss.maxReceiveMessageSize, payInfo, ss.decomp, true); err != nil {
if err == io.EOF {
if len(ss.binlogs) != 0 {
chc := &binarylog.ClientHalfClose{}
@ -1733,11 +1791,9 @@ func (ss *serverStream) RecvMsg(m any) (err error) {
if len(ss.statsHandler) != 0 {
for _, sh := range ss.statsHandler {
sh.HandleRPC(ss.s.Context(), &stats.InPayload{
RecvTime: time.Now(),
Payload: m,
// TODO truncate large payload.
Data: payInfo.uncompressedBytes,
Length: len(payInfo.uncompressedBytes),
RecvTime: time.Now(),
Payload: m,
Length: payInfo.uncompressedBytes.Len(),
WireLength: payInfo.compressedLength + headerLen,
CompressedLength: payInfo.compressedLength,
})
@ -1745,7 +1801,7 @@ func (ss *serverStream) RecvMsg(m any) (err error) {
}
if len(ss.binlogs) != 0 {
cm := &binarylog.ClientMessage{
Message: payInfo.uncompressedBytes,
Message: payInfo.uncompressedBytes.Materialize(),
}
for _, binlog := range ss.binlogs {
binlog.Log(ss.ctx, cm)
@ -1760,23 +1816,26 @@ func MethodFromServerStream(stream ServerStream) (string, bool) {
return Method(stream.Context())
}
// prepareMsg returns the hdr, payload and data
// using the compressors passed or using the
// passed preparedmsg
func prepareMsg(m any, codec baseCodec, cp Compressor, comp encoding.Compressor) (hdr, payload, data []byte, err error) {
// prepareMsg returns the hdr, payload and data using the compressors passed or
// using the passed preparedmsg. The returned boolean indicates whether
// compression was made and therefore whether the payload needs to be freed in
// addition to the returned data. Freeing the payload if the returned boolean is
// false can lead to undefined behavior.
func prepareMsg(m any, codec baseCodec, cp Compressor, comp encoding.Compressor, pool mem.BufferPool) (hdr []byte, data, payload mem.BufferSlice, pf payloadFormat, err error) {
if preparedMsg, ok := m.(*PreparedMsg); ok {
return preparedMsg.hdr, preparedMsg.payload, preparedMsg.encodedData, nil
return preparedMsg.hdr, preparedMsg.encodedData, preparedMsg.payload, preparedMsg.pf, nil
}
// The input interface is not a prepared msg.
// Marshal and Compress the data at this point
data, err = encode(codec, m)
if err != nil {
return nil, nil, nil, err
return nil, nil, nil, 0, err
}
compData, err := compress(data, cp, comp)
compData, pf, err := compress(data, cp, comp, pool)
if err != nil {
return nil, nil, nil, err
data.Free()
return nil, nil, nil, 0, err
}
hdr, payload = msgHeader(data, compData)
return hdr, payload, data, nil
hdr, payload = msgHeader(data, compData, pf)
return hdr, data, payload, pf, nil
}