implement grpc metrics for ceph-csi

Signed-off-by: Madhu Rajanna <madhupr007@gmail.com>
This commit is contained in:
Madhu Rajanna
2019-08-21 14:58:02 +05:30
committed by mergify[bot]
parent 01a78cace5
commit a81a3bf96b
46 changed files with 1363 additions and 158 deletions

View File

@ -173,6 +173,9 @@ func (fs *Driver) Run(conf *util.Config, cachePersister util.CachePersister) {
}
server := csicommon.NewNonBlockingGRPCServer()
server.Start(conf.Endpoint, fs.is, fs.cs, fs.ns)
server.Start(conf.Endpoint, conf.HistogramOption, fs.is, fs.cs, fs.ns, conf.EnableGRPCMetrics)
if conf.EnableGRPCMetrics {
go util.StartMetricsServer(conf)
}
server.Wait()
}

View File

@ -19,10 +19,14 @@ package csicommon
import (
"net"
"os"
"strconv"
"strings"
"sync"
"github.com/container-storage-interface/spec/lib/go/csi"
grpc_middleware "github.com/grpc-ecosystem/go-grpc-middleware"
grpc_prometheus "github.com/grpc-ecosystem/go-grpc-prometheus"
"github.com/prometheus/client_golang/prometheus"
"google.golang.org/grpc"
"k8s.io/klog"
)
@ -30,7 +34,7 @@ import (
// NonBlockingGRPCServer defines Non blocking GRPC server interfaces
type NonBlockingGRPCServer interface {
// Start services at the endpoint
Start(endpoint string, ids csi.IdentityServer, cs csi.ControllerServer, ns csi.NodeServer)
Start(endpoint, hstOptions string, ids csi.IdentityServer, cs csi.ControllerServer, ns csi.NodeServer, metrics bool)
// Waits for the service to stop
Wait()
// Stops the service gracefully
@ -51,10 +55,10 @@ type nonBlockingGRPCServer struct {
}
// Start start service on endpoint
func (s *nonBlockingGRPCServer) Start(endpoint string, ids csi.IdentityServer, cs csi.ControllerServer, ns csi.NodeServer) {
func (s *nonBlockingGRPCServer) Start(endpoint, hstOptions string, ids csi.IdentityServer, cs csi.ControllerServer, ns csi.NodeServer, metrics bool) {
s.wg.Add(1)
go s.serve(endpoint, ids, cs, ns)
go s.serve(endpoint, hstOptions, ids, cs, ns, metrics)
}
// Wait blocks until the WaitGroup counter
@ -72,7 +76,7 @@ func (s *nonBlockingGRPCServer) ForceStop() {
s.server.Stop()
}
func (s *nonBlockingGRPCServer) serve(endpoint string, ids csi.IdentityServer, cs csi.ControllerServer, ns csi.NodeServer) {
func (s *nonBlockingGRPCServer) serve(endpoint, hstOptions string, ids csi.IdentityServer, cs csi.ControllerServer, ns csi.NodeServer, metrics bool) {
proto, addr, err := parseEndpoint(endpoint)
if err != nil {
@ -91,13 +95,14 @@ func (s *nonBlockingGRPCServer) serve(endpoint string, ids csi.IdentityServer, c
klog.Fatalf("Failed to listen: %v", err)
}
opts := []grpc.ServerOption{
grpc_middleware.WithUnaryServerChain(
contextIDInjector,
logGRPC,
panicHandler,
),
middleWare := []grpc.UnaryServerInterceptor{contextIDInjector, logGRPC, panicHandler}
if metrics {
middleWare = append(middleWare, grpc_prometheus.UnaryServerInterceptor)
}
opts := []grpc.ServerOption{
grpc_middleware.WithUnaryServerChain(middleWare...),
}
server := grpc.NewServer(opts...)
s.server = server
@ -110,9 +115,29 @@ func (s *nonBlockingGRPCServer) serve(endpoint string, ids csi.IdentityServer, c
if ns != nil {
csi.RegisterNodeServer(server, ns)
}
klog.Infof("Listening for connections on address: %#v", listener.Addr())
if metrics {
ho := strings.Split(hstOptions, ",")
if len(ho) != 3 {
klog.Fatalf("invalid histogram options provided: %v", hstOptions)
}
start, e := strconv.ParseFloat(ho[0], 32)
if e != nil {
klog.Fatalf("failed to parse histogram start value: %v", e)
}
factor, e := strconv.ParseFloat(ho[1], 32)
if err != nil {
klog.Fatalf("failed to parse histogram factor value: %v", e)
}
count, e := strconv.Atoi(ho[2])
if err != nil {
klog.Fatalf("failed to parse histogram count value: %v", e)
}
buckets := prometheus.ExponentialBuckets(start, factor, count)
bktOptios := grpc_prometheus.WithHistogramBuckets(buckets)
grpc_prometheus.EnableHandlingTimeHistogram(bktOptios)
grpc_prometheus.Register(server)
}
err = server.Serve(listener)
if err != nil {
klog.Fatalf("Failed to server: %v", err)

View File

@ -81,29 +81,29 @@ func NewControllerServiceCapability(ctrlCap csi.ControllerServiceCapability_RPC_
}
// RunNodePublishServer starts node server
func RunNodePublishServer(endpoint string, d *CSIDriver, ns csi.NodeServer) {
func RunNodePublishServer(endpoint, hstOption string, d *CSIDriver, ns csi.NodeServer, m bool) {
ids := NewDefaultIdentityServer(d)
s := NewNonBlockingGRPCServer()
s.Start(endpoint, ids, nil, ns)
s.Start(endpoint, hstOption, ids, nil, ns, m)
s.Wait()
}
// RunControllerPublishServer starts controller server
func RunControllerPublishServer(endpoint string, d *CSIDriver, cs csi.ControllerServer) {
func RunControllerPublishServer(endpoint, hstOption string, d *CSIDriver, cs csi.ControllerServer, m bool) {
ids := NewDefaultIdentityServer(d)
s := NewNonBlockingGRPCServer()
s.Start(endpoint, ids, cs, nil)
s.Start(endpoint, hstOption, ids, cs, nil, m)
s.Wait()
}
// RunControllerandNodePublishServer starts both controller and node server
func RunControllerandNodePublishServer(endpoint string, d *CSIDriver, cs csi.ControllerServer, ns csi.NodeServer) {
func RunControllerandNodePublishServer(endpoint, hstOption string, d *CSIDriver, cs csi.ControllerServer, ns csi.NodeServer, m bool) {
ids := NewDefaultIdentityServer(d)
s := NewNonBlockingGRPCServer()
s.Start(endpoint, ids, cs, ns)
s.Start(endpoint, hstOption, ids, cs, ns, m)
s.Wait()
}

View File

@ -18,10 +18,6 @@ package liveness
import (
"context"
"net"
"net/http"
"os"
"strconv"
"time"
"github.com/ceph/ceph-csi/pkg/util"
@ -29,7 +25,6 @@ import (
connlib "github.com/kubernetes-csi/csi-lib-utils/connection"
"github.com/kubernetes-csi/csi-lib-utils/rpc"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promhttp"
"k8s.io/klog"
)
@ -88,21 +83,9 @@ func recordLiveness(endpoint string, pollTime, timeout time.Duration) {
func Run(conf *util.Config) {
klog.Infof("Liveness Running")
ip := os.Getenv("POD_IP")
if ip == "" {
klog.Warning("missing POD_IP env var defaulting to 0.0.0.0")
ip = "0.0.0.0"
}
// start liveness collection
go recordLiveness(conf.Endpoint, conf.PollTime, conf.PoolTimeout)
// start up prometheus endpoint
addr := net.JoinHostPort(ip, strconv.Itoa(conf.LivenessPort))
http.Handle(conf.LivenessPath, promhttp.Handler())
err := http.ListenAndServe(addr, nil)
if err != nil {
klog.Fatalln(err)
}
util.StartMetricsServer(conf)
}

View File

@ -158,6 +158,9 @@ func (r *Driver) Run(conf *util.Config, cachePersister util.CachePersister) {
}
s := csicommon.NewNonBlockingGRPCServer()
s.Start(conf.Endpoint, r.ids, r.cs, r.ns)
s.Start(conf.Endpoint, conf.HistogramOption, r.ids, r.cs, r.ns, conf.EnableGRPCMetrics)
if conf.EnableGRPCMetrics {
go util.StartMetricsServer(conf)
}
s.Wait()
}

27
pkg/util/httpserver.go Normal file
View File

@ -0,0 +1,27 @@
package util
import (
"net"
"net/http"
"net/url"
"strconv"
"github.com/prometheus/client_golang/prometheus/promhttp"
"k8s.io/klog"
)
// ValidateURL validates the url
func ValidateURL(c *Config) error {
_, err := url.Parse(c.MetricsPath)
return err
}
// StartMetricsServer starts http server
func StartMetricsServer(c *Config) {
addr := net.JoinHostPort(c.MetricsIP, strconv.Itoa(c.MetricsPort))
http.Handle(c.MetricsPath, promhttp.Handler())
err := http.ListenAndServe(addr, nil)
if err != nil {
klog.Fatalln(err)
}
}

View File

@ -54,30 +54,32 @@ var (
// Config holds the parameters list which can be configured
type Config struct {
// common flags
Vtype string // driver type [rbd|cephfs|liveness]
Endpoint string // CSI endpoint
DriverName string // name of the driver
NodeID string // node id
InstanceID string // unique ID distinguishing this instance of Ceph CSI
MetadataStorage string // metadata persistence method [node|k8s_configmap]
PluginPath string // location of cephcsi plugin
PidLimit int // PID limit to configure through cgroups")
IsControllerServer bool // if set to true start provisoner server
IsNodeServer bool // if set to true start node server
// rbd related flags
Containerized bool // whether run as containerized
Vtype string // driver type [rbd|cephfs|liveness]
Endpoint string // CSI endpoint
DriverName string // name of the driver
NodeID string // node id
InstanceID string // unique ID distinguishing this instance of Ceph CSI
MetadataStorage string // metadata persistence method [node|k8s_configmap]
PluginPath string // location of cephcsi plugin
// cephfs related flags
VolumeMounter string // default volume mounter (possible options are 'kernel', 'fuse')
MountCacheDir string // mount info cache save dir
// livenes related flags
LivenessPort int // TCP port for liveness requests"
LivenessPath string // path of prometheus endpoint where metrics will be available
PollTime time.Duration // time interval in seconds between each poll
PoolTimeout time.Duration // probe timeout in seconds
// metrics related flags
MetricsPath string // path of prometheus endpoint where metrics will be available
HistogramOption string // Histogram option for grpc metrics, should be comma separated value, ex:= "0.5,2,6" where start=0.5 factor=2, count=6
MetricsIP string // TCP port for liveness/ metrics requests
PidLimit int // PID limit to configure through cgroups")
MetricsPort int // TCP port for liveness/grpc metrics requests
PollTime time.Duration // time interval in seconds between each poll
PoolTimeout time.Duration // probe timeout in seconds
EnableGRPCMetrics bool // option to enable grpc metrics
IsControllerServer bool // if set to true start provisoner server
IsNodeServer bool // if set to true start node server
// rbd related flag
Containerized bool // whether run as containerized
}