rebase: Bump google.golang.org/grpc from 1.57.0 to 1.58.0

Bumps [google.golang.org/grpc](https://github.com/grpc/grpc-go) from 1.57.0 to 1.58.0.
- [Release notes](https://github.com/grpc/grpc-go/releases)
- [Commits](https://github.com/grpc/grpc-go/compare/v1.57.0...v1.58.0)

---
updated-dependencies:
- dependency-name: google.golang.org/grpc
  dependency-type: direct:production
  update-type: version-update:semver-minor
...

Signed-off-by: dependabot[bot] <support@github.com>
This commit is contained in:
dependabot[bot]
2023-09-11 20:46:57 +00:00
committed by mergify[bot]
parent 28d2a865fa
commit 5090c4171b
64 changed files with 998 additions and 826 deletions

View File

@ -200,8 +200,8 @@ func (gsb *Balancer) ExitIdle() {
}
}
// UpdateSubConnState forwards the update to the appropriate child.
func (gsb *Balancer) UpdateSubConnState(sc balancer.SubConn, state balancer.SubConnState) {
// updateSubConnState forwards the update to the appropriate child.
func (gsb *Balancer) updateSubConnState(sc balancer.SubConn, state balancer.SubConnState, cb func(balancer.SubConnState)) {
gsb.currentMu.Lock()
defer gsb.currentMu.Unlock()
gsb.mu.Lock()
@ -214,13 +214,26 @@ func (gsb *Balancer) UpdateSubConnState(sc balancer.SubConn, state balancer.SubC
} else if gsb.balancerPending != nil && gsb.balancerPending.subconns[sc] {
balToUpdate = gsb.balancerPending
}
gsb.mu.Unlock()
if balToUpdate == nil {
// SubConn belonged to a stale lb policy that has not yet fully closed,
// or the balancer was already closed.
gsb.mu.Unlock()
return
}
balToUpdate.UpdateSubConnState(sc, state)
if state.ConnectivityState == connectivity.Shutdown {
delete(balToUpdate.subconns, sc)
}
gsb.mu.Unlock()
if cb != nil {
cb(state)
} else {
balToUpdate.UpdateSubConnState(sc, state)
}
}
// UpdateSubConnState forwards the update to the appropriate child.
func (gsb *Balancer) UpdateSubConnState(sc balancer.SubConn, state balancer.SubConnState) {
gsb.updateSubConnState(sc, state, nil)
}
// Close closes any active child balancers.
@ -242,7 +255,7 @@ func (gsb *Balancer) Close() {
//
// It implements the balancer.ClientConn interface and is passed down in that
// capacity to the wrapped balancer. It maintains a set of subConns created by
// the wrapped balancer and calls from the latter to create/update/remove
// the wrapped balancer and calls from the latter to create/update/shutdown
// SubConns update this set before being forwarded to the parent ClientConn.
// State updates from the wrapped balancer can result in invocation of the
// graceful switch logic.
@ -254,21 +267,10 @@ type balancerWrapper struct {
subconns map[balancer.SubConn]bool // subconns created by this balancer
}
func (bw *balancerWrapper) UpdateSubConnState(sc balancer.SubConn, state balancer.SubConnState) {
if state.ConnectivityState == connectivity.Shutdown {
bw.gsb.mu.Lock()
delete(bw.subconns, sc)
bw.gsb.mu.Unlock()
}
// There is no need to protect this read with a mutex, as the write to the
// Balancer field happens in SwitchTo, which completes before this can be
// called.
bw.Balancer.UpdateSubConnState(sc, state)
}
// Close closes the underlying LB policy and removes the subconns it created. bw
// must not be referenced via balancerCurrent or balancerPending in gsb when
// called. gsb.mu must not be held. Does not panic with a nil receiver.
// Close closes the underlying LB policy and shuts down the subconns it
// created. bw must not be referenced via balancerCurrent or balancerPending in
// gsb when called. gsb.mu must not be held. Does not panic with a nil
// receiver.
func (bw *balancerWrapper) Close() {
// before Close is called.
if bw == nil {
@ -281,7 +283,7 @@ func (bw *balancerWrapper) Close() {
bw.Balancer.Close()
bw.gsb.mu.Lock()
for sc := range bw.subconns {
bw.gsb.cc.RemoveSubConn(sc)
sc.Shutdown()
}
bw.gsb.mu.Unlock()
}
@ -335,13 +337,16 @@ func (bw *balancerWrapper) NewSubConn(addrs []resolver.Address, opts balancer.Ne
}
bw.gsb.mu.Unlock()
var sc balancer.SubConn
oldListener := opts.StateListener
opts.StateListener = func(state balancer.SubConnState) { bw.gsb.updateSubConnState(sc, state, oldListener) }
sc, err := bw.gsb.cc.NewSubConn(addrs, opts)
if err != nil {
return nil, err
}
bw.gsb.mu.Lock()
if !bw.gsb.balancerCurrentOrPending(bw) { // balancer was closed during this call
bw.gsb.cc.RemoveSubConn(sc)
sc.Shutdown()
bw.gsb.mu.Unlock()
return nil, fmt.Errorf("%T at address %p that called NewSubConn is deleted", bw, bw)
}
@ -360,13 +365,9 @@ func (bw *balancerWrapper) ResolveNow(opts resolver.ResolveNowOptions) {
}
func (bw *balancerWrapper) RemoveSubConn(sc balancer.SubConn) {
bw.gsb.mu.Lock()
if !bw.gsb.balancerCurrentOrPending(bw) {
bw.gsb.mu.Unlock()
return
}
bw.gsb.mu.Unlock()
bw.gsb.cc.RemoveSubConn(sc)
// Note: existing third party balancers may call this, so it must remain
// until RemoveSubConn is fully removed.
sc.Shutdown()
}
func (bw *balancerWrapper) UpdateAddresses(sc balancer.SubConn, addrs []resolver.Address) {

View File

@ -25,7 +25,7 @@ import (
// Parser converts loads from metadata into a concrete type.
type Parser interface {
// Parse parses loads from metadata.
Parse(md metadata.MD) interface{}
Parse(md metadata.MD) any
}
var parser Parser
@ -38,7 +38,7 @@ func SetParser(lr Parser) {
}
// Parse calls parser.Read().
func Parse(md metadata.MD) interface{} {
func Parse(md metadata.MD) any {
if parser == nil {
return nil
}

View File

@ -230,7 +230,7 @@ type ClientMessage struct {
OnClientSide bool
// Message can be a proto.Message or []byte. Other messages formats are not
// supported.
Message interface{}
Message any
}
func (c *ClientMessage) toProto() *binlogpb.GrpcLogEntry {
@ -270,7 +270,7 @@ type ServerMessage struct {
OnClientSide bool
// Message can be a proto.Message or []byte. Other messages formats are not
// supported.
Message interface{}
Message any
}
func (c *ServerMessage) toProto() *binlogpb.GrpcLogEntry {

View File

@ -28,25 +28,25 @@ import "sync"
// the underlying mutex used for synchronization.
//
// Unbounded supports values of any type to be stored in it by using a channel
// of `interface{}`. This means that a call to Put() incurs an extra memory
// allocation, and also that users need a type assertion while reading. For
// performance critical code paths, using Unbounded is strongly discouraged and
// defining a new type specific implementation of this buffer is preferred. See
// of `any`. This means that a call to Put() incurs an extra memory allocation,
// and also that users need a type assertion while reading. For performance
// critical code paths, using Unbounded is strongly discouraged and defining a
// new type specific implementation of this buffer is preferred. See
// internal/transport/transport.go for an example of this.
type Unbounded struct {
c chan interface{}
c chan any
closed bool
mu sync.Mutex
backlog []interface{}
backlog []any
}
// NewUnbounded returns a new instance of Unbounded.
func NewUnbounded() *Unbounded {
return &Unbounded{c: make(chan interface{}, 1)}
return &Unbounded{c: make(chan any, 1)}
}
// Put adds t to the unbounded buffer.
func (b *Unbounded) Put(t interface{}) {
func (b *Unbounded) Put(t any) {
b.mu.Lock()
defer b.mu.Unlock()
if b.closed {
@ -89,7 +89,7 @@ func (b *Unbounded) Load() {
//
// If the unbounded buffer is closed, the read channel returned by this method
// is closed.
func (b *Unbounded) Get() <-chan interface{} {
func (b *Unbounded) Get() <-chan any {
return b.c
}

View File

@ -24,9 +24,7 @@
package channelz
import (
"context"
"errors"
"fmt"
"sort"
"sync"
"sync/atomic"
@ -40,8 +38,11 @@ const (
)
var (
db dbWrapper
idGen idGenerator
// IDGen is the global channelz entity ID generator. It should not be used
// outside this package except by tests.
IDGen IDGenerator
db dbWrapper
// EntryPerPage defines the number of channelz entries to be shown on a web page.
EntryPerPage = int64(50)
curState int32
@ -52,14 +53,14 @@ var (
func TurnOn() {
if !IsOn() {
db.set(newChannelMap())
idGen.reset()
IDGen.Reset()
atomic.StoreInt32(&curState, 1)
}
}
// IsOn returns whether channelz data collection is on.
func IsOn() bool {
return atomic.CompareAndSwapInt32(&curState, 1, 1)
return atomic.LoadInt32(&curState) == 1
}
// SetMaxTraceEntry sets maximum number of trace entry per entity (i.e. channel/subchannel).
@ -97,43 +98,6 @@ func (d *dbWrapper) get() *channelMap {
return d.DB
}
// NewChannelzStorageForTesting initializes channelz data storage and id
// generator for testing purposes.
//
// Returns a cleanup function to be invoked by the test, which waits for up to
// 10s for all channelz state to be reset by the grpc goroutines when those
// entities get closed. This cleanup function helps with ensuring that tests
// don't mess up each other.
func NewChannelzStorageForTesting() (cleanup func() error) {
db.set(newChannelMap())
idGen.reset()
return func() error {
cm := db.get()
if cm == nil {
return nil
}
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
ticker := time.NewTicker(10 * time.Millisecond)
defer ticker.Stop()
for {
cm.mu.RLock()
topLevelChannels, servers, channels, subChannels, listenSockets, normalSockets := len(cm.topLevelChannels), len(cm.servers), len(cm.channels), len(cm.subChannels), len(cm.listenSockets), len(cm.normalSockets)
cm.mu.RUnlock()
if err := ctx.Err(); err != nil {
return fmt.Errorf("after 10s the channelz map has not been cleaned up yet, topchannels: %d, servers: %d, channels: %d, subchannels: %d, listen sockets: %d, normal sockets: %d", topLevelChannels, servers, channels, subChannels, listenSockets, normalSockets)
}
if topLevelChannels == 0 && servers == 0 && channels == 0 && subChannels == 0 && listenSockets == 0 && normalSockets == 0 {
return nil
}
<-ticker.C
}
}
}
// GetTopChannels returns a slice of top channel's ChannelMetric, along with a
// boolean indicating whether there's more top channels to be queried for.
//
@ -193,7 +157,7 @@ func GetServer(id int64) *ServerMetric {
//
// If channelz is not turned ON, the channelz database is not mutated.
func RegisterChannel(c Channel, pid *Identifier, ref string) *Identifier {
id := idGen.genID()
id := IDGen.genID()
var parent int64
isTopChannel := true
if pid != nil {
@ -229,7 +193,7 @@ func RegisterSubChannel(c Channel, pid *Identifier, ref string) (*Identifier, er
if pid == nil {
return nil, errors.New("a SubChannel's parent id cannot be nil")
}
id := idGen.genID()
id := IDGen.genID()
if !IsOn() {
return newIdentifer(RefSubChannel, id, pid), nil
}
@ -251,7 +215,7 @@ func RegisterSubChannel(c Channel, pid *Identifier, ref string) (*Identifier, er
//
// If channelz is not turned ON, the channelz database is not mutated.
func RegisterServer(s Server, ref string) *Identifier {
id := idGen.genID()
id := IDGen.genID()
if !IsOn() {
return newIdentifer(RefServer, id, nil)
}
@ -277,7 +241,7 @@ func RegisterListenSocket(s Socket, pid *Identifier, ref string) (*Identifier, e
if pid == nil {
return nil, errors.New("a ListenSocket's parent id cannot be 0")
}
id := idGen.genID()
id := IDGen.genID()
if !IsOn() {
return newIdentifer(RefListenSocket, id, pid), nil
}
@ -297,7 +261,7 @@ func RegisterNormalSocket(s Socket, pid *Identifier, ref string) (*Identifier, e
if pid == nil {
return nil, errors.New("a NormalSocket's parent id cannot be 0")
}
id := idGen.genID()
id := IDGen.genID()
if !IsOn() {
return newIdentifer(RefNormalSocket, id, pid), nil
}
@ -776,14 +740,17 @@ func (c *channelMap) GetServer(id int64) *ServerMetric {
return sm
}
type idGenerator struct {
// IDGenerator is an incrementing atomic that tracks IDs for channelz entities.
type IDGenerator struct {
id int64
}
func (i *idGenerator) reset() {
// Reset resets the generated ID back to zero. Should only be used at
// initialization or by tests sensitive to the ID number.
func (i *IDGenerator) Reset() {
atomic.StoreInt64(&i.id, 0)
}
func (i *idGenerator) genID() int64 {
func (i *IDGenerator) genID() int64 {
return atomic.AddInt64(&i.id, 1)
}

View File

@ -31,7 +31,7 @@ func withParens(id *Identifier) string {
}
// Info logs and adds a trace event if channelz is on.
func Info(l grpclog.DepthLoggerV2, id *Identifier, args ...interface{}) {
func Info(l grpclog.DepthLoggerV2, id *Identifier, args ...any) {
AddTraceEvent(l, id, 1, &TraceEventDesc{
Desc: fmt.Sprint(args...),
Severity: CtInfo,
@ -39,7 +39,7 @@ func Info(l grpclog.DepthLoggerV2, id *Identifier, args ...interface{}) {
}
// Infof logs and adds a trace event if channelz is on.
func Infof(l grpclog.DepthLoggerV2, id *Identifier, format string, args ...interface{}) {
func Infof(l grpclog.DepthLoggerV2, id *Identifier, format string, args ...any) {
AddTraceEvent(l, id, 1, &TraceEventDesc{
Desc: fmt.Sprintf(format, args...),
Severity: CtInfo,
@ -47,7 +47,7 @@ func Infof(l grpclog.DepthLoggerV2, id *Identifier, format string, args ...inter
}
// Warning logs and adds a trace event if channelz is on.
func Warning(l grpclog.DepthLoggerV2, id *Identifier, args ...interface{}) {
func Warning(l grpclog.DepthLoggerV2, id *Identifier, args ...any) {
AddTraceEvent(l, id, 1, &TraceEventDesc{
Desc: fmt.Sprint(args...),
Severity: CtWarning,
@ -55,7 +55,7 @@ func Warning(l grpclog.DepthLoggerV2, id *Identifier, args ...interface{}) {
}
// Warningf logs and adds a trace event if channelz is on.
func Warningf(l grpclog.DepthLoggerV2, id *Identifier, format string, args ...interface{}) {
func Warningf(l grpclog.DepthLoggerV2, id *Identifier, format string, args ...any) {
AddTraceEvent(l, id, 1, &TraceEventDesc{
Desc: fmt.Sprintf(format, args...),
Severity: CtWarning,
@ -63,7 +63,7 @@ func Warningf(l grpclog.DepthLoggerV2, id *Identifier, format string, args ...in
}
// Error logs and adds a trace event if channelz is on.
func Error(l grpclog.DepthLoggerV2, id *Identifier, args ...interface{}) {
func Error(l grpclog.DepthLoggerV2, id *Identifier, args ...any) {
AddTraceEvent(l, id, 1, &TraceEventDesc{
Desc: fmt.Sprint(args...),
Severity: CtError,
@ -71,7 +71,7 @@ func Error(l grpclog.DepthLoggerV2, id *Identifier, args ...interface{}) {
}
// Errorf logs and adds a trace event if channelz is on.
func Errorf(l grpclog.DepthLoggerV2, id *Identifier, format string, args ...interface{}) {
func Errorf(l grpclog.DepthLoggerV2, id *Identifier, format string, args ...any) {
AddTraceEvent(l, id, 1, &TraceEventDesc{
Desc: fmt.Sprintf(format, args...),
Severity: CtError,

View File

@ -628,6 +628,7 @@ type tracedChannel interface {
type channelTrace struct {
cm *channelMap
clearCalled bool
createdTime time.Time
eventCount int64
mu sync.Mutex
@ -656,6 +657,10 @@ func (c *channelTrace) append(e *TraceEvent) {
}
func (c *channelTrace) clear() {
if c.clearCalled {
return
}
c.clearCalled = true
c.mu.Lock()
for _, e := range c.events {
if e.RefID != 0 {

View File

@ -23,7 +23,7 @@ import (
)
// GetSocketOption gets the socket option info of the conn.
func GetSocketOption(socket interface{}) *SocketOptionData {
func GetSocketOption(socket any) *SocketOptionData {
c, ok := socket.(syscall.Conn)
if !ok {
return nil

View File

@ -22,6 +22,6 @@
package channelz
// GetSocketOption gets the socket option info of the conn.
func GetSocketOption(c interface{}) *SocketOptionData {
func GetSocketOption(c any) *SocketOptionData {
return nil
}

View File

@ -25,12 +25,12 @@ import (
type requestInfoKey struct{}
// NewRequestInfoContext creates a context with ri.
func NewRequestInfoContext(ctx context.Context, ri interface{}) context.Context {
func NewRequestInfoContext(ctx context.Context, ri any) context.Context {
return context.WithValue(ctx, requestInfoKey{}, ri)
}
// RequestInfoFromContext extracts the RequestInfo from ctx.
func RequestInfoFromContext(ctx context.Context) interface{} {
func RequestInfoFromContext(ctx context.Context) any {
return ctx.Value(requestInfoKey{})
}
@ -39,11 +39,11 @@ func RequestInfoFromContext(ctx context.Context) interface{} {
type clientHandshakeInfoKey struct{}
// ClientHandshakeInfoFromContext extracts the ClientHandshakeInfo from ctx.
func ClientHandshakeInfoFromContext(ctx context.Context) interface{} {
func ClientHandshakeInfoFromContext(ctx context.Context) any {
return ctx.Value(clientHandshakeInfoKey{})
}
// NewClientHandshakeInfoContext creates a context with chi.
func NewClientHandshakeInfoContext(ctx context.Context, chi interface{}) context.Context {
func NewClientHandshakeInfoContext(ctx context.Context, chi any) context.Context {
return context.WithValue(ctx, clientHandshakeInfoKey{}, chi)
}

View File

@ -37,9 +37,12 @@ var (
// checking which NACKs configs specifying ring sizes > 8*1024*1024 (~8M).
RingHashCap = uint64FromEnv("GRPC_RING_HASH_CAP", 4096, 1, 8*1024*1024)
// PickFirstLBConfig is set if we should support configuration of the
// pick_first LB policy, which can be enabled by setting the environment
// variable "GRPC_EXPERIMENTAL_PICKFIRST_LB_CONFIG" to "true".
PickFirstLBConfig = boolFromEnv("GRPC_EXPERIMENTAL_PICKFIRST_LB_CONFIG", false)
// pick_first LB policy.
PickFirstLBConfig = boolFromEnv("GRPC_EXPERIMENTAL_PICKFIRST_LB_CONFIG", true)
// LeastRequestLB is set if we should support the least_request_experimental
// LB policy, which can be enabled by setting the environment variable
// "GRPC_EXPERIMENTAL_ENABLE_LEAST_REQUEST" to "true".
LeastRequestLB = boolFromEnv("GRPC_EXPERIMENTAL_ENABLE_LEAST_REQUEST", false)
// ALTSMaxConcurrentHandshakes is the maximum number of concurrent ALTS
// handshakes that can be performed.
ALTSMaxConcurrentHandshakes = uint64FromEnv("GRPC_ALTS_MAX_CONCURRENT_HANDSHAKES", 100, 1, 100)

View File

@ -30,7 +30,7 @@ var Logger LoggerV2
var DepthLogger DepthLoggerV2
// InfoDepth logs to the INFO log at the specified depth.
func InfoDepth(depth int, args ...interface{}) {
func InfoDepth(depth int, args ...any) {
if DepthLogger != nil {
DepthLogger.InfoDepth(depth, args...)
} else {
@ -39,7 +39,7 @@ func InfoDepth(depth int, args ...interface{}) {
}
// WarningDepth logs to the WARNING log at the specified depth.
func WarningDepth(depth int, args ...interface{}) {
func WarningDepth(depth int, args ...any) {
if DepthLogger != nil {
DepthLogger.WarningDepth(depth, args...)
} else {
@ -48,7 +48,7 @@ func WarningDepth(depth int, args ...interface{}) {
}
// ErrorDepth logs to the ERROR log at the specified depth.
func ErrorDepth(depth int, args ...interface{}) {
func ErrorDepth(depth int, args ...any) {
if DepthLogger != nil {
DepthLogger.ErrorDepth(depth, args...)
} else {
@ -57,7 +57,7 @@ func ErrorDepth(depth int, args ...interface{}) {
}
// FatalDepth logs to the FATAL log at the specified depth.
func FatalDepth(depth int, args ...interface{}) {
func FatalDepth(depth int, args ...any) {
if DepthLogger != nil {
DepthLogger.FatalDepth(depth, args...)
} else {
@ -71,35 +71,35 @@ func FatalDepth(depth int, args ...interface{}) {
// is defined here to avoid a circular dependency.
type LoggerV2 interface {
// Info logs to INFO log. Arguments are handled in the manner of fmt.Print.
Info(args ...interface{})
Info(args ...any)
// Infoln logs to INFO log. Arguments are handled in the manner of fmt.Println.
Infoln(args ...interface{})
Infoln(args ...any)
// Infof logs to INFO log. Arguments are handled in the manner of fmt.Printf.
Infof(format string, args ...interface{})
Infof(format string, args ...any)
// Warning logs to WARNING log. Arguments are handled in the manner of fmt.Print.
Warning(args ...interface{})
Warning(args ...any)
// Warningln logs to WARNING log. Arguments are handled in the manner of fmt.Println.
Warningln(args ...interface{})
Warningln(args ...any)
// Warningf logs to WARNING log. Arguments are handled in the manner of fmt.Printf.
Warningf(format string, args ...interface{})
Warningf(format string, args ...any)
// Error logs to ERROR log. Arguments are handled in the manner of fmt.Print.
Error(args ...interface{})
Error(args ...any)
// Errorln logs to ERROR log. Arguments are handled in the manner of fmt.Println.
Errorln(args ...interface{})
Errorln(args ...any)
// Errorf logs to ERROR log. Arguments are handled in the manner of fmt.Printf.
Errorf(format string, args ...interface{})
Errorf(format string, args ...any)
// Fatal logs to ERROR log. Arguments are handled in the manner of fmt.Print.
// gRPC ensures that all Fatal logs will exit with os.Exit(1).
// Implementations may also call os.Exit() with a non-zero exit code.
Fatal(args ...interface{})
Fatal(args ...any)
// Fatalln logs to ERROR log. Arguments are handled in the manner of fmt.Println.
// gRPC ensures that all Fatal logs will exit with os.Exit(1).
// Implementations may also call os.Exit() with a non-zero exit code.
Fatalln(args ...interface{})
Fatalln(args ...any)
// Fatalf logs to ERROR log. Arguments are handled in the manner of fmt.Printf.
// gRPC ensures that all Fatal logs will exit with os.Exit(1).
// Implementations may also call os.Exit() with a non-zero exit code.
Fatalf(format string, args ...interface{})
Fatalf(format string, args ...any)
// V reports whether verbosity level l is at least the requested verbose level.
V(l int) bool
}
@ -116,11 +116,11 @@ type LoggerV2 interface {
// later release.
type DepthLoggerV2 interface {
// InfoDepth logs to INFO log at the specified depth. Arguments are handled in the manner of fmt.Println.
InfoDepth(depth int, args ...interface{})
InfoDepth(depth int, args ...any)
// WarningDepth logs to WARNING log at the specified depth. Arguments are handled in the manner of fmt.Println.
WarningDepth(depth int, args ...interface{})
WarningDepth(depth int, args ...any)
// ErrorDepth logs to ERROR log at the specified depth. Arguments are handled in the manner of fmt.Println.
ErrorDepth(depth int, args ...interface{})
ErrorDepth(depth int, args ...any)
// FatalDepth logs to FATAL log at the specified depth. Arguments are handled in the manner of fmt.Println.
FatalDepth(depth int, args ...interface{})
FatalDepth(depth int, args ...any)
}

View File

@ -31,7 +31,7 @@ type PrefixLogger struct {
}
// Infof does info logging.
func (pl *PrefixLogger) Infof(format string, args ...interface{}) {
func (pl *PrefixLogger) Infof(format string, args ...any) {
if pl != nil {
// Handle nil, so the tests can pass in a nil logger.
format = pl.prefix + format
@ -42,7 +42,7 @@ func (pl *PrefixLogger) Infof(format string, args ...interface{}) {
}
// Warningf does warning logging.
func (pl *PrefixLogger) Warningf(format string, args ...interface{}) {
func (pl *PrefixLogger) Warningf(format string, args ...any) {
if pl != nil {
format = pl.prefix + format
pl.logger.WarningDepth(1, fmt.Sprintf(format, args...))
@ -52,7 +52,7 @@ func (pl *PrefixLogger) Warningf(format string, args ...interface{}) {
}
// Errorf does error logging.
func (pl *PrefixLogger) Errorf(format string, args ...interface{}) {
func (pl *PrefixLogger) Errorf(format string, args ...any) {
if pl != nil {
format = pl.prefix + format
pl.logger.ErrorDepth(1, fmt.Sprintf(format, args...))
@ -62,7 +62,7 @@ func (pl *PrefixLogger) Errorf(format string, args ...interface{}) {
}
// Debugf does info logging at verbose level 2.
func (pl *PrefixLogger) Debugf(format string, args ...interface{}) {
func (pl *PrefixLogger) Debugf(format string, args ...any) {
// TODO(6044): Refactor interfaces LoggerV2 and DepthLogger, and maybe
// rewrite PrefixLogger a little to ensure that we don't use the global
// `Logger` here, and instead use the `logger` field.

View File

@ -32,10 +32,10 @@ import (
//
// This type is safe for concurrent access.
type CallbackSerializer struct {
// Done is closed once the serializer is shut down completely, i.e all
// done is closed once the serializer is shut down completely, i.e all
// scheduled callbacks are executed and the serializer has deallocated all
// its resources.
Done chan struct{}
done chan struct{}
callbacks *buffer.Unbounded
closedMu sync.Mutex
@ -48,12 +48,12 @@ type CallbackSerializer struct {
// callbacks will be added once this context is canceled, and any pending un-run
// callbacks will be executed before the serializer is shut down.
func NewCallbackSerializer(ctx context.Context) *CallbackSerializer {
t := &CallbackSerializer{
Done: make(chan struct{}),
cs := &CallbackSerializer{
done: make(chan struct{}),
callbacks: buffer.NewUnbounded(),
}
go t.run(ctx)
return t
go cs.run(ctx)
return cs
}
// Schedule adds a callback to be scheduled after existing callbacks are run.
@ -64,56 +64,62 @@ func NewCallbackSerializer(ctx context.Context) *CallbackSerializer {
// Return value indicates if the callback was successfully added to the list of
// callbacks to be executed by the serializer. It is not possible to add
// callbacks once the context passed to NewCallbackSerializer is cancelled.
func (t *CallbackSerializer) Schedule(f func(ctx context.Context)) bool {
t.closedMu.Lock()
defer t.closedMu.Unlock()
func (cs *CallbackSerializer) Schedule(f func(ctx context.Context)) bool {
cs.closedMu.Lock()
defer cs.closedMu.Unlock()
if t.closed {
if cs.closed {
return false
}
t.callbacks.Put(f)
cs.callbacks.Put(f)
return true
}
func (t *CallbackSerializer) run(ctx context.Context) {
func (cs *CallbackSerializer) run(ctx context.Context) {
var backlog []func(context.Context)
defer close(t.Done)
defer close(cs.done)
for ctx.Err() == nil {
select {
case <-ctx.Done():
// Do nothing here. Next iteration of the for loop will not happen,
// since ctx.Err() would be non-nil.
case callback, ok := <-t.callbacks.Get():
case callback, ok := <-cs.callbacks.Get():
if !ok {
return
}
t.callbacks.Load()
cs.callbacks.Load()
callback.(func(ctx context.Context))(ctx)
}
}
// Fetch pending callbacks if any, and execute them before returning from
// this method and closing t.Done.
t.closedMu.Lock()
t.closed = true
backlog = t.fetchPendingCallbacks()
t.callbacks.Close()
t.closedMu.Unlock()
// this method and closing cs.done.
cs.closedMu.Lock()
cs.closed = true
backlog = cs.fetchPendingCallbacks()
cs.callbacks.Close()
cs.closedMu.Unlock()
for _, b := range backlog {
b(ctx)
}
}
func (t *CallbackSerializer) fetchPendingCallbacks() []func(context.Context) {
func (cs *CallbackSerializer) fetchPendingCallbacks() []func(context.Context) {
var backlog []func(context.Context)
for {
select {
case b := <-t.callbacks.Get():
case b := <-cs.callbacks.Get():
backlog = append(backlog, b.(func(context.Context)))
t.callbacks.Load()
cs.callbacks.Load()
default:
return backlog
}
}
}
// Done returns a channel that is closed after the context passed to
// NewCallbackSerializer is canceled and all callbacks have been executed.
func (cs *CallbackSerializer) Done() <-chan struct{} {
return cs.done
}

View File

@ -29,7 +29,7 @@ import (
type Subscriber interface {
// OnMessage is invoked when a new message is published. Implementations
// must not block in this method.
OnMessage(msg interface{})
OnMessage(msg any)
}
// PubSub is a simple one-to-many publish-subscribe system that supports
@ -40,25 +40,23 @@ type Subscriber interface {
// subscribers interested in receiving these messages register a callback
// via the Subscribe() method.
//
// Once a PubSub is stopped, no more messages can be published, and
// it is guaranteed that no more subscriber callback will be invoked.
// Once a PubSub is stopped, no more messages can be published, but any pending
// published messages will be delivered to the subscribers. Done may be used
// to determine when all published messages have been delivered.
type PubSub struct {
cs *CallbackSerializer
cancel context.CancelFunc
cs *CallbackSerializer
// Access to the below fields are guarded by this mutex.
mu sync.Mutex
msg interface{}
msg any
subscribers map[Subscriber]bool
stopped bool
}
// NewPubSub returns a new PubSub instance.
func NewPubSub() *PubSub {
ctx, cancel := context.WithCancel(context.Background())
// NewPubSub returns a new PubSub instance. Users should cancel the
// provided context to shutdown the PubSub.
func NewPubSub(ctx context.Context) *PubSub {
return &PubSub{
cs: NewCallbackSerializer(ctx),
cancel: cancel,
subscribers: map[Subscriber]bool{},
}
}
@ -75,10 +73,6 @@ func (ps *PubSub) Subscribe(sub Subscriber) (cancel func()) {
ps.mu.Lock()
defer ps.mu.Unlock()
if ps.stopped {
return func() {}
}
ps.subscribers[sub] = true
if ps.msg != nil {
@ -102,14 +96,10 @@ func (ps *PubSub) Subscribe(sub Subscriber) (cancel func()) {
// Publish publishes the provided message to the PubSub, and invokes
// callbacks registered by subscribers asynchronously.
func (ps *PubSub) Publish(msg interface{}) {
func (ps *PubSub) Publish(msg any) {
ps.mu.Lock()
defer ps.mu.Unlock()
if ps.stopped {
return
}
ps.msg = msg
for sub := range ps.subscribers {
s := sub
@ -124,13 +114,8 @@ func (ps *PubSub) Publish(msg interface{}) {
}
}
// Stop shuts down the PubSub and releases any resources allocated by it.
// It is guaranteed that no subscriber callbacks would be invoked once this
// method returns.
func (ps *PubSub) Stop() {
ps.mu.Lock()
defer ps.mu.Unlock()
ps.stopped = true
ps.cancel()
// Done returns a channel that is closed after the context passed to NewPubSub
// is canceled and all updates have been sent to subscribers.
func (ps *PubSub) Done() <-chan struct{} {
return ps.cs.Done()
}

301
vendor/google.golang.org/grpc/internal/idle/idle.go generated vendored Normal file
View File

@ -0,0 +1,301 @@
/*
*
* Copyright 2023 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
// Package idle contains a component for managing idleness (entering and exiting)
// based on RPC activity.
package idle
import (
"fmt"
"math"
"sync"
"sync/atomic"
"time"
"google.golang.org/grpc/grpclog"
)
// For overriding in unit tests.
var timeAfterFunc = func(d time.Duration, f func()) *time.Timer {
return time.AfterFunc(d, f)
}
// Enforcer is the functionality provided by grpc.ClientConn to enter
// and exit from idle mode.
type Enforcer interface {
ExitIdleMode() error
EnterIdleMode() error
}
// Manager defines the functionality required to track RPC activity on a
// channel.
type Manager interface {
OnCallBegin() error
OnCallEnd()
Close()
}
type noopManager struct{}
func (noopManager) OnCallBegin() error { return nil }
func (noopManager) OnCallEnd() {}
func (noopManager) Close() {}
// manager implements the Manager interface. It uses atomic operations to
// synchronize access to shared state and a mutex to guarantee mutual exclusion
// in a critical section.
type manager struct {
// State accessed atomically.
lastCallEndTime int64 // Unix timestamp in nanos; time when the most recent RPC completed.
activeCallsCount int32 // Count of active RPCs; -math.MaxInt32 means channel is idle or is trying to get there.
activeSinceLastTimerCheck int32 // Boolean; True if there was an RPC since the last timer callback.
closed int32 // Boolean; True when the manager is closed.
// Can be accessed without atomics or mutex since these are set at creation
// time and read-only after that.
enforcer Enforcer // Functionality provided by grpc.ClientConn.
timeout int64 // Idle timeout duration nanos stored as an int64.
logger grpclog.LoggerV2
// idleMu is used to guarantee mutual exclusion in two scenarios:
// - Opposing intentions:
// - a: Idle timeout has fired and handleIdleTimeout() is trying to put
// the channel in idle mode because the channel has been inactive.
// - b: At the same time an RPC is made on the channel, and OnCallBegin()
// is trying to prevent the channel from going idle.
// - Competing intentions:
// - The channel is in idle mode and there are multiple RPCs starting at
// the same time, all trying to move the channel out of idle. Only one
// of them should succeed in doing so, while the other RPCs should
// piggyback on the first one and be successfully handled.
idleMu sync.RWMutex
actuallyIdle bool
timer *time.Timer
}
// ManagerOptions is a collection of options used by
// NewManager.
type ManagerOptions struct {
Enforcer Enforcer
Timeout time.Duration
Logger grpclog.LoggerV2
}
// NewManager creates a new idleness manager implementation for the
// given idle timeout.
func NewManager(opts ManagerOptions) Manager {
if opts.Timeout == 0 {
return noopManager{}
}
m := &manager{
enforcer: opts.Enforcer,
timeout: int64(opts.Timeout),
logger: opts.Logger,
}
m.timer = timeAfterFunc(opts.Timeout, m.handleIdleTimeout)
return m
}
// resetIdleTimer resets the idle timer to the given duration. This method
// should only be called from the timer callback.
func (m *manager) resetIdleTimer(d time.Duration) {
m.idleMu.Lock()
defer m.idleMu.Unlock()
if m.timer == nil {
// Only close sets timer to nil. We are done.
return
}
// It is safe to ignore the return value from Reset() because this method is
// only ever called from the timer callback, which means the timer has
// already fired.
m.timer.Reset(d)
}
// handleIdleTimeout is the timer callback that is invoked upon expiry of the
// configured idle timeout. The channel is considered inactive if there are no
// ongoing calls and no RPC activity since the last time the timer fired.
func (m *manager) handleIdleTimeout() {
if m.isClosed() {
return
}
if atomic.LoadInt32(&m.activeCallsCount) > 0 {
m.resetIdleTimer(time.Duration(m.timeout))
return
}
// There has been activity on the channel since we last got here. Reset the
// timer and return.
if atomic.LoadInt32(&m.activeSinceLastTimerCheck) == 1 {
// Set the timer to fire after a duration of idle timeout, calculated
// from the time the most recent RPC completed.
atomic.StoreInt32(&m.activeSinceLastTimerCheck, 0)
m.resetIdleTimer(time.Duration(atomic.LoadInt64(&m.lastCallEndTime) + m.timeout - time.Now().UnixNano()))
return
}
// This CAS operation is extremely likely to succeed given that there has
// been no activity since the last time we were here. Setting the
// activeCallsCount to -math.MaxInt32 indicates to OnCallBegin() that the
// channel is either in idle mode or is trying to get there.
if !atomic.CompareAndSwapInt32(&m.activeCallsCount, 0, -math.MaxInt32) {
// This CAS operation can fail if an RPC started after we checked for
// activity at the top of this method, or one was ongoing from before
// the last time we were here. In both case, reset the timer and return.
m.resetIdleTimer(time.Duration(m.timeout))
return
}
// Now that we've set the active calls count to -math.MaxInt32, it's time to
// actually move to idle mode.
if m.tryEnterIdleMode() {
// Successfully entered idle mode. No timer needed until we exit idle.
return
}
// Failed to enter idle mode due to a concurrent RPC that kept the channel
// active, or because of an error from the channel. Undo the attempt to
// enter idle, and reset the timer to try again later.
atomic.AddInt32(&m.activeCallsCount, math.MaxInt32)
m.resetIdleTimer(time.Duration(m.timeout))
}
// tryEnterIdleMode instructs the channel to enter idle mode. But before
// that, it performs a last minute check to ensure that no new RPC has come in,
// making the channel active.
//
// Return value indicates whether or not the channel moved to idle mode.
//
// Holds idleMu which ensures mutual exclusion with exitIdleMode.
func (m *manager) tryEnterIdleMode() bool {
m.idleMu.Lock()
defer m.idleMu.Unlock()
if atomic.LoadInt32(&m.activeCallsCount) != -math.MaxInt32 {
// We raced and lost to a new RPC. Very rare, but stop entering idle.
return false
}
if atomic.LoadInt32(&m.activeSinceLastTimerCheck) == 1 {
// An very short RPC could have come in (and also finished) after we
// checked for calls count and activity in handleIdleTimeout(), but
// before the CAS operation. So, we need to check for activity again.
return false
}
// No new RPCs have come in since we last set the active calls count value
// -math.MaxInt32 in the timer callback. And since we have the lock, it is
// safe to enter idle mode now.
if err := m.enforcer.EnterIdleMode(); err != nil {
m.logger.Errorf("Failed to enter idle mode: %v", err)
return false
}
// Successfully entered idle mode.
m.actuallyIdle = true
return true
}
// OnCallBegin is invoked at the start of every RPC.
func (m *manager) OnCallBegin() error {
if m.isClosed() {
return nil
}
if atomic.AddInt32(&m.activeCallsCount, 1) > 0 {
// Channel is not idle now. Set the activity bit and allow the call.
atomic.StoreInt32(&m.activeSinceLastTimerCheck, 1)
return nil
}
// Channel is either in idle mode or is in the process of moving to idle
// mode. Attempt to exit idle mode to allow this RPC.
if err := m.exitIdleMode(); err != nil {
// Undo the increment to calls count, and return an error causing the
// RPC to fail.
atomic.AddInt32(&m.activeCallsCount, -1)
return err
}
atomic.StoreInt32(&m.activeSinceLastTimerCheck, 1)
return nil
}
// exitIdleMode instructs the channel to exit idle mode.
//
// Holds idleMu which ensures mutual exclusion with tryEnterIdleMode.
func (m *manager) exitIdleMode() error {
m.idleMu.Lock()
defer m.idleMu.Unlock()
if !m.actuallyIdle {
// This can happen in two scenarios:
// - handleIdleTimeout() set the calls count to -math.MaxInt32 and called
// tryEnterIdleMode(). But before the latter could grab the lock, an RPC
// came in and OnCallBegin() noticed that the calls count is negative.
// - Channel is in idle mode, and multiple new RPCs come in at the same
// time, all of them notice a negative calls count in OnCallBegin and get
// here. The first one to get the lock would got the channel to exit idle.
//
// Either way, nothing to do here.
return nil
}
if err := m.enforcer.ExitIdleMode(); err != nil {
return fmt.Errorf("channel failed to exit idle mode: %v", err)
}
// Undo the idle entry process. This also respects any new RPC attempts.
atomic.AddInt32(&m.activeCallsCount, math.MaxInt32)
m.actuallyIdle = false
// Start a new timer to fire after the configured idle timeout.
m.timer = timeAfterFunc(time.Duration(m.timeout), m.handleIdleTimeout)
return nil
}
// OnCallEnd is invoked at the end of every RPC.
func (m *manager) OnCallEnd() {
if m.isClosed() {
return
}
// Record the time at which the most recent call finished.
atomic.StoreInt64(&m.lastCallEndTime, time.Now().UnixNano())
// Decrement the active calls count. This count can temporarily go negative
// when the timer callback is in the process of moving the channel to idle
// mode, but one or more RPCs come in and complete before the timer callback
// can get done with the process of moving to idle mode.
atomic.AddInt32(&m.activeCallsCount, -1)
}
func (m *manager) isClosed() bool {
return atomic.LoadInt32(&m.closed) == 1
}
func (m *manager) Close() {
atomic.StoreInt32(&m.closed, 1)
m.idleMu.Lock()
m.timer.Stop()
m.timer = nil
m.idleMu.Unlock()
}

View File

@ -30,7 +30,7 @@ import (
var (
// WithHealthCheckFunc is set by dialoptions.go
WithHealthCheckFunc interface{} // func (HealthChecker) DialOption
WithHealthCheckFunc any // func (HealthChecker) DialOption
// HealthCheckFunc is used to provide client-side LB channel health checking
HealthCheckFunc HealthChecker
// BalancerUnregister is exported by package balancer to unregister a balancer.
@ -38,8 +38,12 @@ var (
// KeepaliveMinPingTime is the minimum ping interval. This must be 10s by
// default, but tests may wish to set it lower for convenience.
KeepaliveMinPingTime = 10 * time.Second
// KeepaliveMinServerPingTime is the minimum ping interval for servers.
// This must be 1s by default, but tests may wish to set it lower for
// convenience.
KeepaliveMinServerPingTime = time.Second
// ParseServiceConfig parses a JSON representation of the service config.
ParseServiceConfig interface{} // func(string) *serviceconfig.ParseResult
ParseServiceConfig any // func(string) *serviceconfig.ParseResult
// EqualServiceConfigForTesting is for testing service config generation and
// parsing. Both a and b should be returned by ParseServiceConfig.
// This function compares the config without rawJSON stripped, in case the
@ -49,33 +53,33 @@ var (
// given name. This is set by package certprovider for use from xDS
// bootstrap code while parsing certificate provider configs in the
// bootstrap file.
GetCertificateProviderBuilder interface{} // func(string) certprovider.Builder
GetCertificateProviderBuilder any // func(string) certprovider.Builder
// GetXDSHandshakeInfoForTesting returns a pointer to the xds.HandshakeInfo
// stored in the passed in attributes. This is set by
// credentials/xds/xds.go.
GetXDSHandshakeInfoForTesting interface{} // func (*attributes.Attributes) *xds.HandshakeInfo
GetXDSHandshakeInfoForTesting any // func (*attributes.Attributes) *xds.HandshakeInfo
// GetServerCredentials returns the transport credentials configured on a
// gRPC server. An xDS-enabled server needs to know what type of credentials
// is configured on the underlying gRPC server. This is set by server.go.
GetServerCredentials interface{} // func (*grpc.Server) credentials.TransportCredentials
GetServerCredentials any // func (*grpc.Server) credentials.TransportCredentials
// CanonicalString returns the canonical string of the code defined here:
// https://github.com/grpc/grpc/blob/master/doc/statuscodes.md.
//
// This is used in the 1.0 release of gcp/observability, and thus must not be
// deleted or changed.
CanonicalString interface{} // func (codes.Code) string
CanonicalString any // func (codes.Code) string
// DrainServerTransports initiates a graceful close of existing connections
// on a gRPC server accepted on the provided listener address. An
// xDS-enabled server invokes this method on a grpc.Server when a particular
// listener moves to "not-serving" mode.
DrainServerTransports interface{} // func(*grpc.Server, string)
DrainServerTransports any // func(*grpc.Server, string)
// AddGlobalServerOptions adds an array of ServerOption that will be
// effective globally for newly created servers. The priority will be: 1.
// user-provided; 2. this method; 3. default values.
//
// This is used in the 1.0 release of gcp/observability, and thus must not be
// deleted or changed.
AddGlobalServerOptions interface{} // func(opt ...ServerOption)
AddGlobalServerOptions any // func(opt ...ServerOption)
// ClearGlobalServerOptions clears the array of extra ServerOption. This
// method is useful in testing and benchmarking.
//
@ -88,14 +92,14 @@ var (
//
// This is used in the 1.0 release of gcp/observability, and thus must not be
// deleted or changed.
AddGlobalDialOptions interface{} // func(opt ...DialOption)
AddGlobalDialOptions any // func(opt ...DialOption)
// DisableGlobalDialOptions returns a DialOption that prevents the
// ClientConn from applying the global DialOptions (set via
// AddGlobalDialOptions).
//
// This is used in the 1.0 release of gcp/observability, and thus must not be
// deleted or changed.
DisableGlobalDialOptions interface{} // func() grpc.DialOption
DisableGlobalDialOptions any // func() grpc.DialOption
// ClearGlobalDialOptions clears the array of extra DialOption. This
// method is useful in testing and benchmarking.
//
@ -104,23 +108,26 @@ var (
ClearGlobalDialOptions func()
// JoinDialOptions combines the dial options passed as arguments into a
// single dial option.
JoinDialOptions interface{} // func(...grpc.DialOption) grpc.DialOption
JoinDialOptions any // func(...grpc.DialOption) grpc.DialOption
// JoinServerOptions combines the server options passed as arguments into a
// single server option.
JoinServerOptions interface{} // func(...grpc.ServerOption) grpc.ServerOption
JoinServerOptions any // func(...grpc.ServerOption) grpc.ServerOption
// WithBinaryLogger returns a DialOption that specifies the binary logger
// for a ClientConn.
//
// This is used in the 1.0 release of gcp/observability, and thus must not be
// deleted or changed.
WithBinaryLogger interface{} // func(binarylog.Logger) grpc.DialOption
WithBinaryLogger any // func(binarylog.Logger) grpc.DialOption
// BinaryLogger returns a ServerOption that can set the binary logger for a
// server.
//
// This is used in the 1.0 release of gcp/observability, and thus must not be
// deleted or changed.
BinaryLogger interface{} // func(binarylog.Logger) grpc.ServerOption
BinaryLogger any // func(binarylog.Logger) grpc.ServerOption
// SubscribeToConnectivityStateChanges adds a grpcsync.Subscriber to a provided grpc.ClientConn
SubscribeToConnectivityStateChanges any // func(*grpc.ClientConn, grpcsync.Subscriber)
// NewXDSResolverWithConfigForTesting creates a new xds resolver builder using
// the provided xds bootstrap config instead of the global configuration from
@ -131,7 +138,7 @@ var (
//
// This function should ONLY be used for testing and may not work with some
// other features, including the CSDS service.
NewXDSResolverWithConfigForTesting interface{} // func([]byte) (resolver.Builder, error)
NewXDSResolverWithConfigForTesting any // func([]byte) (resolver.Builder, error)
// RegisterRLSClusterSpecifierPluginForTesting registers the RLS Cluster
// Specifier Plugin for testing purposes, regardless of the XDSRLS environment
@ -163,7 +170,11 @@ var (
UnregisterRBACHTTPFilterForTesting func()
// ORCAAllowAnyMinReportingInterval is for examples/orca use ONLY.
ORCAAllowAnyMinReportingInterval interface{} // func(so *orca.ServiceOptions)
ORCAAllowAnyMinReportingInterval any // func(so *orca.ServiceOptions)
// GRPCResolverSchemeExtraMetadata determines when gRPC will add extra
// metadata to RPCs.
GRPCResolverSchemeExtraMetadata string = "xds"
)
// HealthChecker defines the signature of the client-side LB channel health checking function.
@ -174,7 +185,7 @@ var (
//
// The health checking protocol is defined at:
// https://github.com/grpc/grpc/blob/master/doc/health-checking.md
type HealthChecker func(ctx context.Context, newStream func(string) (interface{}, error), setConnectivityState func(connectivity.State, error), serviceName string) error
type HealthChecker func(ctx context.Context, newStream func(string) (any, error), setConnectivityState func(connectivity.State, error), serviceName string) error
const (
// CredsBundleModeFallback switches GoogleDefaultCreds to fallback mode.

View File

@ -35,7 +35,7 @@ const mdKey = mdKeyType("grpc.internal.address.metadata")
type mdValue metadata.MD
func (m mdValue) Equal(o interface{}) bool {
func (m mdValue) Equal(o any) bool {
om, ok := o.(mdValue)
if !ok {
return false

View File

@ -35,7 +35,7 @@ const jsonIndent = " "
// ToJSON marshals the input into a json string.
//
// If marshal fails, it falls back to fmt.Sprintf("%+v").
func ToJSON(e interface{}) string {
func ToJSON(e any) string {
switch ee := e.(type) {
case protov1.Message:
mm := jsonpb.Marshaler{Indent: jsonIndent}

View File

@ -92,7 +92,7 @@ type ClientStream interface {
// calling RecvMsg on the same stream at the same time, but it is not safe
// to call SendMsg on the same stream in different goroutines. It is also
// not safe to call CloseSend concurrently with SendMsg.
SendMsg(m interface{}) error
SendMsg(m any) error
// RecvMsg blocks until it receives a message into m or the stream is
// done. It returns io.EOF when the stream completes successfully. On
// any other error, the stream is aborted and the error contains the RPC
@ -101,7 +101,7 @@ type ClientStream interface {
// It is safe to have a goroutine calling SendMsg and another goroutine
// calling RecvMsg on the same stream at the same time, but it is not
// safe to call RecvMsg on the same stream in different goroutines.
RecvMsg(m interface{}) error
RecvMsg(m any) error
}
// ClientInterceptor is an interceptor for gRPC client streams.

View File

@ -49,7 +49,7 @@ func New(c codes.Code, msg string) *Status {
}
// Newf returns New(c, fmt.Sprintf(format, a...)).
func Newf(c codes.Code, format string, a ...interface{}) *Status {
func Newf(c codes.Code, format string, a ...any) *Status {
return New(c, fmt.Sprintf(format, a...))
}
@ -64,7 +64,7 @@ func Err(c codes.Code, msg string) error {
}
// Errorf returns Error(c, fmt.Sprintf(format, a...)).
func Errorf(c codes.Code, format string, a ...interface{}) error {
func Errorf(c codes.Code, format string, a ...any) error {
return Err(c, fmt.Sprintf(format, a...))
}
@ -120,11 +120,11 @@ func (s *Status) WithDetails(details ...proto.Message) (*Status, error) {
// Details returns a slice of details messages attached to the status.
// If a detail cannot be decoded, the error is returned in place of the detail.
func (s *Status) Details() []interface{} {
func (s *Status) Details() []any {
if s == nil || s.s == nil {
return nil
}
details := make([]interface{}, 0, len(s.s.Details))
details := make([]any, 0, len(s.s.Details))
for _, any := range s.s.Details {
detail := &ptypes.DynamicAny{}
if err := ptypes.UnmarshalAny(any, detail); err != nil {

View File

@ -40,7 +40,7 @@ var updateHeaderTblSize = func(e *hpack.Encoder, v uint32) {
}
type itemNode struct {
it interface{}
it any
next *itemNode
}
@ -49,7 +49,7 @@ type itemList struct {
tail *itemNode
}
func (il *itemList) enqueue(i interface{}) {
func (il *itemList) enqueue(i any) {
n := &itemNode{it: i}
if il.tail == nil {
il.head, il.tail = n, n
@ -61,11 +61,11 @@ func (il *itemList) enqueue(i interface{}) {
// peek returns the first item in the list without removing it from the
// list.
func (il *itemList) peek() interface{} {
func (il *itemList) peek() any {
return il.head.it
}
func (il *itemList) dequeue() interface{} {
func (il *itemList) dequeue() any {
if il.head == nil {
return nil
}
@ -336,7 +336,7 @@ func (c *controlBuffer) put(it cbItem) error {
return err
}
func (c *controlBuffer) executeAndPut(f func(it interface{}) bool, it cbItem) (bool, error) {
func (c *controlBuffer) executeAndPut(f func(it any) bool, it cbItem) (bool, error) {
var wakeUp bool
c.mu.Lock()
if c.err != nil {
@ -373,7 +373,7 @@ func (c *controlBuffer) executeAndPut(f func(it interface{}) bool, it cbItem) (b
}
// Note argument f should never be nil.
func (c *controlBuffer) execute(f func(it interface{}) bool, it interface{}) (bool, error) {
func (c *controlBuffer) execute(f func(it any) bool, it any) (bool, error) {
c.mu.Lock()
if c.err != nil {
c.mu.Unlock()
@ -387,7 +387,7 @@ func (c *controlBuffer) execute(f func(it interface{}) bool, it interface{}) (bo
return true, nil
}
func (c *controlBuffer) get(block bool) (interface{}, error) {
func (c *controlBuffer) get(block bool) (any, error) {
for {
c.mu.Lock()
if c.err != nil {
@ -830,7 +830,7 @@ func (l *loopyWriter) goAwayHandler(g *goAway) error {
return nil
}
func (l *loopyWriter) handle(i interface{}) error {
func (l *loopyWriter) handle(i any) error {
switch i := i.(type) {
case *incomingWindowUpdate:
l.incomingWindowUpdateHandler(i)

View File

@ -330,7 +330,7 @@ func newHTTP2Client(connectCtx, ctx context.Context, addr resolver.Address, opts
readerDone: make(chan struct{}),
writerDone: make(chan struct{}),
goAway: make(chan struct{}),
framer: newFramer(conn, writeBufSize, readBufSize, maxHeaderListSize),
framer: newFramer(conn, writeBufSize, readBufSize, opts.SharedWriteBuffer, maxHeaderListSize),
fc: &trInFlow{limit: uint32(icwz)},
scheme: scheme,
activeStreams: make(map[uint32]*Stream),
@ -762,7 +762,7 @@ func (t *http2Client) NewStream(ctx context.Context, callHdr *CallHdr) (*Stream,
firstTry := true
var ch chan struct{}
transportDrainRequired := false
checkForStreamQuota := func(it interface{}) bool {
checkForStreamQuota := func(it any) bool {
if t.streamQuota <= 0 { // Can go negative if server decreases it.
if firstTry {
t.waitingStreams++
@ -800,7 +800,7 @@ func (t *http2Client) NewStream(ctx context.Context, callHdr *CallHdr) (*Stream,
return true
}
var hdrListSizeErr error
checkForHeaderListSize := func(it interface{}) bool {
checkForHeaderListSize := func(it any) bool {
if t.maxSendHeaderListSize == nil {
return true
}
@ -815,7 +815,7 @@ func (t *http2Client) NewStream(ctx context.Context, callHdr *CallHdr) (*Stream,
return true
}
for {
success, err := t.controlBuf.executeAndPut(func(it interface{}) bool {
success, err := t.controlBuf.executeAndPut(func(it any) bool {
return checkForHeaderListSize(it) && checkForStreamQuota(it)
}, hdr)
if err != nil {
@ -927,7 +927,7 @@ func (t *http2Client) closeStream(s *Stream, err error, rst bool, rstCode http2.
rst: rst,
rstCode: rstCode,
}
addBackStreamQuota := func(interface{}) bool {
addBackStreamQuota := func(any) bool {
t.streamQuota++
if t.streamQuota > 0 && t.waitingStreams > 0 {
select {
@ -1080,7 +1080,7 @@ func (t *http2Client) updateWindow(s *Stream, n uint32) {
// for the transport and the stream based on the current bdp
// estimation.
func (t *http2Client) updateFlowControl(n uint32) {
updateIWS := func(interface{}) bool {
updateIWS := func(any) bool {
t.initialWindowSize = int32(n)
t.mu.Lock()
for _, s := range t.activeStreams {
@ -1233,7 +1233,7 @@ func (t *http2Client) handleSettings(f *http2.SettingsFrame, isFirst bool) {
}
updateFuncs = append(updateFuncs, updateStreamQuota)
}
t.controlBuf.executeAndPut(func(interface{}) bool {
t.controlBuf.executeAndPut(func(any) bool {
for _, f := range updateFuncs {
f()
}
@ -1505,14 +1505,15 @@ func (t *http2Client) operateHeaders(frame *http2.MetaHeadersFrame) {
return
}
isHeader := false
// If headerChan hasn't been closed yet
if atomic.CompareAndSwapUint32(&s.headerChanClosed, 0, 1) {
s.headerValid = true
if !endStream {
// HEADERS frame block carries a Response-Headers.
isHeader = true
// For headers, set them in s.header and close headerChan. For trailers or
// trailers-only, closeStream will set the trailers and close headerChan as
// needed.
if !endStream {
// If headerChan hasn't been closed yet (expected, given we checked it
// above, but something else could have potentially closed the whole
// stream).
if atomic.CompareAndSwapUint32(&s.headerChanClosed, 0, 1) {
s.headerValid = true
// These values can be set without any synchronization because
// stream goroutine will read it only after seeing a closed
// headerChan which we'll close after setting this.
@ -1520,15 +1521,12 @@ func (t *http2Client) operateHeaders(frame *http2.MetaHeadersFrame) {
if len(mdata) > 0 {
s.header = mdata
}
} else {
// HEADERS frame block carries a Trailers-Only.
s.noHeaders = true
close(s.headerChan)
}
close(s.headerChan)
}
for _, sh := range t.statsHandlers {
if isHeader {
if !endStream {
inHeader := &stats.InHeader{
Client: true,
WireLength: int(frame.Header().Length),
@ -1554,9 +1552,10 @@ func (t *http2Client) operateHeaders(frame *http2.MetaHeadersFrame) {
statusGen = status.New(rawStatusCode, grpcMessage)
}
// if client received END_STREAM from server while stream was still active, send RST_STREAM
rst := s.getState() == streamActive
t.closeStream(s, io.EOF, rst, http2.ErrCodeNo, statusGen, mdata, true)
// If client received END_STREAM from server while stream was still active,
// send RST_STREAM.
rstStream := s.getState() == streamActive
t.closeStream(s, io.EOF, rstStream, http2.ErrCodeNo, statusGen, mdata, true)
}
// readServerPreface reads and handles the initial settings frame from the

View File

@ -165,7 +165,7 @@ func NewServerTransport(conn net.Conn, config *ServerConfig) (_ ServerTransport,
if config.MaxHeaderListSize != nil {
maxHeaderListSize = *config.MaxHeaderListSize
}
framer := newFramer(conn, writeBufSize, readBufSize, maxHeaderListSize)
framer := newFramer(conn, writeBufSize, readBufSize, config.SharedWriteBuffer, maxHeaderListSize)
// Send initial settings as connection preface to client.
isettings := []http2.Setting{{
ID: http2.SettingMaxFrameSize,
@ -855,7 +855,7 @@ func (t *http2Server) handleSettings(f *http2.SettingsFrame) {
}
return nil
})
t.controlBuf.executeAndPut(func(interface{}) bool {
t.controlBuf.executeAndPut(func(any) bool {
for _, f := range updateFuncs {
f()
}
@ -939,7 +939,7 @@ func appendHeaderFieldsFromMD(headerFields []hpack.HeaderField, md metadata.MD)
return headerFields
}
func (t *http2Server) checkForHeaderListSize(it interface{}) bool {
func (t *http2Server) checkForHeaderListSize(it any) bool {
if t.maxSendHeaderListSize == nil {
return true
}

View File

@ -30,6 +30,7 @@ import (
"net/url"
"strconv"
"strings"
"sync"
"time"
"unicode/utf8"
@ -309,6 +310,7 @@ func decodeGrpcMessageUnchecked(msg string) string {
}
type bufWriter struct {
pool *sync.Pool
buf []byte
offset int
batchSize int
@ -316,12 +318,17 @@ type bufWriter struct {
err error
}
func newBufWriter(conn net.Conn, batchSize int) *bufWriter {
return &bufWriter{
buf: make([]byte, batchSize*2),
func newBufWriter(conn net.Conn, batchSize int, pool *sync.Pool) *bufWriter {
w := &bufWriter{
batchSize: batchSize,
conn: conn,
pool: pool,
}
// this indicates that we should use non shared buf
if pool == nil {
w.buf = make([]byte, batchSize)
}
return w
}
func (w *bufWriter) Write(b []byte) (n int, err error) {
@ -332,19 +339,34 @@ func (w *bufWriter) Write(b []byte) (n int, err error) {
n, err = w.conn.Write(b)
return n, toIOError(err)
}
if w.buf == nil {
b := w.pool.Get().(*[]byte)
w.buf = *b
}
for len(b) > 0 {
nn := copy(w.buf[w.offset:], b)
b = b[nn:]
w.offset += nn
n += nn
if w.offset >= w.batchSize {
err = w.Flush()
err = w.flushKeepBuffer()
}
}
return n, err
}
func (w *bufWriter) Flush() error {
err := w.flushKeepBuffer()
// Only release the buffer if we are in a "shared" mode
if w.buf != nil && w.pool != nil {
b := w.buf
w.pool.Put(&b)
w.buf = nil
}
return err
}
func (w *bufWriter) flushKeepBuffer() error {
if w.err != nil {
return w.err
}
@ -381,7 +403,10 @@ type framer struct {
fr *http2.Framer
}
func newFramer(conn net.Conn, writeBufferSize, readBufferSize int, maxHeaderListSize uint32) *framer {
var writeBufferPoolMap map[int]*sync.Pool = make(map[int]*sync.Pool)
var writeBufferMutex sync.Mutex
func newFramer(conn net.Conn, writeBufferSize, readBufferSize int, sharedWriteBuffer bool, maxHeaderListSize uint32) *framer {
if writeBufferSize < 0 {
writeBufferSize = 0
}
@ -389,7 +414,11 @@ func newFramer(conn net.Conn, writeBufferSize, readBufferSize int, maxHeaderList
if readBufferSize > 0 {
r = bufio.NewReaderSize(r, readBufferSize)
}
w := newBufWriter(conn, writeBufferSize)
var pool *sync.Pool
if sharedWriteBuffer {
pool = getWriteBufferPool(writeBufferSize)
}
w := newBufWriter(conn, writeBufferSize, pool)
f := &framer{
writer: w,
fr: http2.NewFramer(w, r),
@ -403,6 +432,24 @@ func newFramer(conn net.Conn, writeBufferSize, readBufferSize int, maxHeaderList
return f
}
func getWriteBufferPool(writeBufferSize int) *sync.Pool {
writeBufferMutex.Lock()
defer writeBufferMutex.Unlock()
size := writeBufferSize * 2
pool, ok := writeBufferPoolMap[size]
if ok {
return pool
}
pool = &sync.Pool{
New: func() any {
b := make([]byte, size)
return &b
},
}
writeBufferPoolMap[size] = pool
return pool
}
// parseDialTarget returns the network and address to pass to dialer.
func parseDialTarget(target string) (string, string) {
net := "tcp"

View File

@ -43,10 +43,6 @@ import (
"google.golang.org/grpc/tap"
)
// ErrNoHeaders is used as a signal that a trailers only response was received,
// and is not a real error.
var ErrNoHeaders = errors.New("stream has no headers")
const logLevel = 2
type bufferPool struct {
@ -56,7 +52,7 @@ type bufferPool struct {
func newBufferPool() *bufferPool {
return &bufferPool{
pool: sync.Pool{
New: func() interface{} {
New: func() any {
return new(bytes.Buffer)
},
},
@ -390,14 +386,10 @@ func (s *Stream) Header() (metadata.MD, error) {
}
s.waitOnHeader()
if !s.headerValid {
if !s.headerValid || s.noHeaders {
return nil, s.status.Err()
}
if s.noHeaders {
return nil, ErrNoHeaders
}
return s.header.Copy(), nil
}
@ -559,6 +551,7 @@ type ServerConfig struct {
InitialConnWindowSize int32
WriteBufferSize int
ReadBufferSize int
SharedWriteBuffer bool
ChannelzParentID *channelz.Identifier
MaxHeaderListSize *uint32
HeaderTableSize *uint32
@ -592,6 +585,8 @@ type ConnectOptions struct {
WriteBufferSize int
// ReadBufferSize sets the size of read buffer, which in turn determines how much data can be read at most for one read syscall.
ReadBufferSize int
// SharedWriteBuffer indicates whether connections should reuse write buffer
SharedWriteBuffer bool
// ChannelzParentID sets the addrConn id which initiate the creation of this client transport.
ChannelzParentID *channelz.Identifier
// MaxHeaderListSize sets the max (uncompressed) size of header list that is prepared to be received.
@ -736,7 +731,7 @@ type ServerTransport interface {
}
// connectionErrorf creates an ConnectionError with the specified error description.
func connectionErrorf(temp bool, e error, format string, a ...interface{}) ConnectionError {
func connectionErrorf(temp bool, e error, format string, a ...any) ConnectionError {
return ConnectionError{
Desc: fmt.Sprintf(format, a...),
temp: temp,