rebase: bump google.golang.org/grpc from 1.47.0 to 1.48.0

Bumps [google.golang.org/grpc](https://github.com/grpc/grpc-go) from 1.47.0 to 1.48.0.
- [Release notes](https://github.com/grpc/grpc-go/releases)
- [Commits](https://github.com/grpc/grpc-go/compare/v1.47.0...v1.48.0)

---
updated-dependencies:
- dependency-name: google.golang.org/grpc
  dependency-type: direct:production
  update-type: version-update:semver-minor
...

Signed-off-by: dependabot[bot] <support@github.com>
This commit is contained in:
dependabot[bot]
2022-07-18 21:03:18 +00:00
committed by mergify[bot]
parent 04889e66db
commit 33d4d54dbe
20 changed files with 264 additions and 131 deletions

View File

@ -49,7 +49,7 @@ import (
// NewServerHandlerTransport returns a ServerTransport handling gRPC
// from inside an http.Handler. It requires that the http Server
// supports HTTP/2.
func NewServerHandlerTransport(w http.ResponseWriter, r *http.Request, stats stats.Handler) (ServerTransport, error) {
func NewServerHandlerTransport(w http.ResponseWriter, r *http.Request, stats []stats.Handler) (ServerTransport, error) {
if r.ProtoMajor != 2 {
return nil, errors.New("gRPC requires HTTP/2")
}
@ -138,7 +138,7 @@ type serverHandlerTransport struct {
// TODO make sure this is consistent across handler_server and http2_server
contentSubtype string
stats stats.Handler
stats []stats.Handler
}
func (ht *serverHandlerTransport) Close() {
@ -228,10 +228,10 @@ func (ht *serverHandlerTransport) WriteStatus(s *Stream, st *status.Status) erro
})
if err == nil { // transport has not been closed
if ht.stats != nil {
// Note: The trailer fields are compressed with hpack after this call returns.
// No WireLength field is set here.
ht.stats.HandleRPC(s.Context(), &stats.OutTrailer{
// Note: The trailer fields are compressed with hpack after this call returns.
// No WireLength field is set here.
for _, sh := range ht.stats {
sh.HandleRPC(s.Context(), &stats.OutTrailer{
Trailer: s.trailer.Copy(),
})
}
@ -314,10 +314,10 @@ func (ht *serverHandlerTransport) WriteHeader(s *Stream, md metadata.MD) error {
})
if err == nil {
if ht.stats != nil {
for _, sh := range ht.stats {
// Note: The header fields are compressed with hpack after this call returns.
// No WireLength field is set here.
ht.stats.HandleRPC(s.Context(), &stats.OutHeader{
sh.HandleRPC(s.Context(), &stats.OutHeader{
Header: md.Copy(),
Compression: s.sendCompress,
})
@ -369,14 +369,14 @@ func (ht *serverHandlerTransport) HandleStreams(startStream func(*Stream), trace
}
ctx = metadata.NewIncomingContext(ctx, ht.headerMD)
s.ctx = peer.NewContext(ctx, pr)
if ht.stats != nil {
s.ctx = ht.stats.TagRPC(s.ctx, &stats.RPCTagInfo{FullMethodName: s.method})
for _, sh := range ht.stats {
s.ctx = sh.TagRPC(s.ctx, &stats.RPCTagInfo{FullMethodName: s.method})
inHeader := &stats.InHeader{
FullMethod: s.method,
RemoteAddr: ht.RemoteAddr(),
Compression: s.recvCompress,
}
ht.stats.HandleRPC(s.ctx, inHeader)
sh.HandleRPC(s.ctx, inHeader)
}
s.trReader = &transportReader{
reader: &recvBufferReader{ctx: s.ctx, ctxDone: s.ctx.Done(), recv: s.buf, freeBuffer: func(*bytes.Buffer) {}},

View File

@ -90,7 +90,7 @@ type http2Client struct {
kp keepalive.ClientParameters
keepaliveEnabled bool
statsHandler stats.Handler
statsHandlers []stats.Handler
initialWindowSize int32
@ -311,7 +311,7 @@ func newHTTP2Client(connectCtx, ctx context.Context, addr resolver.Address, opts
isSecure: isSecure,
perRPCCreds: perRPCCreds,
kp: kp,
statsHandler: opts.StatsHandler,
statsHandlers: opts.StatsHandlers,
initialWindowSize: initialWindowSize,
onPrefaceReceipt: onPrefaceReceipt,
nextID: 1,
@ -341,15 +341,15 @@ func newHTTP2Client(connectCtx, ctx context.Context, addr resolver.Address, opts
updateFlowControl: t.updateFlowControl,
}
}
if t.statsHandler != nil {
t.ctx = t.statsHandler.TagConn(t.ctx, &stats.ConnTagInfo{
for _, sh := range t.statsHandlers {
t.ctx = sh.TagConn(t.ctx, &stats.ConnTagInfo{
RemoteAddr: t.remoteAddr,
LocalAddr: t.localAddr,
})
connBegin := &stats.ConnBegin{
Client: true,
}
t.statsHandler.HandleConn(t.ctx, connBegin)
sh.HandleConn(t.ctx, connBegin)
}
t.channelzID, err = channelz.RegisterNormalSocket(t, opts.ChannelzParentID, fmt.Sprintf("%s -> %s", t.localAddr, t.remoteAddr))
if err != nil {
@ -773,24 +773,27 @@ func (t *http2Client) NewStream(ctx context.Context, callHdr *CallHdr) (*Stream,
return nil, &NewStreamError{Err: ErrConnClosing, AllowTransparentRetry: true}
}
}
if t.statsHandler != nil {
if len(t.statsHandlers) != 0 {
header, ok := metadata.FromOutgoingContext(ctx)
if ok {
header.Set("user-agent", t.userAgent)
} else {
header = metadata.Pairs("user-agent", t.userAgent)
}
// Note: The header fields are compressed with hpack after this call returns.
// No WireLength field is set here.
outHeader := &stats.OutHeader{
Client: true,
FullMethod: callHdr.Method,
RemoteAddr: t.remoteAddr,
LocalAddr: t.localAddr,
Compression: callHdr.SendCompress,
Header: header,
for _, sh := range t.statsHandlers {
// Note: The header fields are compressed with hpack after this call returns.
// No WireLength field is set here.
// Note: Creating a new stats object to prevent pollution.
outHeader := &stats.OutHeader{
Client: true,
FullMethod: callHdr.Method,
RemoteAddr: t.remoteAddr,
LocalAddr: t.localAddr,
Compression: callHdr.SendCompress,
Header: header,
}
sh.HandleRPC(s.ctx, outHeader)
}
t.statsHandler.HandleRPC(s.ctx, outHeader)
}
return s, nil
}
@ -916,11 +919,11 @@ func (t *http2Client) Close(err error) {
for _, s := range streams {
t.closeStream(s, err, false, http2.ErrCodeNo, st, nil, false)
}
if t.statsHandler != nil {
for _, sh := range t.statsHandlers {
connEnd := &stats.ConnEnd{
Client: true,
}
t.statsHandler.HandleConn(t.ctx, connEnd)
sh.HandleConn(t.ctx, connEnd)
}
}
@ -1432,7 +1435,7 @@ func (t *http2Client) operateHeaders(frame *http2.MetaHeadersFrame) {
close(s.headerChan)
}
if t.statsHandler != nil {
for _, sh := range t.statsHandlers {
if isHeader {
inHeader := &stats.InHeader{
Client: true,
@ -1440,14 +1443,14 @@ func (t *http2Client) operateHeaders(frame *http2.MetaHeadersFrame) {
Header: metadata.MD(mdata).Copy(),
Compression: s.recvCompress,
}
t.statsHandler.HandleRPC(s.ctx, inHeader)
sh.HandleRPC(s.ctx, inHeader)
} else {
inTrailer := &stats.InTrailer{
Client: true,
WireLength: int(frame.Header().Length),
Trailer: metadata.MD(mdata).Copy(),
}
t.statsHandler.HandleRPC(s.ctx, inTrailer)
sh.HandleRPC(s.ctx, inTrailer)
}
}

View File

@ -82,7 +82,7 @@ type http2Server struct {
// updates, reset streams, and various settings) to the controller.
controlBuf *controlBuffer
fc *trInFlow
stats stats.Handler
stats []stats.Handler
// Keepalive and max-age parameters for the server.
kp keepalive.ServerParameters
// Keepalive enforcement policy.
@ -257,7 +257,7 @@ func NewServerTransport(conn net.Conn, config *ServerConfig) (_ ServerTransport,
fc: &trInFlow{limit: uint32(icwz)},
state: reachable,
activeStreams: make(map[uint32]*Stream),
stats: config.StatsHandler,
stats: config.StatsHandlers,
kp: kp,
idle: time.Now(),
kep: kep,
@ -272,13 +272,13 @@ func NewServerTransport(conn net.Conn, config *ServerConfig) (_ ServerTransport,
updateFlowControl: t.updateFlowControl,
}
}
if t.stats != nil {
t.ctx = t.stats.TagConn(t.ctx, &stats.ConnTagInfo{
for _, sh := range t.stats {
t.ctx = sh.TagConn(t.ctx, &stats.ConnTagInfo{
RemoteAddr: t.remoteAddr,
LocalAddr: t.localAddr,
})
connBegin := &stats.ConnBegin{}
t.stats.HandleConn(t.ctx, connBegin)
sh.HandleConn(t.ctx, connBegin)
}
t.channelzID, err = channelz.RegisterNormalSocket(t, config.ChannelzParentID, fmt.Sprintf("%s -> %s", t.remoteAddr, t.localAddr))
if err != nil {
@ -570,8 +570,8 @@ func (t *http2Server) operateHeaders(frame *http2.MetaHeadersFrame, handle func(
t.adjustWindow(s, uint32(n))
}
s.ctx = traceCtx(s.ctx, s.method)
if t.stats != nil {
s.ctx = t.stats.TagRPC(s.ctx, &stats.RPCTagInfo{FullMethodName: s.method})
for _, sh := range t.stats {
s.ctx = sh.TagRPC(s.ctx, &stats.RPCTagInfo{FullMethodName: s.method})
inHeader := &stats.InHeader{
FullMethod: s.method,
RemoteAddr: t.remoteAddr,
@ -580,7 +580,7 @@ func (t *http2Server) operateHeaders(frame *http2.MetaHeadersFrame, handle func(
WireLength: int(frame.Header().Length),
Header: metadata.MD(mdata).Copy(),
}
t.stats.HandleRPC(s.ctx, inHeader)
sh.HandleRPC(s.ctx, inHeader)
}
s.ctxDone = s.ctx.Done()
s.wq = newWriteQuota(defaultWriteQuota, s.ctxDone)
@ -996,14 +996,14 @@ func (t *http2Server) writeHeaderLocked(s *Stream) error {
t.closeStream(s, true, http2.ErrCodeInternal, false)
return ErrHeaderListSizeLimitViolation
}
if t.stats != nil {
for _, sh := range t.stats {
// Note: Headers are compressed with hpack after this call returns.
// No WireLength field is set here.
outHeader := &stats.OutHeader{
Header: s.header.Copy(),
Compression: s.sendCompress,
}
t.stats.HandleRPC(s.Context(), outHeader)
sh.HandleRPC(s.Context(), outHeader)
}
return nil
}
@ -1064,10 +1064,10 @@ func (t *http2Server) WriteStatus(s *Stream, st *status.Status) error {
// Send a RST_STREAM after the trailers if the client has not already half-closed.
rst := s.getState() == streamActive
t.finishStream(s, rst, http2.ErrCodeNo, trailingHeader, true)
if t.stats != nil {
for _, sh := range t.stats {
// Note: The trailer fields are compressed with hpack after this call returns.
// No WireLength field is set here.
t.stats.HandleRPC(s.Context(), &stats.OutTrailer{
sh.HandleRPC(s.Context(), &stats.OutTrailer{
Trailer: s.trailer.Copy(),
})
}
@ -1222,9 +1222,9 @@ func (t *http2Server) Close() {
for _, s := range streams {
s.cancel()
}
if t.stats != nil {
for _, sh := range t.stats {
connEnd := &stats.ConnEnd{}
t.stats.HandleConn(t.ctx, connEnd)
sh.HandleConn(t.ctx, connEnd)
}
}

View File

@ -322,8 +322,6 @@ type bufWriter struct {
batchSize int
conn net.Conn
err error
onFlush func()
}
func newBufWriter(conn net.Conn, batchSize int) *bufWriter {
@ -360,9 +358,6 @@ func (w *bufWriter) Flush() error {
if w.offset == 0 {
return nil
}
if w.onFlush != nil {
w.onFlush()
}
_, w.err = w.conn.Write(w.buf[:w.offset])
w.offset = 0
return w.err

View File

@ -523,7 +523,7 @@ type ServerConfig struct {
ConnectionTimeout time.Duration
Credentials credentials.TransportCredentials
InTapHandle tap.ServerInHandle
StatsHandler stats.Handler
StatsHandlers []stats.Handler
KeepaliveParams keepalive.ServerParameters
KeepalivePolicy keepalive.EnforcementPolicy
InitialWindowSize int32
@ -553,8 +553,8 @@ type ConnectOptions struct {
CredsBundle credentials.Bundle
// KeepaliveParams stores the keepalive parameters.
KeepaliveParams keepalive.ClientParameters
// StatsHandler stores the handler for stats.
StatsHandler stats.Handler
// StatsHandlers stores the handler for stats.
StatsHandlers []stats.Handler
// InitialWindowSize sets the initial window size for a stream.
InitialWindowSize int32
// InitialConnWindowSize sets the initial window size for a connection.