rebase: bump google.golang.org/grpc from 1.49.0 to 1.50.1

Bumps [google.golang.org/grpc](https://github.com/grpc/grpc-go) from 1.49.0 to 1.50.1.
- [Release notes](https://github.com/grpc/grpc-go/releases)
- [Commits](https://github.com/grpc/grpc-go/compare/v1.49.0...v1.50.1)

---
updated-dependencies:
- dependency-name: google.golang.org/grpc
  dependency-type: direct:production
  update-type: version-update:semver-minor
...

Signed-off-by: dependabot[bot] <support@github.com>
This commit is contained in:
dependabot[bot]
2022-11-08 08:23:05 +00:00
committed by mergify[bot]
parent fe13fff9fa
commit 076816d26e
24 changed files with 425 additions and 179 deletions

View File

@ -37,7 +37,7 @@ type Logger interface {
// binLogger is the global binary logger for the binary. One of this should be
// built at init time from the configuration (environment variable or flags).
//
// It is used to get a methodLogger for each individual method.
// It is used to get a MethodLogger for each individual method.
var binLogger Logger
var grpclogLogger = grpclog.Component("binarylog")
@ -56,11 +56,11 @@ func GetLogger() Logger {
return binLogger
}
// GetMethodLogger returns the methodLogger for the given methodName.
// GetMethodLogger returns the MethodLogger for the given methodName.
//
// methodName should be in the format of "/service/method".
//
// Each methodLogger returned by this method is a new instance. This is to
// Each MethodLogger returned by this method is a new instance. This is to
// generate sequence id within the call.
func GetMethodLogger(methodName string) MethodLogger {
if binLogger == nil {
@ -117,7 +117,7 @@ func (l *logger) setDefaultMethodLogger(ml *MethodLoggerConfig) error {
// Set method logger for "service/*".
//
// New methodLogger with same service overrides the old one.
// New MethodLogger with same service overrides the old one.
func (l *logger) setServiceMethodLogger(service string, ml *MethodLoggerConfig) error {
if _, ok := l.config.Services[service]; ok {
return fmt.Errorf("conflicting service rules for service %v found", service)
@ -131,7 +131,7 @@ func (l *logger) setServiceMethodLogger(service string, ml *MethodLoggerConfig)
// Set method logger for "service/method".
//
// New methodLogger with same method overrides the old one.
// New MethodLogger with same method overrides the old one.
func (l *logger) setMethodMethodLogger(method string, ml *MethodLoggerConfig) error {
if _, ok := l.config.Blacklist[method]; ok {
return fmt.Errorf("conflicting blacklist rules for method %v found", method)
@ -161,11 +161,11 @@ func (l *logger) setBlacklist(method string) error {
return nil
}
// getMethodLogger returns the methodLogger for the given methodName.
// getMethodLogger returns the MethodLogger for the given methodName.
//
// methodName should be in the format of "/service/method".
//
// Each methodLogger returned by this method is a new instance. This is to
// Each MethodLogger returned by this method is a new instance. This is to
// generate sequence id within the call.
func (l *logger) GetMethodLogger(methodName string) MethodLogger {
s, m, err := grpcutil.ParseMethod(methodName)
@ -174,16 +174,16 @@ func (l *logger) GetMethodLogger(methodName string) MethodLogger {
return nil
}
if ml, ok := l.config.Methods[s+"/"+m]; ok {
return newMethodLogger(ml.Header, ml.Message)
return NewTruncatingMethodLogger(ml.Header, ml.Message)
}
if _, ok := l.config.Blacklist[s+"/"+m]; ok {
return nil
}
if ml, ok := l.config.Services[s]; ok {
return newMethodLogger(ml.Header, ml.Message)
return NewTruncatingMethodLogger(ml.Header, ml.Message)
}
if l.config.All == nil {
return nil
}
return newMethodLogger(l.config.All.Header, l.config.All.Message)
return NewTruncatingMethodLogger(l.config.All.Header, l.config.All.Message)
}

View File

@ -57,7 +57,7 @@ func NewLoggerFromConfigString(s string) Logger {
return l
}
// fillMethodLoggerWithConfigString parses config, creates methodLogger and adds
// fillMethodLoggerWithConfigString parses config, creates TruncatingMethodLogger and adds
// it to the right map in the logger.
func (l *logger) fillMethodLoggerWithConfigString(config string) error {
// "" is invalid.

View File

@ -52,7 +52,9 @@ type MethodLogger interface {
Log(LogEntryConfig)
}
type methodLogger struct {
// TruncatingMethodLogger is a method logger that truncates headers and messages
// based on configured fields.
type TruncatingMethodLogger struct {
headerMaxLen, messageMaxLen uint64
callID uint64
@ -61,8 +63,9 @@ type methodLogger struct {
sink Sink // TODO(blog): make this plugable.
}
func newMethodLogger(h, m uint64) *methodLogger {
return &methodLogger{
// NewTruncatingMethodLogger returns a new truncating method logger.
func NewTruncatingMethodLogger(h, m uint64) *TruncatingMethodLogger {
return &TruncatingMethodLogger{
headerMaxLen: h,
messageMaxLen: m,
@ -75,8 +78,8 @@ func newMethodLogger(h, m uint64) *methodLogger {
// Build is an internal only method for building the proto message out of the
// input event. It's made public to enable other library to reuse as much logic
// in methodLogger as possible.
func (ml *methodLogger) Build(c LogEntryConfig) *pb.GrpcLogEntry {
// in TruncatingMethodLogger as possible.
func (ml *TruncatingMethodLogger) Build(c LogEntryConfig) *pb.GrpcLogEntry {
m := c.toProto()
timestamp, _ := ptypes.TimestampProto(time.Now())
m.Timestamp = timestamp
@ -95,11 +98,11 @@ func (ml *methodLogger) Build(c LogEntryConfig) *pb.GrpcLogEntry {
}
// Log creates a proto binary log entry, and logs it to the sink.
func (ml *methodLogger) Log(c LogEntryConfig) {
func (ml *TruncatingMethodLogger) Log(c LogEntryConfig) {
ml.sink.Write(ml.Build(c))
}
func (ml *methodLogger) truncateMetadata(mdPb *pb.Metadata) (truncated bool) {
func (ml *TruncatingMethodLogger) truncateMetadata(mdPb *pb.Metadata) (truncated bool) {
if ml.headerMaxLen == maxUInt {
return false
}
@ -129,7 +132,7 @@ func (ml *methodLogger) truncateMetadata(mdPb *pb.Metadata) (truncated bool) {
return truncated
}
func (ml *methodLogger) truncateMessage(msgPb *pb.Message) (truncated bool) {
func (ml *TruncatingMethodLogger) truncateMessage(msgPb *pb.Message) (truncated bool) {
if ml.messageMaxLen == maxUInt {
return false
}

View File

@ -0,0 +1,36 @@
/*
*
* Copyright 2022 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package envconfig
import "os"
const (
envObservabilityConfig = "GRPC_GCP_OBSERVABILITY_CONFIG"
envObservabilityConfigFile = "GRPC_GCP_OBSERVABILITY_CONFIG_FILE"
)
var (
// ObservabilityConfig is the json configuration for the gcp/observability
// package specified directly in the envObservabilityConfig env var.
ObservabilityConfig = os.Getenv(envObservabilityConfig)
// ObservabilityConfigFile is the json configuration for the
// gcp/observability specified in a file with the location specified in
// envObservabilityConfigFile env var.
ObservabilityConfigFile = os.Getenv(envObservabilityConfigFile)
)

View File

@ -41,6 +41,7 @@ const (
clientSideSecuritySupportEnv = "GRPC_XDS_EXPERIMENTAL_SECURITY_SUPPORT"
aggregateAndDNSSupportEnv = "GRPC_XDS_EXPERIMENTAL_ENABLE_AGGREGATE_AND_LOGICAL_DNS_CLUSTER"
rbacSupportEnv = "GRPC_XDS_EXPERIMENTAL_RBAC"
outlierDetectionSupportEnv = "GRPC_EXPERIMENTAL_ENABLE_OUTLIER_DETECTION"
federationEnv = "GRPC_EXPERIMENTAL_XDS_FEDERATION"
rlsInXDSEnv = "GRPC_EXPERIMENTAL_XDS_RLS_LB"
@ -83,9 +84,9 @@ var (
// "GRPC_XDS_EXPERIMENTAL_RBAC" to "false".
XDSRBAC = !strings.EqualFold(os.Getenv(rbacSupportEnv), "false")
// XDSOutlierDetection indicates whether outlier detection support is
// enabled, which can be enabled by setting the environment variable
// "GRPC_EXPERIMENTAL_ENABLE_OUTLIER_DETECTION" to "true".
XDSOutlierDetection = false
// enabled, which can be disabled by setting the environment variable
// "GRPC_EXPERIMENTAL_ENABLE_OUTLIER_DETECTION" to "false".
XDSOutlierDetection = !strings.EqualFold(os.Getenv(outlierDetectionSupportEnv), "false")
// XDSFederation indicates whether federation support is enabled.
XDSFederation = strings.EqualFold(os.Getenv(federationEnv), "true")

View File

@ -52,6 +52,13 @@ func Intn(n int) int {
return r.Intn(n)
}
// Int31n implements rand.Int31n on the grpcrand global source.
func Int31n(n int32) int32 {
mu.Lock()
defer mu.Unlock()
return r.Int31n(n)
}
// Float64 implements rand.Float64 on the grpcrand global source.
func Float64() float64 {
mu.Lock()

View File

@ -63,20 +63,30 @@ var (
// xDS-enabled server invokes this method on a grpc.Server when a particular
// listener moves to "not-serving" mode.
DrainServerTransports interface{} // func(*grpc.Server, string)
// AddExtraServerOptions adds an array of ServerOption that will be
// AddGlobalServerOptions adds an array of ServerOption that will be
// effective globally for newly created servers. The priority will be: 1.
// user-provided; 2. this method; 3. default values.
AddExtraServerOptions interface{} // func(opt ...ServerOption)
// ClearExtraServerOptions clears the array of extra ServerOption. This
AddGlobalServerOptions interface{} // func(opt ...ServerOption)
// ClearGlobalServerOptions clears the array of extra ServerOption. This
// method is useful in testing and benchmarking.
ClearExtraServerOptions func()
// AddExtraDialOptions adds an array of DialOption that will be effective
ClearGlobalServerOptions func()
// AddGlobalDialOptions adds an array of DialOption that will be effective
// globally for newly created client channels. The priority will be: 1.
// user-provided; 2. this method; 3. default values.
AddExtraDialOptions interface{} // func(opt ...DialOption)
// ClearExtraDialOptions clears the array of extra DialOption. This
AddGlobalDialOptions interface{} // func(opt ...DialOption)
// ClearGlobalDialOptions clears the array of extra DialOption. This
// method is useful in testing and benchmarking.
ClearExtraDialOptions func()
ClearGlobalDialOptions func()
// JoinServerOptions combines the server options passed as arguments into a
// single server option.
JoinServerOptions interface{} // func(...grpc.ServerOption) grpc.ServerOption
// WithBinaryLogger returns a DialOption that specifies the binary logger
// for a ClientConn.
WithBinaryLogger interface{} // func(binarylog.Logger) grpc.DialOption
// BinaryLogger returns a ServerOption that can set the binary logger for a
// server.
BinaryLogger interface{} // func(binarylog.Logger) grpc.ServerOption
// NewXDSResolverWithConfigForTesting creates a new xds resolver builder using
// the provided xds bootstrap config instead of the global configuration from
@ -117,22 +127,6 @@ var (
//
// TODO: Remove this function once the RBAC env var is removed.
UnregisterRBACHTTPFilterForTesting func()
// RegisterOutlierDetectionBalancerForTesting registers the Outlier
// Detection Balancer for testing purposes, regardless of the Outlier
// Detection environment variable.
//
// TODO: Remove this function once the Outlier Detection env var is removed.
RegisterOutlierDetectionBalancerForTesting func()
// UnregisterOutlierDetectionBalancerForTesting unregisters the Outlier
// Detection Balancer for testing purposes. This is needed because there is
// no way to unregister the Outlier Detection Balancer after registering it
// solely for testing purposes using
// RegisterOutlierDetectionBalancerForTesting().
//
// TODO: Remove this function once the Outlier Detection env var is removed.
UnregisterOutlierDetectionBalancerForTesting func()
)
// HealthChecker defines the signature of the client-side LB channel health checking function.

View File

@ -49,8 +49,9 @@ func (b *builder) Build(target resolver.Target, cc resolver.ClientConn, _ resolv
}
addr := resolver.Address{Addr: endpoint}
if b.scheme == unixAbstractScheme {
// prepend "\x00" to address for unix-abstract
addr.Addr = "\x00" + addr.Addr
// We can not prepend \0 as c++ gRPC does, as in Golang '@' is used to signify we do
// not want trailing \0 in address.
addr.Addr = "@" + addr.Addr
}
cc.UpdateState(resolver.State{Addresses: []resolver.Address{networktype.Set(addr, "unix")}})
return &nopResolver{}, nil

View File

@ -886,9 +886,9 @@ func (l *loopyWriter) processData() (bool, error) {
dataItem := str.itl.peek().(*dataFrame) // Peek at the first data item this stream.
// A data item is represented by a dataFrame, since it later translates into
// multiple HTTP2 data frames.
// Every dataFrame has two buffers; h that keeps grpc-message header and d that is acutal data.
// Every dataFrame has two buffers; h that keeps grpc-message header and d that is actual data.
// As an optimization to keep wire traffic low, data from d is copied to h to make as big as the
// maximum possilbe HTTP2 frame size.
// maximum possible HTTP2 frame size.
if len(dataItem.h) == 0 && len(dataItem.d) == 0 { // Empty data frame
// Client sends out empty data frame with endStream = true

View File

@ -326,6 +326,8 @@ func newHTTP2Client(connectCtx, ctx context.Context, addr resolver.Address, opts
keepaliveEnabled: keepaliveEnabled,
bufferPool: newBufferPool(),
}
// Add peer information to the http2client context.
t.ctx = peer.NewContext(t.ctx, t.getPeer())
if md, ok := addr.Metadata.(*metadata.MD); ok {
t.md = *md
@ -469,7 +471,7 @@ func (t *http2Client) newStream(ctx context.Context, callHdr *CallHdr) *Stream {
func (t *http2Client) getPeer() *peer.Peer {
return &peer.Peer{
Addr: t.remoteAddr,
AuthInfo: t.authInfo,
AuthInfo: t.authInfo, // Can be nil
}
}
@ -1230,18 +1232,29 @@ func (t *http2Client) handleGoAway(f *http2.GoAwayFrame) {
if upperLimit == 0 { // This is the first GoAway Frame.
upperLimit = math.MaxUint32 // Kill all streams after the GoAway ID.
}
t.prevGoAwayID = id
if len(t.activeStreams) == 0 {
t.mu.Unlock()
t.Close(connectionErrorf(true, nil, "received goaway and there are no active streams"))
return
}
streamsToClose := make([]*Stream, 0)
for streamID, stream := range t.activeStreams {
if streamID > id && streamID <= upperLimit {
// The stream was unprocessed by the server.
atomic.StoreUint32(&stream.unprocessed, 1)
t.closeStream(stream, errStreamDrain, false, http2.ErrCodeNo, statusGoAway, nil, false)
if streamID > id && streamID <= upperLimit {
atomic.StoreUint32(&stream.unprocessed, 1)
streamsToClose = append(streamsToClose, stream)
}
}
}
t.prevGoAwayID = id
active := len(t.activeStreams)
t.mu.Unlock()
if active == 0 {
t.Close(connectionErrorf(true, nil, "received goaway and there are no active streams"))
// Called outside t.mu because closeStream can take controlBuf's mu, which
// could induce deadlock and is not allowed.
for _, stream := range streamsToClose {
t.closeStream(stream, errStreamDrain, false, http2.ErrCodeNo, statusGoAway, nil, false)
}
}

View File

@ -265,6 +265,9 @@ func NewServerTransport(conn net.Conn, config *ServerConfig) (_ ServerTransport,
czData: new(channelzData),
bufferPool: newBufferPool(),
}
// Add peer information to the http2server context.
t.ctx = peer.NewContext(t.ctx, t.getPeer())
t.controlBuf = newControlBuffer(t.done)
if dynamicWindow {
t.bdpEst = &bdpEstimator{
@ -485,14 +488,7 @@ func (t *http2Server) operateHeaders(frame *http2.MetaHeadersFrame, handle func(
} else {
s.ctx, s.cancel = context.WithCancel(t.ctx)
}
pr := &peer.Peer{
Addr: t.remoteAddr,
}
// Attach Auth info if there is any.
if t.authInfo != nil {
pr.AuthInfo = t.authInfo
}
s.ctx = peer.NewContext(s.ctx, pr)
// Attach the received metadata to the context.
if len(mdata) > 0 {
s.ctx = metadata.NewIncomingContext(s.ctx, mdata)
@ -1416,6 +1412,13 @@ func (t *http2Server) getOutFlowWindow() int64 {
}
}
func (t *http2Server) getPeer() *peer.Peer {
return &peer.Peer{
Addr: t.remoteAddr,
AuthInfo: t.authInfo, // Can be nil
}
}
func getJitter(v time.Duration) time.Duration {
if v == infinity {
return 0

View File

@ -20,7 +20,6 @@ package transport
import (
"bufio"
"bytes"
"encoding/base64"
"fmt"
"io"
@ -45,7 +44,7 @@ import (
const (
// http2MaxFrameLen specifies the max length of a HTTP2 frame.
http2MaxFrameLen = 16384 // 16KB frame
// http://http2.github.io/http2-spec/#SettingValues
// https://httpwg.org/specs/rfc7540.html#SettingValues
http2InitHeaderTableSize = 4096
)
@ -251,13 +250,13 @@ func encodeGrpcMessage(msg string) string {
}
func encodeGrpcMessageUnchecked(msg string) string {
var buf bytes.Buffer
var sb strings.Builder
for len(msg) > 0 {
r, size := utf8.DecodeRuneInString(msg)
for _, b := range []byte(string(r)) {
if size > 1 {
// If size > 1, r is not ascii. Always do percent encoding.
buf.WriteString(fmt.Sprintf("%%%02X", b))
fmt.Fprintf(&sb, "%%%02X", b)
continue
}
@ -266,14 +265,14 @@ func encodeGrpcMessageUnchecked(msg string) string {
//
// fmt.Sprintf("%%%02X", utf8.RuneError) gives "%FFFD".
if b >= spaceByte && b <= tildeByte && b != percentByte {
buf.WriteByte(b)
sb.WriteByte(b)
} else {
buf.WriteString(fmt.Sprintf("%%%02X", b))
fmt.Fprintf(&sb, "%%%02X", b)
}
}
msg = msg[size:]
}
return buf.String()
return sb.String()
}
// decodeGrpcMessage decodes the msg encoded by encodeGrpcMessage.
@ -291,23 +290,23 @@ func decodeGrpcMessage(msg string) string {
}
func decodeGrpcMessageUnchecked(msg string) string {
var buf bytes.Buffer
var sb strings.Builder
lenMsg := len(msg)
for i := 0; i < lenMsg; i++ {
c := msg[i]
if c == percentByte && i+2 < lenMsg {
parsed, err := strconv.ParseUint(msg[i+1:i+3], 16, 8)
if err != nil {
buf.WriteByte(c)
sb.WriteByte(c)
} else {
buf.WriteByte(byte(parsed))
sb.WriteByte(byte(parsed))
i += 2
}
} else {
buf.WriteByte(c)
sb.WriteByte(c)
}
}
return buf.String()
return sb.String()
}
type bufWriter struct {