rebase: Bump google.golang.org/grpc from 1.53.0 to 1.54.0

Bumps [google.golang.org/grpc](https://github.com/grpc/grpc-go) from 1.53.0 to 1.54.0.
- [Release notes](https://github.com/grpc/grpc-go/releases)
- [Commits](https://github.com/grpc/grpc-go/compare/v1.53.0...v1.54.0)

---
updated-dependencies:
- dependency-name: google.golang.org/grpc
  dependency-type: direct:production
  update-type: version-update:semver-minor
...

Signed-off-by: dependabot[bot] <support@github.com>
This commit is contained in:
dependabot[bot] 2023-04-21 06:49:40 +00:00 committed by mergify[bot]
parent 304194a0c0
commit d8e6c37743
29 changed files with 557 additions and 203 deletions

2
go.mod
View File

@ -30,7 +30,7 @@ require (
golang.org/x/crypto v0.6.0 golang.org/x/crypto v0.6.0
golang.org/x/net v0.8.0 golang.org/x/net v0.8.0
golang.org/x/sys v0.7.0 golang.org/x/sys v0.7.0
google.golang.org/grpc v1.53.0 google.golang.org/grpc v1.54.0
google.golang.org/protobuf v1.28.1 google.golang.org/protobuf v1.28.1
// //
// when updating k8s.io/kubernetes, make sure to update the replace section too // when updating k8s.io/kubernetes, make sure to update the replace section too

4
go.sum
View File

@ -1659,8 +1659,8 @@ google.golang.org/grpc v1.44.0/go.mod h1:k+4IHHFw41K8+bbowsex27ge2rCb65oeWqe4jJ5
google.golang.org/grpc v1.46.0/go.mod h1:vN9eftEi1UMyUsIF80+uQXhHjbXYbm0uXoFCACuMGWk= google.golang.org/grpc v1.46.0/go.mod h1:vN9eftEi1UMyUsIF80+uQXhHjbXYbm0uXoFCACuMGWk=
google.golang.org/grpc v1.46.2/go.mod h1:vN9eftEi1UMyUsIF80+uQXhHjbXYbm0uXoFCACuMGWk= google.golang.org/grpc v1.46.2/go.mod h1:vN9eftEi1UMyUsIF80+uQXhHjbXYbm0uXoFCACuMGWk=
google.golang.org/grpc v1.49.0/go.mod h1:ZgQEeidpAuNRZ8iRrlBKXZQP1ghovWIVhdJRyCDK+GI= google.golang.org/grpc v1.49.0/go.mod h1:ZgQEeidpAuNRZ8iRrlBKXZQP1ghovWIVhdJRyCDK+GI=
google.golang.org/grpc v1.53.0 h1:LAv2ds7cmFV/XTS3XG1NneeENYrXGmorPxsBbptIjNc= google.golang.org/grpc v1.54.0 h1:EhTqbhiYeixwWQtAEZAxmV9MGqcjEU2mFx52xCzNyag=
google.golang.org/grpc v1.53.0/go.mod h1:OnIrk0ipVdj4N5d9IUoFUx72/VlD7+jUsHwZgwSMQpw= google.golang.org/grpc v1.54.0/go.mod h1:PUSEXI6iWghWaB6lXM4knEgpJNu2qUcKfDtNci3EC2g=
google.golang.org/grpc/cmd/protoc-gen-go-grpc v1.1.0/go.mod h1:6Kw0yEErY5E/yWrBtf03jp27GLLJujG4z/JK95pnjjw= google.golang.org/grpc/cmd/protoc-gen-go-grpc v1.1.0/go.mod h1:6Kw0yEErY5E/yWrBtf03jp27GLLJujG4z/JK95pnjjw=
google.golang.org/protobuf v0.0.0-20200109180630-ec00e32a8dfd/go.mod h1:DFci5gLYBciE7Vtevhsrf46CRTquxDuWsQurQQe4oz8= google.golang.org/protobuf v0.0.0-20200109180630-ec00e32a8dfd/go.mod h1:DFci5gLYBciE7Vtevhsrf46CRTquxDuWsQurQQe4oz8=
google.golang.org/protobuf v0.0.0-20200221191635-4d8936d0db64/go.mod h1:kwYJMbMJ01Woi6D6+Kah6886xMZcty6N08ah7+eCXa0= google.golang.org/protobuf v0.0.0-20200221191635-4d8936d0db64/go.mod h1:kwYJMbMJ01Woi6D6+Kah6886xMZcty6N08ah7+eCXa0=

View File

@ -10,7 +10,7 @@ allowHostNetwork: true
# This need to be set to true as we use HostPath # This need to be set to true as we use HostPath
allowHostDirVolumePlugin: true allowHostDirVolumePlugin: true
priority: priority:
# SYS_ADMIN is needed for rbd to execture rbd map command # SYS_ADMIN is needed for rbd to execute rbd map command
allowedCapabilities: ["SYS_ADMIN"] allowedCapabilities: ["SYS_ADMIN"]
# Needed as we run liveness container on daemonset pods # Needed as we run liveness container on daemonset pods
allowHostPorts: true allowHostPorts: true

View File

@ -20,6 +20,19 @@ How to get your contributions merged smoothly and quickly.
both author's & review's time is wasted. Create more PRs to address different both author's & review's time is wasted. Create more PRs to address different
concerns and everyone will be happy. concerns and everyone will be happy.
- For speculative changes, consider opening an issue and discussing it first. If
you are suggesting a behavioral or API change, consider starting with a [gRFC
proposal](https://github.com/grpc/proposal).
- If you are searching for features to work on, issues labeled [Status: Help
Wanted](https://github.com/grpc/grpc-go/issues?q=is%3Aissue+is%3Aopen+sort%3Aupdated-desc+label%3A%22Status%3A+Help+Wanted%22)
is a great place to start. These issues are well-documented and usually can be
resolved with a single pull request.
- If you are adding a new file, make sure it has the copyright message template
at the top as a comment. You can copy over the message from an existing file
and update the year.
- The grpc package should only depend on standard Go packages and a small number - The grpc package should only depend on standard Go packages and a small number
of exceptions. If your contribution introduces new dependencies which are NOT of exceptions. If your contribution introduces new dependencies which are NOT
in the [list](https://godoc.org/google.golang.org/grpc?imports), you need a in the [list](https://godoc.org/google.golang.org/grpc?imports), you need a
@ -32,14 +45,18 @@ How to get your contributions merged smoothly and quickly.
- Provide a good **PR description** as a record of **what** change is being made - Provide a good **PR description** as a record of **what** change is being made
and **why** it was made. Link to a github issue if it exists. and **why** it was made. Link to a github issue if it exists.
- Don't fix code style and formatting unless you are already changing that line - If you want to fix formatting or style, consider whether your changes are an
to address an issue. PRs with irrelevant changes won't be merged. If you do obvious improvement or might be considered a personal preference. If a style
want to fix formatting or style, do that in a separate PR. change is based on preference, it likely will not be accepted. If it corrects
widely agreed-upon anti-patterns, then please do create a PR and explain the
benefits of the change.
- Unless your PR is trivial, you should expect there will be reviewer comments - Unless your PR is trivial, you should expect there will be reviewer comments
that you'll need to address before merging. We expect you to be reasonably that you'll need to address before merging. We'll mark it as `Status: Requires
responsive to those comments, otherwise the PR will be closed after 2-3 weeks Reporter Clarification` if we expect you to respond to these comments in a
of inactivity. timely manner. If the PR remains inactive for 6 days, it will be marked as
`stale` and automatically close 7 days after that if we don't hear back from
you.
- Maintain **clean commit history** and use **meaningful commit messages**. PRs - Maintain **clean commit history** and use **meaningful commit messages**. PRs
with messy commit history are difficult to review and won't be merged. Use with messy commit history are difficult to review and won't be merged. Use

View File

@ -19,7 +19,7 @@
// Code generated by protoc-gen-go. DO NOT EDIT. // Code generated by protoc-gen-go. DO NOT EDIT.
// versions: // versions:
// protoc-gen-go v1.28.1 // protoc-gen-go v1.28.1
// protoc v3.14.0 // protoc v4.22.0
// source: grpc/binlog/v1/binarylog.proto // source: grpc/binlog/v1/binarylog.proto
package grpc_binarylog_v1 package grpc_binarylog_v1

View File

@ -146,8 +146,18 @@ func DialContext(ctx context.Context, target string, opts ...DialOption) (conn *
cc.safeConfigSelector.UpdateConfigSelector(&defaultConfigSelector{nil}) cc.safeConfigSelector.UpdateConfigSelector(&defaultConfigSelector{nil})
cc.ctx, cc.cancel = context.WithCancel(context.Background()) cc.ctx, cc.cancel = context.WithCancel(context.Background())
for _, opt := range extraDialOptions { disableGlobalOpts := false
opt.apply(&cc.dopts) for _, opt := range opts {
if _, ok := opt.(*disableGlobalDialOptions); ok {
disableGlobalOpts = true
break
}
}
if !disableGlobalOpts {
for _, opt := range globalDialOptions {
opt.apply(&cc.dopts)
}
} }
for _, opt := range opts { for _, opt := range opts {
@ -1103,7 +1113,11 @@ func (ac *addrConn) updateConnectivityState(s connectivity.State, lastErr error)
return return
} }
ac.state = s ac.state = s
channelz.Infof(logger, ac.channelzID, "Subchannel Connectivity change to %v", s) if lastErr == nil {
channelz.Infof(logger, ac.channelzID, "Subchannel Connectivity change to %v", s)
} else {
channelz.Infof(logger, ac.channelzID, "Subchannel Connectivity change to %v, last error: %s", s, lastErr)
}
ac.cc.handleSubConnStateChange(ac.acbw, s, lastErr) ac.cc.handleSubConnStateChange(ac.acbw, s, lastErr)
} }
@ -1527,6 +1541,9 @@ func (c *channelzChannel) ChannelzMetric() *channelz.ChannelInternalMetric {
// referenced by users. // referenced by users.
var ErrClientConnTimeout = errors.New("grpc: timed out when dialing") var ErrClientConnTimeout = errors.New("grpc: timed out when dialing")
// getResolver finds the scheme in the cc's resolvers or the global registry.
// scheme should always be lowercase (typically by virtue of url.Parse()
// performing proper RFC3986 behavior).
func (cc *ClientConn) getResolver(scheme string) resolver.Builder { func (cc *ClientConn) getResolver(scheme string) resolver.Builder {
for _, rb := range cc.dopts.resolvers { for _, rb := range cc.dopts.resolvers {
if scheme == rb.Scheme() { if scheme == rb.Scheme() {

View File

@ -18,7 +18,15 @@
package codes package codes
import "strconv" import (
"strconv"
"google.golang.org/grpc/internal"
)
func init() {
internal.CanonicalString = canonicalString
}
func (c Code) String() string { func (c Code) String() string {
switch c { switch c {
@ -60,3 +68,44 @@ func (c Code) String() string {
return "Code(" + strconv.FormatInt(int64(c), 10) + ")" return "Code(" + strconv.FormatInt(int64(c), 10) + ")"
} }
} }
func canonicalString(c Code) string {
switch c {
case OK:
return "OK"
case Canceled:
return "CANCELLED"
case Unknown:
return "UNKNOWN"
case InvalidArgument:
return "INVALID_ARGUMENT"
case DeadlineExceeded:
return "DEADLINE_EXCEEDED"
case NotFound:
return "NOT_FOUND"
case AlreadyExists:
return "ALREADY_EXISTS"
case PermissionDenied:
return "PERMISSION_DENIED"
case ResourceExhausted:
return "RESOURCE_EXHAUSTED"
case FailedPrecondition:
return "FAILED_PRECONDITION"
case Aborted:
return "ABORTED"
case OutOfRange:
return "OUT_OF_RANGE"
case Unimplemented:
return "UNIMPLEMENTED"
case Internal:
return "INTERNAL"
case Unavailable:
return "UNAVAILABLE"
case DataLoss:
return "DATA_LOSS"
case Unauthenticated:
return "UNAUTHENTICATED"
default:
return "CODE(" + strconv.FormatInt(int64(c), 10) + ")"
}
}

View File

@ -38,13 +38,14 @@ import (
func init() { func init() {
internal.AddGlobalDialOptions = func(opt ...DialOption) { internal.AddGlobalDialOptions = func(opt ...DialOption) {
extraDialOptions = append(extraDialOptions, opt...) globalDialOptions = append(globalDialOptions, opt...)
} }
internal.ClearGlobalDialOptions = func() { internal.ClearGlobalDialOptions = func() {
extraDialOptions = nil globalDialOptions = nil
} }
internal.WithBinaryLogger = withBinaryLogger internal.WithBinaryLogger = withBinaryLogger
internal.JoinDialOptions = newJoinDialOption internal.JoinDialOptions = newJoinDialOption
internal.DisableGlobalDialOptions = newDisableGlobalDialOptions
} }
// dialOptions configure a Dial call. dialOptions are set by the DialOption // dialOptions configure a Dial call. dialOptions are set by the DialOption
@ -83,7 +84,7 @@ type DialOption interface {
apply(*dialOptions) apply(*dialOptions)
} }
var extraDialOptions []DialOption var globalDialOptions []DialOption
// EmptyDialOption does not alter the dial configuration. It can be embedded in // EmptyDialOption does not alter the dial configuration. It can be embedded in
// another structure to build custom dial options. // another structure to build custom dial options.
@ -96,6 +97,16 @@ type EmptyDialOption struct{}
func (EmptyDialOption) apply(*dialOptions) {} func (EmptyDialOption) apply(*dialOptions) {}
type disableGlobalDialOptions struct{}
func (disableGlobalDialOptions) apply(*dialOptions) {}
// newDisableGlobalDialOptions returns a DialOption that prevents the ClientConn
// from applying the global DialOptions (set via AddGlobalDialOptions).
func newDisableGlobalDialOptions() DialOption {
return &disableGlobalDialOptions{}
}
// funcDialOption wraps a function that modifies dialOptions into an // funcDialOption wraps a function that modifies dialOptions into an
// implementation of the DialOption interface. // implementation of the DialOption interface.
type funcDialOption struct { type funcDialOption struct {

View File

@ -18,7 +18,7 @@
// Code generated by protoc-gen-go. DO NOT EDIT. // Code generated by protoc-gen-go. DO NOT EDIT.
// versions: // versions:
// protoc-gen-go v1.28.1 // protoc-gen-go v1.28.1
// protoc v3.14.0 // protoc v4.22.0
// source: grpc/health/v1/health.proto // source: grpc/health/v1/health.proto
package grpc_health_v1 package grpc_health_v1

View File

@ -17,8 +17,8 @@
// Code generated by protoc-gen-go-grpc. DO NOT EDIT. // Code generated by protoc-gen-go-grpc. DO NOT EDIT.
// versions: // versions:
// - protoc-gen-go-grpc v1.2.0 // - protoc-gen-go-grpc v1.3.0
// - protoc v3.14.0 // - protoc v4.22.0
// source: grpc/health/v1/health.proto // source: grpc/health/v1/health.proto
package grpc_health_v1 package grpc_health_v1
@ -35,6 +35,11 @@ import (
// Requires gRPC-Go v1.32.0 or later. // Requires gRPC-Go v1.32.0 or later.
const _ = grpc.SupportPackageIsVersion7 const _ = grpc.SupportPackageIsVersion7
const (
Health_Check_FullMethodName = "/grpc.health.v1.Health/Check"
Health_Watch_FullMethodName = "/grpc.health.v1.Health/Watch"
)
// HealthClient is the client API for Health service. // HealthClient is the client API for Health service.
// //
// For semantics around ctx use and closing/ending streaming RPCs, please refer to https://pkg.go.dev/google.golang.org/grpc/?tab=doc#ClientConn.NewStream. // For semantics around ctx use and closing/ending streaming RPCs, please refer to https://pkg.go.dev/google.golang.org/grpc/?tab=doc#ClientConn.NewStream.
@ -70,7 +75,7 @@ func NewHealthClient(cc grpc.ClientConnInterface) HealthClient {
func (c *healthClient) Check(ctx context.Context, in *HealthCheckRequest, opts ...grpc.CallOption) (*HealthCheckResponse, error) { func (c *healthClient) Check(ctx context.Context, in *HealthCheckRequest, opts ...grpc.CallOption) (*HealthCheckResponse, error) {
out := new(HealthCheckResponse) out := new(HealthCheckResponse)
err := c.cc.Invoke(ctx, "/grpc.health.v1.Health/Check", in, out, opts...) err := c.cc.Invoke(ctx, Health_Check_FullMethodName, in, out, opts...)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -78,7 +83,7 @@ func (c *healthClient) Check(ctx context.Context, in *HealthCheckRequest, opts .
} }
func (c *healthClient) Watch(ctx context.Context, in *HealthCheckRequest, opts ...grpc.CallOption) (Health_WatchClient, error) { func (c *healthClient) Watch(ctx context.Context, in *HealthCheckRequest, opts ...grpc.CallOption) (Health_WatchClient, error) {
stream, err := c.cc.NewStream(ctx, &Health_ServiceDesc.Streams[0], "/grpc.health.v1.Health/Watch", opts...) stream, err := c.cc.NewStream(ctx, &Health_ServiceDesc.Streams[0], Health_Watch_FullMethodName, opts...)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -166,7 +171,7 @@ func _Health_Check_Handler(srv interface{}, ctx context.Context, dec func(interf
} }
info := &grpc.UnaryServerInfo{ info := &grpc.UnaryServerInfo{
Server: srv, Server: srv,
FullMethod: "/grpc.health.v1.Health/Check", FullMethod: Health_Check_FullMethodName,
} }
handler := func(ctx context.Context, req interface{}) (interface{}, error) { handler := func(ctx context.Context, req interface{}) (interface{}, error) {
return srv.(HealthServer).Check(ctx, req.(*HealthCheckRequest)) return srv.(HealthServer).Check(ctx, req.(*HealthCheckRequest))

View File

@ -28,8 +28,10 @@ import (
"google.golang.org/grpc/internal/grpcutil" "google.golang.org/grpc/internal/grpcutil"
) )
// Logger is the global binary logger. It can be used to get binary logger for var grpclogLogger = grpclog.Component("binarylog")
// each method.
// Logger specifies MethodLoggers for method names with a Log call that
// takes a context.
type Logger interface { type Logger interface {
GetMethodLogger(methodName string) MethodLogger GetMethodLogger(methodName string) MethodLogger
} }
@ -40,8 +42,6 @@ type Logger interface {
// It is used to get a MethodLogger for each individual method. // It is used to get a MethodLogger for each individual method.
var binLogger Logger var binLogger Logger
var grpclogLogger = grpclog.Component("binarylog")
// SetLogger sets the binary logger. // SetLogger sets the binary logger.
// //
// Only call this at init time. // Only call this at init time.

View File

@ -19,6 +19,7 @@
package binarylog package binarylog
import ( import (
"context"
"net" "net"
"strings" "strings"
"sync/atomic" "sync/atomic"
@ -49,7 +50,7 @@ var idGen callIDGenerator
// MethodLogger is the sub-logger for each method. // MethodLogger is the sub-logger for each method.
type MethodLogger interface { type MethodLogger interface {
Log(LogEntryConfig) Log(context.Context, LogEntryConfig)
} }
// TruncatingMethodLogger is a method logger that truncates headers and messages // TruncatingMethodLogger is a method logger that truncates headers and messages
@ -98,7 +99,7 @@ func (ml *TruncatingMethodLogger) Build(c LogEntryConfig) *binlogpb.GrpcLogEntry
} }
// Log creates a proto binary log entry, and logs it to the sink. // Log creates a proto binary log entry, and logs it to the sink.
func (ml *TruncatingMethodLogger) Log(c LogEntryConfig) { func (ml *TruncatingMethodLogger) Log(ctx context.Context, c LogEntryConfig) {
ml.sink.Write(ml.Build(c)) ml.sink.Write(ml.Build(c))
} }

View File

@ -63,6 +63,9 @@ func (pl *PrefixLogger) Errorf(format string, args ...interface{}) {
// Debugf does info logging at verbose level 2. // Debugf does info logging at verbose level 2.
func (pl *PrefixLogger) Debugf(format string, args ...interface{}) { func (pl *PrefixLogger) Debugf(format string, args ...interface{}) {
// TODO(6044): Refactor interfaces LoggerV2 and DepthLogger, and maybe
// rewrite PrefixLogger a little to ensure that we don't use the global
// `Logger` here, and instead use the `logger` field.
if !Logger.V(2) { if !Logger.V(2) {
return return
} }
@ -73,6 +76,15 @@ func (pl *PrefixLogger) Debugf(format string, args ...interface{}) {
return return
} }
InfoDepth(1, fmt.Sprintf(format, args...)) InfoDepth(1, fmt.Sprintf(format, args...))
}
// V reports whether verbosity level l is at least the requested verbose level.
func (pl *PrefixLogger) V(l int) bool {
// TODO(6044): Refactor interfaces LoggerV2 and DepthLogger, and maybe
// rewrite PrefixLogger a little to ensure that we don't use the global
// `Logger` here, and instead use the `logger` field.
return Logger.V(l)
} }
// NewPrefixLogger creates a prefix logger with the given prefix. // NewPrefixLogger creates a prefix logger with the given prefix.

View File

@ -58,6 +58,9 @@ var (
// gRPC server. An xDS-enabled server needs to know what type of credentials // gRPC server. An xDS-enabled server needs to know what type of credentials
// is configured on the underlying gRPC server. This is set by server.go. // is configured on the underlying gRPC server. This is set by server.go.
GetServerCredentials interface{} // func (*grpc.Server) credentials.TransportCredentials GetServerCredentials interface{} // func (*grpc.Server) credentials.TransportCredentials
// CanonicalString returns the canonical string of the code defined here:
// https://github.com/grpc/grpc/blob/master/doc/statuscodes.md.
CanonicalString interface{} // func (codes.Code) string
// DrainServerTransports initiates a graceful close of existing connections // DrainServerTransports initiates a graceful close of existing connections
// on a gRPC server accepted on the provided listener address. An // on a gRPC server accepted on the provided listener address. An
// xDS-enabled server invokes this method on a grpc.Server when a particular // xDS-enabled server invokes this method on a grpc.Server when a particular
@ -74,6 +77,10 @@ var (
// globally for newly created client channels. The priority will be: 1. // globally for newly created client channels. The priority will be: 1.
// user-provided; 2. this method; 3. default values. // user-provided; 2. this method; 3. default values.
AddGlobalDialOptions interface{} // func(opt ...DialOption) AddGlobalDialOptions interface{} // func(opt ...DialOption)
// DisableGlobalDialOptions returns a DialOption that prevents the
// ClientConn from applying the global DialOptions (set via
// AddGlobalDialOptions).
DisableGlobalDialOptions interface{} // func() grpc.DialOption
// ClearGlobalDialOptions clears the array of extra DialOption. This // ClearGlobalDialOptions clears the array of extra DialOption. This
// method is useful in testing and benchmarking. // method is useful in testing and benchmarking.
ClearGlobalDialOptions func() ClearGlobalDialOptions func()
@ -130,6 +137,9 @@ var (
// //
// TODO: Remove this function once the RBAC env var is removed. // TODO: Remove this function once the RBAC env var is removed.
UnregisterRBACHTTPFilterForTesting func() UnregisterRBACHTTPFilterForTesting func()
// ORCAAllowAnyMinReportingInterval is for examples/orca use ONLY.
ORCAAllowAnyMinReportingInterval interface{} // func(so *orca.ServiceOptions)
) )
// HealthChecker defines the signature of the client-side LB channel health checking function. // HealthChecker defines the signature of the client-side LB channel health checking function.

View File

@ -76,33 +76,11 @@ func Set(addr resolver.Address, md metadata.MD) resolver.Address {
return addr return addr
} }
// Validate returns an error if the input md contains invalid keys or values. // Validate validates every pair in md with ValidatePair.
//
// If the header is not a pseudo-header, the following items are checked:
// - header names must contain one or more characters from this set [0-9 a-z _ - .].
// - if the header-name ends with a "-bin" suffix, no validation of the header value is performed.
// - otherwise, the header value must contain one or more characters from the set [%x20-%x7E].
func Validate(md metadata.MD) error { func Validate(md metadata.MD) error {
for k, vals := range md { for k, vals := range md {
// pseudo-header will be ignored if err := ValidatePair(k, vals...); err != nil {
if k[0] == ':' { return err
continue
}
// check key, for i that saving a conversion if not using for range
for i := 0; i < len(k); i++ {
r := k[i]
if !(r >= 'a' && r <= 'z') && !(r >= '0' && r <= '9') && r != '.' && r != '-' && r != '_' {
return fmt.Errorf("header key %q contains illegal characters not in [0-9a-z-_.]", k)
}
}
if strings.HasSuffix(k, "-bin") {
continue
}
// check value
for _, val := range vals {
if hasNotPrintable(val) {
return fmt.Errorf("header key %q contains value with non-printable ASCII characters", k)
}
} }
} }
return nil return nil
@ -118,3 +96,37 @@ func hasNotPrintable(msg string) bool {
} }
return false return false
} }
// ValidatePair validate a key-value pair with the following rules (the pseudo-header will be skipped) :
//
// - key must contain one or more characters.
// - the characters in the key must be contained in [0-9 a-z _ - .].
// - if the key ends with a "-bin" suffix, no validation of the corresponding value is performed.
// - the characters in the every value must be printable (in [%x20-%x7E]).
func ValidatePair(key string, vals ...string) error {
// key should not be empty
if key == "" {
return fmt.Errorf("there is an empty key in the header")
}
// pseudo-header will be ignored
if key[0] == ':' {
return nil
}
// check key, for i that saving a conversion if not using for range
for i := 0; i < len(key); i++ {
r := key[i]
if !(r >= 'a' && r <= 'z') && !(r >= '0' && r <= '9') && r != '.' && r != '-' && r != '_' {
return fmt.Errorf("header key %q contains illegal characters not in [0-9a-z-_.]", key)
}
}
if strings.HasSuffix(key, "-bin") {
return nil
}
// check value
for _, val := range vals {
if hasNotPrintable(val) {
return fmt.Errorf("header key %q contains value with non-printable ASCII characters", key)
}
}
return nil
}

View File

@ -22,6 +22,7 @@ import (
"bytes" "bytes"
"errors" "errors"
"fmt" "fmt"
"net"
"runtime" "runtime"
"strconv" "strconv"
"sync" "sync"
@ -486,12 +487,13 @@ type loopyWriter struct {
hEnc *hpack.Encoder // HPACK encoder. hEnc *hpack.Encoder // HPACK encoder.
bdpEst *bdpEstimator bdpEst *bdpEstimator
draining bool draining bool
conn net.Conn
// Side-specific handlers // Side-specific handlers
ssGoAwayHandler func(*goAway) (bool, error) ssGoAwayHandler func(*goAway) (bool, error)
} }
func newLoopyWriter(s side, fr *framer, cbuf *controlBuffer, bdpEst *bdpEstimator) *loopyWriter { func newLoopyWriter(s side, fr *framer, cbuf *controlBuffer, bdpEst *bdpEstimator, conn net.Conn) *loopyWriter {
var buf bytes.Buffer var buf bytes.Buffer
l := &loopyWriter{ l := &loopyWriter{
side: s, side: s,
@ -504,6 +506,7 @@ func newLoopyWriter(s side, fr *framer, cbuf *controlBuffer, bdpEst *bdpEstimato
hBuf: &buf, hBuf: &buf,
hEnc: hpack.NewEncoder(&buf), hEnc: hpack.NewEncoder(&buf),
bdpEst: bdpEst, bdpEst: bdpEst,
conn: conn,
} }
return l return l
} }
@ -521,15 +524,27 @@ const minBatchSize = 1000
// 2. Stream level flow control quota available. // 2. Stream level flow control quota available.
// //
// In each iteration of run loop, other than processing the incoming control // In each iteration of run loop, other than processing the incoming control
// frame, loopy calls processData, which processes one node from the activeStreams linked-list. // frame, loopy calls processData, which processes one node from the
// This results in writing of HTTP2 frames into an underlying write buffer. // activeStreams linked-list. This results in writing of HTTP2 frames into an
// When there's no more control frames to read from controlBuf, loopy flushes the write buffer. // underlying write buffer. When there's no more control frames to read from
// As an optimization, to increase the batch size for each flush, loopy yields the processor, once // controlBuf, loopy flushes the write buffer. As an optimization, to increase
// if the batch size is too low to give stream goroutines a chance to fill it up. // the batch size for each flush, loopy yields the processor, once if the batch
// size is too low to give stream goroutines a chance to fill it up.
//
// Upon exiting, if the error causing the exit is not an I/O error, run()
// flushes and closes the underlying connection. Otherwise, the connection is
// left open to allow the I/O error to be encountered by the reader instead.
func (l *loopyWriter) run() (err error) { func (l *loopyWriter) run() (err error) {
// Always flush the writer before exiting in case there are pending frames defer func() {
// to be sent. if logger.V(logLevel) {
defer l.framer.writer.Flush() logger.Infof("transport: loopyWriter exiting with error: %v", err)
}
if !isIOError(err) {
l.framer.writer.Flush()
l.conn.Close()
}
l.cbuf.finish()
}()
for { for {
it, err := l.cbuf.get(true) it, err := l.cbuf.get(true)
if err != nil { if err != nil {
@ -581,11 +596,11 @@ func (l *loopyWriter) outgoingWindowUpdateHandler(w *outgoingWindowUpdate) error
return l.framer.fr.WriteWindowUpdate(w.streamID, w.increment) return l.framer.fr.WriteWindowUpdate(w.streamID, w.increment)
} }
func (l *loopyWriter) incomingWindowUpdateHandler(w *incomingWindowUpdate) error { func (l *loopyWriter) incomingWindowUpdateHandler(w *incomingWindowUpdate) {
// Otherwise update the quota. // Otherwise update the quota.
if w.streamID == 0 { if w.streamID == 0 {
l.sendQuota += w.increment l.sendQuota += w.increment
return nil return
} }
// Find the stream and update it. // Find the stream and update it.
if str, ok := l.estdStreams[w.streamID]; ok { if str, ok := l.estdStreams[w.streamID]; ok {
@ -593,10 +608,9 @@ func (l *loopyWriter) incomingWindowUpdateHandler(w *incomingWindowUpdate) error
if strQuota := int(l.oiws) - str.bytesOutStanding; strQuota > 0 && str.state == waitingOnStreamQuota { if strQuota := int(l.oiws) - str.bytesOutStanding; strQuota > 0 && str.state == waitingOnStreamQuota {
str.state = active str.state = active
l.activeStreams.enqueue(str) l.activeStreams.enqueue(str)
return nil return
} }
} }
return nil
} }
func (l *loopyWriter) outgoingSettingsHandler(s *outgoingSettings) error { func (l *loopyWriter) outgoingSettingsHandler(s *outgoingSettings) error {
@ -604,13 +618,11 @@ func (l *loopyWriter) outgoingSettingsHandler(s *outgoingSettings) error {
} }
func (l *loopyWriter) incomingSettingsHandler(s *incomingSettings) error { func (l *loopyWriter) incomingSettingsHandler(s *incomingSettings) error {
if err := l.applySettings(s.ss); err != nil { l.applySettings(s.ss)
return err
}
return l.framer.fr.WriteSettingsAck() return l.framer.fr.WriteSettingsAck()
} }
func (l *loopyWriter) registerStreamHandler(h *registerStream) error { func (l *loopyWriter) registerStreamHandler(h *registerStream) {
str := &outStream{ str := &outStream{
id: h.streamID, id: h.streamID,
state: empty, state: empty,
@ -618,7 +630,6 @@ func (l *loopyWriter) registerStreamHandler(h *registerStream) error {
wq: h.wq, wq: h.wq,
} }
l.estdStreams[h.streamID] = str l.estdStreams[h.streamID] = str
return nil
} }
func (l *loopyWriter) headerHandler(h *headerFrame) error { func (l *loopyWriter) headerHandler(h *headerFrame) error {
@ -720,10 +731,10 @@ func (l *loopyWriter) writeHeader(streamID uint32, endStream bool, hf []hpack.He
return nil return nil
} }
func (l *loopyWriter) preprocessData(df *dataFrame) error { func (l *loopyWriter) preprocessData(df *dataFrame) {
str, ok := l.estdStreams[df.streamID] str, ok := l.estdStreams[df.streamID]
if !ok { if !ok {
return nil return
} }
// If we got data for a stream it means that // If we got data for a stream it means that
// stream was originated and the headers were sent out. // stream was originated and the headers were sent out.
@ -732,7 +743,6 @@ func (l *loopyWriter) preprocessData(df *dataFrame) error {
str.state = active str.state = active
l.activeStreams.enqueue(str) l.activeStreams.enqueue(str)
} }
return nil
} }
func (l *loopyWriter) pingHandler(p *ping) error { func (l *loopyWriter) pingHandler(p *ping) error {
@ -743,9 +753,8 @@ func (l *loopyWriter) pingHandler(p *ping) error {
} }
func (l *loopyWriter) outFlowControlSizeRequestHandler(o *outFlowControlSizeRequest) error { func (l *loopyWriter) outFlowControlSizeRequestHandler(o *outFlowControlSizeRequest) {
o.resp <- l.sendQuota o.resp <- l.sendQuota
return nil
} }
func (l *loopyWriter) cleanupStreamHandler(c *cleanupStream) error { func (l *loopyWriter) cleanupStreamHandler(c *cleanupStream) error {
@ -763,6 +772,7 @@ func (l *loopyWriter) cleanupStreamHandler(c *cleanupStream) error {
} }
} }
if l.draining && len(l.estdStreams) == 0 { if l.draining && len(l.estdStreams) == 0 {
// Flush and close the connection; we are done with it.
return errors.New("finished processing active streams while in draining mode") return errors.New("finished processing active streams while in draining mode")
} }
return nil return nil
@ -798,6 +808,7 @@ func (l *loopyWriter) incomingGoAwayHandler(*incomingGoAway) error {
if l.side == clientSide { if l.side == clientSide {
l.draining = true l.draining = true
if len(l.estdStreams) == 0 { if len(l.estdStreams) == 0 {
// Flush and close the connection; we are done with it.
return errors.New("received GOAWAY with no active streams") return errors.New("received GOAWAY with no active streams")
} }
} }
@ -816,17 +827,10 @@ func (l *loopyWriter) goAwayHandler(g *goAway) error {
return nil return nil
} }
func (l *loopyWriter) closeConnectionHandler() error {
// Exit loopyWriter entirely by returning an error here. This will lead to
// the transport closing the connection, and, ultimately, transport
// closure.
return ErrConnClosing
}
func (l *loopyWriter) handle(i interface{}) error { func (l *loopyWriter) handle(i interface{}) error {
switch i := i.(type) { switch i := i.(type) {
case *incomingWindowUpdate: case *incomingWindowUpdate:
return l.incomingWindowUpdateHandler(i) l.incomingWindowUpdateHandler(i)
case *outgoingWindowUpdate: case *outgoingWindowUpdate:
return l.outgoingWindowUpdateHandler(i) return l.outgoingWindowUpdateHandler(i)
case *incomingSettings: case *incomingSettings:
@ -836,7 +840,7 @@ func (l *loopyWriter) handle(i interface{}) error {
case *headerFrame: case *headerFrame:
return l.headerHandler(i) return l.headerHandler(i)
case *registerStream: case *registerStream:
return l.registerStreamHandler(i) l.registerStreamHandler(i)
case *cleanupStream: case *cleanupStream:
return l.cleanupStreamHandler(i) return l.cleanupStreamHandler(i)
case *earlyAbortStream: case *earlyAbortStream:
@ -844,21 +848,24 @@ func (l *loopyWriter) handle(i interface{}) error {
case *incomingGoAway: case *incomingGoAway:
return l.incomingGoAwayHandler(i) return l.incomingGoAwayHandler(i)
case *dataFrame: case *dataFrame:
return l.preprocessData(i) l.preprocessData(i)
case *ping: case *ping:
return l.pingHandler(i) return l.pingHandler(i)
case *goAway: case *goAway:
return l.goAwayHandler(i) return l.goAwayHandler(i)
case *outFlowControlSizeRequest: case *outFlowControlSizeRequest:
return l.outFlowControlSizeRequestHandler(i) l.outFlowControlSizeRequestHandler(i)
case closeConnection: case closeConnection:
return l.closeConnectionHandler() // Just return a non-I/O error and run() will flush and close the
// connection.
return ErrConnClosing
default: default:
return fmt.Errorf("transport: unknown control message type %T", i) return fmt.Errorf("transport: unknown control message type %T", i)
} }
return nil
} }
func (l *loopyWriter) applySettings(ss []http2.Setting) error { func (l *loopyWriter) applySettings(ss []http2.Setting) {
for _, s := range ss { for _, s := range ss {
switch s.ID { switch s.ID {
case http2.SettingInitialWindowSize: case http2.SettingInitialWindowSize:
@ -877,7 +884,6 @@ func (l *loopyWriter) applySettings(ss []http2.Setting) error {
updateHeaderTblSize(l.hEnc, s.Val) updateHeaderTblSize(l.hEnc, s.Val)
} }
} }
return nil
} }
// processData removes the first stream from active streams, writes out at most 16KB // processData removes the first stream from active streams, writes out at most 16KB
@ -911,7 +917,7 @@ func (l *loopyWriter) processData() (bool, error) {
return false, err return false, err
} }
if err := l.cleanupStreamHandler(trailer.cleanup); err != nil { if err := l.cleanupStreamHandler(trailer.cleanup); err != nil {
return false, nil return false, err
} }
} else { } else {
l.activeStreams.enqueue(str) l.activeStreams.enqueue(str)

View File

@ -444,15 +444,8 @@ func newHTTP2Client(connectCtx, ctx context.Context, addr resolver.Address, opts
return nil, err return nil, err
} }
go func() { go func() {
t.loopy = newLoopyWriter(clientSide, t.framer, t.controlBuf, t.bdpEst) t.loopy = newLoopyWriter(clientSide, t.framer, t.controlBuf, t.bdpEst, t.conn)
err := t.loopy.run() t.loopy.run()
if logger.V(logLevel) {
logger.Infof("transport: loopyWriter exited. Closing connection. Err: %v", err)
}
// Do not close the transport. Let reader goroutine handle it since
// there might be data in the buffers.
t.conn.Close()
t.controlBuf.finish()
close(t.writerDone) close(t.writerDone)
}() }()
return t, nil return t, nil
@ -1264,10 +1257,12 @@ func (t *http2Client) handleGoAway(f *http2.GoAwayFrame) {
t.mu.Unlock() t.mu.Unlock()
return return
} }
if f.ErrCode == http2.ErrCodeEnhanceYourCalm { if f.ErrCode == http2.ErrCodeEnhanceYourCalm && string(f.DebugData()) == "too_many_pings" {
if logger.V(logLevel) { // When a client receives a GOAWAY with error code ENHANCE_YOUR_CALM and debug
logger.Infof("Client received GoAway with http2.ErrCodeEnhanceYourCalm.") // data equal to ASCII "too_many_pings", it should log the occurrence at a log level that is
} // enabled by default and double the configure KEEPALIVE_TIME used for new connections
// on that channel.
logger.Errorf("Client received GoAway with error code ENHANCE_YOUR_CALM and debug data equal to ASCII \"too_many_pings\".")
} }
id := f.LastStreamID id := f.LastStreamID
if id > 0 && id%2 == 0 { if id > 0 && id%2 == 0 {

View File

@ -331,14 +331,9 @@ func NewServerTransport(conn net.Conn, config *ServerConfig) (_ ServerTransport,
t.handleSettings(sf) t.handleSettings(sf)
go func() { go func() {
t.loopy = newLoopyWriter(serverSide, t.framer, t.controlBuf, t.bdpEst) t.loopy = newLoopyWriter(serverSide, t.framer, t.controlBuf, t.bdpEst, t.conn)
t.loopy.ssGoAwayHandler = t.outgoingGoAwayHandler t.loopy.ssGoAwayHandler = t.outgoingGoAwayHandler
err := t.loopy.run() t.loopy.run()
if logger.V(logLevel) {
logger.Infof("transport: loopyWriter exited. Closing connection. Err: %v", err)
}
t.conn.Close()
t.controlBuf.finish()
close(t.writerDone) close(t.writerDone)
}() }()
go t.keepalive() go t.keepalive()
@ -383,7 +378,7 @@ func (t *http2Server) operateHeaders(frame *http2.MetaHeadersFrame, handle func(
// if false, content-type was missing or invalid // if false, content-type was missing or invalid
isGRPC = false isGRPC = false
contentType = "" contentType = ""
mdata = make(map[string][]string) mdata = make(metadata.MD, len(frame.Fields))
httpMethod string httpMethod string
// these are set if an error is encountered while parsing the headers // these are set if an error is encountered while parsing the headers
protocolError bool protocolError bool
@ -404,6 +399,17 @@ func (t *http2Server) operateHeaders(frame *http2.MetaHeadersFrame, handle func(
mdata[hf.Name] = append(mdata[hf.Name], hf.Value) mdata[hf.Name] = append(mdata[hf.Name], hf.Value)
s.contentSubtype = contentSubtype s.contentSubtype = contentSubtype
isGRPC = true isGRPC = true
case "grpc-accept-encoding":
mdata[hf.Name] = append(mdata[hf.Name], hf.Value)
if hf.Value == "" {
continue
}
compressors := hf.Value
if s.clientAdvertisedCompressors != "" {
compressors = s.clientAdvertisedCompressors + "," + compressors
}
s.clientAdvertisedCompressors = compressors
case "grpc-encoding": case "grpc-encoding":
s.recvCompress = hf.Value s.recvCompress = hf.Value
case ":method": case ":method":
@ -595,7 +601,7 @@ func (t *http2Server) operateHeaders(frame *http2.MetaHeadersFrame, handle func(
LocalAddr: t.localAddr, LocalAddr: t.localAddr,
Compression: s.recvCompress, Compression: s.recvCompress,
WireLength: int(frame.Header().Length), WireLength: int(frame.Header().Length),
Header: metadata.MD(mdata).Copy(), Header: mdata.Copy(),
} }
sh.HandleRPC(s.ctx, inHeader) sh.HandleRPC(s.ctx, inHeader)
} }
@ -1344,9 +1350,6 @@ func (t *http2Server) outgoingGoAwayHandler(g *goAway) (bool, error) {
return false, err return false, err
} }
if retErr != nil { if retErr != nil {
// Abruptly close the connection following the GoAway (via
// loopywriter). But flush out what's inside the buffer first.
t.framer.writer.Flush()
return false, retErr return false, retErr
} }
return true, nil return true, nil

View File

@ -21,6 +21,7 @@ package transport
import ( import (
"bufio" "bufio"
"encoding/base64" "encoding/base64"
"errors"
"fmt" "fmt"
"io" "io"
"math" "math"
@ -330,7 +331,8 @@ func (w *bufWriter) Write(b []byte) (n int, err error) {
return 0, w.err return 0, w.err
} }
if w.batchSize == 0 { // Buffer has been disabled. if w.batchSize == 0 { // Buffer has been disabled.
return w.conn.Write(b) n, err = w.conn.Write(b)
return n, toIOError(err)
} }
for len(b) > 0 { for len(b) > 0 {
nn := copy(w.buf[w.offset:], b) nn := copy(w.buf[w.offset:], b)
@ -352,10 +354,30 @@ func (w *bufWriter) Flush() error {
return nil return nil
} }
_, w.err = w.conn.Write(w.buf[:w.offset]) _, w.err = w.conn.Write(w.buf[:w.offset])
w.err = toIOError(w.err)
w.offset = 0 w.offset = 0
return w.err return w.err
} }
type ioError struct {
error
}
func (i ioError) Unwrap() error {
return i.error
}
func isIOError(err error) bool {
return errors.As(err, &ioError{})
}
func toIOError(err error) error {
if err == nil {
return nil
}
return ioError{error: err}
}
type framer struct { type framer struct {
writer *bufWriter writer *bufWriter
fr *http2.Framer fr *http2.Framer

View File

@ -257,6 +257,9 @@ type Stream struct {
fc *inFlow fc *inFlow
wq *writeQuota wq *writeQuota
// Holds compressor names passed in grpc-accept-encoding metadata from the
// client. This is empty for the client side stream.
clientAdvertisedCompressors string
// Callback to state application's intentions to read data. This // Callback to state application's intentions to read data. This
// is used to adjust flow control, if needed. // is used to adjust flow control, if needed.
requestRead func(int) requestRead func(int)
@ -345,8 +348,24 @@ func (s *Stream) RecvCompress() string {
} }
// SetSendCompress sets the compression algorithm to the stream. // SetSendCompress sets the compression algorithm to the stream.
func (s *Stream) SetSendCompress(str string) { func (s *Stream) SetSendCompress(name string) error {
s.sendCompress = str if s.isHeaderSent() || s.getState() == streamDone {
return errors.New("transport: set send compressor called after headers sent or stream done")
}
s.sendCompress = name
return nil
}
// SendCompress returns the send compressor name.
func (s *Stream) SendCompress() string {
return s.sendCompress
}
// ClientAdvertisedCompressors returns the compressor names advertised by the
// client via grpc-accept-encoding header.
func (s *Stream) ClientAdvertisedCompressors() string {
return s.clientAdvertisedCompressors
} }
// Done returns a channel which is closed when it receives the final status // Done returns a channel which is closed when it receives the final status

View File

@ -91,7 +91,11 @@ func (md MD) Len() int {
// Copy returns a copy of md. // Copy returns a copy of md.
func (md MD) Copy() MD { func (md MD) Copy() MD {
return Join(md) out := make(MD, len(md))
for k, v := range md {
out[k] = copyOf(v)
}
return out
} }
// Get obtains the values for a given key. // Get obtains the values for a given key.
@ -171,8 +175,11 @@ func AppendToOutgoingContext(ctx context.Context, kv ...string) context.Context
md, _ := ctx.Value(mdOutgoingKey{}).(rawMD) md, _ := ctx.Value(mdOutgoingKey{}).(rawMD)
added := make([][]string, len(md.added)+1) added := make([][]string, len(md.added)+1)
copy(added, md.added) copy(added, md.added)
added[len(added)-1] = make([]string, len(kv)) kvCopy := make([]string, 0, len(kv))
copy(added[len(added)-1], kv) for i := 0; i < len(kv); i += 2 {
kvCopy = append(kvCopy, strings.ToLower(kv[i]), kv[i+1])
}
added[len(added)-1] = kvCopy
return context.WithValue(ctx, mdOutgoingKey{}, rawMD{md: md.md, added: added}) return context.WithValue(ctx, mdOutgoingKey{}, rawMD{md: md.md, added: added})
} }

View File

@ -41,8 +41,9 @@ var (
// TODO(bar) install dns resolver in init(){}. // TODO(bar) install dns resolver in init(){}.
// Register registers the resolver builder to the resolver map. b.Scheme will be // Register registers the resolver builder to the resolver map. b.Scheme will
// used as the scheme registered with this builder. // be used as the scheme registered with this builder. The registry is case
// sensitive, and schemes should not contain any uppercase characters.
// //
// NOTE: this function must only be called during initialization time (i.e. in // NOTE: this function must only be called during initialization time (i.e. in
// an init() function), and is not thread-safe. If multiple Resolvers are // an init() function), and is not thread-safe. If multiple Resolvers are
@ -203,6 +204,15 @@ type State struct {
// gRPC to add new methods to this interface. // gRPC to add new methods to this interface.
type ClientConn interface { type ClientConn interface {
// UpdateState updates the state of the ClientConn appropriately. // UpdateState updates the state of the ClientConn appropriately.
//
// If an error is returned, the resolver should try to resolve the
// target again. The resolver should use a backoff timer to prevent
// overloading the server with requests. If a resolver is certain that
// reresolving will not change the result, e.g. because it is
// a watch-based resolver, returned errors can be ignored.
//
// If the resolved State is the same as the last reported one, calling
// UpdateState can be omitted.
UpdateState(State) error UpdateState(State) error
// ReportError notifies the ClientConn that the Resolver encountered an // ReportError notifies the ClientConn that the Resolver encountered an
// error. The ClientConn will notify the load balancer and begin calling // error. The ClientConn will notify the load balancer and begin calling
@ -280,8 +290,10 @@ type Builder interface {
// gRPC dial calls Build synchronously, and fails if the returned error is // gRPC dial calls Build synchronously, and fails if the returned error is
// not nil. // not nil.
Build(target Target, cc ClientConn, opts BuildOptions) (Resolver, error) Build(target Target, cc ClientConn, opts BuildOptions) (Resolver, error)
// Scheme returns the scheme supported by this resolver. // Scheme returns the scheme supported by this resolver. Scheme is defined
// Scheme is defined at https://github.com/grpc/grpc/blob/master/doc/naming.md. // at https://github.com/grpc/grpc/blob/master/doc/naming.md. The returned
// string should not contain uppercase characters, as they will not match
// the parsed target's scheme as defined in RFC 3986.
Scheme() string Scheme() string
} }

View File

@ -159,6 +159,7 @@ type callInfo struct {
contentSubtype string contentSubtype string
codec baseCodec codec baseCodec
maxRetryRPCBufferSize int maxRetryRPCBufferSize int
onFinish []func(err error)
} }
func defaultCallInfo() *callInfo { func defaultCallInfo() *callInfo {
@ -295,6 +296,41 @@ func (o FailFastCallOption) before(c *callInfo) error {
} }
func (o FailFastCallOption) after(c *callInfo, attempt *csAttempt) {} func (o FailFastCallOption) after(c *callInfo, attempt *csAttempt) {}
// OnFinish returns a CallOption that configures a callback to be called when
// the call completes. The error passed to the callback is the status of the
// RPC, and may be nil. The onFinish callback provided will only be called once
// by gRPC. This is mainly used to be used by streaming interceptors, to be
// notified when the RPC completes along with information about the status of
// the RPC.
//
// # Experimental
//
// Notice: This API is EXPERIMENTAL and may be changed or removed in a
// later release.
func OnFinish(onFinish func(err error)) CallOption {
return OnFinishCallOption{
OnFinish: onFinish,
}
}
// OnFinishCallOption is CallOption that indicates a callback to be called when
// the call completes.
//
// # Experimental
//
// Notice: This type is EXPERIMENTAL and may be changed or removed in a
// later release.
type OnFinishCallOption struct {
OnFinish func(error)
}
func (o OnFinishCallOption) before(c *callInfo) error {
c.onFinish = append(c.onFinish, o.OnFinish)
return nil
}
func (o OnFinishCallOption) after(c *callInfo, attempt *csAttempt) {}
// MaxCallRecvMsgSize returns a CallOption which sets the maximum message size // MaxCallRecvMsgSize returns a CallOption which sets the maximum message size
// in bytes the client can receive. If this is not set, gRPC uses the default // in bytes the client can receive. If this is not set, gRPC uses the default
// 4MB. // 4MB.
@ -658,12 +694,13 @@ func msgHeader(data, compData []byte) (hdr []byte, payload []byte) {
func outPayload(client bool, msg interface{}, data, payload []byte, t time.Time) *stats.OutPayload { func outPayload(client bool, msg interface{}, data, payload []byte, t time.Time) *stats.OutPayload {
return &stats.OutPayload{ return &stats.OutPayload{
Client: client, Client: client,
Payload: msg, Payload: msg,
Data: data, Data: data,
Length: len(data), Length: len(data),
WireLength: len(payload) + headerLen, WireLength: len(payload) + headerLen,
SentTime: t, CompressedLength: len(payload),
SentTime: t,
} }
} }
@ -684,7 +721,7 @@ func checkRecvPayload(pf payloadFormat, recvCompress string, haveCompressor bool
} }
type payloadInfo struct { type payloadInfo struct {
wireLength int // The compressed length got from wire. compressedLength int // The compressed length got from wire.
uncompressedBytes []byte uncompressedBytes []byte
} }
@ -694,7 +731,7 @@ func recvAndDecompress(p *parser, s *transport.Stream, dc Decompressor, maxRecei
return nil, err return nil, err
} }
if payInfo != nil { if payInfo != nil {
payInfo.wireLength = len(d) payInfo.compressedLength = len(d)
} }
if st := checkRecvPayload(pf, s.RecvCompress(), compressor != nil || dc != nil); st != nil { if st := checkRecvPayload(pf, s.RecvCompress(), compressor != nil || dc != nil); st != nil {

View File

@ -45,6 +45,7 @@ import (
"google.golang.org/grpc/internal/channelz" "google.golang.org/grpc/internal/channelz"
"google.golang.org/grpc/internal/grpcrand" "google.golang.org/grpc/internal/grpcrand"
"google.golang.org/grpc/internal/grpcsync" "google.golang.org/grpc/internal/grpcsync"
"google.golang.org/grpc/internal/grpcutil"
"google.golang.org/grpc/internal/transport" "google.golang.org/grpc/internal/transport"
"google.golang.org/grpc/keepalive" "google.golang.org/grpc/keepalive"
"google.golang.org/grpc/metadata" "google.golang.org/grpc/metadata"
@ -74,10 +75,10 @@ func init() {
srv.drainServerTransports(addr) srv.drainServerTransports(addr)
} }
internal.AddGlobalServerOptions = func(opt ...ServerOption) { internal.AddGlobalServerOptions = func(opt ...ServerOption) {
extraServerOptions = append(extraServerOptions, opt...) globalServerOptions = append(globalServerOptions, opt...)
} }
internal.ClearGlobalServerOptions = func() { internal.ClearGlobalServerOptions = func() {
extraServerOptions = nil globalServerOptions = nil
} }
internal.BinaryLogger = binaryLogger internal.BinaryLogger = binaryLogger
internal.JoinServerOptions = newJoinServerOption internal.JoinServerOptions = newJoinServerOption
@ -183,7 +184,7 @@ var defaultServerOptions = serverOptions{
writeBufferSize: defaultWriteBufSize, writeBufferSize: defaultWriteBufSize,
readBufferSize: defaultReadBufSize, readBufferSize: defaultReadBufSize,
} }
var extraServerOptions []ServerOption var globalServerOptions []ServerOption
// A ServerOption sets options such as credentials, codec and keepalive parameters, etc. // A ServerOption sets options such as credentials, codec and keepalive parameters, etc.
type ServerOption interface { type ServerOption interface {
@ -600,7 +601,7 @@ func (s *Server) stopServerWorkers() {
// started to accept requests yet. // started to accept requests yet.
func NewServer(opt ...ServerOption) *Server { func NewServer(opt ...ServerOption) *Server {
opts := defaultServerOptions opts := defaultServerOptions
for _, o := range extraServerOptions { for _, o := range globalServerOptions {
o.apply(&opts) o.apply(&opts)
} }
for _, o := range opt { for _, o := range opt {
@ -1252,7 +1253,7 @@ func (s *Server) processUnaryRPC(t transport.ServerTransport, stream *transport.
logEntry.PeerAddr = peer.Addr logEntry.PeerAddr = peer.Addr
} }
for _, binlog := range binlogs { for _, binlog := range binlogs {
binlog.Log(logEntry) binlog.Log(ctx, logEntry)
} }
} }
@ -1263,6 +1264,7 @@ func (s *Server) processUnaryRPC(t transport.ServerTransport, stream *transport.
var comp, decomp encoding.Compressor var comp, decomp encoding.Compressor
var cp Compressor var cp Compressor
var dc Decompressor var dc Decompressor
var sendCompressorName string
// If dc is set and matches the stream's compression, use it. Otherwise, try // If dc is set and matches the stream's compression, use it. Otherwise, try
// to find a matching registered compressor for decomp. // to find a matching registered compressor for decomp.
@ -1283,12 +1285,18 @@ func (s *Server) processUnaryRPC(t transport.ServerTransport, stream *transport.
// NOTE: this needs to be ahead of all handling, https://github.com/grpc/grpc-go/issues/686. // NOTE: this needs to be ahead of all handling, https://github.com/grpc/grpc-go/issues/686.
if s.opts.cp != nil { if s.opts.cp != nil {
cp = s.opts.cp cp = s.opts.cp
stream.SetSendCompress(cp.Type()) sendCompressorName = cp.Type()
} else if rc := stream.RecvCompress(); rc != "" && rc != encoding.Identity { } else if rc := stream.RecvCompress(); rc != "" && rc != encoding.Identity {
// Legacy compressor not specified; attempt to respond with same encoding. // Legacy compressor not specified; attempt to respond with same encoding.
comp = encoding.GetCompressor(rc) comp = encoding.GetCompressor(rc)
if comp != nil { if comp != nil {
stream.SetSendCompress(rc) sendCompressorName = comp.Name()
}
}
if sendCompressorName != "" {
if err := stream.SetSendCompress(sendCompressorName); err != nil {
return status.Errorf(codes.Internal, "grpc: failed to set send compressor: %v", err)
} }
} }
@ -1312,11 +1320,12 @@ func (s *Server) processUnaryRPC(t transport.ServerTransport, stream *transport.
} }
for _, sh := range shs { for _, sh := range shs {
sh.HandleRPC(stream.Context(), &stats.InPayload{ sh.HandleRPC(stream.Context(), &stats.InPayload{
RecvTime: time.Now(), RecvTime: time.Now(),
Payload: v, Payload: v,
WireLength: payInfo.wireLength + headerLen, Length: len(d),
Data: d, WireLength: payInfo.compressedLength + headerLen,
Length: len(d), CompressedLength: payInfo.compressedLength,
Data: d,
}) })
} }
if len(binlogs) != 0 { if len(binlogs) != 0 {
@ -1324,7 +1333,7 @@ func (s *Server) processUnaryRPC(t transport.ServerTransport, stream *transport.
Message: d, Message: d,
} }
for _, binlog := range binlogs { for _, binlog := range binlogs {
binlog.Log(cm) binlog.Log(stream.Context(), cm)
} }
} }
if trInfo != nil { if trInfo != nil {
@ -1357,7 +1366,7 @@ func (s *Server) processUnaryRPC(t transport.ServerTransport, stream *transport.
Header: h, Header: h,
} }
for _, binlog := range binlogs { for _, binlog := range binlogs {
binlog.Log(sh) binlog.Log(stream.Context(), sh)
} }
} }
st := &binarylog.ServerTrailer{ st := &binarylog.ServerTrailer{
@ -1365,7 +1374,7 @@ func (s *Server) processUnaryRPC(t transport.ServerTransport, stream *transport.
Err: appErr, Err: appErr,
} }
for _, binlog := range binlogs { for _, binlog := range binlogs {
binlog.Log(st) binlog.Log(stream.Context(), st)
} }
} }
return appErr return appErr
@ -1375,6 +1384,11 @@ func (s *Server) processUnaryRPC(t transport.ServerTransport, stream *transport.
} }
opts := &transport.Options{Last: true} opts := &transport.Options{Last: true}
// Server handler could have set new compressor by calling SetSendCompressor.
// In case it is set, we need to use it for compressing outbound message.
if stream.SendCompress() != sendCompressorName {
comp = encoding.GetCompressor(stream.SendCompress())
}
if err := s.sendResponse(t, stream, reply, cp, opts, comp); err != nil { if err := s.sendResponse(t, stream, reply, cp, opts, comp); err != nil {
if err == io.EOF { if err == io.EOF {
// The entire stream is done (for unary RPC only). // The entire stream is done (for unary RPC only).
@ -1402,8 +1416,8 @@ func (s *Server) processUnaryRPC(t transport.ServerTransport, stream *transport.
Err: appErr, Err: appErr,
} }
for _, binlog := range binlogs { for _, binlog := range binlogs {
binlog.Log(sh) binlog.Log(stream.Context(), sh)
binlog.Log(st) binlog.Log(stream.Context(), st)
} }
} }
return err return err
@ -1417,8 +1431,8 @@ func (s *Server) processUnaryRPC(t transport.ServerTransport, stream *transport.
Message: reply, Message: reply,
} }
for _, binlog := range binlogs { for _, binlog := range binlogs {
binlog.Log(sh) binlog.Log(stream.Context(), sh)
binlog.Log(sm) binlog.Log(stream.Context(), sm)
} }
} }
if channelz.IsOn() { if channelz.IsOn() {
@ -1430,17 +1444,16 @@ func (s *Server) processUnaryRPC(t transport.ServerTransport, stream *transport.
// TODO: Should we be logging if writing status failed here, like above? // TODO: Should we be logging if writing status failed here, like above?
// Should the logging be in WriteStatus? Should we ignore the WriteStatus // Should the logging be in WriteStatus? Should we ignore the WriteStatus
// error or allow the stats handler to see it? // error or allow the stats handler to see it?
err = t.WriteStatus(stream, statusOK)
if len(binlogs) != 0 { if len(binlogs) != 0 {
st := &binarylog.ServerTrailer{ st := &binarylog.ServerTrailer{
Trailer: stream.Trailer(), Trailer: stream.Trailer(),
Err: appErr, Err: appErr,
} }
for _, binlog := range binlogs { for _, binlog := range binlogs {
binlog.Log(st) binlog.Log(stream.Context(), st)
} }
} }
return err return t.WriteStatus(stream, statusOK)
} }
// chainStreamServerInterceptors chains all stream server interceptors into one. // chainStreamServerInterceptors chains all stream server interceptors into one.
@ -1574,7 +1587,7 @@ func (s *Server) processStreamingRPC(t transport.ServerTransport, stream *transp
logEntry.PeerAddr = peer.Addr logEntry.PeerAddr = peer.Addr
} }
for _, binlog := range ss.binlogs { for _, binlog := range ss.binlogs {
binlog.Log(logEntry) binlog.Log(stream.Context(), logEntry)
} }
} }
@ -1597,12 +1610,18 @@ func (s *Server) processStreamingRPC(t transport.ServerTransport, stream *transp
// NOTE: this needs to be ahead of all handling, https://github.com/grpc/grpc-go/issues/686. // NOTE: this needs to be ahead of all handling, https://github.com/grpc/grpc-go/issues/686.
if s.opts.cp != nil { if s.opts.cp != nil {
ss.cp = s.opts.cp ss.cp = s.opts.cp
stream.SetSendCompress(s.opts.cp.Type()) ss.sendCompressorName = s.opts.cp.Type()
} else if rc := stream.RecvCompress(); rc != "" && rc != encoding.Identity { } else if rc := stream.RecvCompress(); rc != "" && rc != encoding.Identity {
// Legacy compressor not specified; attempt to respond with same encoding. // Legacy compressor not specified; attempt to respond with same encoding.
ss.comp = encoding.GetCompressor(rc) ss.comp = encoding.GetCompressor(rc)
if ss.comp != nil { if ss.comp != nil {
stream.SetSendCompress(rc) ss.sendCompressorName = rc
}
}
if ss.sendCompressorName != "" {
if err := stream.SetSendCompress(ss.sendCompressorName); err != nil {
return status.Errorf(codes.Internal, "grpc: failed to set send compressor: %v", err)
} }
} }
@ -1640,16 +1659,16 @@ func (s *Server) processStreamingRPC(t transport.ServerTransport, stream *transp
ss.trInfo.tr.SetError() ss.trInfo.tr.SetError()
ss.mu.Unlock() ss.mu.Unlock()
} }
t.WriteStatus(ss.s, appStatus)
if len(ss.binlogs) != 0 { if len(ss.binlogs) != 0 {
st := &binarylog.ServerTrailer{ st := &binarylog.ServerTrailer{
Trailer: ss.s.Trailer(), Trailer: ss.s.Trailer(),
Err: appErr, Err: appErr,
} }
for _, binlog := range ss.binlogs { for _, binlog := range ss.binlogs {
binlog.Log(st) binlog.Log(stream.Context(), st)
} }
} }
t.WriteStatus(ss.s, appStatus)
// TODO: Should we log an error from WriteStatus here and below? // TODO: Should we log an error from WriteStatus here and below?
return appErr return appErr
} }
@ -1658,17 +1677,16 @@ func (s *Server) processStreamingRPC(t transport.ServerTransport, stream *transp
ss.trInfo.tr.LazyLog(stringer("OK"), false) ss.trInfo.tr.LazyLog(stringer("OK"), false)
ss.mu.Unlock() ss.mu.Unlock()
} }
err = t.WriteStatus(ss.s, statusOK)
if len(ss.binlogs) != 0 { if len(ss.binlogs) != 0 {
st := &binarylog.ServerTrailer{ st := &binarylog.ServerTrailer{
Trailer: ss.s.Trailer(), Trailer: ss.s.Trailer(),
Err: appErr, Err: appErr,
} }
for _, binlog := range ss.binlogs { for _, binlog := range ss.binlogs {
binlog.Log(st) binlog.Log(stream.Context(), st)
} }
} }
return err return t.WriteStatus(ss.s, statusOK)
} }
func (s *Server) handleStream(t transport.ServerTransport, stream *transport.Stream, trInfo *traceInfo) { func (s *Server) handleStream(t transport.ServerTransport, stream *transport.Stream, trInfo *traceInfo) {
@ -1935,6 +1953,60 @@ func SendHeader(ctx context.Context, md metadata.MD) error {
return nil return nil
} }
// SetSendCompressor sets a compressor for outbound messages from the server.
// It must not be called after any event that causes headers to be sent
// (see ServerStream.SetHeader for the complete list). Provided compressor is
// used when below conditions are met:
//
// - compressor is registered via encoding.RegisterCompressor
// - compressor name must exist in the client advertised compressor names
// sent in grpc-accept-encoding header. Use ClientSupportedCompressors to
// get client supported compressor names.
//
// The context provided must be the context passed to the server's handler.
// It must be noted that compressor name encoding.Identity disables the
// outbound compression.
// By default, server messages will be sent using the same compressor with
// which request messages were sent.
//
// It is not safe to call SetSendCompressor concurrently with SendHeader and
// SendMsg.
//
// # Experimental
//
// Notice: This function is EXPERIMENTAL and may be changed or removed in a
// later release.
func SetSendCompressor(ctx context.Context, name string) error {
stream, ok := ServerTransportStreamFromContext(ctx).(*transport.Stream)
if !ok || stream == nil {
return fmt.Errorf("failed to fetch the stream from the given context")
}
if err := validateSendCompressor(name, stream.ClientAdvertisedCompressors()); err != nil {
return fmt.Errorf("unable to set send compressor: %w", err)
}
return stream.SetSendCompress(name)
}
// ClientSupportedCompressors returns compressor names advertised by the client
// via grpc-accept-encoding header.
//
// The context provided must be the context passed to the server's handler.
//
// # Experimental
//
// Notice: This function is EXPERIMENTAL and may be changed or removed in a
// later release.
func ClientSupportedCompressors(ctx context.Context) ([]string, error) {
stream, ok := ServerTransportStreamFromContext(ctx).(*transport.Stream)
if !ok || stream == nil {
return nil, fmt.Errorf("failed to fetch the stream from the given context %v", ctx)
}
return strings.Split(stream.ClientAdvertisedCompressors(), ","), nil
}
// SetTrailer sets the trailer metadata that will be sent when an RPC returns. // SetTrailer sets the trailer metadata that will be sent when an RPC returns.
// When called more than once, all the provided metadata will be merged. // When called more than once, all the provided metadata will be merged.
// //
@ -1969,3 +2041,22 @@ type channelzServer struct {
func (c *channelzServer) ChannelzMetric() *channelz.ServerInternalMetric { func (c *channelzServer) ChannelzMetric() *channelz.ServerInternalMetric {
return c.s.channelzMetric() return c.s.channelzMetric()
} }
// validateSendCompressor returns an error when given compressor name cannot be
// handled by the server or the client based on the advertised compressors.
func validateSendCompressor(name, clientCompressors string) error {
if name == encoding.Identity {
return nil
}
if !grpcutil.IsCompressorNameRegistered(name) {
return fmt.Errorf("compressor not registered %q", name)
}
for _, c := range strings.Split(clientCompressors, ",") {
if c == name {
return nil // found match
}
}
return fmt.Errorf("client does not support compressor %q", name)
}

View File

@ -67,10 +67,18 @@ type InPayload struct {
Payload interface{} Payload interface{}
// Data is the serialized message payload. // Data is the serialized message payload.
Data []byte Data []byte
// Length is the length of uncompressed data.
// Length is the size of the uncompressed payload data. Does not include any
// framing (gRPC or HTTP/2).
Length int Length int
// WireLength is the length of data on wire (compressed, signed, encrypted). // CompressedLength is the size of the compressed payload data. Does not
// include any framing (gRPC or HTTP/2). Same as Length if compression not
// enabled.
CompressedLength int
// WireLength is the size of the compressed payload data plus gRPC framing.
// Does not include HTTP/2 framing.
WireLength int WireLength int
// RecvTime is the time when the payload is received. // RecvTime is the time when the payload is received.
RecvTime time.Time RecvTime time.Time
} }
@ -129,9 +137,15 @@ type OutPayload struct {
Payload interface{} Payload interface{}
// Data is the serialized message payload. // Data is the serialized message payload.
Data []byte Data []byte
// Length is the length of uncompressed data. // Length is the size of the uncompressed payload data. Does not include any
// framing (gRPC or HTTP/2).
Length int Length int
// WireLength is the length of data on wire (compressed, signed, encrypted). // CompressedLength is the size of the compressed payload data. Does not
// include any framing (gRPC or HTTP/2). Same as Length if compression not
// enabled.
CompressedLength int
// WireLength is the size of the compressed payload data plus gRPC framing.
// Does not include HTTP/2 framing.
WireLength int WireLength int
// SentTime is the time when the payload is sent. // SentTime is the time when the payload is sent.
SentTime time.Time SentTime time.Time

View File

@ -168,10 +168,19 @@ func NewClientStream(ctx context.Context, desc *StreamDesc, cc *ClientConn, meth
} }
func newClientStream(ctx context.Context, desc *StreamDesc, cc *ClientConn, method string, opts ...CallOption) (_ ClientStream, err error) { func newClientStream(ctx context.Context, desc *StreamDesc, cc *ClientConn, method string, opts ...CallOption) (_ ClientStream, err error) {
if md, _, ok := metadata.FromOutgoingContextRaw(ctx); ok { if md, added, ok := metadata.FromOutgoingContextRaw(ctx); ok {
// validate md
if err := imetadata.Validate(md); err != nil { if err := imetadata.Validate(md); err != nil {
return nil, status.Error(codes.Internal, err.Error()) return nil, status.Error(codes.Internal, err.Error())
} }
// validate added
for _, kvs := range added {
for i := 0; i < len(kvs); i += 2 {
if err := imetadata.ValidatePair(kvs[i], kvs[i+1]); err != nil {
return nil, status.Error(codes.Internal, err.Error())
}
}
}
} }
if channelz.IsOn() { if channelz.IsOn() {
cc.incrCallsStarted() cc.incrCallsStarted()
@ -352,7 +361,7 @@ func newClientStreamWithParams(ctx context.Context, desc *StreamDesc, cc *Client
} }
} }
for _, binlog := range cs.binlogs { for _, binlog := range cs.binlogs {
binlog.Log(logEntry) binlog.Log(cs.ctx, logEntry)
} }
} }
@ -800,7 +809,7 @@ func (cs *clientStream) Header() (metadata.MD, error) {
} }
cs.serverHeaderBinlogged = true cs.serverHeaderBinlogged = true
for _, binlog := range cs.binlogs { for _, binlog := range cs.binlogs {
binlog.Log(logEntry) binlog.Log(cs.ctx, logEntry)
} }
} }
return m, nil return m, nil
@ -881,7 +890,7 @@ func (cs *clientStream) SendMsg(m interface{}) (err error) {
Message: data, Message: data,
} }
for _, binlog := range cs.binlogs { for _, binlog := range cs.binlogs {
binlog.Log(cm) binlog.Log(cs.ctx, cm)
} }
} }
return err return err
@ -905,7 +914,7 @@ func (cs *clientStream) RecvMsg(m interface{}) error {
Message: recvInfo.uncompressedBytes, Message: recvInfo.uncompressedBytes,
} }
for _, binlog := range cs.binlogs { for _, binlog := range cs.binlogs {
binlog.Log(sm) binlog.Log(cs.ctx, sm)
} }
} }
if err != nil || !cs.desc.ServerStreams { if err != nil || !cs.desc.ServerStreams {
@ -926,7 +935,7 @@ func (cs *clientStream) RecvMsg(m interface{}) error {
logEntry.PeerAddr = peer.Addr logEntry.PeerAddr = peer.Addr
} }
for _, binlog := range cs.binlogs { for _, binlog := range cs.binlogs {
binlog.Log(logEntry) binlog.Log(cs.ctx, logEntry)
} }
} }
} }
@ -953,7 +962,7 @@ func (cs *clientStream) CloseSend() error {
OnClientSide: true, OnClientSide: true,
} }
for _, binlog := range cs.binlogs { for _, binlog := range cs.binlogs {
binlog.Log(chc) binlog.Log(cs.ctx, chc)
} }
} }
// We never returned an error here for reasons. // We never returned an error here for reasons.
@ -971,6 +980,9 @@ func (cs *clientStream) finish(err error) {
return return
} }
cs.finished = true cs.finished = true
for _, onFinish := range cs.callInfo.onFinish {
onFinish(err)
}
cs.commitAttemptLocked() cs.commitAttemptLocked()
if cs.attempt != nil { if cs.attempt != nil {
cs.attempt.finish(err) cs.attempt.finish(err)
@ -992,7 +1004,7 @@ func (cs *clientStream) finish(err error) {
OnClientSide: true, OnClientSide: true,
} }
for _, binlog := range cs.binlogs { for _, binlog := range cs.binlogs {
binlog.Log(c) binlog.Log(cs.ctx, c)
} }
} }
if err == nil { if err == nil {
@ -1081,9 +1093,10 @@ func (a *csAttempt) recvMsg(m interface{}, payInfo *payloadInfo) (err error) {
RecvTime: time.Now(), RecvTime: time.Now(),
Payload: m, Payload: m,
// TODO truncate large payload. // TODO truncate large payload.
Data: payInfo.uncompressedBytes, Data: payInfo.uncompressedBytes,
WireLength: payInfo.wireLength + headerLen, WireLength: payInfo.compressedLength + headerLen,
Length: len(payInfo.uncompressedBytes), CompressedLength: payInfo.compressedLength,
Length: len(payInfo.uncompressedBytes),
}) })
} }
if channelz.IsOn() { if channelz.IsOn() {
@ -1511,6 +1524,8 @@ type serverStream struct {
comp encoding.Compressor comp encoding.Compressor
decomp encoding.Compressor decomp encoding.Compressor
sendCompressorName string
maxReceiveMessageSize int maxReceiveMessageSize int
maxSendMessageSize int maxSendMessageSize int
trInfo *traceInfo trInfo *traceInfo
@ -1558,7 +1573,7 @@ func (ss *serverStream) SendHeader(md metadata.MD) error {
} }
ss.serverHeaderBinlogged = true ss.serverHeaderBinlogged = true
for _, binlog := range ss.binlogs { for _, binlog := range ss.binlogs {
binlog.Log(sh) binlog.Log(ss.ctx, sh)
} }
} }
return err return err
@ -1603,6 +1618,13 @@ func (ss *serverStream) SendMsg(m interface{}) (err error) {
} }
}() }()
// Server handler could have set new compressor by calling SetSendCompressor.
// In case it is set, we need to use it for compressing outbound message.
if sendCompressorsName := ss.s.SendCompress(); sendCompressorsName != ss.sendCompressorName {
ss.comp = encoding.GetCompressor(sendCompressorsName)
ss.sendCompressorName = sendCompressorsName
}
// load hdr, payload, data // load hdr, payload, data
hdr, payload, data, err := prepareMsg(m, ss.codec, ss.cp, ss.comp) hdr, payload, data, err := prepareMsg(m, ss.codec, ss.cp, ss.comp)
if err != nil { if err != nil {
@ -1624,14 +1646,14 @@ func (ss *serverStream) SendMsg(m interface{}) (err error) {
} }
ss.serverHeaderBinlogged = true ss.serverHeaderBinlogged = true
for _, binlog := range ss.binlogs { for _, binlog := range ss.binlogs {
binlog.Log(sh) binlog.Log(ss.ctx, sh)
} }
} }
sm := &binarylog.ServerMessage{ sm := &binarylog.ServerMessage{
Message: data, Message: data,
} }
for _, binlog := range ss.binlogs { for _, binlog := range ss.binlogs {
binlog.Log(sm) binlog.Log(ss.ctx, sm)
} }
} }
if len(ss.statsHandler) != 0 { if len(ss.statsHandler) != 0 {
@ -1679,7 +1701,7 @@ func (ss *serverStream) RecvMsg(m interface{}) (err error) {
if len(ss.binlogs) != 0 { if len(ss.binlogs) != 0 {
chc := &binarylog.ClientHalfClose{} chc := &binarylog.ClientHalfClose{}
for _, binlog := range ss.binlogs { for _, binlog := range ss.binlogs {
binlog.Log(chc) binlog.Log(ss.ctx, chc)
} }
} }
return err return err
@ -1695,9 +1717,10 @@ func (ss *serverStream) RecvMsg(m interface{}) (err error) {
RecvTime: time.Now(), RecvTime: time.Now(),
Payload: m, Payload: m,
// TODO truncate large payload. // TODO truncate large payload.
Data: payInfo.uncompressedBytes, Data: payInfo.uncompressedBytes,
WireLength: payInfo.wireLength + headerLen, Length: len(payInfo.uncompressedBytes),
Length: len(payInfo.uncompressedBytes), WireLength: payInfo.compressedLength + headerLen,
CompressedLength: payInfo.compressedLength,
}) })
} }
} }
@ -1706,7 +1729,7 @@ func (ss *serverStream) RecvMsg(m interface{}) (err error) {
Message: payInfo.uncompressedBytes, Message: payInfo.uncompressedBytes,
} }
for _, binlog := range ss.binlogs { for _, binlog := range ss.binlogs {
binlog.Log(cm) binlog.Log(ss.ctx, cm)
} }
} }
return nil return nil

View File

@ -19,4 +19,4 @@
package grpc package grpc
// Version is the current grpc version. // Version is the current grpc version.
const Version = "1.53.0" const Version = "1.54.0"

15
vendor/google.golang.org/grpc/vet.sh generated vendored
View File

@ -41,16 +41,8 @@ if [[ "$1" = "-install" ]]; then
github.com/client9/misspell/cmd/misspell github.com/client9/misspell/cmd/misspell
popd popd
if [[ -z "${VET_SKIP_PROTO}" ]]; then if [[ -z "${VET_SKIP_PROTO}" ]]; then
if [[ "${TRAVIS}" = "true" ]]; then if [[ "${GITHUB_ACTIONS}" = "true" ]]; then
PROTOBUF_VERSION=3.14.0 PROTOBUF_VERSION=22.0 # a.k.a v4.22.0 in pb.go files.
PROTOC_FILENAME=protoc-${PROTOBUF_VERSION}-linux-x86_64.zip
pushd /home/travis
wget https://github.com/google/protobuf/releases/download/v${PROTOBUF_VERSION}/${PROTOC_FILENAME}
unzip ${PROTOC_FILENAME}
bin/protoc --version
popd
elif [[ "${GITHUB_ACTIONS}" = "true" ]]; then
PROTOBUF_VERSION=3.14.0
PROTOC_FILENAME=protoc-${PROTOBUF_VERSION}-linux-x86_64.zip PROTOC_FILENAME=protoc-${PROTOBUF_VERSION}-linux-x86_64.zip
pushd /home/runner/go pushd /home/runner/go
wget https://github.com/google/protobuf/releases/download/v${PROTOBUF_VERSION}/${PROTOC_FILENAME} wget https://github.com/google/protobuf/releases/download/v${PROTOBUF_VERSION}/${PROTOC_FILENAME}
@ -68,8 +60,7 @@ fi
# - Check that generated proto files are up to date. # - Check that generated proto files are up to date.
if [[ -z "${VET_SKIP_PROTO}" ]]; then if [[ -z "${VET_SKIP_PROTO}" ]]; then
PATH="/home/travis/bin:${PATH}" make proto && \ make proto && git status --porcelain 2>&1 | fail_on_output || \
git status --porcelain 2>&1 | fail_on_output || \
(git status; git --no-pager diff; exit 1) (git status; git --no-pager diff; exit 1)
fi fi

2
vendor/modules.txt vendored
View File

@ -634,7 +634,7 @@ google.golang.org/genproto/googleapis/api/httpbody
google.golang.org/genproto/googleapis/rpc/errdetails google.golang.org/genproto/googleapis/rpc/errdetails
google.golang.org/genproto/googleapis/rpc/status google.golang.org/genproto/googleapis/rpc/status
google.golang.org/genproto/protobuf/field_mask google.golang.org/genproto/protobuf/field_mask
# google.golang.org/grpc v1.53.0 # google.golang.org/grpc v1.54.0
## explicit; go 1.17 ## explicit; go 1.17
google.golang.org/grpc google.golang.org/grpc
google.golang.org/grpc/attributes google.golang.org/grpc/attributes