Merge pull request #113 from ceph/devel

Sync devel branch with upstream
This commit is contained in:
OpenShift Merge Robot 2022-07-27 11:11:24 +02:00 committed by GitHub
commit 3db764f0e6
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
24 changed files with 283 additions and 240 deletions

2
go.mod
View File

@ -26,7 +26,7 @@ require (
github.com/stretchr/testify v1.8.0 github.com/stretchr/testify v1.8.0
golang.org/x/crypto v0.0.0-20220214200702-86341886e292 golang.org/x/crypto v0.0.0-20220214200702-86341886e292
golang.org/x/sys v0.0.0-20220209214540-3681064d5158 golang.org/x/sys v0.0.0-20220209214540-3681064d5158
google.golang.org/grpc v1.47.0 google.golang.org/grpc v1.48.0
google.golang.org/protobuf v1.28.0 google.golang.org/protobuf v1.28.0
k8s.io/api v0.24.2 k8s.io/api v0.24.2
k8s.io/apimachinery v0.24.2 k8s.io/apimachinery v0.24.2

4
go.sum
View File

@ -1674,8 +1674,8 @@ google.golang.org/grpc v1.37.0/go.mod h1:NREThFqKR1f3iQ6oBuvc5LadQuXVGo9rkm5ZGrQ
google.golang.org/grpc v1.38.0/go.mod h1:NREThFqKR1f3iQ6oBuvc5LadQuXVGo9rkm5ZGrQdJfM= google.golang.org/grpc v1.38.0/go.mod h1:NREThFqKR1f3iQ6oBuvc5LadQuXVGo9rkm5ZGrQdJfM=
google.golang.org/grpc v1.40.0/go.mod h1:ogyxbiOoUXAkP+4+xa6PZSE9DZgIHtSpzjDTB9KAK34= google.golang.org/grpc v1.40.0/go.mod h1:ogyxbiOoUXAkP+4+xa6PZSE9DZgIHtSpzjDTB9KAK34=
google.golang.org/grpc v1.41.0/go.mod h1:U3l9uK9J0sini8mHphKoXyaqDA/8VyGnDee1zzIUK6k= google.golang.org/grpc v1.41.0/go.mod h1:U3l9uK9J0sini8mHphKoXyaqDA/8VyGnDee1zzIUK6k=
google.golang.org/grpc v1.47.0 h1:9n77onPX5F3qfFCqjy9dhn8PbNQsIKeVU04J9G7umt8= google.golang.org/grpc v1.48.0 h1:rQOsyJ/8+ufEDJd/Gdsz7HG220Mh9HAhFHRGnIjda0w=
google.golang.org/grpc v1.47.0/go.mod h1:vN9eftEi1UMyUsIF80+uQXhHjbXYbm0uXoFCACuMGWk= google.golang.org/grpc v1.48.0/go.mod h1:vN9eftEi1UMyUsIF80+uQXhHjbXYbm0uXoFCACuMGWk=
google.golang.org/protobuf v1.22.0/go.mod h1:EGpADcykh3NcUnDUJcl1+ZksZNG86OlYog2l/sGQquU= google.golang.org/protobuf v1.22.0/go.mod h1:EGpADcykh3NcUnDUJcl1+ZksZNG86OlYog2l/sGQquU=
google.golang.org/protobuf v1.23.0/go.mod h1:EGpADcykh3NcUnDUJcl1+ZksZNG86OlYog2l/sGQquU= google.golang.org/protobuf v1.23.0/go.mod h1:EGpADcykh3NcUnDUJcl1+ZksZNG86OlYog2l/sGQquU=
google.golang.org/protobuf v1.23.1-0.20200526195155-81db48ad09cc/go.mod h1:EGpADcykh3NcUnDUJcl1+ZksZNG86OlYog2l/sGQquU= google.golang.org/protobuf v1.23.1-0.20200526195155-81db48ad09cc/go.mod h1:EGpADcykh3NcUnDUJcl1+ZksZNG86OlYog2l/sGQquU=

View File

@ -42,6 +42,4 @@ var (
ErrMissingImageNameInVolID = errors.New("rbd image name information can not be empty in volID") ErrMissingImageNameInVolID = errors.New("rbd image name information can not be empty in volID")
// ErrDecodeClusterIDFromMonsInVolID is returned when mons hash decoding on migration volID. // ErrDecodeClusterIDFromMonsInVolID is returned when mons hash decoding on migration volID.
ErrDecodeClusterIDFromMonsInVolID = errors.New("failed to get clusterID from monitors hash in volID") ErrDecodeClusterIDFromMonsInVolID = errors.New("failed to get clusterID from monitors hash in volID")
// ErrUnHealthyMirroredImage is returned when mirrored image is not healthy.
ErrUnHealthyMirroredImage = errors.New("mirrored image is not healthy")
) )

View File

@ -581,16 +581,6 @@ func (rs *ReplicationServer) PromoteVolume(ctx context.Context,
} }
} }
mirrorStatus, err := rbdVol.getImageMirroringStatus()
if err != nil {
return nil, status.Error(codes.Internal, err.Error())
}
err = checkHealthyPrimary(ctx, rbdVol, mirrorStatus)
if err != nil {
return nil, status.Error(codes.Internal, err.Error())
}
var mode librbd.ImageMirrorMode var mode librbd.ImageMirrorMode
mode, err = getMirroringMode(ctx, req.GetParameters()) mode, err = getMirroringMode(ctx, req.GetParameters())
if err != nil { if err != nil {
@ -620,31 +610,6 @@ func (rs *ReplicationServer) PromoteVolume(ctx context.Context,
return &replication.PromoteVolumeResponse{}, nil return &replication.PromoteVolumeResponse{}, nil
} }
// checkHealthyPrimary checks if the image is a healhty primary or not.
// healthy primary image will be in up+stopped state, for states other
// than this it returns an error message.
func checkHealthyPrimary(ctx context.Context, rbdVol *rbdVolume, mirrorStatus *librbd.GlobalMirrorImageStatus) error {
localStatus, err := mirrorStatus.LocalStatus()
if err != nil {
// LocalStatus can fail if the local site status is not found in
// mirroring status. Log complete sites status to debug why getting
// local status failed
log.ErrorLog(ctx, "mirroring status is %+v", mirrorStatus)
return fmt.Errorf("failed to get local status: %w", err)
}
if !localStatus.Up || localStatus.State != librbd.MirrorImageStatusStateStopped {
return fmt.Errorf("%s %w. State is up=%t, state=%q",
rbdVol,
ErrUnHealthyMirroredImage,
localStatus.Up,
localStatus.State)
}
return nil
}
// DemoteVolume extracts the RBD volume information from the // DemoteVolume extracts the RBD volume information from the
// volumeID, If the image is present, mirroring is enabled and the // volumeID, If the image is present, mirroring is enabled and the
// image is in promoted state it will demote the volume as secondary. // image is in promoted state it will demote the volume as secondary.

View File

@ -279,75 +279,3 @@ func TestCheckVolumeResyncStatus(t *testing.T) {
}) })
} }
} }
func Test_checkHealthyPrimary(t *testing.T) {
t.Parallel()
rbdVol := &rbdVolume{
rbdImage: rbdImage{
RbdImageName: "test",
Pool: "test-pool",
},
}
tests := []struct {
name string
ctx context.Context
rbdVol *rbdVolume
mirrorStatus *librbd.GlobalMirrorImageStatus
wantErr bool
}{
{
name: "when image is in up+stopped state",
ctx: context.TODO(),
rbdVol: rbdVol,
mirrorStatus: &librbd.GlobalMirrorImageStatus{
SiteStatuses: []librbd.SiteMirrorImageStatus{
{
MirrorUUID: "",
State: librbd.MirrorImageStatusStateStopped,
Up: true,
},
},
},
wantErr: false,
},
{
name: "when image is in up+error state",
ctx: context.TODO(),
rbdVol: rbdVol,
mirrorStatus: &librbd.GlobalMirrorImageStatus{
SiteStatuses: []librbd.SiteMirrorImageStatus{
{
MirrorUUID: "",
State: librbd.MirrorImageStatusStateError,
Up: false,
},
},
},
wantErr: true,
},
{
name: "when image is in up+replaying state",
ctx: context.TODO(),
rbdVol: rbdVol,
mirrorStatus: &librbd.GlobalMirrorImageStatus{
SiteStatuses: []librbd.SiteMirrorImageStatus{
{
MirrorUUID: "",
State: librbd.MirrorImageStatusStateReplaying,
Up: true,
},
},
},
wantErr: true,
},
}
for _, tt := range tests {
ts := tt
t.Run(ts.name, func(t *testing.T) {
t.Parallel()
if err := checkHealthyPrimary(ts.ctx, ts.rbdVol, ts.mirrorStatus); (err != nil) != ts.wantErr {
t.Errorf("checkHealthyPrimary() error = %v, wantErr %v", err, ts.wantErr)
}
})
}
}

