mirror of
https://github.com/ceph/ceph-csi.git
synced 2025-06-13 10:33:35 +00:00
rebase: bump google.golang.org/grpc from 1.62.0 to 1.62.1
Bumps [google.golang.org/grpc](https://github.com/grpc/grpc-go) from 1.62.0 to 1.62.1. - [Release notes](https://github.com/grpc/grpc-go/releases) - [Commits](https://github.com/grpc/grpc-go/compare/v1.62.0...v1.62.1) --- updated-dependencies: - dependency-name: google.golang.org/grpc dependency-type: direct:production update-type: version-update:semver-patch ... Signed-off-by: dependabot[bot] <support@github.com>
This commit is contained in:
committed by
mergify[bot]
parent
fcaac58a1e
commit
d93c75517e
54
vendor/google.golang.org/grpc/rpc_util.go
generated
vendored
54
vendor/google.golang.org/grpc/rpc_util.go
generated
vendored
@ -744,17 +744,19 @@ type payloadInfo struct {
|
||||
uncompressedBytes []byte
|
||||
}
|
||||
|
||||
func recvAndDecompress(p *parser, s *transport.Stream, dc Decompressor, maxReceiveMessageSize int, payInfo *payloadInfo, compressor encoding.Compressor) ([]byte, error) {
|
||||
pf, buf, err := p.recvMsg(maxReceiveMessageSize)
|
||||
// recvAndDecompress reads a message from the stream, decompressing it if necessary.
|
||||
//
|
||||
// Cancelling the returned cancel function releases the buffer back to the pool. So the caller should cancel as soon as
|
||||
// the buffer is no longer needed.
|
||||
func recvAndDecompress(p *parser, s *transport.Stream, dc Decompressor, maxReceiveMessageSize int, payInfo *payloadInfo, compressor encoding.Compressor,
|
||||
) (uncompressedBuf []byte, cancel func(), err error) {
|
||||
pf, compressedBuf, err := p.recvMsg(maxReceiveMessageSize)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if payInfo != nil {
|
||||
payInfo.compressedLength = len(buf)
|
||||
return nil, nil, err
|
||||
}
|
||||
|
||||
if st := checkRecvPayload(pf, s.RecvCompress(), compressor != nil || dc != nil); st != nil {
|
||||
return nil, st.Err()
|
||||
return nil, nil, st.Err()
|
||||
}
|
||||
|
||||
var size int
|
||||
@ -762,21 +764,35 @@ func recvAndDecompress(p *parser, s *transport.Stream, dc Decompressor, maxRecei
|
||||
// To match legacy behavior, if the decompressor is set by WithDecompressor or RPCDecompressor,
|
||||
// use this decompressor as the default.
|
||||
if dc != nil {
|
||||
buf, err = dc.Do(bytes.NewReader(buf))
|
||||
size = len(buf)
|
||||
uncompressedBuf, err = dc.Do(bytes.NewReader(compressedBuf))
|
||||
size = len(uncompressedBuf)
|
||||
} else {
|
||||
buf, size, err = decompress(compressor, buf, maxReceiveMessageSize)
|
||||
uncompressedBuf, size, err = decompress(compressor, compressedBuf, maxReceiveMessageSize)
|
||||
}
|
||||
if err != nil {
|
||||
return nil, status.Errorf(codes.Internal, "grpc: failed to decompress the received message: %v", err)
|
||||
return nil, nil, status.Errorf(codes.Internal, "grpc: failed to decompress the received message: %v", err)
|
||||
}
|
||||
if size > maxReceiveMessageSize {
|
||||
// TODO: Revisit the error code. Currently keep it consistent with java
|
||||
// implementation.
|
||||
return nil, status.Errorf(codes.ResourceExhausted, "grpc: received message after decompression larger than max (%d vs. %d)", size, maxReceiveMessageSize)
|
||||
return nil, nil, status.Errorf(codes.ResourceExhausted, "grpc: received message after decompression larger than max (%d vs. %d)", size, maxReceiveMessageSize)
|
||||
}
|
||||
} else {
|
||||
uncompressedBuf = compressedBuf
|
||||
}
|
||||
|
||||
if payInfo != nil {
|
||||
payInfo.compressedLength = len(compressedBuf)
|
||||
payInfo.uncompressedBytes = uncompressedBuf
|
||||
|
||||
cancel = func() {}
|
||||
} else {
|
||||
cancel = func() {
|
||||
p.recvBufferPool.Put(&compressedBuf)
|
||||
}
|
||||
}
|
||||
return buf, nil
|
||||
|
||||
return uncompressedBuf, cancel, nil
|
||||
}
|
||||
|
||||
// Using compressor, decompress d, returning data and size.
|
||||
@ -796,6 +812,9 @@ func decompress(compressor encoding.Compressor, d []byte, maxReceiveMessageSize
|
||||
// size is used as an estimate to size the buffer, but we
|
||||
// will read more data if available.
|
||||
// +MinRead so ReadFrom will not reallocate if size is correct.
|
||||
//
|
||||
// TODO: If we ensure that the buffer size is the same as the DecompressedSize,
|
||||
// we can also utilize the recv buffer pool here.
|
||||
buf := bytes.NewBuffer(make([]byte, 0, size+bytes.MinRead))
|
||||
bytesRead, err := buf.ReadFrom(io.LimitReader(dcReader, int64(maxReceiveMessageSize)+1))
|
||||
return buf.Bytes(), int(bytesRead), err
|
||||
@ -811,18 +830,15 @@ func decompress(compressor encoding.Compressor, d []byte, maxReceiveMessageSize
|
||||
// dc takes precedence over compressor.
|
||||
// TODO(dfawley): wrap the old compressor/decompressor using the new API?
|
||||
func recv(p *parser, c baseCodec, s *transport.Stream, dc Decompressor, m any, maxReceiveMessageSize int, payInfo *payloadInfo, compressor encoding.Compressor) error {
|
||||
buf, err := recvAndDecompress(p, s, dc, maxReceiveMessageSize, payInfo, compressor)
|
||||
buf, cancel, err := recvAndDecompress(p, s, dc, maxReceiveMessageSize, payInfo, compressor)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer cancel()
|
||||
|
||||
if err := c.Unmarshal(buf, m); err != nil {
|
||||
return status.Errorf(codes.Internal, "grpc: failed to unmarshal the received message: %v", err)
|
||||
}
|
||||
if payInfo != nil {
|
||||
payInfo.uncompressedBytes = buf
|
||||
} else {
|
||||
p.recvBufferPool.Put(&buf)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
|
Reference in New Issue
Block a user