rebase: bump google.golang.org/grpc from 1.67.1 to 1.68.0

Bumps [google.golang.org/grpc](https://github.com/grpc/grpc-go) from 1.67.1 to 1.68.0.
- [Release notes](https://github.com/grpc/grpc-go/releases)
- [Commits](https://github.com/grpc/grpc-go/compare/v1.67.1...v1.68.0)

---
updated-dependencies:
- dependency-name: google.golang.org/grpc
  dependency-type: direct:production
  update-type: version-update:semver-minor
...

Signed-off-by: dependabot[bot] <support@github.com>
Signed-off-by: Praveen M <m.praveen@ibm.com>
This commit is contained in:
dependabot[bot]
2024-11-11 20:04:05 +00:00
committed by mergify[bot]
parent 925ea1970c
commit d651011026
34 changed files with 1207 additions and 314 deletions

View File

@ -86,9 +86,9 @@ type http2Client struct {
writerDone chan struct{} // sync point to enable testing.
// goAway is closed to notify the upper layer (i.e., addrConn.transportMonitor)
// that the server sent GoAway on this transport.
goAway chan struct{}
framer *framer
goAway chan struct{}
keepaliveDone chan struct{} // Closed when the keepalive goroutine exits.
framer *framer
// controlBuf delivers all the control related tasks (e.g., window
// updates, reset streams, and various settings) to the controller.
// Do not access controlBuf with mu held.
@ -335,6 +335,7 @@ func newHTTP2Client(connectCtx, ctx context.Context, addr resolver.Address, opts
readerDone: make(chan struct{}),
writerDone: make(chan struct{}),
goAway: make(chan struct{}),
keepaliveDone: make(chan struct{}),
framer: newFramer(conn, writeBufSize, readBufSize, opts.SharedWriteBuffer, maxHeaderListSize),
fc: &trInFlow{limit: uint32(icwz)},
scheme: scheme,
@ -527,8 +528,9 @@ func (t *http2Client) getPeer() *peer.Peer {
// to be the last frame loopy writes to the transport.
func (t *http2Client) outgoingGoAwayHandler(g *goAway) (bool, error) {
t.mu.Lock()
defer t.mu.Unlock()
if err := t.framer.fr.WriteGoAway(t.nextID-2, http2.ErrCodeNo, g.debugData); err != nil {
maxStreamID := t.nextID - 2
t.mu.Unlock()
if err := t.framer.fr.WriteGoAway(maxStreamID, http2.ErrCodeNo, g.debugData); err != nil {
return false, err
}
return false, g.closeConn
@ -1008,6 +1010,9 @@ func (t *http2Client) Close(err error) {
// should unblock it so that the goroutine eventually exits.
t.kpDormancyCond.Signal()
}
// Append info about previous goaways if there were any, since this may be important
// for understanding the root cause for this connection to be closed.
goAwayDebugMessage := t.goAwayDebugMessage
t.mu.Unlock()
// Per HTTP/2 spec, a GOAWAY frame must be sent before closing the
@ -1025,11 +1030,13 @@ func (t *http2Client) Close(err error) {
}
t.cancel()
t.conn.Close()
// Waits for the reader and keepalive goroutines to exit before returning to
// ensure all resources are cleaned up before Close can return.
<-t.readerDone
if t.keepaliveEnabled {
<-t.keepaliveDone
}
channelz.RemoveEntry(t.channelz.ID)
// Append info about previous goaways if there were any, since this may be important
// for understanding the root cause for this connection to be closed.
_, goAwayDebugMessage := t.GetGoAwayReason()
var st *status.Status
if len(goAwayDebugMessage) > 0 {
st = status.Newf(codes.Unavailable, "closing transport due to: %v, received prior goaway: %v", err, goAwayDebugMessage)
@ -1316,11 +1323,11 @@ func (t *http2Client) handlePing(f *http2.PingFrame) {
t.controlBuf.put(pingAck)
}
func (t *http2Client) handleGoAway(f *http2.GoAwayFrame) {
func (t *http2Client) handleGoAway(f *http2.GoAwayFrame) error {
t.mu.Lock()
if t.state == closing {
t.mu.Unlock()
return
return nil
}
if f.ErrCode == http2.ErrCodeEnhanceYourCalm && string(f.DebugData()) == "too_many_pings" {
// When a client receives a GOAWAY with error code ENHANCE_YOUR_CALM and debug
@ -1332,8 +1339,7 @@ func (t *http2Client) handleGoAway(f *http2.GoAwayFrame) {
id := f.LastStreamID
if id > 0 && id%2 == 0 {
t.mu.Unlock()
t.Close(connectionErrorf(true, nil, "received goaway with non-zero even-numbered stream id: %v", id))
return
return connectionErrorf(true, nil, "received goaway with non-zero even-numbered stream id: %v", id)
}
// A client can receive multiple GoAways from the server (see
// https://github.com/grpc/grpc-go/issues/1387). The idea is that the first
@ -1350,8 +1356,7 @@ func (t *http2Client) handleGoAway(f *http2.GoAwayFrame) {
// If there are multiple GoAways the first one should always have an ID greater than the following ones.
if id > t.prevGoAwayID {
t.mu.Unlock()
t.Close(connectionErrorf(true, nil, "received goaway with stream id: %v, which exceeds stream id of previous goaway: %v", id, t.prevGoAwayID))
return
return connectionErrorf(true, nil, "received goaway with stream id: %v, which exceeds stream id of previous goaway: %v", id, t.prevGoAwayID)
}
default:
t.setGoAwayReason(f)
@ -1375,8 +1380,7 @@ func (t *http2Client) handleGoAway(f *http2.GoAwayFrame) {
t.prevGoAwayID = id
if len(t.activeStreams) == 0 {
t.mu.Unlock()
t.Close(connectionErrorf(true, nil, "received goaway and there are no active streams"))
return
return connectionErrorf(true, nil, "received goaway and there are no active streams")
}
streamsToClose := make([]*Stream, 0)
@ -1393,6 +1397,7 @@ func (t *http2Client) handleGoAway(f *http2.GoAwayFrame) {
for _, stream := range streamsToClose {
t.closeStream(stream, errStreamDrain, false, http2.ErrCodeNo, statusGoAway, nil, false)
}
return nil
}
// setGoAwayReason sets the value of t.goAwayReason based
@ -1628,7 +1633,13 @@ func (t *http2Client) readServerPreface() error {
// network connection. If the server preface is not read successfully, an
// error is pushed to errCh; otherwise errCh is closed with no error.
func (t *http2Client) reader(errCh chan<- error) {
defer close(t.readerDone)
var errClose error
defer func() {
close(t.readerDone)
if errClose != nil {
t.Close(errClose)
}
}()
if err := t.readServerPreface(); err != nil {
errCh <- err
@ -1669,7 +1680,7 @@ func (t *http2Client) reader(errCh chan<- error) {
continue
}
// Transport error.
t.Close(connectionErrorf(true, err, "error reading from server: %v", err))
errClose = connectionErrorf(true, err, "error reading from server: %v", err)
return
}
switch frame := frame.(type) {
@ -1684,7 +1695,7 @@ func (t *http2Client) reader(errCh chan<- error) {
case *http2.PingFrame:
t.handlePing(frame)
case *http2.GoAwayFrame:
t.handleGoAway(frame)
errClose = t.handleGoAway(frame)
case *http2.WindowUpdateFrame:
t.handleWindowUpdate(frame)
default:
@ -1697,6 +1708,13 @@ func (t *http2Client) reader(errCh chan<- error) {
// keepalive running in a separate goroutine makes sure the connection is alive by sending pings.
func (t *http2Client) keepalive() {
var err error
defer func() {
close(t.keepaliveDone)
if err != nil {
t.Close(err)
}
}()
p := &ping{data: [8]byte{}}
// True iff a ping has been sent, and no data has been received since then.
outstandingPing := false
@ -1720,7 +1738,7 @@ func (t *http2Client) keepalive() {
continue
}
if outstandingPing && timeoutLeft <= 0 {
t.Close(connectionErrorf(true, nil, "keepalive ping failed to receive ACK within timeout"))
err = connectionErrorf(true, nil, "keepalive ping failed to receive ACK within timeout")
return
}
t.mu.Lock()

View File

@ -547,6 +547,15 @@ func (s *Stream) write(m recvMsg) {
s.buf.put(m)
}
// ReadHeader reads data into the provided header slice from the stream. It
// first checks if there was an error during a previous read operation and
// returns it if present. It then requests a read operation for the length of
// the header. It continues to read from the stream until the entire header
// slice is filled or an error occurs. If an `io.EOF` error is encountered
// with partially read data, it is converted to `io.ErrUnexpectedEOF` to
// indicate an unexpected end of the stream. The method returns any error
// encountered during the read process or nil if the header was successfully
// read.
func (s *Stream) ReadHeader(header []byte) (err error) {
// Don't request a read if there was an error earlier
if er := s.trReader.er; er != nil {