View File

@ -86,6 +86,9 @@ function deploy_rook() {
check_ceph_cluster_health check_ceph_cluster_health
fi fi
# Make sure Ceph Mgr is running
check_ceph_mgr
# Check if CephFileSystem is empty # Check if CephFileSystem is empty
if ! kubectl_retry -n rook-ceph get cephfilesystems -oyaml | grep 'items: \[\]' &>/dev/null; then if ! kubectl_retry -n rook-ceph get cephfilesystems -oyaml | grep 'items: \[\]' &>/dev/null; then
check_mds_stat check_mds_stat
@ -166,6 +169,22 @@ function check_ceph_cluster_health() {
echo "" echo ""
} }
function check_ceph_mgr() {
for ((retry = 0; retry <= ROOK_DEPLOY_TIMEOUT; retry = retry + 5)); do
echo "Waiting for Ceph Mgr... ${retry}s" && sleep 5
MGR_POD=$(kubectl_retry -n rook-ceph get pods -l app=rook-ceph-mgr -o jsonpath='{.items[0].metadata.name}')
MGR_POD_STATUS=$(kubectl_retry -n rook-ceph get pod "$MGR_POD" -ojsonpath='{.status.phase}')
[[ "$MGR_POD_STATUS" = "Running" ]] && break
done
if [ "$retry" -gt "$ROOK_DEPLOY_TIMEOUT" ]; then
echo "[Timeout] Ceph Mgr is not running (timeout)"
return 1
fi
echo ""
}
function check_mds_stat() { function check_mds_stat() {
for ((retry = 0; retry <= ROOK_DEPLOY_TIMEOUT; retry = retry + 5)); do for ((retry = 0; retry <= ROOK_DEPLOY_TIMEOUT; retry = retry + 5)); do
FS_NAME=$(kubectl_retry -n rook-ceph get cephfilesystems.ceph.rook.io -ojsonpath='{.items[0].metadata.name}') FS_NAME=$(kubectl_retry -n rook-ceph get cephfilesystems.ceph.rook.io -ojsonpath='{.items[0].metadata.name}')

View File

@ -45,6 +45,7 @@ func (bb *baseBuilder) Build(cc balancer.ClientConn, opt balancer.BuildOptions)
scStates: make(map[balancer.SubConn]connectivity.State), scStates: make(map[balancer.SubConn]connectivity.State),
csEvltr: &balancer.ConnectivityStateEvaluator{}, csEvltr: &balancer.ConnectivityStateEvaluator{},
config: bb.config, config: bb.config,
state: connectivity.Connecting,
} }
// Initialize picker to a picker that always returns // Initialize picker to a picker that always returns
// ErrNoSubConnAvailable, because when state of a SubConn changes, we // ErrNoSubConnAvailable, because when state of a SubConn changes, we
@ -134,6 +135,9 @@ func (b *baseBalancer) UpdateClientConnState(s balancer.ClientConnState) error {
b.ResolverError(errors.New("produced zero addresses")) b.ResolverError(errors.New("produced zero addresses"))
return balancer.ErrBadResolverState return balancer.ErrBadResolverState
} }
b.regeneratePicker()
b.cc.UpdateState(balancer.State{ConnectivityState: b.state, Picker: b.picker})
return nil return nil
} }

View File

