mirror of
https://github.com/ceph/ceph-csi.git
synced 2025-01-17 02:09:29 +00:00
util: added logs for slow gRPC calls
This commit adds a gRPC middleware that logs calls that keep running after their deadline. Adds --logslowopinterval cmdline argument to pass the log rate. Signed-off-by: Robert Vasek <robert.vasek@clyso.com>
This commit is contained in:
parent
56d08e1b4d
commit
7a727c2a43
@ -120,6 +120,11 @@ func init() {
|
||||
"path of prometheus endpoint where metrics will be available")
|
||||
flag.DurationVar(&conf.PollTime, "polltime", time.Second*pollTime, "time interval in seconds between each poll")
|
||||
flag.DurationVar(&conf.PoolTimeout, "timeout", time.Second*probeTimeout, "probe timeout in seconds")
|
||||
flag.DurationVar(
|
||||
&conf.LogSlowOpInterval,
|
||||
"logslowopinterval",
|
||||
time.Second*30,
|
||||
"how often to inform about slow gRPC calls")
|
||||
|
||||
flag.UintVar(
|
||||
&conf.RbdHardMaxCloneDepth,
|
||||
|
@ -199,7 +199,9 @@ func (fs *Driver) Run(conf *util.Config) {
|
||||
NS: fs.ns,
|
||||
GS: fs.cs,
|
||||
}
|
||||
server.Start(conf.Endpoint, srv)
|
||||
server.Start(conf.Endpoint, srv, csicommon.MiddlewareServerOptionConfig{
|
||||
LogSlowOpInterval: conf.LogSlowOpInterval,
|
||||
})
|
||||
|
||||
if conf.EnableProfiling {
|
||||
go util.StartMetricsServer(conf)
|
||||
@ -230,7 +232,9 @@ func (fs *Driver) setupCSIAddonsServer(conf *util.Config) error {
|
||||
}
|
||||
|
||||
// start the server, this does not block, it runs a new go-routine
|
||||
err = fs.cas.Start()
|
||||
err = fs.cas.Start(csicommon.MiddlewareServerOptionConfig{
|
||||
LogSlowOpInterval: conf.LogSlowOpInterval,
|
||||
})
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to start CSI-Addons server: %w", err)
|
||||
}
|
||||
|
@ -85,9 +85,9 @@ func (cas *CSIAddonsServer) RegisterService(svc CSIAddonsService) {
|
||||
// Start creates the internal gRPC server, and registers the CSIAddonsServices.
|
||||
// The internal gRPC server is started in it's own go-routine when no error is
|
||||
// returned.
|
||||
func (cas *CSIAddonsServer) Start() error {
|
||||
func (cas *CSIAddonsServer) Start(middlewareConfig csicommon.MiddlewareServerOptionConfig) error {
|
||||
// create the gRPC server and register services
|
||||
cas.server = grpc.NewServer(csicommon.NewMiddlewareServerOption())
|
||||
cas.server = grpc.NewServer(csicommon.NewMiddlewareServerOption(middlewareConfig))
|
||||
|
||||
for _, svc := range cas.services {
|
||||
svc.RegisterService(cas.server)
|
||||
|
@ -31,7 +31,7 @@ import (
|
||||
// NonBlockingGRPCServer defines Non blocking GRPC server interfaces.
|
||||
type NonBlockingGRPCServer interface {
|
||||
// Start services at the endpoint
|
||||
Start(endpoint string, srv Servers)
|
||||
Start(endpoint string, srv Servers, middlewareConfig MiddlewareServerOptionConfig)
|
||||
// Waits for the service to stop
|
||||
Wait()
|
||||
// Stops the service gracefully
|
||||
@ -60,9 +60,13 @@ type nonBlockingGRPCServer struct {
|
||||
}
|
||||
|
||||
// Start start service on endpoint.
|
||||
func (s *nonBlockingGRPCServer) Start(endpoint string, srv Servers) {
|
||||
func (s *nonBlockingGRPCServer) Start(
|
||||
endpoint string,
|
||||
srv Servers,
|
||||
middlewareConfig MiddlewareServerOptionConfig,
|
||||
) {
|
||||
s.wg.Add(1)
|
||||
go s.serve(endpoint, srv)
|
||||
go s.serve(endpoint, srv, middlewareConfig)
|
||||
}
|
||||
|
||||
// Wait blocks until the WaitGroup counter.
|
||||
@ -80,7 +84,11 @@ func (s *nonBlockingGRPCServer) ForceStop() {
|
||||
s.server.Stop()
|
||||
}
|
||||
|
||||
func (s *nonBlockingGRPCServer) serve(endpoint string, srv Servers) {
|
||||
func (s *nonBlockingGRPCServer) serve(
|
||||
endpoint string,
|
||||
srv Servers,
|
||||
middlewareConfig MiddlewareServerOptionConfig,
|
||||
) {
|
||||
proto, addr, err := parseEndpoint(endpoint)
|
||||
if err != nil {
|
||||
klog.Fatal(err.Error())
|
||||
@ -98,7 +106,7 @@ func (s *nonBlockingGRPCServer) serve(endpoint string, srv Servers) {
|
||||
klog.Fatalf("Failed to listen: %v", err)
|
||||
}
|
||||
|
||||
server := grpc.NewServer(NewMiddlewareServerOption())
|
||||
server := grpc.NewServer(NewMiddlewareServerOption(middlewareConfig))
|
||||
s.server = server
|
||||
|
||||
if srv.IS != nil {
|
||||
|
@ -23,6 +23,7 @@ import (
|
||||
"runtime/debug"
|
||||
"strings"
|
||||
"sync/atomic"
|
||||
"time"
|
||||
|
||||
"github.com/ceph/ceph-csi/internal/util"
|
||||
"github.com/ceph/ceph-csi/internal/util/log"
|
||||
@ -108,10 +109,35 @@ func NewGroupControllerServiceCapability(ctrlCap csi.GroupControllerServiceCapab
|
||||
}
|
||||
}
|
||||
|
||||
// MiddlewareServerOptionConfig contains configuration parameters
|
||||
// that are passed to the respective middleware interceptors that
|
||||
// are instantiated when starting gRPC servers.
|
||||
type MiddlewareServerOptionConfig struct {
|
||||
LogSlowOpInterval time.Duration
|
||||
}
|
||||
|
||||
// NewMiddlewareServerOption creates a new grpc.ServerOption that configures a
|
||||
// common format for log messages and other gRPC related handlers.
|
||||
func NewMiddlewareServerOption() grpc.ServerOption {
|
||||
middleWare := []grpc.UnaryServerInterceptor{contextIDInjector, logGRPC, panicHandler}
|
||||
func NewMiddlewareServerOption(config MiddlewareServerOptionConfig) grpc.ServerOption {
|
||||
middleWare := []grpc.UnaryServerInterceptor{
|
||||
contextIDInjector,
|
||||
logGRPC,
|
||||
}
|
||||
|
||||
if config.LogSlowOpInterval > 0 {
|
||||
middleWare = append(middleWare, func(
|
||||
ctx context.Context,
|
||||
req interface{},
|
||||
info *grpc.UnaryServerInfo,
|
||||
handler grpc.UnaryHandler,
|
||||
) (interface{}, error) {
|
||||
return logSlowGRPC(
|
||||
config.LogSlowOpInterval, ctx, req, info, handler,
|
||||
)
|
||||
})
|
||||
}
|
||||
|
||||
middleWare = append(middleWare, panicHandler)
|
||||
|
||||
return grpc.UnaryInterceptor(grpc_middleware.ChainUnaryServer(middleWare...))
|
||||
}
|
||||
@ -250,6 +276,53 @@ func logGRPC(
|
||||
return resp, err
|
||||
}
|
||||
|
||||
func logSlowGRPC(
|
||||
logInterval time.Duration,
|
||||
ctx context.Context,
|
||||
req interface{},
|
||||
info *grpc.UnaryServerInfo,
|
||||
handler grpc.UnaryHandler,
|
||||
) (interface{}, error) {
|
||||
handlerFinished := make(chan struct{})
|
||||
callStartTime := time.Now()
|
||||
|
||||
// Ticks at a logInterval rate and logs a slow-call message until handler finishes.
|
||||
// This is called once the handler outlives its context, see below.
|
||||
doLogSlowGRPC := func() {
|
||||
ticker := time.NewTicker(logInterval)
|
||||
defer ticker.Stop()
|
||||
|
||||
for {
|
||||
select {
|
||||
case t := <-ticker.C:
|
||||
timePassed := t.Sub(callStartTime).Truncate(time.Second)
|
||||
log.ExtendedLog(ctx,
|
||||
"Slow GRPC call %s (%s)", info.FullMethod, timePassed)
|
||||
log.TraceLog(ctx,
|
||||
"Slow GRPC request: %s", protosanitizer.StripSecrets(req))
|
||||
case <-handlerFinished:
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
go func() {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
// The call (most likely) outlived its context. Start logging slow messages.
|
||||
doLogSlowGRPC()
|
||||
case <-handlerFinished:
|
||||
// The call finished, exit.
|
||||
return
|
||||
}
|
||||
}()
|
||||
|
||||
resp, err := handler(ctx, req)
|
||||
close(handlerFinished)
|
||||
|
||||
return resp, err
|
||||
}
|
||||
|
||||
//nolint:nonamedreturns // named return used to send recovered panic error.
|
||||
func panicHandler(
|
||||
ctx context.Context,
|
||||
|
@ -77,7 +77,10 @@ func (fs *Driver) Run(conf *util.Config) {
|
||||
srv.CS = controller.NewControllerServer(cd)
|
||||
}
|
||||
|
||||
server.Start(conf.Endpoint, srv)
|
||||
server.Start(conf.Endpoint, srv, csicommon.MiddlewareServerOptionConfig{
|
||||
LogSlowOpInterval: conf.LogSlowOpInterval,
|
||||
})
|
||||
|
||||
if conf.EnableProfiling {
|
||||
go util.StartMetricsServer(conf)
|
||||
log.DebugLogMsg("Registering profiling handler")
|
||||
|
@ -179,7 +179,9 @@ func (r *Driver) Run(conf *util.Config) {
|
||||
CS: r.cs,
|
||||
NS: r.ns,
|
||||
}
|
||||
s.Start(conf.Endpoint, srv)
|
||||
s.Start(conf.Endpoint, srv, csicommon.MiddlewareServerOptionConfig{
|
||||
LogSlowOpInterval: conf.LogSlowOpInterval,
|
||||
})
|
||||
|
||||
r.startProfiling(conf)
|
||||
|
||||
@ -233,7 +235,9 @@ func (r *Driver) setupCSIAddonsServer(conf *util.Config) error {
|
||||
}
|
||||
|
||||
// start the server, this does not block, it runs a new go-routine
|
||||
err = r.cas.Start()
|
||||
err = r.cas.Start(csicommon.MiddlewareServerOptionConfig{
|
||||
LogSlowOpInterval: conf.LogSlowOpInterval,
|
||||
})
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to start CSI-Addons server: %w", err)
|
||||
}
|
||||
|
@ -131,6 +131,9 @@ type Config struct {
|
||||
MetricsPort int // TCP port for liveness/grpc metrics requests
|
||||
PollTime time.Duration // time interval in seconds between each poll
|
||||
PoolTimeout time.Duration // probe timeout in seconds
|
||||
// Log interval for slow GRPC calls. Calls that outlive their context deadline
|
||||
// are considered slow.
|
||||
LogSlowOpInterval time.Duration
|
||||
|
||||
EnableProfiling bool // flag to enable profiling
|
||||
IsControllerServer bool // if set to true start provisioner server
|
||||
|
Loading…
Reference in New Issue
Block a user