mirror of
https://github.com/ceph/ceph-csi.git
synced 2025-04-11 18:13:00 +00:00
Several packages are only used while running the e2e suite. These packages are less important to update, as the they can not influence the final executable that is part of the Ceph-CSI container-image. By moving these dependencies out of the main Ceph-CSI go.mod, it is easier to identify if a reported CVE affects Ceph-CSI, or only the testing (like most of the Kubernetes CVEs). Signed-off-by: Niels de Vos <ndevos@ibm.com>
186 lines
7.3 KiB
Go
186 lines
7.3 KiB
Go
package grpc_prometheus
|
|
|
|
import (
|
|
prom "github.com/prometheus/client_golang/prometheus"
|
|
"golang.org/x/net/context"
|
|
"google.golang.org/grpc"
|
|
"google.golang.org/grpc/status"
|
|
)
|
|
|
|
// ServerMetrics represents a collection of metrics to be registered on a
|
|
// Prometheus metrics registry for a gRPC server.
|
|
type ServerMetrics struct {
|
|
serverStartedCounter *prom.CounterVec
|
|
serverHandledCounter *prom.CounterVec
|
|
serverStreamMsgReceived *prom.CounterVec
|
|
serverStreamMsgSent *prom.CounterVec
|
|
serverHandledHistogramEnabled bool
|
|
serverHandledHistogramOpts prom.HistogramOpts
|
|
serverHandledHistogram *prom.HistogramVec
|
|
}
|
|
|
|
// NewServerMetrics returns a ServerMetrics object. Use a new instance of
|
|
// ServerMetrics when not using the default Prometheus metrics registry, for
|
|
// example when wanting to control which metrics are added to a registry as
|
|
// opposed to automatically adding metrics via init functions.
|
|
func NewServerMetrics(counterOpts ...CounterOption) *ServerMetrics {
|
|
opts := counterOptions(counterOpts)
|
|
return &ServerMetrics{
|
|
serverStartedCounter: prom.NewCounterVec(
|
|
opts.apply(prom.CounterOpts{
|
|
Name: "grpc_server_started_total",
|
|
Help: "Total number of RPCs started on the server.",
|
|
}), []string{"grpc_type", "grpc_service", "grpc_method"}),
|
|
serverHandledCounter: prom.NewCounterVec(
|
|
opts.apply(prom.CounterOpts{
|
|
Name: "grpc_server_handled_total",
|
|
Help: "Total number of RPCs completed on the server, regardless of success or failure.",
|
|
}), []string{"grpc_type", "grpc_service", "grpc_method", "grpc_code"}),
|
|
serverStreamMsgReceived: prom.NewCounterVec(
|
|
opts.apply(prom.CounterOpts{
|
|
Name: "grpc_server_msg_received_total",
|
|
Help: "Total number of RPC stream messages received on the server.",
|
|
}), []string{"grpc_type", "grpc_service", "grpc_method"}),
|
|
serverStreamMsgSent: prom.NewCounterVec(
|
|
opts.apply(prom.CounterOpts{
|
|
Name: "grpc_server_msg_sent_total",
|
|
Help: "Total number of gRPC stream messages sent by the server.",
|
|
}), []string{"grpc_type", "grpc_service", "grpc_method"}),
|
|
serverHandledHistogramEnabled: false,
|
|
serverHandledHistogramOpts: prom.HistogramOpts{
|
|
Name: "grpc_server_handling_seconds",
|
|
Help: "Histogram of response latency (seconds) of gRPC that had been application-level handled by the server.",
|
|
Buckets: prom.DefBuckets,
|
|
},
|
|
serverHandledHistogram: nil,
|
|
}
|
|
}
|
|
|
|
// EnableHandlingTimeHistogram enables histograms being registered when
|
|
// registering the ServerMetrics on a Prometheus registry. Histograms can be
|
|
// expensive on Prometheus servers. It takes options to configure histogram
|
|
// options such as the defined buckets.
|
|
func (m *ServerMetrics) EnableHandlingTimeHistogram(opts ...HistogramOption) {
|
|
for _, o := range opts {
|
|
o(&m.serverHandledHistogramOpts)
|
|
}
|
|
if !m.serverHandledHistogramEnabled {
|
|
m.serverHandledHistogram = prom.NewHistogramVec(
|
|
m.serverHandledHistogramOpts,
|
|
[]string{"grpc_type", "grpc_service", "grpc_method"},
|
|
)
|
|
}
|
|
m.serverHandledHistogramEnabled = true
|
|
}
|
|
|
|
// Describe sends the super-set of all possible descriptors of metrics
|
|
// collected by this Collector to the provided channel and returns once
|
|
// the last descriptor has been sent.
|
|
func (m *ServerMetrics) Describe(ch chan<- *prom.Desc) {
|
|
m.serverStartedCounter.Describe(ch)
|
|
m.serverHandledCounter.Describe(ch)
|
|
m.serverStreamMsgReceived.Describe(ch)
|
|
m.serverStreamMsgSent.Describe(ch)
|
|
if m.serverHandledHistogramEnabled {
|
|
m.serverHandledHistogram.Describe(ch)
|
|
}
|
|
}
|
|
|
|
// Collect is called by the Prometheus registry when collecting
|
|
// metrics. The implementation sends each collected metric via the
|
|
// provided channel and returns once the last metric has been sent.
|
|
func (m *ServerMetrics) Collect(ch chan<- prom.Metric) {
|
|
m.serverStartedCounter.Collect(ch)
|
|
m.serverHandledCounter.Collect(ch)
|
|
m.serverStreamMsgReceived.Collect(ch)
|
|
m.serverStreamMsgSent.Collect(ch)
|
|
if m.serverHandledHistogramEnabled {
|
|
m.serverHandledHistogram.Collect(ch)
|
|
}
|
|
}
|
|
|
|
// UnaryServerInterceptor is a gRPC server-side interceptor that provides Prometheus monitoring for Unary RPCs.
|
|
func (m *ServerMetrics) UnaryServerInterceptor() func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) {
|
|
return func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) {
|
|
monitor := newServerReporter(m, Unary, info.FullMethod)
|
|
monitor.ReceivedMessage()
|
|
resp, err := handler(ctx, req)
|
|
st, _ := status.FromError(err)
|
|
monitor.Handled(st.Code())
|
|
if err == nil {
|
|
monitor.SentMessage()
|
|
}
|
|
return resp, err
|
|
}
|
|
}
|
|
|
|
// StreamServerInterceptor is a gRPC server-side interceptor that provides Prometheus monitoring for Streaming RPCs.
|
|
func (m *ServerMetrics) StreamServerInterceptor() func(srv interface{}, ss grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error {
|
|
return func(srv interface{}, ss grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error {
|
|
monitor := newServerReporter(m, streamRPCType(info), info.FullMethod)
|
|
err := handler(srv, &monitoredServerStream{ss, monitor})
|
|
st, _ := status.FromError(err)
|
|
monitor.Handled(st.Code())
|
|
return err
|
|
}
|
|
}
|
|
|
|
// InitializeMetrics initializes all metrics, with their appropriate null
|
|
// value, for all gRPC methods registered on a gRPC server. This is useful, to
|
|
// ensure that all metrics exist when collecting and querying.
|
|
func (m *ServerMetrics) InitializeMetrics(server *grpc.Server) {
|
|
serviceInfo := server.GetServiceInfo()
|
|
for serviceName, info := range serviceInfo {
|
|
for _, mInfo := range info.Methods {
|
|
preRegisterMethod(m, serviceName, &mInfo)
|
|
}
|
|
}
|
|
}
|
|
|
|
func streamRPCType(info *grpc.StreamServerInfo) grpcType {
|
|
if info.IsClientStream && !info.IsServerStream {
|
|
return ClientStream
|
|
} else if !info.IsClientStream && info.IsServerStream {
|
|
return ServerStream
|
|
}
|
|
return BidiStream
|
|
}
|
|
|
|
// monitoredStream wraps grpc.ServerStream allowing each Sent/Recv of message to increment counters.
|
|
type monitoredServerStream struct {
|
|
grpc.ServerStream
|
|
monitor *serverReporter
|
|
}
|
|
|
|
func (s *monitoredServerStream) SendMsg(m interface{}) error {
|
|
err := s.ServerStream.SendMsg(m)
|
|
if err == nil {
|
|
s.monitor.SentMessage()
|
|
}
|
|
return err
|
|
}
|
|
|
|
func (s *monitoredServerStream) RecvMsg(m interface{}) error {
|
|
err := s.ServerStream.RecvMsg(m)
|
|
if err == nil {
|
|
s.monitor.ReceivedMessage()
|
|
}
|
|
return err
|
|
}
|
|
|
|
// preRegisterMethod is invoked on Register of a Server, allowing all gRPC services labels to be pre-populated.
|
|
func preRegisterMethod(metrics *ServerMetrics, serviceName string, mInfo *grpc.MethodInfo) {
|
|
methodName := mInfo.Name
|
|
methodType := string(typeFromMethodInfo(mInfo))
|
|
// These are just references (no increments), as just referencing will create the labels but not set values.
|
|
metrics.serverStartedCounter.GetMetricWithLabelValues(methodType, serviceName, methodName)
|
|
metrics.serverStreamMsgReceived.GetMetricWithLabelValues(methodType, serviceName, methodName)
|
|
metrics.serverStreamMsgSent.GetMetricWithLabelValues(methodType, serviceName, methodName)
|
|
if metrics.serverHandledHistogramEnabled {
|
|
metrics.serverHandledHistogram.GetMetricWithLabelValues(methodType, serviceName, methodName)
|
|
}
|
|
for _, code := range allCodes {
|
|
metrics.serverHandledCounter.GetMetricWithLabelValues(methodType, serviceName, methodName, code.String())
|
|
}
|
|
}
|