@ -146,6 +146,10 @@ func DialContext(ctx context.Context, target string, opts ...DialOption) (conn *
cc.safeConfigSelector.UpdateConfigSelector(&defaultConfigSelector{nil}) cc.safeConfigSelector.UpdateConfigSelector(&defaultConfigSelector{nil})
cc.ctx, cc.cancel = context.WithCancel(context.Background()) cc.ctx, cc.cancel = context.WithCancel(context.Background())
for _, opt := range extraDialOptions {
opt.apply(&cc.dopts)
}
for _, opt := range opts { for _, opt := range opts {
opt.apply(&cc.dopts) opt.apply(&cc.dopts)
} }

View File

@ -35,6 +35,15 @@ import (
"google.golang.org/grpc/stats" "google.golang.org/grpc/stats"
) )
func init() {
internal.AddExtraDialOptions = func(opt ...DialOption) {
extraDialOptions = append(extraDialOptions, opt...)
}
internal.ClearExtraDialOptions = func() {
extraDialOptions = nil
}
}
// dialOptions configure a Dial call. dialOptions are set by the DialOption // dialOptions configure a Dial call. dialOptions are set by the DialOption
// values passed to Dial. // values passed to Dial.
type dialOptions struct { type dialOptions struct {
@ -70,6 +79,8 @@ type DialOption interface {
apply(*dialOptions) apply(*dialOptions)
} }
var extraDialOptions []DialOption
// EmptyDialOption does not alter the dial configuration. It can be embedded in // EmptyDialOption does not alter the dial configuration. It can be embedded in
// another structure to build custom dial options. // another structure to build custom dial options.
// //
@ -380,7 +391,7 @@ func WithDialer(f func(string, time.Duration) (net.Conn, error)) DialOption {
// all the RPCs and underlying network connections in this ClientConn. // all the RPCs and underlying network connections in this ClientConn.
func WithStatsHandler(h stats.Handler) DialOption { func WithStatsHandler(h stats.Handler) DialOption {
return newFuncDialOption(func(o *dialOptions) { return newFuncDialOption(func(o *dialOptions) {
o.copts.StatsHandler = h o.copts.StatsHandlers = append(o.copts.StatsHandlers, h)
}) })
} }

View File

@ -193,6 +193,8 @@ func (gsb *Balancer) ExitIdle() {
ei.ExitIdle() ei.ExitIdle()
return return
} }
gsb.mu.Lock()
defer gsb.mu.Unlock()
for sc := range balToUpdate.subconns { for sc := range balToUpdate.subconns {
sc.Connect() sc.Connect()
} }

View File

@ -42,14 +42,14 @@ var binLogger Logger
var grpclogLogger = grpclog.Component("binarylog") var grpclogLogger = grpclog.Component("binarylog")
// SetLogger sets the binarg logger. // SetLogger sets the binary logger.
// //
// Only call this at init time. // Only call this at init time.
func SetLogger(l Logger) { func SetLogger(l Logger) {
binLogger = l binLogger = l
} }
// GetLogger gets the binarg logger. // GetLogger gets the binary logger.
// //
// Only call this at init time. // Only call this at init time.
func GetLogger() Logger { func GetLogger() Logger {

View File

@ -77,7 +77,7 @@ var (
// environment variable // environment variable
// "GRPC_XDS_EXPERIMENTAL_ENABLE_AGGREGATE_AND_LOGICAL_DNS_CLUSTER" to // "GRPC_XDS_EXPERIMENTAL_ENABLE_AGGREGATE_AND_LOGICAL_DNS_CLUSTER" to
// "true". // "true".
XDSAggregateAndDNS = strings.EqualFold(os.Getenv(aggregateAndDNSSupportEnv), "true") XDSAggregateAndDNS = !strings.EqualFold(os.Getenv(aggregateAndDNSSupportEnv), "false")
// XDSRBAC indicates whether xDS configured RBAC HTTP Filter is enabled, // XDSRBAC indicates whether xDS configured RBAC HTTP Filter is enabled,
// which can be disabled by setting the environment variable // which can be disabled by setting the environment variable

View File

@ -63,6 +63,76 @@ var (
// xDS-enabled server invokes this method on a grpc.Server when a particular // xDS-enabled server invokes this method on a grpc.Server when a particular
// listener moves to "not-serving" mode. // listener moves to "not-serving" mode.
DrainServerTransports interface{} // func(*grpc.Server, string) DrainServerTransports interface{} // func(*grpc.Server, string)
// AddExtraServerOptions adds an array of ServerOption that will be
// effective globally for newly created servers. The priority will be: 1.
// user-provided; 2. this method; 3. default values.
AddExtraServerOptions interface{} // func(opt ...ServerOption)
// ClearExtraServerOptions clears the array of extra ServerOption. This
// method is useful in testing and benchmarking.
ClearExtraServerOptions func()
// AddExtraDialOptions adds an array of DialOption that will be effective
// globally for newly created client channels. The priority will be: 1.
// user-provided; 2. this method; 3. default values.
AddExtraDialOptions interface{} // func(opt ...DialOption)
// ClearExtraDialOptions clears the array of extra DialOption. This
// method is useful in testing and benchmarking.
ClearExtraDialOptions func()
// NewXDSResolverWithConfigForTesting creates a new xds resolver builder using
// the provided xds bootstrap config instead of the global configuration from
// the supported environment variables. The resolver.Builder is meant to be
// used in conjunction with the grpc.WithResolvers DialOption.
//
// Testing Only
//
// This function should ONLY be used for testing and may not work with some
// other features, including the CSDS service.
NewXDSResolverWithConfigForTesting interface{} // func([]byte) (resolver.Builder, error)
// RegisterRLSClusterSpecifierPluginForTesting registers the RLS Cluster
// Specifier Plugin for testing purposes, regardless of the XDSRLS environment
// variable.
//
// TODO: Remove this function once the RLS env var is removed.
RegisterRLSClusterSpecifierPluginForTesting func()
// UnregisterRLSClusterSpecifierPluginForTesting unregisters the RLS Cluster
// Specifier Plugin for testing purposes. This is needed because there is no way
// to unregister the RLS Cluster Specifier Plugin after registering it solely
// for testing purposes using RegisterRLSClusterSpecifierPluginForTesting().
//
// TODO: Remove this function once the RLS env var is removed.
UnregisterRLSClusterSpecifierPluginForTesting func()
// RegisterRBACHTTPFilterForTesting registers the RBAC HTTP Filter for testing
// purposes, regardless of the RBAC environment variable.
//
// TODO: Remove this function once the RBAC env var is removed.
RegisterRBACHTTPFilterForTesting func()
// UnregisterRBACHTTPFilterForTesting unregisters the RBAC HTTP Filter for
// testing purposes. This is needed because there is no way to unregister the
// HTTP Filter after registering it solely for testing purposes using
// RegisterRBACHTTPFilterForTesting().
//
// TODO: Remove this function once the RBAC env var is removed.
UnregisterRBACHTTPFilterForTesting func()
// RegisterOutlierDetectionBalancerForTesting registers the Outlier
// Detection Balancer for testing purposes, regardless of the Outlier
// Detection environment variable.
//
// TODO: Remove this function once the Outlier Detection env var is removed.
RegisterOutlierDetectionBalancerForTesting func()
// UnregisterOutlierDetectionBalancerForTesting unregisters the Outlier
// Detection Balancer for testing purposes. This is needed because there is
// no way to unregister the Outlier Detection Balancer after registering it
// solely for testing purposes using
// RegisterOutlierDetectionBalancerForTesting().
//
// TODO: Remove this function once the Outlier Detection env var is removed.
UnregisterOutlierDetectionBalancerForTesting func()
) )
// HealthChecker defines the signature of the client-side LB channel health checking function. // HealthChecker defines the signature of the client-side LB channel health checking function.

View File

@ -49,7 +49,7 @@ import (
// NewServerHandlerTransport returns a ServerTransport handling gRPC // NewServerHandlerTransport returns a ServerTransport handling gRPC
// from inside an http.Handler. It requires that the http Server // from inside an http.Handler. It requires that the http Server
// supports HTTP/2. // supports HTTP/2.
func NewServerHandlerTransport(w http.ResponseWriter, r *http.Request, stats stats.Handler) (ServerTransport, error) { func NewServerHandlerTransport(w http.ResponseWriter, r *http.Request, stats []stats.Handler) (ServerTransport, error) {
if r.ProtoMajor != 2 { if r.ProtoMajor != 2 {
return nil, errors.New("gRPC requires HTTP/2") return nil, errors.New("gRPC requires HTTP/2")
} }
@ -138,7 +138,7 @@ type serverHandlerTransport struct {
// TODO make sure this is consistent across handler_server and http2_server // TODO make sure this is consistent across handler_server and http2_server
contentSubtype string contentSubtype string
stats stats.Handler stats []stats.Handler
} }
func (ht *serverHandlerTransport) Close() { func (ht *serverHandlerTransport) Close() {
@ -228,10 +228,10 @@ func (ht *serverHandlerTransport) WriteStatus(s *Stream, st *status.Status) erro
}) })
if err == nil { // transport has not been closed if err == nil { // transport has not been closed
if ht.stats != nil {
// Note: The trailer fields are compressed with hpack after this call returns. // Note: The trailer fields are compressed with hpack after this call returns.
// No WireLength field is set here. // No WireLength field is set here.
ht.stats.HandleRPC(s.Context(), &stats.OutTrailer{ for _, sh := range ht.stats {
sh.HandleRPC(s.Context(), &stats.OutTrailer{
Trailer: s.trailer.Copy(), Trailer: s.trailer.Copy(),
}) })
} }
@ -314,10 +314,10 @@ func (ht *serverHandlerTransport) WriteHeader(s *Stream, md metadata.MD) error {
}) })
if err == nil { if err == nil {
if ht.stats != nil { for _, sh := range ht.stats {
// Note: The header fields are compressed with hpack after this call returns. // Note: The header fields are compressed with hpack after this call returns.
// No WireLength field is set here. // No WireLength field is set here.
ht.stats.HandleRPC(s.Context(), &stats.OutHeader{ sh.HandleRPC(s.Context(), &stats.OutHeader{
Header: md.Copy(), Header: md.Copy(),
Compression: s.sendCompress, Compression: s.sendCompress,
}) })
@ -369,14 +369,14 @@ func (ht *serverHandlerTransport) HandleStreams(startStream func(*Stream), trace
} }
ctx = metadata.NewIncomingContext(ctx, ht.headerMD) ctx = metadata.NewIncomingContext(ctx, ht.headerMD)
s.ctx = peer.NewContext(ctx, pr) s.ctx = peer.NewContext(ctx, pr)
if ht.stats != nil { for _, sh := range ht.stats {
s.ctx = ht.stats.TagRPC(s.ctx, &stats.RPCTagInfo{FullMethodName: s.method}) s.ctx = sh.TagRPC(s.ctx, &stats.RPCTagInfo{FullMethodName: s.method})
inHeader := &stats.InHeader{ inHeader := &stats.InHeader{
FullMethod: s.method, FullMethod: s.method,
RemoteAddr: ht.RemoteAddr(), RemoteAddr: ht.RemoteAddr(),
Compression: s.recvCompress, Compression: s.recvCompress,
} }
ht.stats.HandleRPC(s.ctx, inHeader) sh.HandleRPC(s.ctx, inHeader)
} }
s.trReader = &transportReader{ s.trReader = &transportReader{
reader: &recvBufferReader{ctx: s.ctx, ctxDone: s.ctx.Done(), recv: s.buf, freeBuffer: func(*bytes.Buffer) {}}, reader: &recvBufferReader{ctx: s.ctx, ctxDone: s.ctx.Done(), recv: s.buf, freeBuffer: func(*bytes.Buffer) {}},

View File

@ -90,7 +90,7 @@ type http2Client struct {
kp keepalive.ClientParameters kp keepalive.ClientParameters
keepaliveEnabled bool keepaliveEnabled bool
statsHandler stats.Handler statsHandlers []stats.Handler
initialWindowSize int32 initialWindowSize int32
@ -311,7 +311,7 @@ func newHTTP2Client(connectCtx, ctx context.Context, addr resolver.Address, opts
isSecure: isSecure, isSecure: isSecure,
perRPCCreds: perRPCCreds, perRPCCreds: perRPCCreds,
kp: kp, kp: kp,
statsHandler: opts.StatsHandler, statsHandlers: opts.StatsHandlers,
initialWindowSize: initialWindowSize, initialWindowSize: initialWindowSize,
onPrefaceReceipt: onPrefaceReceipt, onPrefaceReceipt: onPrefaceReceipt,
nextID: 1, nextID: 1,
@ -341,15 +341,15 @@ func newHTTP2Client(connectCtx, ctx context.Context, addr resolver.Address, opts
updateFlowControl: t.updateFlowControl, updateFlowControl: t.updateFlowControl,
} }
} }
if t.statsHandler != nil { for _, sh := range t.statsHandlers {
t.ctx = t.statsHandler.TagConn(t.ctx, &stats.ConnTagInfo{ t.ctx = sh.TagConn(t.ctx, &stats.ConnTagInfo{
RemoteAddr: t.remoteAddr, RemoteAddr: t.remoteAddr,
LocalAddr: t.localAddr, LocalAddr: t.localAddr,
}) })
connBegin := &stats.ConnBegin{ connBegin := &stats.ConnBegin{
Client: true, Client: true,
} }
t.statsHandler.HandleConn(t.ctx, connBegin) sh.HandleConn(t.ctx, connBegin)
} }
t.channelzID, err = channelz.RegisterNormalSocket(t, opts.ChannelzParentID, fmt.Sprintf("%s -> %s", t.localAddr, t.remoteAddr)) t.channelzID, err = channelz.RegisterNormalSocket(t, opts.ChannelzParentID, fmt.Sprintf("%s -> %s", t.localAddr, t.remoteAddr))
if err != nil { if err != nil {
@ -773,15 +773,17 @@ func (t *http2Client) NewStream(ctx context.Context, callHdr *CallHdr) (*Stream,
return nil, &NewStreamError{Err: ErrConnClosing, AllowTransparentRetry: true} return nil, &NewStreamError{Err: ErrConnClosing, AllowTransparentRetry: true}
} }
} }
if t.statsHandler != nil { if len(t.statsHandlers) != 0 {
header, ok := metadata.FromOutgoingContext(ctx) header, ok := metadata.FromOutgoingContext(ctx)
if ok { if ok {
header.Set("user-agent", t.userAgent) header.Set("user-agent", t.userAgent)
} else { } else {
header = metadata.Pairs("user-agent", t.userAgent) header = metadata.Pairs("user-agent", t.userAgent)
} }
for _, sh := range t.statsHandlers {
// Note: The header fields are compressed with hpack after this call returns. // Note: The header fields are compressed with hpack after this call returns.
// No WireLength field is set here. // No WireLength field is set here.
// Note: Creating a new stats object to prevent pollution.
outHeader := &stats.OutHeader{ outHeader := &stats.OutHeader{
Client: true, Client: true,
FullMethod: callHdr.Method, FullMethod: callHdr.Method,
@ -790,7 +792,8 @@ func (t *http2Client) NewStream(ctx context.Context, callHdr *CallHdr) (*Stream,
Compression: callHdr.SendCompress, Compression: callHdr.SendCompress,
Header: header, Header: header,
} }
t.statsHandler.HandleRPC(s.ctx, outHeader) sh.HandleRPC(s.ctx, outHeader)
}
} }
return s, nil return s, nil
} }
@ -916,11 +919,11 @@ func (t *http2Client) Close(err error) {
for _, s := range streams { for _, s := range streams {
t.closeStream(s, err, false, http2.ErrCodeNo, st, nil, false) t.closeStream(s, err, false, http2.ErrCodeNo, st, nil, false)
} }
if t.statsHandler != nil { for _, sh := range t.statsHandlers {
connEnd := &stats.ConnEnd{ connEnd := &stats.ConnEnd{
Client: true, Client: true,
} }
t.statsHandler.HandleConn(t.ctx, connEnd) sh.HandleConn(t.ctx, connEnd)
} }
} }
@ -1432,7 +1435,7 @@ func (t *http2Client) operateHeaders(frame *http2.MetaHeadersFrame) {
close(s.headerChan) close(s.headerChan)
} }
if t.statsHandler != nil { for _, sh := range t.statsHandlers {
if isHeader { if isHeader {
inHeader := &stats.InHeader{ inHeader := &stats.InHeader{
Client: true, Client: true,
@ -1440,14 +1443,14 @@ func (t *http2Client) operateHeaders(frame *http2.MetaHeadersFrame) {
Header: metadata.MD(mdata).Copy(), Header: metadata.MD(mdata).Copy(),
Compression: s.recvCompress, Compression: s.recvCompress,
} }
t.statsHandler.HandleRPC(s.ctx, inHeader) sh.HandleRPC(s.ctx, inHeader)
} else { } else {
inTrailer := &stats.InTrailer{ inTrailer := &stats.InTrailer{
Client: true, Client: true,
WireLength: int(frame.Header().Length), WireLength: int(frame.Header().Length),
Trailer: metadata.MD(mdata).Copy(), Trailer: metadata.MD(mdata).Copy(),
} }
t.statsHandler.HandleRPC(s.ctx, inTrailer) sh.HandleRPC(s.ctx, inTrailer)
} }
} }

View File

@ -82,7 +82,7 @@ type http2Server struct {
// updates, reset streams, and various settings) to the controller. // updates, reset streams, and various settings) to the controller.
controlBuf *controlBuffer controlBuf *controlBuffer
fc *trInFlow fc *trInFlow
stats stats.Handler stats []stats.Handler
// Keepalive and max-age parameters for the server. // Keepalive and max-age parameters for the server.
kp keepalive.ServerParameters kp keepalive.ServerParameters
// Keepalive enforcement policy. // Keepalive enforcement policy.
@ -257,7 +257,7 @@ func NewServerTransport(conn net.Conn, config *ServerConfig) (_ ServerTransport,
fc: &trInFlow{limit: uint32(icwz)}, fc: &trInFlow{limit: uint32(icwz)},
state: reachable, state: reachable,
activeStreams: make(map[uint32]*Stream), activeStreams: make(map[uint32]*Stream),
stats: config.StatsHandler, stats: config.StatsHandlers,
kp: kp, kp: kp,
idle: time.Now(), idle: time.Now(),
kep: kep, kep: kep,
@ -272,13 +272,13 @@ func NewServerTransport(conn net.Conn, config *ServerConfig) (_ ServerTransport,
updateFlowControl: t.updateFlowControl, updateFlowControl: t.updateFlowControl,
} }
} }
if t.stats != nil { for _, sh := range t.stats {
t.ctx = t.stats.TagConn(t.ctx, &stats.ConnTagInfo{ t.ctx = sh.TagConn(t.ctx, &stats.ConnTagInfo{
RemoteAddr: t.remoteAddr, RemoteAddr: t.remoteAddr,
LocalAddr: t.localAddr, LocalAddr: t.localAddr,
}) })
connBegin := &stats.ConnBegin{} connBegin := &stats.ConnBegin{}
t.stats.HandleConn(t.ctx, connBegin) sh.HandleConn(t.ctx, connBegin)
} }
t.channelzID, err = channelz.RegisterNormalSocket(t, config.ChannelzParentID, fmt.Sprintf("%s -> %s", t.remoteAddr, t.localAddr)) t.channelzID, err = channelz.RegisterNormalSocket(t, config.ChannelzParentID, fmt.Sprintf("%s -> %s", t.remoteAddr, t.localAddr))
if err != nil { if err != nil {
@ -570,8 +570,8 @@ func (t *http2Server) operateHeaders(frame *http2.MetaHeadersFrame, handle func(
t.adjustWindow(s, uint32(n)) t.adjustWindow(s, uint32(n))
} }
s.ctx = traceCtx(s.ctx, s.method) s.ctx = traceCtx(s.ctx, s.method)
if t.stats != nil { for _, sh := range t.stats {
s.ctx = t.stats.TagRPC(s.ctx, &stats.RPCTagInfo{FullMethodName: s.method}) s.ctx = sh.TagRPC(s.ctx, &stats.RPCTagInfo{FullMethodName: s.method})
inHeader := &stats.InHeader{ inHeader := &stats.InHeader{
FullMethod: s.method, FullMethod: s.method,
RemoteAddr: t.remoteAddr, RemoteAddr: t.remoteAddr,
@ -580,7 +580,7 @@ func (t *http2Server) operateHeaders(frame *http2.MetaHeadersFrame, handle func(
WireLength: int(frame.Header().Length), WireLength: int(frame.Header().Length),
Header: metadata.MD(mdata).Copy(), Header: metadata.MD(mdata).Copy(),
} }
t.stats.HandleRPC(s.ctx, inHeader) sh.HandleRPC(s.ctx, inHeader)
} }
s.ctxDone = s.ctx.Done() s.ctxDone = s.ctx.Done()
s.wq = newWriteQuota(defaultWriteQuota, s.ctxDone) s.wq = newWriteQuota(defaultWriteQuota, s.ctxDone)
@ -996,14 +996,14 @@ func (t *http2Server) writeHeaderLocked(s *Stream) error {
t.closeStream(s, true, http2.ErrCodeInternal, false) t.closeStream(s, true, http2.ErrCodeInternal, false)
return ErrHeaderListSizeLimitViolation return ErrHeaderListSizeLimitViolation
} }
if t.stats != nil { for _, sh := range t.stats {
// Note: Headers are compressed with hpack after this call returns. // Note: Headers are compressed with hpack after this call returns.
// No WireLength field is set here. // No WireLength field is set here.
outHeader := &stats.OutHeader{ outHeader := &stats.OutHeader{
Header: s.header.Copy(), Header: s.header.Copy(),
Compression: s.sendCompress, Compression: s.sendCompress,
} }
t.stats.HandleRPC(s.Context(), outHeader) sh.HandleRPC(s.Context(), outHeader)
} }
return nil return nil
} }
@ -1064,10 +1064,10 @@ func (t *http2Server) WriteStatus(s *Stream, st *status.Status) error {
// Send a RST_STREAM after the trailers if the client has not already half-closed. // Send a RST_STREAM after the trailers if the client has not already half-closed.
rst := s.getState() == streamActive rst := s.getState() == streamActive
t.finishStream(s, rst, http2.ErrCodeNo, trailingHeader, true) t.finishStream(s, rst, http2.ErrCodeNo, trailingHeader, true)
if t.stats != nil { for _, sh := range t.stats {
// Note: The trailer fields are compressed with hpack after this call returns. // Note: The trailer fields are compressed with hpack after this call returns.
// No WireLength field is set here. // No WireLength field is set here.
t.stats.HandleRPC(s.Context(), &stats.OutTrailer{ sh.HandleRPC(s.Context(), &stats.OutTrailer{
Trailer: s.trailer.Copy(), Trailer: s.trailer.Copy(),
}) })
} }
@ -1222,9 +1222,9 @@ func (t *http2Server) Close() {
for _, s := range streams { for _, s := range streams {
s.cancel() s.cancel()
} }
if t.stats != nil { for _, sh := range t.stats {
connEnd := &stats.ConnEnd{} connEnd := &stats.ConnEnd{}
t.stats.HandleConn(t.ctx, connEnd) sh.HandleConn(t.ctx, connEnd)
} }
} }

View File

@ -322,8 +322,6 @@ type bufWriter struct {
batchSize int batchSize int
conn net.Conn conn net.Conn
err error err error
onFlush func()
} }
func newBufWriter(conn net.Conn, batchSize int) *bufWriter { func newBufWriter(conn net.Conn, batchSize int) *bufWriter {
@ -360,9 +358,6 @@ func (w *bufWriter) Flush() error {
if w.offset == 0 { if w.offset == 0 {
return nil return nil
} }
if w.onFlush != nil {
w.onFlush()
}
_, w.err = w.conn.Write(w.buf[:w.offset]) _, w.err = w.conn.Write(w.buf[:w.offset])
w.offset = 0 w.offset = 0
return w.err return w.err

View File

@ -523,7 +523,7 @@ type ServerConfig struct {
ConnectionTimeout time.Duration ConnectionTimeout time.Duration
Credentials credentials.TransportCredentials Credentials credentials.TransportCredentials
InTapHandle tap.ServerInHandle InTapHandle tap.ServerInHandle
StatsHandler stats.Handler StatsHandlers []stats.Handler
KeepaliveParams keepalive.ServerParameters KeepaliveParams keepalive.ServerParameters
KeepalivePolicy keepalive.EnforcementPolicy KeepalivePolicy keepalive.EnforcementPolicy
InitialWindowSize int32 InitialWindowSize int32
@ -553,8 +553,8 @@ type ConnectOptions struct {
CredsBundle credentials.Bundle CredsBundle credentials.Bundle
// KeepaliveParams stores the keepalive parameters. // KeepaliveParams stores the keepalive parameters.
KeepaliveParams keepalive.ClientParameters KeepaliveParams keepalive.ClientParameters
// StatsHandler stores the handler for stats. // StatsHandlers stores the handler for stats.
StatsHandler stats.Handler StatsHandlers []stats.Handler
// InitialWindowSize sets the initial window size for a stream. // InitialWindowSize sets the initial window size for a stream.
InitialWindowSize int32 InitialWindowSize int32
// InitialConnWindowSize sets the initial window size for a connection. // InitialConnWindowSize sets the initial window size for a connection.

View File

@ -68,7 +68,6 @@ SOURCES=(
${WORKDIR}/grpc-proto/grpc/gcp/transport_security_common.proto ${WORKDIR}/grpc-proto/grpc/gcp/transport_security_common.proto
${WORKDIR}/grpc-proto/grpc/lookup/v1/rls.proto ${WORKDIR}/grpc-proto/grpc/lookup/v1/rls.proto
${WORKDIR}/grpc-proto/grpc/lookup/v1/rls_config.proto ${WORKDIR}/grpc-proto/grpc/lookup/v1/rls_config.proto
${WORKDIR}/grpc-proto/grpc/service_config/service_config.proto
${WORKDIR}/grpc-proto/grpc/testing/*.proto ${WORKDIR}/grpc-proto/grpc/testing/*.proto
${WORKDIR}/grpc-proto/grpc/core/*.proto ${WORKDIR}/grpc-proto/grpc/core/*.proto
) )
@ -80,8 +79,7 @@ SOURCES=(
# Note that the protos listed here are all for testing purposes. All protos to # Note that the protos listed here are all for testing purposes. All protos to
# be used externally should have a go_package option (and they don't need to be # be used externally should have a go_package option (and they don't need to be
# listed here). # listed here).
OPTS=Mgrpc/service_config/service_config.proto=/internal/proto/grpc_service_config,\ OPTS=Mgrpc/core/stats.proto=google.golang.org/grpc/interop/grpc_testing/core,\
Mgrpc/core/stats.proto=google.golang.org/grpc/interop/grpc_testing/core,\
Mgrpc/testing/benchmark_service.proto=google.golang.org/grpc/interop/grpc_testing,\ Mgrpc/testing/benchmark_service.proto=google.golang.org/grpc/interop/grpc_testing,\
Mgrpc/testing/stats.proto=google.golang.org/grpc/interop/grpc_testing,\ Mgrpc/testing/stats.proto=google.golang.org/grpc/interop/grpc_testing,\
Mgrpc/testing/report_qps_scenario_service.proto=google.golang.org/grpc/interop/grpc_testing,\ Mgrpc/testing/report_qps_scenario_service.proto=google.golang.org/grpc/interop/grpc_testing,\
@ -121,9 +119,6 @@ mv ${WORKDIR}/out/google.golang.org/grpc/lookup/grpc_lookup_v1/* ${WORKDIR}/out/
# see grpc_testing_not_regenerate/README.md for details. # see grpc_testing_not_regenerate/README.md for details.
rm ${WORKDIR}/out/google.golang.org/grpc/reflection/grpc_testing_not_regenerate/*.pb.go rm ${WORKDIR}/out/google.golang.org/grpc/reflection/grpc_testing_not_regenerate/*.pb.go
# grpc/service_config/service_config.proto does not have a go_package option.
mv ${WORKDIR}/out/grpc/service_config/service_config.pb.go internal/proto/grpc_service_config
# grpc/testing does not have a go_package option. # grpc/testing does not have a go_package option.
mv ${WORKDIR}/out/grpc/testing/*.pb.go interop/grpc_testing/ mv ${WORKDIR}/out/grpc/testing/*.pb.go interop/grpc_testing/
mv ${WORKDIR}/out/grpc/core/*.pb.go interop/grpc_testing/core/ mv ${WORKDIR}/out/grpc/core/*.pb.go interop/grpc_testing/core/

View File

@ -28,25 +28,40 @@ type addressMapEntry struct {
// Multiple accesses may not be performed concurrently. Must be created via // Multiple accesses may not be performed concurrently. Must be created via
// NewAddressMap; do not construct directly. // NewAddressMap; do not construct directly.
type AddressMap struct { type AddressMap struct {
m map[string]addressMapEntryList // The underlying map is keyed by an Address with fields that we don't care
// about being set to their zero values. The only fields that we care about
// are `Addr`, `ServerName` and `Attributes`. Since we need to be able to
// distinguish between addresses with same `Addr` and `ServerName`, but
// different `Attributes`, we cannot store the `Attributes` in the map key.
//
// The comparison operation for structs work as follows:
// Struct values are comparable if all their fields are comparable. Two
// struct values are equal if their corresponding non-blank fields are equal.
//
// The value type of the map contains a slice of addresses which match the key
// in their `Addr` and `ServerName` fields and contain the corresponding value
// associated with them.
m map[Address]addressMapEntryList
}
func toMapKey(addr *Address) Address {
return Address{Addr: addr.Addr, ServerName: addr.ServerName}
} }
type addressMapEntryList []*addressMapEntry type addressMapEntryList []*addressMapEntry
// NewAddressMap creates a new AddressMap. // NewAddressMap creates a new AddressMap.
func NewAddressMap() *AddressMap { func NewAddressMap() *AddressMap {
return &AddressMap{m: make(map[string]addressMapEntryList)} return &AddressMap{m: make(map[Address]addressMapEntryList)}
} }
// find returns the index of addr in the addressMapEntry slice, or -1 if not // find returns the index of addr in the addressMapEntry slice, or -1 if not
// present. // present.
func (l addressMapEntryList) find(addr Address) int { func (l addressMapEntryList) find(addr Address) int {
if len(l) == 0 {
return -1
}
for i, entry := range l { for i, entry := range l {
if entry.addr.ServerName == addr.ServerName && // Attributes are the only thing to match on here, since `Addr` and
entry.addr.Attributes.Equal(addr.Attributes) { // `ServerName` are already equal.
if entry.addr.Attributes.Equal(addr.Attributes) {
return i return i
} }
} }
@ -55,7 +70,8 @@ func (l addressMapEntryList) find(addr Address) int {
// Get returns the value for the address in the map, if present. // Get returns the value for the address in the map, if present.
func (a *AddressMap) Get(addr Address) (value interface{}, ok bool) { func (a *AddressMap) Get(addr Address) (value interface{}, ok bool) {
entryList := a.m[addr.Addr] addrKey := toMapKey(&addr)
entryList := a.m[addrKey]
if entry := entryList.find(addr); entry != -1 { if entry := entryList.find(addr); entry != -1 {
return entryList[entry].value, true return entryList[entry].value, true
} }
@ -64,17 +80,19 @@ func (a *AddressMap) Get(addr Address) (value interface{}, ok bool) {
// Set updates or adds the value to the address in the map. // Set updates or adds the value to the address in the map.
func (a *AddressMap) Set(addr Address, value interface{}) { func (a *AddressMap) Set(addr Address, value interface{}) {
entryList := a.m[addr.Addr] addrKey := toMapKey(&addr)
entryList := a.m[addrKey]
if entry := entryList.find(addr); entry != -1 { if entry := entryList.find(addr); entry != -1 {
a.m[addr.Addr][entry].value = value entryList[entry].value = value
return return
} }
a.m[addr.Addr] = append(a.m[addr.Addr], &addressMapEntry{addr: addr, value: value}) a.m[addrKey] = append(entryList, &addressMapEntry{addr: addr, value: value})
} }
// Delete removes addr from the map. // Delete removes addr from the map.
func (a *AddressMap) Delete(addr Address) { func (a *AddressMap) Delete(addr Address) {
entryList := a.m[addr.Addr] addrKey := toMapKey(&addr)
entryList := a.m[addrKey]
entry := entryList.find(addr) entry := entryList.find(addr)
if entry == -1 { if entry == -1 {
return return
@ -85,7 +103,7 @@ func (a *AddressMap) Delete(addr Address) {
copy(entryList[entry:], entryList[entry+1:]) copy(entryList[entry:], entryList[entry+1:])
entryList = entryList[:len(entryList)-1] entryList = entryList[:len(entryList)-1]
} }
a.m[addr.Addr] = entryList a.m[addrKey] = entryList
} }
// Len returns the number of entries in the map. // Len returns the number of entries in the map.
@ -107,3 +125,14 @@ func (a *AddressMap) Keys() []Address {
} }
return ret return ret
} }
// Values returns a slice of all current map values.
func (a *AddressMap) Values() []interface{} {
ret := make([]interface{}, 0, a.Len())
for _, entryList := range a.m {
for _, entry := range entryList {
ret = append(ret, entry.value)
}
}
return ret
}

View File

@ -73,6 +73,12 @@ func init() {
internal.DrainServerTransports = func(srv *Server, addr string) { internal.DrainServerTransports = func(srv *Server, addr string) {
srv.drainServerTransports(addr) srv.drainServerTransports(addr)
} }
internal.AddExtraServerOptions = func(opt ...ServerOption) {
extraServerOptions = opt
}
internal.ClearExtraServerOptions = func() {
extraServerOptions = nil
}
} }
var statusOK = status.New(codes.OK, "") var statusOK = status.New(codes.OK, "")
@ -150,7 +156,7 @@ type serverOptions struct {
chainUnaryInts []UnaryServerInterceptor chainUnaryInts []UnaryServerInterceptor
chainStreamInts []StreamServerInterceptor chainStreamInts []StreamServerInterceptor
inTapHandle tap.ServerInHandle inTapHandle tap.ServerInHandle
statsHandler stats.Handler statsHandlers []stats.Handler
maxConcurrentStreams uint32 maxConcurrentStreams uint32
maxReceiveMessageSize int maxReceiveMessageSize int
maxSendMessageSize int maxSendMessageSize int
@ -174,6 +180,7 @@ var defaultServerOptions = serverOptions{
writeBufferSize: defaultWriteBufSize, writeBufferSize: defaultWriteBufSize,
readBufferSize: defaultReadBufSize, readBufferSize: defaultReadBufSize,
} }
var extraServerOptions []ServerOption
// A ServerOption sets options such as credentials, codec and keepalive parameters, etc. // A ServerOption sets options such as credentials, codec and keepalive parameters, etc.
type ServerOption interface { type ServerOption interface {
@ -435,7 +442,7 @@ func InTapHandle(h tap.ServerInHandle) ServerOption {
// StatsHandler returns a ServerOption that sets the stats handler for the server. // StatsHandler returns a ServerOption that sets the stats handler for the server.
func StatsHandler(h stats.Handler) ServerOption { func StatsHandler(h stats.Handler) ServerOption {
return newFuncServerOption(func(o *serverOptions) { return newFuncServerOption(func(o *serverOptions) {
o.statsHandler = h o.statsHandlers = append(o.statsHandlers, h)
}) })
} }
@ -560,6 +567,9 @@ func (s *Server) stopServerWorkers() {
// started to accept requests yet. // started to accept requests yet.
func NewServer(opt ...ServerOption) *Server { func NewServer(opt ...ServerOption) *Server {
opts := defaultServerOptions opts := defaultServerOptions
for _, o := range extraServerOptions {
o.apply(&opts)
}
for _, o := range opt { for _, o := range opt {
o.apply(&opts) o.apply(&opts)
} }
@ -867,7 +877,7 @@ func (s *Server) newHTTP2Transport(c net.Conn) transport.ServerTransport {
ConnectionTimeout: s.opts.connectionTimeout, ConnectionTimeout: s.opts.connectionTimeout,
Credentials: s.opts.creds, Credentials: s.opts.creds,
InTapHandle: s.opts.inTapHandle, InTapHandle: s.opts.inTapHandle,
StatsHandler: s.opts.statsHandler, StatsHandlers: s.opts.statsHandlers,
KeepaliveParams: s.opts.keepaliveParams, KeepaliveParams: s.opts.keepaliveParams,
KeepalivePolicy: s.opts.keepalivePolicy, KeepalivePolicy: s.opts.keepalivePolicy,
InitialWindowSize: s.opts.initialWindowSize, InitialWindowSize: s.opts.initialWindowSize,
@ -963,7 +973,7 @@ var _ http.Handler = (*Server)(nil)
// Notice: This API is EXPERIMENTAL and may be changed or removed in a // Notice: This API is EXPERIMENTAL and may be changed or removed in a
// later release. // later release.
func (s *Server) ServeHTTP(w http.ResponseWriter, r *http.Request) { func (s *Server) ServeHTTP(w http.ResponseWriter, r *http.Request) {
st, err := transport.NewServerHandlerTransport(w, r, s.opts.statsHandler) st, err := transport.NewServerHandlerTransport(w, r, s.opts.statsHandlers)
if err != nil { if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError) http.Error(w, err.Error(), http.StatusInternalServerError)
return return
@ -1076,8 +1086,10 @@ func (s *Server) sendResponse(t transport.ServerTransport, stream *transport.Str
return status.Errorf(codes.ResourceExhausted, "grpc: trying to send message larger than max (%d vs. %d)", len(payload), s.opts.maxSendMessageSize) return status.Errorf(codes.ResourceExhausted, "grpc: trying to send message larger than max (%d vs. %d)", len(payload), s.opts.maxSendMessageSize)
} }
err = t.Write(stream, hdr, payload, opts) err = t.Write(stream, hdr, payload, opts)
if err == nil && s.opts.statsHandler != nil { if err == nil {
s.opts.statsHandler.HandleRPC(stream.Context(), outPayload(false, msg, data, payload, time.Now())) for _, sh := range s.opts.statsHandlers {
sh.HandleRPC(stream.Context(), outPayload(false, msg, data, payload, time.Now()))
}
} }
return err return err
} }
@ -1124,13 +1136,13 @@ func chainUnaryInterceptors(interceptors []UnaryServerInterceptor) UnaryServerIn
} }
func (s *Server) processUnaryRPC(t transport.ServerTransport, stream *transport.Stream, info *serviceInfo, md *MethodDesc, trInfo *traceInfo) (err error) { func (s *Server) processUnaryRPC(t transport.ServerTransport, stream *transport.Stream, info *serviceInfo, md *MethodDesc, trInfo *traceInfo) (err error) {
sh := s.opts.statsHandler shs := s.opts.statsHandlers
if sh != nil || trInfo != nil || channelz.IsOn() { if len(shs) != 0 || trInfo != nil || channelz.IsOn() {
if channelz.IsOn() { if channelz.IsOn() {
s.incrCallsStarted() s.incrCallsStarted()
} }
var statsBegin *stats.Begin var statsBegin *stats.Begin
if sh != nil { for _, sh := range shs {
beginTime := time.Now() beginTime := time.Now()
statsBegin = &stats.Begin{ statsBegin = &stats.Begin{
BeginTime: beginTime, BeginTime: beginTime,
@ -1161,7 +1173,7 @@ func (s *Server) processUnaryRPC(t transport.ServerTransport, stream *transport.
trInfo.tr.Finish() trInfo.tr.Finish()
} }
if sh != nil { for _, sh := range shs {
end := &stats.End{ end := &stats.End{
BeginTime: statsBegin.BeginTime, BeginTime: statsBegin.BeginTime,
EndTime: time.Now(), EndTime: time.Now(),
@ -1243,7 +1255,7 @@ func (s *Server) processUnaryRPC(t transport.ServerTransport, stream *transport.
} }
var payInfo *payloadInfo var payInfo *payloadInfo
if sh != nil || binlog != nil { if len(shs) != 0 || binlog != nil {
payInfo = &payloadInfo{} payInfo = &payloadInfo{}
} }
d, err := recvAndDecompress(&parser{r: stream}, stream, dc, s.opts.maxReceiveMessageSize, payInfo, decomp) d, err := recvAndDecompress(&parser{r: stream}, stream, dc, s.opts.maxReceiveMessageSize, payInfo, decomp)
@ -1260,7 +1272,7 @@ func (s *Server) processUnaryRPC(t transport.ServerTransport, stream *transport.
if err := s.getCodec(stream.ContentSubtype()).Unmarshal(d, v); err != nil { if err := s.getCodec(stream.ContentSubtype()).Unmarshal(d, v); err != nil {
return status.Errorf(codes.Internal, "grpc: error unmarshalling request: %v", err) return status.Errorf(codes.Internal, "grpc: error unmarshalling request: %v", err)
} }
if sh != nil { for _, sh := range shs {
sh.HandleRPC(stream.Context(), &stats.InPayload{ sh.HandleRPC(stream.Context(), &stats.InPayload{
RecvTime: time.Now(), RecvTime: time.Now(),
Payload: v, Payload: v,
@ -1418,17 +1430,19 @@ func (s *Server) processStreamingRPC(t transport.ServerTransport, stream *transp
if channelz.IsOn() { if channelz.IsOn() {
s.incrCallsStarted() s.incrCallsStarted()
} }
sh := s.opts.statsHandler shs := s.opts.statsHandlers
var statsBegin *stats.Begin var statsBegin *stats.Begin
if sh != nil { if len(shs) != 0 {
beginTime := time.Now() beginTime := time.Now()
statsBegin = &stats.Begin{ statsBegin = &stats.Begin{
BeginTime: beginTime, BeginTime: beginTime,
IsClientStream: sd.ClientStreams, IsClientStream: sd.ClientStreams,
IsServerStream: sd.ServerStreams, IsServerStream: sd.ServerStreams,
} }
for _, sh := range shs {
sh.HandleRPC(stream.Context(), statsBegin) sh.HandleRPC(stream.Context(), statsBegin)
} }
}
ctx := NewContextWithServerTransportStream(stream.Context(), stream) ctx := NewContextWithServerTransportStream(stream.Context(), stream)
ss := &serverStream{ ss := &serverStream{
ctx: ctx, ctx: ctx,
@ -1439,10 +1453,10 @@ func (s *Server) processStreamingRPC(t transport.ServerTransport, stream *transp
maxReceiveMessageSize: s.opts.maxReceiveMessageSize, maxReceiveMessageSize: s.opts.maxReceiveMessageSize,
maxSendMessageSize: s.opts.maxSendMessageSize, maxSendMessageSize: s.opts.maxSendMessageSize,
trInfo: trInfo, trInfo: trInfo,
statsHandler: sh, statsHandler: shs,
} }
if sh != nil || trInfo != nil || channelz.IsOn() { if len(shs) != 0 || trInfo != nil || channelz.IsOn() {
// See comment in processUnaryRPC on defers. // See comment in processUnaryRPC on defers.
defer func() { defer func() {
if trInfo != nil { if trInfo != nil {
@ -1456,7 +1470,7 @@ func (s *Server) processStreamingRPC(t transport.ServerTransport, stream *transp
ss.mu.Unlock() ss.mu.Unlock()
} }
if sh != nil { if len(shs) != 0 {
end := &stats.End{ end := &stats.End{
BeginTime: statsBegin.BeginTime, BeginTime: statsBegin.BeginTime,
EndTime: time.Now(), EndTime: time.Now(),
@ -1464,8 +1478,10 @@ func (s *Server) processStreamingRPC(t transport.ServerTransport, stream *transp
if err != nil && err != io.EOF { if err != nil && err != io.EOF {
end.Error = toRPCErr(err) end.Error = toRPCErr(err)
} }
for _, sh := range shs {
sh.HandleRPC(stream.Context(), end) sh.HandleRPC(stream.Context(), end)
} }
}
if channelz.IsOn() { if channelz.IsOn() {
if err != nil && err != io.EOF { if err != nil && err != io.EOF {

View File

@ -374,9 +374,9 @@ func (cs *clientStream) newAttemptLocked(isTransparent bool) (*csAttempt, error)
ctx := newContextWithRPCInfo(cs.ctx, cs.callInfo.failFast, cs.callInfo.codec, cs.cp, cs.comp) ctx := newContextWithRPCInfo(cs.ctx, cs.callInfo.failFast, cs.callInfo.codec, cs.cp, cs.comp)
method := cs.callHdr.Method method := cs.callHdr.Method
sh := cs.cc.dopts.copts.StatsHandler
var beginTime time.Time var beginTime time.Time
if sh != nil { shs := cs.cc.dopts.copts.StatsHandlers
for _, sh := range shs {
ctx = sh.TagRPC(ctx, &stats.RPCTagInfo{FullMethodName: method, FailFast: cs.callInfo.failFast}) ctx = sh.TagRPC(ctx, &stats.RPCTagInfo{FullMethodName: method, FailFast: cs.callInfo.failFast})
beginTime = time.Now() beginTime = time.Now()
begin := &stats.Begin{ begin := &stats.Begin{
@ -418,7 +418,7 @@ func (cs *clientStream) newAttemptLocked(isTransparent bool) (*csAttempt, error)
beginTime: beginTime, beginTime: beginTime,
cs: cs, cs: cs,
dc: cs.cc.dopts.dc, dc: cs.cc.dopts.dc,
statsHandler: sh, statsHandlers: shs,
trInfo: trInfo, trInfo: trInfo,
}, nil }, nil
} }
@ -536,7 +536,7 @@ type csAttempt struct {
// and cleared when the finish method is called. // and cleared when the finish method is called.
trInfo *traceInfo trInfo *traceInfo
statsHandler stats.Handler statsHandlers []stats.Handler
beginTime time.Time beginTime time.Time
// set for newStream errors that may be transparently retried // set for newStream errors that may be transparently retried
@ -960,8 +960,8 @@ func (a *csAttempt) sendMsg(m interface{}, hdr, payld, data []byte) error {
} }
return io.EOF return io.EOF
} }
if a.statsHandler != nil { for _, sh := range a.statsHandlers {
a.statsHandler.HandleRPC(a.ctx, outPayload(true, m, data, payld, time.Now())) sh.HandleRPC(a.ctx, outPayload(true, m, data, payld, time.Now()))
} }
if channelz.IsOn() { if channelz.IsOn() {
a.t.IncrMsgSent() a.t.IncrMsgSent()
@ -971,7 +971,7 @@ func (a *csAttempt) sendMsg(m interface{}, hdr, payld, data []byte) error {
func (a *csAttempt) recvMsg(m interface{}, payInfo *payloadInfo) (err error) { func (a *csAttempt) recvMsg(m interface{}, payInfo *payloadInfo) (err error) {
cs := a.cs cs := a.cs
if a.statsHandler != nil && payInfo == nil { if len(a.statsHandlers) != 0 && payInfo == nil {
payInfo = &payloadInfo{} payInfo = &payloadInfo{}
} }
@ -1008,8 +1008,8 @@ func (a *csAttempt) recvMsg(m interface{}, payInfo *payloadInfo) (err error) {
} }
a.mu.Unlock() a.mu.Unlock()
} }
if a.statsHandler != nil { for _, sh := range a.statsHandlers {
a.statsHandler.HandleRPC(a.ctx, &stats.InPayload{ sh.HandleRPC(a.ctx, &stats.InPayload{
Client: true, Client: true,
RecvTime: time.Now(), RecvTime: time.Now(),
Payload: m, Payload: m,
@ -1068,7 +1068,7 @@ func (a *csAttempt) finish(err error) {
ServerLoad: balancerload.Parse(tr), ServerLoad: balancerload.Parse(tr),
}) })
} }
if a.statsHandler != nil { for _, sh := range a.statsHandlers {
end := &stats.End{ end := &stats.End{
Client: true, Client: true,
BeginTime: a.beginTime, BeginTime: a.beginTime,
@ -1076,7 +1076,7 @@ func (a *csAttempt) finish(err error) {
Trailer: tr, Trailer: tr,
Error: err, Error: err,
} }
a.statsHandler.HandleRPC(a.ctx, end) sh.HandleRPC(a.ctx, end)
} }
if a.trInfo != nil && a.trInfo.tr != nil { if a.trInfo != nil && a.trInfo.tr != nil {
if err == nil { if err == nil {
@ -1445,7 +1445,7 @@ type serverStream struct {
maxSendMessageSize int maxSendMessageSize int
trInfo *traceInfo trInfo *traceInfo
statsHandler stats.Handler statsHandler []stats.Handler
binlog binarylog.MethodLogger binlog binarylog.MethodLogger
// serverHeaderBinlogged indicates whether server header has been logged. It // serverHeaderBinlogged indicates whether server header has been logged. It
@ -1555,8 +1555,10 @@ func (ss *serverStream) SendMsg(m interface{}) (err error) {
Message: data, Message: data,
}) })
} }
if ss.statsHandler != nil { if len(ss.statsHandler) != 0 {
ss.statsHandler.HandleRPC(ss.s.Context(), outPayload(false, m, data, payload, time.Now())) for _, sh := range ss.statsHandler {
sh.HandleRPC(ss.s.Context(), outPayload(false, m, data, payload, time.Now()))
}
} }
return nil return nil
} }
@ -1590,7 +1592,7 @@ func (ss *serverStream) RecvMsg(m interface{}) (err error) {
} }
}() }()
var payInfo *payloadInfo var payInfo *payloadInfo
if ss.statsHandler != nil || ss.binlog != nil { if len(ss.statsHandler) != 0 || ss.binlog != nil {
payInfo = &payloadInfo{} payInfo = &payloadInfo{}
} }
if err := recv(ss.p, ss.codec, ss.s, ss.dc, m, ss.maxReceiveMessageSize, payInfo, ss.decomp); err != nil { if err := recv(ss.p, ss.codec, ss.s, ss.dc, m, ss.maxReceiveMessageSize, payInfo, ss.decomp); err != nil {
@ -1605,8 +1607,9 @@ func (ss *serverStream) RecvMsg(m interface{}) (err error) {
} }
return toRPCErr(err) return toRPCErr(err)
} }
if ss.statsHandler != nil { if len(ss.statsHandler) != 0 {
ss.statsHandler.HandleRPC(ss.s.Context(), &stats.InPayload{ for _, sh := range ss.statsHandler {
sh.HandleRPC(ss.s.Context(), &stats.InPayload{
RecvTime: time.Now(), RecvTime: time.Now(),
Payload: m, Payload: m,
// TODO truncate large payload. // TODO truncate large payload.
@ -1615,6 +1618,7 @@ func (ss *serverStream) RecvMsg(m interface{}) (err error) {
Length: len(payInfo.uncompressedBytes), Length: len(payInfo.uncompressedBytes),
}) })
} }
}
if ss.binlog != nil { if ss.binlog != nil {
ss.binlog.Log(&binarylog.ClientMessage{ ss.binlog.Log(&binarylog.ClientMessage{
Message: payInfo.uncompressedBytes, Message: payInfo.uncompressedBytes,

View File

@ -19,4 +19,4 @@
package grpc package grpc
// Version is the current grpc version. // Version is the current grpc version.
const Version = "1.47.0" const Version = "1.48.0"

2
vendor/modules.txt vendored
View File

@ -662,7 +662,7 @@ google.golang.org/appengine/urlfetch
google.golang.org/genproto/googleapis/api/httpbody google.golang.org/genproto/googleapis/api/httpbody
google.golang.org/genproto/googleapis/rpc/status google.golang.org/genproto/googleapis/rpc/status
google.golang.org/genproto/protobuf/field_mask google.golang.org/genproto/protobuf/field_mask
# google.golang.org/grpc v1.47.0 # google.golang.org/grpc v1.48.0
## explicit; go 1.14 ## explicit; go 1.14
google.golang.org/grpc google.golang.org/grpc
google.golang.org/grpc/attributes google.golang.org/grpc/attributes