rebase: Bump google.golang.org/grpc from 1.58.3 to 1.59.0

Bumps [google.golang.org/grpc](https://github.com/grpc/grpc-go) from 1.58.3 to 1.59.0.
- [Release notes](https://github.com/grpc/grpc-go/releases)
- [Commits](https://github.com/grpc/grpc-go/compare/v1.58.3...v1.59.0)

---
updated-dependencies:
- dependency-name: google.golang.org/grpc
  dependency-type: direct:production
  update-type: version-update:semver-minor
...

Signed-off-by: dependabot[bot] <support@github.com>
This commit is contained in:
dependabot[bot]
2023-10-25 10:07:57 +00:00
committed by mergify[bot]
parent 852b829fa9
commit c0085e5f9b
25 changed files with 277 additions and 177 deletions

View File

@ -983,7 +983,7 @@ func (s *Server) serveStreams(st transport.ServerTransport) {
f := func() {
defer streamQuota.release()
defer wg.Done()
s.handleStream(st, stream, s.traceInfo(st, stream))
s.handleStream(st, stream)
}
if s.opts.numServerWorkers > 0 {
@ -995,12 +995,6 @@ func (s *Server) serveStreams(st transport.ServerTransport) {
}
}
go f()
}, func(ctx context.Context, method string) context.Context {
if !EnableTracing {
return ctx
}
tr := trace.New("grpc.Recv."+methodFamily(method), method)
return trace.NewContext(ctx, tr)
})
wg.Wait()
}
@ -1049,30 +1043,6 @@ func (s *Server) ServeHTTP(w http.ResponseWriter, r *http.Request) {
s.serveStreams(st)
}
// traceInfo returns a traceInfo and associates it with stream, if tracing is enabled.
// If tracing is not enabled, it returns nil.
func (s *Server) traceInfo(st transport.ServerTransport, stream *transport.Stream) (trInfo *traceInfo) {
if !EnableTracing {
return nil
}
tr, ok := trace.FromContext(stream.Context())
if !ok {
return nil
}
trInfo = &traceInfo{
tr: tr,
firstLine: firstLine{
client: false,
remoteAddr: st.RemoteAddr(),
},
}
if dl, ok := stream.Context().Deadline(); ok {
trInfo.firstLine.deadline = time.Until(dl)
}
return trInfo
}
func (s *Server) addConn(addr string, st transport.ServerTransport) bool {
s.mu.Lock()
defer s.mu.Unlock()
@ -1133,7 +1103,7 @@ func (s *Server) incrCallsFailed() {
atomic.AddInt64(&s.czData.callsFailed, 1)
}
func (s *Server) sendResponse(t transport.ServerTransport, stream *transport.Stream, msg any, cp Compressor, opts *transport.Options, comp encoding.Compressor) error {
func (s *Server) sendResponse(ctx context.Context, t transport.ServerTransport, stream *transport.Stream, msg any, cp Compressor, opts *transport.Options, comp encoding.Compressor) error {
data, err := encode(s.getCodec(stream.ContentSubtype()), msg)
if err != nil {
channelz.Error(logger, s.channelzID, "grpc: server failed to encode response: ", err)
@ -1152,7 +1122,7 @@ func (s *Server) sendResponse(t transport.ServerTransport, stream *transport.Str
err = t.Write(stream, hdr, payload, opts)
if err == nil {
for _, sh := range s.opts.statsHandlers {
sh.HandleRPC(stream.Context(), outPayload(false, msg, data, payload, time.Now()))
sh.HandleRPC(ctx, outPayload(false, msg, data, payload, time.Now()))
}
}
return err
@ -1194,7 +1164,7 @@ func getChainUnaryHandler(interceptors []UnaryServerInterceptor, curr int, info
}
}
func (s *Server) processUnaryRPC(t transport.ServerTransport, stream *transport.Stream, info *serviceInfo, md *MethodDesc, trInfo *traceInfo) (err error) {
func (s *Server) processUnaryRPC(ctx context.Context, t transport.ServerTransport, stream *transport.Stream, info *serviceInfo, md *MethodDesc, trInfo *traceInfo) (err error) {
shs := s.opts.statsHandlers
if len(shs) != 0 || trInfo != nil || channelz.IsOn() {
if channelz.IsOn() {
@ -1208,7 +1178,7 @@ func (s *Server) processUnaryRPC(t transport.ServerTransport, stream *transport.
IsClientStream: false,
IsServerStream: false,
}
sh.HandleRPC(stream.Context(), statsBegin)
sh.HandleRPC(ctx, statsBegin)
}
if trInfo != nil {
trInfo.tr.LazyLog(&trInfo.firstLine, false)
@ -1240,7 +1210,7 @@ func (s *Server) processUnaryRPC(t transport.ServerTransport, stream *transport.
if err != nil && err != io.EOF {
end.Error = toRPCErr(err)
}
sh.HandleRPC(stream.Context(), end)
sh.HandleRPC(ctx, end)
}
if channelz.IsOn() {
@ -1262,7 +1232,6 @@ func (s *Server) processUnaryRPC(t transport.ServerTransport, stream *transport.
}
}
if len(binlogs) != 0 {
ctx := stream.Context()
md, _ := metadata.FromIncomingContext(ctx)
logEntry := &binarylog.ClientHeader{
Header: md,
@ -1348,7 +1317,7 @@ func (s *Server) processUnaryRPC(t transport.ServerTransport, stream *transport.
return status.Errorf(codes.Internal, "grpc: error unmarshalling request: %v", err)
}
for _, sh := range shs {
sh.HandleRPC(stream.Context(), &stats.InPayload{
sh.HandleRPC(ctx, &stats.InPayload{
RecvTime: time.Now(),
Payload: v,
Length: len(d),
@ -1362,7 +1331,7 @@ func (s *Server) processUnaryRPC(t transport.ServerTransport, stream *transport.
Message: d,
}
for _, binlog := range binlogs {
binlog.Log(stream.Context(), cm)
binlog.Log(ctx, cm)
}
}
if trInfo != nil {
@ -1370,7 +1339,7 @@ func (s *Server) processUnaryRPC(t transport.ServerTransport, stream *transport.
}
return nil
}
ctx := NewContextWithServerTransportStream(stream.Context(), stream)
ctx = NewContextWithServerTransportStream(ctx, stream)
reply, appErr := md.Handler(info.serviceImpl, ctx, df, s.opts.unaryInt)
if appErr != nil {
appStatus, ok := status.FromError(appErr)
@ -1395,7 +1364,7 @@ func (s *Server) processUnaryRPC(t transport.ServerTransport, stream *transport.
Header: h,
}
for _, binlog := range binlogs {
binlog.Log(stream.Context(), sh)
binlog.Log(ctx, sh)
}
}
st := &binarylog.ServerTrailer{
@ -1403,7 +1372,7 @@ func (s *Server) processUnaryRPC(t transport.ServerTransport, stream *transport.
Err: appErr,
}
for _, binlog := range binlogs {
binlog.Log(stream.Context(), st)
binlog.Log(ctx, st)
}
}
return appErr
@ -1418,7 +1387,7 @@ func (s *Server) processUnaryRPC(t transport.ServerTransport, stream *transport.
if stream.SendCompress() != sendCompressorName {
comp = encoding.GetCompressor(stream.SendCompress())
}
if err := s.sendResponse(t, stream, reply, cp, opts, comp); err != nil {
if err := s.sendResponse(ctx, t, stream, reply, cp, opts, comp); err != nil {
if err == io.EOF {
// The entire stream is done (for unary RPC only).
return err
@ -1445,8 +1414,8 @@ func (s *Server) processUnaryRPC(t transport.ServerTransport, stream *transport.
Err: appErr,
}
for _, binlog := range binlogs {
binlog.Log(stream.Context(), sh)
binlog.Log(stream.Context(), st)
binlog.Log(ctx, sh)
binlog.Log(ctx, st)
}
}
return err
@ -1460,8 +1429,8 @@ func (s *Server) processUnaryRPC(t transport.ServerTransport, stream *transport.
Message: reply,
}
for _, binlog := range binlogs {
binlog.Log(stream.Context(), sh)
binlog.Log(stream.Context(), sm)
binlog.Log(ctx, sh)
binlog.Log(ctx, sm)
}
}
if channelz.IsOn() {
@ -1479,7 +1448,7 @@ func (s *Server) processUnaryRPC(t transport.ServerTransport, stream *transport.
Err: appErr,
}
for _, binlog := range binlogs {
binlog.Log(stream.Context(), st)
binlog.Log(ctx, st)
}
}
return t.WriteStatus(stream, statusOK)
@ -1521,7 +1490,7 @@ func getChainStreamHandler(interceptors []StreamServerInterceptor, curr int, inf
}
}
func (s *Server) processStreamingRPC(t transport.ServerTransport, stream *transport.Stream, info *serviceInfo, sd *StreamDesc, trInfo *traceInfo) (err error) {
func (s *Server) processStreamingRPC(ctx context.Context, t transport.ServerTransport, stream *transport.Stream, info *serviceInfo, sd *StreamDesc, trInfo *traceInfo) (err error) {
if channelz.IsOn() {
s.incrCallsStarted()
}
@ -1535,10 +1504,10 @@ func (s *Server) processStreamingRPC(t transport.ServerTransport, stream *transp
IsServerStream: sd.ServerStreams,
}
for _, sh := range shs {
sh.HandleRPC(stream.Context(), statsBegin)
sh.HandleRPC(ctx, statsBegin)
}
}
ctx := NewContextWithServerTransportStream(stream.Context(), stream)
ctx = NewContextWithServerTransportStream(ctx, stream)
ss := &serverStream{
ctx: ctx,
t: t,
@ -1574,7 +1543,7 @@ func (s *Server) processStreamingRPC(t transport.ServerTransport, stream *transp
end.Error = toRPCErr(err)
}
for _, sh := range shs {
sh.HandleRPC(stream.Context(), end)
sh.HandleRPC(ctx, end)
}
}
@ -1616,7 +1585,7 @@ func (s *Server) processStreamingRPC(t transport.ServerTransport, stream *transp
logEntry.PeerAddr = peer.Addr
}
for _, binlog := range ss.binlogs {
binlog.Log(stream.Context(), logEntry)
binlog.Log(ctx, logEntry)
}
}
@ -1694,7 +1663,7 @@ func (s *Server) processStreamingRPC(t transport.ServerTransport, stream *transp
Err: appErr,
}
for _, binlog := range ss.binlogs {
binlog.Log(stream.Context(), st)
binlog.Log(ctx, st)
}
}
t.WriteStatus(ss.s, appStatus)
@ -1712,33 +1681,50 @@ func (s *Server) processStreamingRPC(t transport.ServerTransport, stream *transp
Err: appErr,
}
for _, binlog := range ss.binlogs {
binlog.Log(stream.Context(), st)
binlog.Log(ctx, st)
}
}
return t.WriteStatus(ss.s, statusOK)
}
func (s *Server) handleStream(t transport.ServerTransport, stream *transport.Stream, trInfo *traceInfo) {
func (s *Server) handleStream(t transport.ServerTransport, stream *transport.Stream) {
ctx := stream.Context()
var ti *traceInfo
if EnableTracing {
tr := trace.New("grpc.Recv."+methodFamily(stream.Method()), stream.Method())
ctx = trace.NewContext(ctx, tr)
ti = &traceInfo{
tr: tr,
firstLine: firstLine{
client: false,
remoteAddr: t.RemoteAddr(),
},
}
if dl, ok := ctx.Deadline(); ok {
ti.firstLine.deadline = time.Until(dl)
}
}
sm := stream.Method()
if sm != "" && sm[0] == '/' {
sm = sm[1:]
}
pos := strings.LastIndex(sm, "/")
if pos == -1 {
if trInfo != nil {
trInfo.tr.LazyLog(&fmtStringer{"Malformed method name %q", []any{sm}}, true)
trInfo.tr.SetError()
if ti != nil {
ti.tr.LazyLog(&fmtStringer{"Malformed method name %q", []any{sm}}, true)
ti.tr.SetError()
}
errDesc := fmt.Sprintf("malformed method name: %q", stream.Method())
if err := t.WriteStatus(stream, status.New(codes.Unimplemented, errDesc)); err != nil {
if trInfo != nil {
trInfo.tr.LazyLog(&fmtStringer{"%v", []any{err}}, true)
trInfo.tr.SetError()
if ti != nil {
ti.tr.LazyLog(&fmtStringer{"%v", []any{err}}, true)
ti.tr.SetError()
}
channelz.Warningf(logger, s.channelzID, "grpc: Server.handleStream failed to write status: %v", err)
}
if trInfo != nil {
trInfo.tr.Finish()
if ti != nil {
ti.tr.Finish()
}
return
}
@ -1748,17 +1734,17 @@ func (s *Server) handleStream(t transport.ServerTransport, stream *transport.Str
srv, knownService := s.services[service]
if knownService {
if md, ok := srv.methods[method]; ok {
s.processUnaryRPC(t, stream, srv, md, trInfo)
s.processUnaryRPC(ctx, t, stream, srv, md, ti)
return
}
if sd, ok := srv.streams[method]; ok {
s.processStreamingRPC(t, stream, srv, sd, trInfo)
s.processStreamingRPC(ctx, t, stream, srv, sd, ti)
return
}
}
// Unknown service, or known server unknown method.
if unknownDesc := s.opts.unknownStreamDesc; unknownDesc != nil {
s.processStreamingRPC(t, stream, nil, unknownDesc, trInfo)
s.processStreamingRPC(ctx, t, stream, nil, unknownDesc, ti)
return
}
var errDesc string
@ -1767,19 +1753,19 @@ func (s *Server) handleStream(t transport.ServerTransport, stream *transport.Str
} else {
errDesc = fmt.Sprintf("unknown method %v for service %v", method, service)
}
if trInfo != nil {
trInfo.tr.LazyPrintf("%s", errDesc)
trInfo.tr.SetError()
if ti != nil {
ti.tr.LazyPrintf("%s", errDesc)
ti.tr.SetError()
}
if err := t.WriteStatus(stream, status.New(codes.Unimplemented, errDesc)); err != nil {
if trInfo != nil {
trInfo.tr.LazyLog(&fmtStringer{"%v", []any{err}}, true)
trInfo.tr.SetError()
if ti != nil {
ti.tr.LazyLog(&fmtStringer{"%v", []any{err}}, true)
ti.tr.SetError()
}
channelz.Warningf(logger, s.channelzID, "grpc: Server.handleStream failed to write status: %v", err)
}
if trInfo != nil {
trInfo.tr.Finish()
if ti != nil {
ti.tr.Finish()
}
}