From 7a727c2a4382afccb5fbf0482d568e295bcfbd26 Mon Sep 17 00:00:00 2001 From: Robert Vasek Date: Tue, 17 Sep 2024 15:52:30 +0200 Subject: [PATCH] util: added logs for slow gRPC calls This commit adds a gRPC middleware that logs calls that keep running after their deadline. Adds --logslowopinterval cmdline argument to pass the log rate. Signed-off-by: Robert Vasek --- cmd/cephcsi.go | 5 ++ internal/cephfs/driver.go | 8 ++- internal/csi-addons/server/server.go | 4 +- internal/csi-common/server.go | 18 +++++-- internal/csi-common/utils.go | 77 +++++++++++++++++++++++++++- internal/nfs/driver/driver.go | 5 +- internal/rbd/driver/driver.go | 8 ++- internal/util/util.go | 3 ++ 8 files changed, 114 insertions(+), 14 deletions(-) diff --git a/cmd/cephcsi.go b/cmd/cephcsi.go index a43ac2763..bc6e1c7a3 100644 --- a/cmd/cephcsi.go +++ b/cmd/cephcsi.go @@ -120,6 +120,11 @@ func init() { "path of prometheus endpoint where metrics will be available") flag.DurationVar(&conf.PollTime, "polltime", time.Second*pollTime, "time interval in seconds between each poll") flag.DurationVar(&conf.PoolTimeout, "timeout", time.Second*probeTimeout, "probe timeout in seconds") + flag.DurationVar( + &conf.LogSlowOpInterval, + "logslowopinterval", + time.Second*30, + "how often to inform about slow gRPC calls") flag.UintVar( &conf.RbdHardMaxCloneDepth, diff --git a/internal/cephfs/driver.go b/internal/cephfs/driver.go index 97509956d..8023bb1a7 100644 --- a/internal/cephfs/driver.go +++ b/internal/cephfs/driver.go @@ -199,7 +199,9 @@ func (fs *Driver) Run(conf *util.Config) { NS: fs.ns, GS: fs.cs, } - server.Start(conf.Endpoint, srv) + server.Start(conf.Endpoint, srv, csicommon.MiddlewareServerOptionConfig{ + LogSlowOpInterval: conf.LogSlowOpInterval, + }) if conf.EnableProfiling { go util.StartMetricsServer(conf) @@ -230,7 +232,9 @@ func (fs *Driver) setupCSIAddonsServer(conf *util.Config) error { } // start the server, this does not block, it runs a new go-routine - err = fs.cas.Start() + err = fs.cas.Start(csicommon.MiddlewareServerOptionConfig{ + LogSlowOpInterval: conf.LogSlowOpInterval, + }) if err != nil { return fmt.Errorf("failed to start CSI-Addons server: %w", err) } diff --git a/internal/csi-addons/server/server.go b/internal/csi-addons/server/server.go index e5c6310ee..c41e27100 100644 --- a/internal/csi-addons/server/server.go +++ b/internal/csi-addons/server/server.go @@ -85,9 +85,9 @@ func (cas *CSIAddonsServer) RegisterService(svc CSIAddonsService) { // Start creates the internal gRPC server, and registers the CSIAddonsServices. // The internal gRPC server is started in it's own go-routine when no error is // returned. -func (cas *CSIAddonsServer) Start() error { +func (cas *CSIAddonsServer) Start(middlewareConfig csicommon.MiddlewareServerOptionConfig) error { // create the gRPC server and register services - cas.server = grpc.NewServer(csicommon.NewMiddlewareServerOption()) + cas.server = grpc.NewServer(csicommon.NewMiddlewareServerOption(middlewareConfig)) for _, svc := range cas.services { svc.RegisterService(cas.server) diff --git a/internal/csi-common/server.go b/internal/csi-common/server.go index 727ef57f8..b758f8d4a 100644 --- a/internal/csi-common/server.go +++ b/internal/csi-common/server.go @@ -31,7 +31,7 @@ import ( // NonBlockingGRPCServer defines Non blocking GRPC server interfaces. type NonBlockingGRPCServer interface { // Start services at the endpoint - Start(endpoint string, srv Servers) + Start(endpoint string, srv Servers, middlewareConfig MiddlewareServerOptionConfig) // Waits for the service to stop Wait() // Stops the service gracefully @@ -60,9 +60,13 @@ type nonBlockingGRPCServer struct { } // Start start service on endpoint. -func (s *nonBlockingGRPCServer) Start(endpoint string, srv Servers) { +func (s *nonBlockingGRPCServer) Start( + endpoint string, + srv Servers, + middlewareConfig MiddlewareServerOptionConfig, +) { s.wg.Add(1) - go s.serve(endpoint, srv) + go s.serve(endpoint, srv, middlewareConfig) } // Wait blocks until the WaitGroup counter. @@ -80,7 +84,11 @@ func (s *nonBlockingGRPCServer) ForceStop() { s.server.Stop() } -func (s *nonBlockingGRPCServer) serve(endpoint string, srv Servers) { +func (s *nonBlockingGRPCServer) serve( + endpoint string, + srv Servers, + middlewareConfig MiddlewareServerOptionConfig, +) { proto, addr, err := parseEndpoint(endpoint) if err != nil { klog.Fatal(err.Error()) @@ -98,7 +106,7 @@ func (s *nonBlockingGRPCServer) serve(endpoint string, srv Servers) { klog.Fatalf("Failed to listen: %v", err) } - server := grpc.NewServer(NewMiddlewareServerOption()) + server := grpc.NewServer(NewMiddlewareServerOption(middlewareConfig)) s.server = server if srv.IS != nil { diff --git a/internal/csi-common/utils.go b/internal/csi-common/utils.go index 91db95f93..b541e68f6 100644 --- a/internal/csi-common/utils.go +++ b/internal/csi-common/utils.go @@ -23,6 +23,7 @@ import ( "runtime/debug" "strings" "sync/atomic" + "time" "github.com/ceph/ceph-csi/internal/util" "github.com/ceph/ceph-csi/internal/util/log" @@ -108,10 +109,35 @@ func NewGroupControllerServiceCapability(ctrlCap csi.GroupControllerServiceCapab } } +// MiddlewareServerOptionConfig contains configuration parameters +// that are passed to the respective middleware interceptors that +// are instantiated when starting gRPC servers. +type MiddlewareServerOptionConfig struct { + LogSlowOpInterval time.Duration +} + // NewMiddlewareServerOption creates a new grpc.ServerOption that configures a // common format for log messages and other gRPC related handlers. -func NewMiddlewareServerOption() grpc.ServerOption { - middleWare := []grpc.UnaryServerInterceptor{contextIDInjector, logGRPC, panicHandler} +func NewMiddlewareServerOption(config MiddlewareServerOptionConfig) grpc.ServerOption { + middleWare := []grpc.UnaryServerInterceptor{ + contextIDInjector, + logGRPC, + } + + if config.LogSlowOpInterval > 0 { + middleWare = append(middleWare, func( + ctx context.Context, + req interface{}, + info *grpc.UnaryServerInfo, + handler grpc.UnaryHandler, + ) (interface{}, error) { + return logSlowGRPC( + config.LogSlowOpInterval, ctx, req, info, handler, + ) + }) + } + + middleWare = append(middleWare, panicHandler) return grpc.UnaryInterceptor(grpc_middleware.ChainUnaryServer(middleWare...)) } @@ -250,6 +276,53 @@ func logGRPC( return resp, err } +func logSlowGRPC( + logInterval time.Duration, + ctx context.Context, + req interface{}, + info *grpc.UnaryServerInfo, + handler grpc.UnaryHandler, +) (interface{}, error) { + handlerFinished := make(chan struct{}) + callStartTime := time.Now() + + // Ticks at a logInterval rate and logs a slow-call message until handler finishes. + // This is called once the handler outlives its context, see below. + doLogSlowGRPC := func() { + ticker := time.NewTicker(logInterval) + defer ticker.Stop() + + for { + select { + case t := <-ticker.C: + timePassed := t.Sub(callStartTime).Truncate(time.Second) + log.ExtendedLog(ctx, + "Slow GRPC call %s (%s)", info.FullMethod, timePassed) + log.TraceLog(ctx, + "Slow GRPC request: %s", protosanitizer.StripSecrets(req)) + case <-handlerFinished: + return + } + } + } + + go func() { + select { + case <-ctx.Done(): + // The call (most likely) outlived its context. Start logging slow messages. + doLogSlowGRPC() + case <-handlerFinished: + // The call finished, exit. + return + } + }() + + resp, err := handler(ctx, req) + close(handlerFinished) + + return resp, err +} + //nolint:nonamedreturns // named return used to send recovered panic error. func panicHandler( ctx context.Context, diff --git a/internal/nfs/driver/driver.go b/internal/nfs/driver/driver.go index 51eefc568..890fba815 100644 --- a/internal/nfs/driver/driver.go +++ b/internal/nfs/driver/driver.go @@ -77,7 +77,10 @@ func (fs *Driver) Run(conf *util.Config) { srv.CS = controller.NewControllerServer(cd) } - server.Start(conf.Endpoint, srv) + server.Start(conf.Endpoint, srv, csicommon.MiddlewareServerOptionConfig{ + LogSlowOpInterval: conf.LogSlowOpInterval, + }) + if conf.EnableProfiling { go util.StartMetricsServer(conf) log.DebugLogMsg("Registering profiling handler") diff --git a/internal/rbd/driver/driver.go b/internal/rbd/driver/driver.go index cebfc53cf..0ad8109f8 100644 --- a/internal/rbd/driver/driver.go +++ b/internal/rbd/driver/driver.go @@ -179,7 +179,9 @@ func (r *Driver) Run(conf *util.Config) { CS: r.cs, NS: r.ns, } - s.Start(conf.Endpoint, srv) + s.Start(conf.Endpoint, srv, csicommon.MiddlewareServerOptionConfig{ + LogSlowOpInterval: conf.LogSlowOpInterval, + }) r.startProfiling(conf) @@ -233,7 +235,9 @@ func (r *Driver) setupCSIAddonsServer(conf *util.Config) error { } // start the server, this does not block, it runs a new go-routine - err = r.cas.Start() + err = r.cas.Start(csicommon.MiddlewareServerOptionConfig{ + LogSlowOpInterval: conf.LogSlowOpInterval, + }) if err != nil { return fmt.Errorf("failed to start CSI-Addons server: %w", err) } diff --git a/internal/util/util.go b/internal/util/util.go index 869df991b..1d62c6502 100644 --- a/internal/util/util.go +++ b/internal/util/util.go @@ -131,6 +131,9 @@ type Config struct { MetricsPort int // TCP port for liveness/grpc metrics requests PollTime time.Duration // time interval in seconds between each poll PoolTimeout time.Duration // probe timeout in seconds + // Log interval for slow GRPC calls. Calls that outlive their context deadline + // are considered slow. + LogSlowOpInterval time.Duration EnableProfiling bool // flag to enable profiling IsControllerServer bool // if set to true start provisioner server