prometheus liveness probe sidecar

Signed-off-by: Daniel-Pivonka dpivonka@redhat.com
This commit is contained in:
Daniel-Pivonka 2019-06-20 15:30:40 -04:00 committed by mergify[bot]
parent 2ca575b99d
commit d621a58207
30 changed files with 2483 additions and 55 deletions

37
Gopkg.lock generated
View File

@ -33,8 +33,8 @@
name = "github.com/beorn7/perks"
packages = ["quantile"]
pruneopts = "NUT"
revision = "4b2b341e8d7715fae06375aa633dbb6e91b3fb46"
version = "v1.0.0"
revision = "37c8de3658fcb183f997c4e13e8337516ab753e6"
version = "v1.0.1"
[[projects]]
digest = "1:7f21fa1f8ab9a529dba26a7e9cf20de217c307fa1d96cb599d3afd9e5c83e9d6"
@ -305,9 +305,13 @@
version = "v1.1.7"
[[projects]]
digest = "1:5f039a8e43dc5b00adee7b38e39baf6c36f607372940c11975f00ec9c5f298ae"
digest = "1:a0892607b4f5385bb9fb12759facc8fad4e61b8b557384e4a078150c6ba43623"
name = "github.com/kubernetes-csi/csi-lib-utils"
packages = ["protosanitizer"]
packages = [
"connection",
"protosanitizer",
"rpc",
]
pruneopts = "NUT"
revision = "b8b7a89535d80e12f2c0f4c53cfb981add8aaca2"
version = "v0.6.1"
@ -457,15 +461,16 @@
version = "v1.0.0"
[[projects]]
digest = "1:623090ece42820e4121c5c85a4373b10b922e79f3b8c44caad67a45a50e10054"
digest = "1:097cc61836050f45cbb712ae3bb45d66fba464c16b8fac09907fa3c1f753eff6"
name = "github.com/prometheus/client_golang"
packages = [
"prometheus",
"prometheus/internal",
"prometheus/promhttp",
]
pruneopts = "NUT"
revision = "4ab88e80c249ed361d3299e2930427d9ac43ef8d"
version = "v1.0.0"
revision = "170205fb58decfd011f1550d4cfb737230d7ae4f"
version = "v1.1.0"
[[projects]]
branch = "master"
@ -588,7 +593,7 @@
[[projects]]
branch = "master"
digest = "1:7a9d37d6b986937aadaf23b125ee6a6d4e3e6eec8560b51ac18fd2adf62e6406"
digest = "1:816780136a1ee09b3070cdfed092c244010fa3f2bde27beb1b1f80dfef4338e1"
name = "golang.org/x/sys"
packages = [
"cpu",
@ -596,7 +601,7 @@
"windows",
]
pruneopts = "NUT"
revision = "fc99dfbffb4e5ed5758a37e31dd861afe285406b"
revision = "cbf593c0f2f39034e9104bbf77e2ec7c48c98fc5"
[[projects]]
digest = "1:0b5dc8c3581fc3ea2b80cc2e360dfb9c2d61dd0cba0d2fe247e8edd3e83f7551"
@ -1106,7 +1111,7 @@
"volume/helpers",
]
pruneopts = "NUT"
revision = "21f9d6b59624871aea170656c0433ad603e67ecd"
revision = "4405817736360488dfda1950e6266d2ff4bcba9e"
[[projects]]
branch = "master"
@ -1117,7 +1122,7 @@
"featuregate",
]
pruneopts = "NUT"
revision = "042c00bc1f9e2571ae04255e09819fa8f3e870d1"
revision = "8d609ba1a28a47e7880ecf03fd75faba56797aad"
[[projects]]
branch = "master"
@ -1125,7 +1130,7 @@
name = "k8s.io/cri-api"
packages = ["pkg/apis/runtime/v1alpha2"]
pruneopts = "NUT"
revision = "247eeb1afecb07101bb58883f79b8b463864b035"
revision = "3baa588ab5670ff6ff8a6fcb4a8ec9234a2346c4"
[[projects]]
digest = "1:43099cc4ed575c40f80277c7ba7168df37d0c663bdc4f541325430bd175cce8a"
@ -1276,7 +1281,7 @@
[[projects]]
branch = "master"
digest = "1:1541afde648991d68b2579076b29afb2d51a54aec822b434644f60f435eeee6e"
digest = "1:9300c13de75a1813f7c386de9aebc7dc49f490bb77369cb9d50af571dd9acf52"
name = "k8s.io/utils"
packages = [
"buffer",
@ -1292,7 +1297,7 @@
"trace",
]
pruneopts = "NUT"
revision = "3dccf664f023863740c508fb4284e49742bedfa4"
revision = "581e00157fb1a0435d4fac54a52d1ca1e481d60e"
[[projects]]
digest = "1:cb422c75bab66a8339a38b64e837f3b28f3d5a8c06abd7b9048f420363baa18a"
@ -1341,13 +1346,17 @@
"github.com/golang/protobuf/ptypes",
"github.com/golang/protobuf/ptypes/timestamp",
"github.com/grpc-ecosystem/go-grpc-middleware",
"github.com/kubernetes-csi/csi-lib-utils/connection",
"github.com/kubernetes-csi/csi-lib-utils/protosanitizer",
"github.com/kubernetes-csi/csi-lib-utils/rpc",
"github.com/kubernetes-csi/external-snapshotter/pkg/apis/volumesnapshot/v1alpha1",
"github.com/kubernetes-csi/external-snapshotter/pkg/client/clientset/versioned/typed/volumesnapshot/v1alpha1",
"github.com/onsi/ginkgo",
"github.com/onsi/gomega",
"github.com/pborman/uuid",
"github.com/pkg/errors",
"github.com/prometheus/client_golang/prometheus",
"github.com/prometheus/client_golang/prometheus/promhttp",
"golang.org/x/net/context",
"google.golang.org/grpc",
"google.golang.org/grpc/codes",

View File

@ -22,8 +22,10 @@ import (
"path"
"path/filepath"
"strings"
"time"
"github.com/ceph/ceph-csi/pkg/cephfs"
"github.com/ceph/ceph-csi/pkg/liveness"
"github.com/ceph/ceph-csi/pkg/rbd"
"github.com/ceph/ceph-csi/pkg/util"
"k8s.io/klog"
@ -32,14 +34,16 @@ import (
const (
rbdType = "rbd"
cephfsType = "cephfs"
livenessType = "liveness"
rbdDefaultName = "rbd.csi.ceph.com"
cephfsDefaultName = "cephfs.csi.ceph.com"
livenessDefaultName = "liveness.csi.ceph.com"
)
var (
// common flags
vtype = flag.String("type", "", "driver type [rbd|cephfs]")
vtype = flag.String("type", "", "driver type [rbd|cephfs|liveness]")
endpoint = flag.String("endpoint", "unix://tmp/csi.sock", "CSI endpoint")
driverName = flag.String("drivername", "", "name of the driver")
nodeID = flag.String("nodeid", "", "node id")
@ -55,6 +59,12 @@ var (
// cephfs related flags
volumeMounter = flag.String("volumemounter", "", "default volume mounter (possible options are 'kernel', 'fuse')")
mountCacheDir = flag.String("mountcachedir", "", "mount info cache save dir")
// livenes related flags
livenessport = flag.Int("livenessport", 8080, "TCP port for liveness requests")
livenesspath = flag.String("livenesspath", "/metrics", "path of prometheus endpoint where metrics will be available")
pollTime = flag.Duration("polltime", time.Second*60, "time interval in seconds between each poll")
timeout = flag.Duration("timeout", time.Second*3, "probe timeout in seconds")
)
func init() {
@ -90,6 +100,8 @@ func getDriverName() string {
return rbdDefaultName
case cephfsType:
return cephfsDefaultName
case livenessType:
return livenessDefaultName
default:
return ""
}
@ -148,6 +160,9 @@ func main() {
driver := cephfs.NewDriver()
driver.Run(dname, *nodeID, *endpoint, *volumeMounter, *mountCacheDir, *instanceID, csipluginPath, cp, driverType)
case livenessType:
liveness.Run(*endpoint, *livenesspath, *livenessport, *pollTime, *timeout)
default:
klog.Fatalln("invalid volume type", vtype) // calls exit
}

View File

@ -4,13 +4,15 @@ apiVersion: v1
metadata:
name: csi-cephfsplugin-provisioner
labels:
app: csi-cephfsplugin-provisioner
app: csi-liveness
spec:
selector:
app: csi-cephfsplugin-provisioner
ports:
- name: dummy
port: 12345
- name: http-metrics
port: 8080
protocol: TCP
targetPort: 8081
---
kind: StatefulSet
@ -97,6 +99,26 @@ spec:
mountPath: /etc/ceph-csi-config/
- name: keys-tmp-dir
mountPath: /tmp/csi/keys
- name: liveness-prometheus
image: quay.io/cephcsi/cephcsi:canary
args:
- "--type=liveness"
- "--endpoint=$(CSI_ENDPOINT)"
- "--livenessport=8081"
- "--livenesspath=/metrics"
- "--polltime=60s"
- "--timeout=3s"
env:
- name: CSI_ENDPOINT
value: unix:///csi/csi.sock
- name: POD_IP
valueFrom:
fieldRef:
fieldPath: status.podIP
volumeMounts:
- name: socket-dir
mountPath: /csi
imagePullPolicy: "IfNotPresent"
volumes:
- name: socket-dir
hostPath:

View File

@ -90,8 +90,29 @@ spec:
mountPath: /dev
- name: ceph-csi-config
mountPath: /etc/ceph-csi-config/
- name: keys-tmp-dir
mountPath: /tmp/csi/keys
- name: liveness-prometheus
image: quay.io/cephcsi/cephcsi:canary
args:
- "--type=liveness"
- "--endpoint=$(CSI_ENDPOINT)"
- "--livenessport=8081"
- "--livenesspath=/metrics"
- "--polltime=60s"
- "--timeout=3s"
env:
- name: CSI_ENDPOINT
value: unix:///csi/csi.sock
- name: POD_IP
valueFrom:
fieldRef:
fieldPath: status.podIP
volumeMounts:
- name: socket-dir
mountPath: /csi
imagePullPolicy: "IfNotPresent"
volumes:
- name: mount-cache-dir
emptyDir: {}
@ -127,3 +148,19 @@ spec:
emptyDir: {
medium: "Memory"
}
---
# This is a service to expose the liveness side car
apiVersion: v1
kind: Service
metadata:
name: csi-liveness-cephfsplugin
labels:
app: csi-liveness
spec:
ports:
- name: http-metrics
port: 8080
protocol: TCP
targetPort: 8081
selector:
app: csi-cephfsplugin

View File

@ -108,6 +108,28 @@ spec:
- name: keys-tmp-dir
mountPath: /tmp/csi/keys
resources:
{{ toYaml .Values.nodeplugin.plugin.resources | indent 12 }}
- name: liveness-prometheus
image: "{{ .Values.nodeplugin.plugin.image.repository }}:{{ .Values.nodeplugin.plugin.image.tag }}"
args:
- "--type=liveness"
- "--endpoint=$(CSI_ENDPOINT)"
- "--livenessport=8081"
- "--livenesspath=/metrics"
- "--polltime=60s"
- "--timeout=3s"
imagePullPolicy: {{ .Values.nodeplugin.plugin.image.pullPolicy }}
env:
- name: CSI_ENDPOINT
value: "unix:/{{ .Values.socketDir }}/{{ .Values.socketFile }}"
- name: POD_IP
valueFrom:
fieldRef:
fieldPath: status.podIP
volumeMounts:
- name: plugin-dir
mountPath: {{ .Values.socketDir }}
resources:
{{ toYaml .Values.nodeplugin.plugin.resources | indent 12 }}
volumes:
- name: mount-cache-dir

View File

@ -4,7 +4,7 @@ metadata:
name: {{ include "ceph-csi-cephfs.provisioner.fullname" . }}
namespace: {{ .Release.Namespace }}
labels:
app: {{ include "ceph-csi-cephfs.name" . }}
app: csi-liveness
chart: {{ include "ceph-csi-cephfs.chart" . }}
component: {{ .Values.provisioner.name }}
release: {{ .Release.Name }}
@ -15,5 +15,7 @@ spec:
component: {{ .Values.provisioner.name }}
release: {{ .Release.Name }}
ports:
- name: dummy
port: 12345
- name: http-metrics
port: 8080
protocol: TCP
targetPort: 8081

View File

@ -95,6 +95,28 @@ spec:
- name: keys-tmp-dir
mountPath: /tmp/csi/keys
resources:
{{ toYaml .Values.nodeplugin.plugin.resources | indent 12 }}
- name: liveness-prometheus
image: "{{ .Values.nodeplugin.plugin.image.repository }}:{{ .Values.nodeplugin.plugin.image.tag }}"
args:
- "--type=liveness"
- "--endpoint=$(CSI_ENDPOINT)"
- "--livenessport=8081"
- "--livenesspath=/metrics"
- "--polltime=60s"
- "--timeout=3s"
imagePullPolicy: {{ .Values.nodeplugin.plugin.image.pullPolicy }}
env:
- name: CSI_ENDPOINT
value: "unix:/{{ .Values.socketDir }}/{{ .Values.socketFile }}"
- name: POD_IP
valueFrom:
fieldRef:
fieldPath: status.podIP
volumeMounts:
- name: socket-dir
mountPath: {{ .Values.socketDir }}
resources:
{{ toYaml .Values.nodeplugin.plugin.resources | indent 12 }}
volumes:
- name: socket-dir

View File

@ -1,3 +1,19 @@
---
kind: Service
apiVersion: v1
metadata:
name: csi-cephfsplugin-provisioner
labels:
app: csi-liveness
spec:
selector:
app: csi-cephfsplugin-provisioner
ports:
- name: http-metrics
port: 8080
protocol: TCP
targetPort: 8081
---
kind: Deployment
apiVersion: apps/v1
@ -86,6 +102,26 @@ spec:
mountPath: /etc/ceph-csi-config/
- name: keys-tmp-dir
mountPath: /tmp/csi/keys
- name: liveness-prometheus
image: quay.io/cephcsi/cephcsi:canary
args:
- "--type=liveness"
- "--endpoint=$(CSI_ENDPOINT)"
- "--livenessport=8081"
- "--livenesspath=/metrics"
- "--polltime=60s"
- "--timeout=3s"
env:
- name: CSI_ENDPOINT
value: unix:///csi/csi.sock
- name: POD_IP
valueFrom:
fieldRef:
fieldPath: status.podIP
volumeMounts:
- name: socket-dir
mountPath: /csi
imagePullPolicy: "IfNotPresent"
volumes:
- name: socket-dir
hostPath:

View File

@ -92,6 +92,26 @@ spec:
mountPath: /etc/ceph-csi-config/
- name: keys-tmp-dir
mountPath: /tmp/csi/keys
- name: liveness-prometheus
image: quay.io/cephcsi/cephcsi:canary
args:
- "--type=liveness"
- "--endpoint=$(CSI_ENDPOINT)"
- "--livenessport=8081"
- "--livenesspath=/metrics"
- "--polltime=60s"
- "--timeout=3s"
env:
- name: CSI_ENDPOINT
value: unix:///csi/csi.sock
- name: POD_IP
valueFrom:
fieldRef:
fieldPath: status.podIP
volumeMounts:
- name: socket-dir
mountPath: /csi
imagePullPolicy: "IfNotPresent"
volumes:
- name: mount-cache-dir
emptyDir: {}
@ -127,3 +147,19 @@ spec:
emptyDir: {
medium: "Memory"
}
---
# This is a service to expose the liveness side car
apiVersion: v1
kind: Service
metadata:
name: csi-liveness-cephfsplugin
labels:
app: csi-liveness
spec:
ports:
- name: http-metrics
port: 8080
protocol: TCP
targetPort: 8081
selector:
app: csi-cephfsplugin

View File

@ -4,13 +4,15 @@ apiVersion: v1
metadata:
name: csi-rbdplugin-provisioner
labels:
app: csi-rbdplugin-provisioner
app: csi-liveness
spec:
selector:
app: csi-rbdplugin-provisioner
ports:
- name: dummy
port: 12345
- name: http-metrics
port: 8080
protocol: TCP
targetPort: 8080
---
kind: StatefulSet
@ -110,6 +112,26 @@ spec:
mountPath: /etc/ceph-csi-config/
- name: keys-tmp-dir
mountPath: /tmp/csi/keys
- name: liveness-prometheus
image: quay.io/cephcsi/cephcsi:canary
args:
- "--type=liveness"
- "--endpoint=$(CSI_ENDPOINT)"
- "--livenessport=8080"
- "--livenesspath=/metrics"
- "--polltime=60s"
- "--timeout=3s"
env:
- name: CSI_ENDPOINT
value: unix:///csi/csi.sock
- name: POD_IP
valueFrom:
fieldRef:
fieldPath: status.podIP
volumeMounts:
- name: socket-dir
mountPath: /csi
imagePullPolicy: "IfNotPresent"
volumes:
- name: host-dev
hostPath:

View File

@ -90,6 +90,26 @@ spec:
mountPropagation: "Bidirectional"
- name: keys-tmp-dir
mountPath: /tmp/csi/keys
- name: liveness-prometheus
image: quay.io/cephcsi/cephcsi:canary
args:
- "--type=liveness"
- "--endpoint=$(CSI_ENDPOINT)"
- "--livenessport=8080"
- "--livenesspath=/metrics"
- "--polltime=60s"
- "--timeout=3s"
env:
- name: CSI_ENDPOINT
value: unix:///csi/csi.sock
- name: POD_IP
valueFrom:
fieldRef:
fieldPath: status.podIP
volumeMounts:
- name: socket-dir
mountPath: /csi
imagePullPolicy: "IfNotPresent"
volumes:
- name: socket-dir
hostPath:
@ -126,3 +146,19 @@ spec:
emptyDir: {
medium: "Memory"
}
---
# This is a service to expose the liveness side car
apiVersion: v1
kind: Service
metadata:
name: csi-liveness-rbdplugin
labels:
app: csi-liveness
spec:
ports:
- name: http-metrics
port: 8080
protocol: TCP
targetPort: 8080
selector:
app: csi-rbdplugin

View File

@ -107,6 +107,28 @@ spec:
- name: keys-tmp-dir
mountPath: /tmp/csi/keys
resources:
{{ toYaml .Values.nodeplugin.plugin.resources | indent 12 }}
- name: liveness-prometheus
image: "{{ .Values.nodeplugin.plugin.image.repository }}:{{ .Values.nodeplugin.plugin.image.tag }}"
args:
- "--type=liveness"
- "--endpoint=$(CSI_ENDPOINT)"
- "--livenessport=8080"
- "--livenesspath=/metrics"
- "--polltime=60s"
- "--timeout=3s"
imagePullPolicy: {{ .Values.nodeplugin.plugin.image.pullPolicy }}
env:
- name: CSI_ENDPOINT
value: "unix:/{{ .Values.socketDir }}/{{ .Values.socketFile }}"
- name: POD_IP
valueFrom:
fieldRef:
fieldPath: status.podIP
volumeMounts:
- name: plugin-dir
mountPath: {{ .Values.socketDir }}
resources:
{{ toYaml .Values.nodeplugin.plugin.resources | indent 12 }}
volumes:
- name: socket-dir

View File

@ -4,7 +4,7 @@ metadata:
name: {{ include "ceph-csi-rbd.provisioner.fullname" . }}
namespace: {{ .Release.Namespace }}
labels:
app: {{ include "ceph-csi-rbd.name" . }}
app: csi-liveness
chart: {{ include "ceph-csi-rbd.chart" . }}
component: {{ .Values.provisioner.name }}
release: {{ .Release.Name }}
@ -15,5 +15,7 @@ spec:
component: {{ .Values.provisioner.name }}
release: {{ .Release.Name }}
ports:
- name: dummy
port: 12345
- name: http-metrics
port: 8080
protocol: TCP
targetPort: 8080

View File

@ -25,6 +25,7 @@ spec:
component: {{ .Values.provisioner.name }}
release: {{ .Release.Name }}
heritage: {{ .Release.Service }}
contains: liveness
spec:
serviceAccountName: {{ include "ceph-csi-rbd.serviceAccountName.provisioner" . }}
containers:
@ -112,6 +113,28 @@ spec:
- name: keys-tmp-dir
mountPath: /tmp/csi/keys
resources:
{{ toYaml .Values.nodeplugin.plugin.resources | indent 12 }}
- name: liveness-prometheus
image: "{{ .Values.nodeplugin.plugin.image.repository }}:{{ .Values.nodeplugin.plugin.image.tag }}"
args:
- "--type=liveness"
- "--endpoint=$(CSI_ENDPOINT)"
- "--livenessport=8080"
- "--livenesspath=/metrics"
- "--polltime=60s"
- "--timeout=3s"
imagePullPolicy: {{ .Values.nodeplugin.plugin.image.pullPolicy }}
env:
- name: CSI_ENDPOINT
value: "unix:/{{ .Values.socketDir }}/{{ .Values.socketFile }}"
- name: POD_IP
valueFrom:
fieldRef:
fieldPath: status.podIP
volumeMounts:
- name: socket-dir
mountPath: {{ .Values.socketDir }}
resources:
{{ toYaml .Values.nodeplugin.plugin.resources | indent 12 }}
volumes:
- name: socket-dir

View File

@ -1,3 +1,19 @@
---
kind: Service
apiVersion: v1
metadata:
name: csi-rbdplugin-provisioner
labels:
app: csi-liveness
spec:
selector:
app: csi-rbdplugin-provisioner
ports:
- name: http-metrics
port: 8080
protocol: TCP
targetPort: 8080
---
kind: Deployment
apiVersion: apps/v1
@ -100,6 +116,26 @@ spec:
mountPath: /etc/ceph-csi-config/
- name: keys-tmp-dir
mountPath: /tmp/csi/keys
- name: liveness-prometheus
image: quay.io/cephcsi/cephcsi:canary
args:
- "--type=liveness"
- "--endpoint=$(CSI_ENDPOINT)"
- "--livenessport=8080"
- "--livenesspath=/metrics"
- "--polltime=60s"
- "--timeout=3s"
env:
- name: CSI_ENDPOINT
value: unix:///csi/csi.sock
- name: POD_IP
valueFrom:
fieldRef:
fieldPath: status.podIP
volumeMounts:
- name: socket-dir
mountPath: /csi
imagePullPolicy: "IfNotPresent"
volumes:
- name: host-dev
hostPath:

View File

@ -90,6 +90,26 @@ spec:
mountPropagation: "Bidirectional"
- name: keys-tmp-dir
mountPath: /tmp/csi/keys
- name: liveness-prometheus
image: quay.io/cephcsi/cephcsi:canary
args:
- "--type=liveness"
- "--endpoint=$(CSI_ENDPOINT)"
- "--livenessport=8080"
- "--livenesspath=/metrics"
- "--polltime=60s"
- "--timeout=3s"
env:
- name: CSI_ENDPOINT
value: unix:///csi/csi.sock
- name: POD_IP
valueFrom:
fieldRef:
fieldPath: status.podIP
volumeMounts:
- name: socket-dir
mountPath: /csi
imagePullPolicy: "IfNotPresent"
volumes:
- name: socket-dir
hostPath:
@ -126,3 +146,19 @@ spec:
emptyDir: {
medium: "Memory"
}
---
# This is a service to expose the liveness side car
apiVersion: v1
kind: Service
metadata:
name: csi-liveness-rbdplugin
labels:
app: csi-liveness
spec:
ports:
- name: http-metrics
port: 8080
protocol: TCP
targetPort: 8080
selector:
app: csi-rbdplugin

View File

@ -1,3 +1,4 @@
# CSI CephFS plugin
The CSI CephFS plugin is able to both provision new CephFS volumes
@ -43,7 +44,7 @@ that should be resolved in v14.2.3.
**Available command line arguments:**
| Option | Default value | Description |
| ------------------- | --------------------------- | -------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- |
| ------------------- | --------------------------- | ----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- |
| `--endpoint` | `unix://tmp/csi.sock` | CSI endpoint, must be a UNIX socket |
| `--drivername` | `cephfs.csi.ceph.com` | Name of the driver (Kubernetes: `provisioner` field in StorageClass must correspond to this value) |
| `--nodeid` | _empty_ | This node's ID |
@ -54,6 +55,10 @@ that should be resolved in v14.2.3.
| `--pluginpath` | "/var/lib/kubelet/plugins/" | The location of cephcsi plugin on host |
| `--metadatastorage` | _empty_ | Points to where older (1.0.0 or older plugin versions) metadata about provisioned volumes are kept, as file or in as k8s configmap (`node` or `k8s_configmap` respectively) |
| `--pidlimit` | _0_ | Configure the PID limit in cgroups. The container runtime can restrict the number of processes/tasks which can cause problems while provisioning (or deleting) a large number of volumes. A value of `-1` configures the limit to the maximum, `0` does not configure limits at all. |
| `--livenessport` | `8080` | TCP port for liveness requests |
| `--livenesspath` | `/metrics` | Path of prometheus endpoint where metrics will be available |
| `--polltime` | `60s` | Time interval in between each poll |
| `--timeout` | `3s` | Probe timeout in seconds |
**Available environmental variables:**
@ -163,11 +168,11 @@ After successfully completing the steps above, you should see output similar to
```bash
$ kubectl get all
NAME READY STATUS RESTARTS AGE
pod/csi-cephfsplugin-provisioner-0 3/3 Running 0 25s
pod/csi-cephfsplugin-rljcv 2/2 Running 0 24s
pod/csi-cephfsplugin-provisioner-0 4/4 Running 0 25s
pod/csi-cephfsplugin-rljcv 3/3 Running 0 24s
NAME TYPE CLUSTER-IP EXTERNAL-IP PORT(S) AGE
service/csi-cephfsplugin-provisioner ClusterIP 10.101.78.75 <none> 12345/TCP 26s
service/csi-cephfsplugin-provisioner ClusterIP 10.101.78.75 <none> 8080/TCP 26s
...
```

View File

@ -1,3 +1,4 @@
# CSI RBD Plugin
The RBD CSI plugin is able to provision new RBD images and
@ -36,6 +37,10 @@ make image-cephcsi
| `--instanceid` | "default" | Unique ID distinguishing this instance of Ceph CSI among other instances, when sharing Ceph clusters across CSI instances for provisioning |
| `--metadatastorage` | _empty_ | Points to where legacy (1.0.0 or older plugin versions) metadata about provisioned volumes are kept, as file or in as k8s configmap (`node` or `k8s_configmap` respectively) |
| `--pidlimit` | _0_ | Configure the PID limit in cgroups. The container runtime can restrict the number of processes/tasks which can cause problems while provisioning (or deleting) a large number of volumes. A value of `-1` configures the limit to the maximum, `0` does not configure limits at all. |
| `--livenessport` | `8080` | TCP port for liveness requests |
| `--livenesspath` | `"/metrics"` | Path of prometheus endpoint where metrics will be available |
| `--polltime` | `"60s"` | Time interval in between each poll |
| `--timeout` | `"3s"` | Probe timeout in seconds |
**Available volume parameters:**
@ -126,11 +131,11 @@ After successfully completing the steps above, you should see output similar to
```bash
$ kubectl get all
NAME READY STATUS RESTARTS AGE
pod/csi-rbdplugin-fptqr 2/2 Running 0 21s
pod/csi-rbdplugin-provisioner-0 4/4 Running 0 22s
pod/csi-rbdplugin-fptqr 3/3 Running 0 21s
pod/csi-rbdplugin-provisioner-0 5/5 Running 0 22s
NAME TYPE CLUSTER-IP EXTERNAL-IP PORT(S) AGE
service/csi-rbdplugin-provisioner ClusterIP 10.104.2.130 <none> 12345/TCP 23s
service/csi-rbdplugin-provisioner ClusterIP 10.104.2.130 <none> 8080/TCP 23s
...
```

32
docs/metrics.md Normal file
View File

@ -0,0 +1,32 @@
# Metrics
CSI deploys a sidecar container that is responsible for collecting metrics.
## Liveness
Liveness metrics are intended to be collected by prometheus but can be accesesed
through a GET request to a specific pod ip.
for example
`curl -X get http://[pod ip]:[liveness-port][liveness-path] 2>/dev/null | grep csi`
the expected output should be
```bash
[root@worker2 /]# curl -X GET http://10.109.65.142:8080/metrics 2>/dev/null | grep csi
# HELP csi_liveness Liveness Probe
# TYPE csi_liveness gauge
csi_liveness 1
```
Promethues can be deployed through the promethues operator described [here](https://coreos.com/operators/prometheus/docs/latest/user-guides/getting-started.html).
The [service-monitor](../examples/service-monitor.yaml) will tell promethues how
to pull metrics out of CSI.
Each CSI pod has a service to expose the end point to prometheus. By default rbd
pods run on port 8080 and cephfs 8081.
These can be changed if desired or if multiple ceph clusters are deployed more
ports will be used for additional CSI pods.
You may need to open the ports used in your firewall depending on how you
cluster is setup.

View File

@ -0,0 +1,22 @@
---
# This is a servicemonitor that would be used by a prometheus collector to pick
# up liveness metrics. The label your prometheus instance is looking for will
# need to be supplied and the namespace may need to changed
apiVersion: monitoring.coreos.com/v1
kind: ServiceMonitor
metadata:
name: csi-liveness
namespace: rook-ceph
labels:
team: rook
spec:
namespaceSelector:
matchNames:
- default
selector:
matchLabels:
app: csi-liveness
endpoints:
- port: http-metrics
path: /metrics
interval: 5s

106
pkg/liveness/liveness.go Normal file
View File

@ -0,0 +1,106 @@
/*
Copyright 2019 The Ceph-CSI Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package liveness
import (
"context"
"net"
"net/http"
"os"
"strconv"
"time"
connlib "github.com/kubernetes-csi/csi-lib-utils/connection"
"github.com/kubernetes-csi/csi-lib-utils/rpc"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promhttp"
"k8s.io/klog"
)
var (
liveness = prometheus.NewGauge(prometheus.GaugeOpts{
Namespace: "csi",
Name: "liveness",
Help: "Liveness Probe",
})
)
func getLiveness(endpoint string, timeout time.Duration) {
csiConn, err := connlib.Connect(endpoint)
if err != nil {
// connlib should retry forever so a returned error should mean
// the grpc client is misconfigured rather than an error on the network
klog.Fatalf("failed to establish connection to CSI driver: %v", err)
}
ctx, cancel := context.WithTimeout(context.Background(), timeout)
defer cancel()
klog.Info("Sending probe request to CSI driver")
ready, err := rpc.Probe(ctx, csiConn)
if err != nil {
liveness.Set(0)
klog.Errorf("health check failed: %v", err)
return
}
if !ready {
liveness.Set(0)
klog.Error("driver responded but is not ready")
return
}
liveness.Set(1)
klog.Infof("Health check succeeded")
}
func recordLiveness(endpoint string, pollTime, timeout time.Duration) {
// register prometheus metrics
err := prometheus.Register(liveness)
if err != nil {
klog.Fatalln(err)
}
// get liveness periodically
ticker := time.NewTicker(pollTime)
defer ticker.Stop()
for range ticker.C {
getLiveness(endpoint, timeout)
}
}
func Run(endpoint, livenessendpoint string, port int, pollTime, timeout time.Duration) {
klog.Infof("Liveness Running")
ip := os.Getenv("POD_IP")
if ip == "" {
klog.Warning("missing POD_IP env var defaulting to 0.0.0.0")
ip = "0.0.0.0"
}
// start liveness collection
go recordLiveness(endpoint, pollTime, timeout)
// start up prometheus endpoint
addr := net.JoinHostPort(ip, strconv.Itoa(port))
http.Handle(livenessendpoint, promhttp.Handler())
err := http.ListenAndServe(addr, nil)
if err != nil {
klog.Fatalln(err)
}
}

View File

@ -0,0 +1,310 @@
/*
Copyright 2019 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package connection
import (
"context"
"errors"
"fmt"
"io/ioutil"
"net"
"strings"
"time"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
"github.com/container-storage-interface/spec/lib/go/csi"
"github.com/kubernetes-csi/csi-lib-utils/protosanitizer"
"google.golang.org/grpc"
"k8s.io/klog"
)
const (
// Interval of logging connection errors
connectionLoggingInterval = 10 * time.Second
// Interval of trying to call Probe() until it succeeds
probeInterval = 1 * time.Second
)
const terminationLogPath = "/dev/termination-log"
// Connect opens insecure gRPC connection to a CSI driver. Address must be either absolute path to UNIX domain socket
// file or have format '<protocol>://', following gRPC name resolution mechanism at
// https://github.com/grpc/grpc/blob/master/doc/naming.md.
//
// The function tries to connect indefinitely every second until it connects. The function automatically disables TLS
// and adds interceptor for logging of all gRPC messages at level 5.
//
// For a connection to a Unix Domain socket, the behavior after
// loosing the connection is configurable. The default is to
// log the connection loss and reestablish a connection. Applications
// which need to know about a connection loss can be notified by
// passing a callback with OnConnectionLoss and in that callback
// can decide what to do:
// - exit the application with os.Exit
// - invalidate cached information
// - disable the reconnect, which will cause all gRPC method calls to fail with status.Unavailable
//
// For other connections, the default behavior from gRPC is used and
// loss of connection is not detected reliably.
func Connect(address string, options ...Option) (*grpc.ClientConn, error) {
return connect(address, []grpc.DialOption{}, options)
}
// Option is the type of all optional parameters for Connect.
type Option func(o *options)
// OnConnectionLoss registers a callback that will be invoked when the
// connection got lost. If that callback returns true, the connection
// is reestablished. Otherwise the connection is left as it is and
// all future gRPC calls using it will fail with status.Unavailable.
func OnConnectionLoss(reconnect func() bool) Option {
return func(o *options) {
o.reconnect = reconnect
}
}
// ExitOnConnectionLoss returns callback for OnConnectionLoss() that writes
// an error to /dev/termination-log and exits.
func ExitOnConnectionLoss() func() bool {
return func() bool {
terminationMsg := "Lost connection to CSI driver, exiting"
if err := ioutil.WriteFile(terminationLogPath, []byte(terminationMsg), 0644); err != nil {
klog.Errorf("%s: %s", terminationLogPath, err)
}
klog.Fatalf(terminationMsg)
return false
}
}
type options struct {
reconnect func() bool
}
// connect is the internal implementation of Connect. It has more options to enable testing.
func connect(address string, dialOptions []grpc.DialOption, connectOptions []Option) (*grpc.ClientConn, error) {
var o options
for _, option := range connectOptions {
option(&o)
}
dialOptions = append(dialOptions,
grpc.WithInsecure(), // Don't use TLS, it's usually local Unix domain socket in a container.
grpc.WithBackoffMaxDelay(time.Second), // Retry every second after failure.
grpc.WithBlock(), // Block until connection succeeds.
grpc.WithUnaryInterceptor(LogGRPC), // Log all messages.
)
unixPrefix := "unix://"
if strings.HasPrefix(address, "/") {
// It looks like filesystem path.
address = unixPrefix + address
}
if strings.HasPrefix(address, unixPrefix) {
// state variables for the custom dialer
haveConnected := false
lostConnection := false
reconnect := true
dialOptions = append(dialOptions, grpc.WithDialer(func(addr string, timeout time.Duration) (net.Conn, error) {
if haveConnected && !lostConnection {
// We have detected a loss of connection for the first time. Decide what to do...
// Record this once. TODO (?): log at regular time intervals.
klog.Errorf("Lost connection to %s.", address)
// Inform caller and let it decide? Default is to reconnect.
if o.reconnect != nil {
reconnect = o.reconnect()
}
lostConnection = true
}
if !reconnect {
return nil, errors.New("connection lost, reconnecting disabled")
}
conn, err := net.DialTimeout("unix", address[len(unixPrefix):], timeout)
if err == nil {
// Connection restablished.
haveConnected = true
lostConnection = false
}
return conn, err
}))
} else if o.reconnect != nil {
return nil, errors.New("OnConnectionLoss callback only supported for unix:// addresses")
}
klog.Infof("Connecting to %s", address)
// Connect in background.
var conn *grpc.ClientConn
var err error
ready := make(chan bool)
go func() {
conn, err = grpc.Dial(address, dialOptions...)
close(ready)
}()
// Log error every connectionLoggingInterval
ticker := time.NewTicker(connectionLoggingInterval)
defer ticker.Stop()
// Wait until Dial() succeeds.
for {
select {
case <-ticker.C:
klog.Warningf("Still connecting to %s", address)
case <-ready:
return conn, err
}
}
}
// LogGRPC is gPRC unary interceptor for logging of CSI messages at level 5. It removes any secrets from the message.
func LogGRPC(ctx context.Context, method string, req, reply interface{}, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error {
klog.V(5).Infof("GRPC call: %s", method)
klog.V(5).Infof("GRPC request: %s", protosanitizer.StripSecrets(req))
err := invoker(ctx, method, req, reply, cc, opts...)
klog.V(5).Infof("GRPC response: %s", protosanitizer.StripSecrets(reply))
klog.V(5).Infof("GRPC error: %v", err)
return err
}
// GetDriverName returns name of CSI driver.
func GetDriverName(ctx context.Context, conn *grpc.ClientConn) (string, error) {
client := csi.NewIdentityClient(conn)
req := csi.GetPluginInfoRequest{}
rsp, err := client.GetPluginInfo(ctx, &req)
if err != nil {
return "", err
}
name := rsp.GetName()
if name == "" {
return "", fmt.Errorf("driver name is empty")
}
return name, nil
}
// PluginCapabilitySet is set of CSI plugin capabilities. Only supported capabilities are in the map.
type PluginCapabilitySet map[csi.PluginCapability_Service_Type]bool
// GetPluginCapabilities returns set of supported capabilities of CSI driver.
func GetPluginCapabilities(ctx context.Context, conn *grpc.ClientConn) (PluginCapabilitySet, error) {
client := csi.NewIdentityClient(conn)
req := csi.GetPluginCapabilitiesRequest{}
rsp, err := client.GetPluginCapabilities(ctx, &req)
if err != nil {
return nil, err
}
caps := PluginCapabilitySet{}
for _, cap := range rsp.GetCapabilities() {
if cap == nil {
continue
}
srv := cap.GetService()
if srv == nil {
continue
}
t := srv.GetType()
caps[t] = true
}
return caps, nil
}
// ControllerCapabilitySet is set of CSI controller capabilities. Only supported capabilities are in the map.
type ControllerCapabilitySet map[csi.ControllerServiceCapability_RPC_Type]bool
// GetControllerCapabilities returns set of supported controller capabilities of CSI driver.
func GetControllerCapabilities(ctx context.Context, conn *grpc.ClientConn) (ControllerCapabilitySet, error) {
client := csi.NewControllerClient(conn)
req := csi.ControllerGetCapabilitiesRequest{}
rsp, err := client.ControllerGetCapabilities(ctx, &req)
if err != nil {
return nil, err
}
caps := ControllerCapabilitySet{}
for _, cap := range rsp.GetCapabilities() {
if cap == nil {
continue
}
rpc := cap.GetRpc()
if rpc == nil {
continue
}
t := rpc.GetType()
caps[t] = true
}
return caps, nil
}
// ProbeForever calls Probe() of a CSI driver and waits until the driver becomes ready.
// Any error other than timeout is returned.
func ProbeForever(conn *grpc.ClientConn, singleProbeTimeout time.Duration) error {
for {
klog.Info("Probing CSI driver for readiness")
ready, err := probeOnce(conn, singleProbeTimeout)
if err != nil {
st, ok := status.FromError(err)
if !ok {
// This is not gRPC error. The probe must have failed before gRPC
// method was called, otherwise we would get gRPC error.
return fmt.Errorf("CSI driver probe failed: %s", err)
}
if st.Code() != codes.DeadlineExceeded {
return fmt.Errorf("CSI driver probe failed: %s", err)
}
// Timeout -> driver is not ready. Fall through to sleep() below.
klog.Warning("CSI driver probe timed out")
} else {
if ready {
return nil
}
klog.Warning("CSI driver is not ready")
}
// Timeout was returned or driver is not ready.
time.Sleep(probeInterval)
}
}
// probeOnce is a helper to simplify defer cancel()
func probeOnce(conn *grpc.ClientConn, timeout time.Duration) (bool, error) {
ctx, cancel := context.WithTimeout(context.Background(), timeout)
defer cancel()
return Probe(ctx, conn)
}
// Probe calls driver Probe() just once and returns its result without any processing.
func Probe(ctx context.Context, conn *grpc.ClientConn) (ready bool, err error) {
client := csi.NewIdentityClient(conn)
req := csi.ProbeRequest{}
rsp, err := client.Probe(ctx, &req)
if err != nil {
return false, err
}
r := rsp.GetReady()
if r == nil {
// "If not present, the caller SHALL assume that the plugin is in a ready state"
return true, nil
}
return r.GetValue(), nil
}

View File

@ -0,0 +1,160 @@
/*
Copyright 2019 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package rpc
import (
"context"
"fmt"
"time"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
"github.com/container-storage-interface/spec/lib/go/csi"
"k8s.io/klog"
)
const (
// Interval of trying to call Probe() until it succeeds
probeInterval = 1 * time.Second
)
// GetDriverName returns name of CSI driver.
func GetDriverName(ctx context.Context, conn *grpc.ClientConn) (string, error) {
client := csi.NewIdentityClient(conn)
req := csi.GetPluginInfoRequest{}
rsp, err := client.GetPluginInfo(ctx, &req)
if err != nil {
return "", err
}
name := rsp.GetName()
if name == "" {
return "", fmt.Errorf("driver name is empty")
}
return name, nil
}
// PluginCapabilitySet is set of CSI plugin capabilities. Only supported capabilities are in the map.
type PluginCapabilitySet map[csi.PluginCapability_Service_Type]bool
// GetPluginCapabilities returns set of supported capabilities of CSI driver.
func GetPluginCapabilities(ctx context.Context, conn *grpc.ClientConn) (PluginCapabilitySet, error) {
client := csi.NewIdentityClient(conn)
req := csi.GetPluginCapabilitiesRequest{}
rsp, err := client.GetPluginCapabilities(ctx, &req)
if err != nil {
return nil, err
}
caps := PluginCapabilitySet{}
for _, cap := range rsp.GetCapabilities() {
if cap == nil {
continue
}
srv := cap.GetService()
if srv == nil {
continue
}
t := srv.GetType()
caps[t] = true
}
return caps, nil
}
// ControllerCapabilitySet is set of CSI controller capabilities. Only supported capabilities are in the map.
type ControllerCapabilitySet map[csi.ControllerServiceCapability_RPC_Type]bool
// GetControllerCapabilities returns set of supported controller capabilities of CSI driver.
func GetControllerCapabilities(ctx context.Context, conn *grpc.ClientConn) (ControllerCapabilitySet, error) {
client := csi.NewControllerClient(conn)
req := csi.ControllerGetCapabilitiesRequest{}
rsp, err := client.ControllerGetCapabilities(ctx, &req)
if err != nil {
return nil, err
}
caps := ControllerCapabilitySet{}
for _, cap := range rsp.GetCapabilities() {
if cap == nil {
continue
}
rpc := cap.GetRpc()
if rpc == nil {
continue
}
t := rpc.GetType()
caps[t] = true
}
return caps, nil
}
// ProbeForever calls Probe() of a CSI driver and waits until the driver becomes ready.
// Any error other than timeout is returned.
func ProbeForever(conn *grpc.ClientConn, singleProbeTimeout time.Duration) error {
for {
klog.Info("Probing CSI driver for readiness")
ready, err := probeOnce(conn, singleProbeTimeout)
if err != nil {
st, ok := status.FromError(err)
if !ok {
// This is not gRPC error. The probe must have failed before gRPC
// method was called, otherwise we would get gRPC error.
return fmt.Errorf("CSI driver probe failed: %s", err)
}
if st.Code() != codes.DeadlineExceeded {
return fmt.Errorf("CSI driver probe failed: %s", err)
}
// Timeout -> driver is not ready. Fall through to sleep() below.
klog.Warning("CSI driver probe timed out")
} else {
if ready {
return nil
}
klog.Warning("CSI driver is not ready")
}
// Timeout was returned or driver is not ready.
time.Sleep(probeInterval)
}
}
// probeOnce is a helper to simplify defer cancel()
func probeOnce(conn *grpc.ClientConn, timeout time.Duration) (bool, error) {
ctx, cancel := context.WithTimeout(context.Background(), timeout)
defer cancel()
return Probe(ctx, conn)
}
// Probe calls driver Probe() just once and returns its result without any processing.
func Probe(ctx context.Context, conn *grpc.ClientConn) (ready bool, err error) {
client := csi.NewIdentityClient(conn)
req := csi.ProbeRequest{}
rsp, err := client.Probe(ctx, &req)
if err != nil {
return false, err
}
r := rsp.GetReady()
if r == nil {
// "If not present, the caller SHALL assume that the plugin is in a ready state"
return true, nil
}
return r.GetValue(), nil
}

View File

@ -0,0 +1,357 @@
// Copyright 2017 The Prometheus Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package promhttp
import (
"bufio"
"io"
"net"
"net/http"
)
const (
closeNotifier = 1 << iota
flusher
hijacker
readerFrom
pusher
)
type delegator interface {
http.ResponseWriter
Status() int
Written() int64
}
type responseWriterDelegator struct {
http.ResponseWriter
status int
written int64
wroteHeader bool
observeWriteHeader func(int)
}
func (r *responseWriterDelegator) Status() int {
return r.status
}
func (r *responseWriterDelegator) Written() int64 {
return r.written
}
func (r *responseWriterDelegator) WriteHeader(code int) {
r.status = code
r.wroteHeader = true
r.ResponseWriter.WriteHeader(code)
if r.observeWriteHeader != nil {
r.observeWriteHeader(code)
}
}
func (r *responseWriterDelegator) Write(b []byte) (int, error) {
if !r.wroteHeader {
r.WriteHeader(http.StatusOK)
}
n, err := r.ResponseWriter.Write(b)
r.written += int64(n)
return n, err
}
type closeNotifierDelegator struct{ *responseWriterDelegator }
type flusherDelegator struct{ *responseWriterDelegator }
type hijackerDelegator struct{ *responseWriterDelegator }
type readerFromDelegator struct{ *responseWriterDelegator }
type pusherDelegator struct{ *responseWriterDelegator }
func (d closeNotifierDelegator) CloseNotify() <-chan bool {
//lint:ignore SA1019 http.CloseNotifier is deprecated but we don't want to
//remove support from client_golang yet.
return d.ResponseWriter.(http.CloseNotifier).CloseNotify()
}
func (d flusherDelegator) Flush() {
d.ResponseWriter.(http.Flusher).Flush()
}
func (d hijackerDelegator) Hijack() (net.Conn, *bufio.ReadWriter, error) {
return d.ResponseWriter.(http.Hijacker).Hijack()
}
func (d readerFromDelegator) ReadFrom(re io.Reader) (int64, error) {
if !d.wroteHeader {
d.WriteHeader(http.StatusOK)
}
n, err := d.ResponseWriter.(io.ReaderFrom).ReadFrom(re)
d.written += n
return n, err
}
func (d pusherDelegator) Push(target string, opts *http.PushOptions) error {
return d.ResponseWriter.(http.Pusher).Push(target, opts)
}
var pickDelegator = make([]func(*responseWriterDelegator) delegator, 32)
func init() {
// TODO(beorn7): Code generation would help here.
pickDelegator[0] = func(d *responseWriterDelegator) delegator { // 0
return d
}
pickDelegator[closeNotifier] = func(d *responseWriterDelegator) delegator { // 1
return closeNotifierDelegator{d}
}
pickDelegator[flusher] = func(d *responseWriterDelegator) delegator { // 2
return flusherDelegator{d}
}
pickDelegator[flusher+closeNotifier] = func(d *responseWriterDelegator) delegator { // 3
return struct {
*responseWriterDelegator
http.Flusher
http.CloseNotifier
}{d, flusherDelegator{d}, closeNotifierDelegator{d}}
}
pickDelegator[hijacker] = func(d *responseWriterDelegator) delegator { // 4
return hijackerDelegator{d}
}
pickDelegator[hijacker+closeNotifier] = func(d *responseWriterDelegator) delegator { // 5
return struct {
*responseWriterDelegator
http.Hijacker
http.CloseNotifier
}{d, hijackerDelegator{d}, closeNotifierDelegator{d}}
}
pickDelegator[hijacker+flusher] = func(d *responseWriterDelegator) delegator { // 6
return struct {
*responseWriterDelegator
http.Hijacker
http.Flusher
}{d, hijackerDelegator{d}, flusherDelegator{d}}
}
pickDelegator[hijacker+flusher+closeNotifier] = func(d *responseWriterDelegator) delegator { // 7
return struct {
*responseWriterDelegator
http.Hijacker
http.Flusher
http.CloseNotifier
}{d, hijackerDelegator{d}, flusherDelegator{d}, closeNotifierDelegator{d}}
}
pickDelegator[readerFrom] = func(d *responseWriterDelegator) delegator { // 8
return readerFromDelegator{d}
}
pickDelegator[readerFrom+closeNotifier] = func(d *responseWriterDelegator) delegator { // 9
return struct {
*responseWriterDelegator
io.ReaderFrom
http.CloseNotifier
}{d, readerFromDelegator{d}, closeNotifierDelegator{d}}
}
pickDelegator[readerFrom+flusher] = func(d *responseWriterDelegator) delegator { // 10
return struct {
*responseWriterDelegator
io.ReaderFrom
http.Flusher
}{d, readerFromDelegator{d}, flusherDelegator{d}}
}
pickDelegator[readerFrom+flusher+closeNotifier] = func(d *responseWriterDelegator) delegator { // 11
return struct {
*responseWriterDelegator
io.ReaderFrom
http.Flusher
http.CloseNotifier
}{d, readerFromDelegator{d}, flusherDelegator{d}, closeNotifierDelegator{d}}
}
pickDelegator[readerFrom+hijacker] = func(d *responseWriterDelegator) delegator { // 12
return struct {
*responseWriterDelegator
io.ReaderFrom
http.Hijacker
}{d, readerFromDelegator{d}, hijackerDelegator{d}}
}
pickDelegator[readerFrom+hijacker+closeNotifier] = func(d *responseWriterDelegator) delegator { // 13
return struct {
*responseWriterDelegator
io.ReaderFrom
http.Hijacker
http.CloseNotifier
}{d, readerFromDelegator{d}, hijackerDelegator{d}, closeNotifierDelegator{d}}
}
pickDelegator[readerFrom+hijacker+flusher] = func(d *responseWriterDelegator) delegator { // 14
return struct {
*responseWriterDelegator
io.ReaderFrom
http.Hijacker
http.Flusher
}{d, readerFromDelegator{d}, hijackerDelegator{d}, flusherDelegator{d}}
}
pickDelegator[readerFrom+hijacker+flusher+closeNotifier] = func(d *responseWriterDelegator) delegator { // 15
return struct {
*responseWriterDelegator
io.ReaderFrom
http.Hijacker
http.Flusher
http.CloseNotifier
}{d, readerFromDelegator{d}, hijackerDelegator{d}, flusherDelegator{d}, closeNotifierDelegator{d}}
}
pickDelegator[pusher] = func(d *responseWriterDelegator) delegator { // 16
return pusherDelegator{d}
}
pickDelegator[pusher+closeNotifier] = func(d *responseWriterDelegator) delegator { // 17
return struct {
*responseWriterDelegator
http.Pusher
http.CloseNotifier
}{d, pusherDelegator{d}, closeNotifierDelegator{d}}
}
pickDelegator[pusher+flusher] = func(d *responseWriterDelegator) delegator { // 18
return struct {
*responseWriterDelegator
http.Pusher
http.Flusher
}{d, pusherDelegator{d}, flusherDelegator{d}}
}
pickDelegator[pusher+flusher+closeNotifier] = func(d *responseWriterDelegator) delegator { // 19
return struct {
*responseWriterDelegator
http.Pusher
http.Flusher
http.CloseNotifier
}{d, pusherDelegator{d}, flusherDelegator{d}, closeNotifierDelegator{d}}
}
pickDelegator[pusher+hijacker] = func(d *responseWriterDelegator) delegator { // 20
return struct {
*responseWriterDelegator
http.Pusher
http.Hijacker
}{d, pusherDelegator{d}, hijackerDelegator{d}}
}
pickDelegator[pusher+hijacker+closeNotifier] = func(d *responseWriterDelegator) delegator { // 21
return struct {
*responseWriterDelegator
http.Pusher
http.Hijacker
http.CloseNotifier
}{d, pusherDelegator{d}, hijackerDelegator{d}, closeNotifierDelegator{d}}
}
pickDelegator[pusher+hijacker+flusher] = func(d *responseWriterDelegator) delegator { // 22
return struct {
*responseWriterDelegator
http.Pusher
http.Hijacker
http.Flusher
}{d, pusherDelegator{d}, hijackerDelegator{d}, flusherDelegator{d}}
}
pickDelegator[pusher+hijacker+flusher+closeNotifier] = func(d *responseWriterDelegator) delegator { //23
return struct {
*responseWriterDelegator
http.Pusher
http.Hijacker
http.Flusher
http.CloseNotifier
}{d, pusherDelegator{d}, hijackerDelegator{d}, flusherDelegator{d}, closeNotifierDelegator{d}}
}
pickDelegator[pusher+readerFrom] = func(d *responseWriterDelegator) delegator { // 24
return struct {
*responseWriterDelegator
http.Pusher
io.ReaderFrom
}{d, pusherDelegator{d}, readerFromDelegator{d}}
}
pickDelegator[pusher+readerFrom+closeNotifier] = func(d *responseWriterDelegator) delegator { // 25
return struct {
*responseWriterDelegator
http.Pusher
io.ReaderFrom
http.CloseNotifier
}{d, pusherDelegator{d}, readerFromDelegator{d}, closeNotifierDelegator{d}}
}
pickDelegator[pusher+readerFrom+flusher] = func(d *responseWriterDelegator) delegator { // 26
return struct {
*responseWriterDelegator
http.Pusher
io.ReaderFrom
http.Flusher
}{d, pusherDelegator{d}, readerFromDelegator{d}, flusherDelegator{d}}
}
pickDelegator[pusher+readerFrom+flusher+closeNotifier] = func(d *responseWriterDelegator) delegator { // 27
return struct {
*responseWriterDelegator
http.Pusher
io.ReaderFrom
http.Flusher
http.CloseNotifier
}{d, pusherDelegator{d}, readerFromDelegator{d}, flusherDelegator{d}, closeNotifierDelegator{d}}
}
pickDelegator[pusher+readerFrom+hijacker] = func(d *responseWriterDelegator) delegator { // 28
return struct {
*responseWriterDelegator
http.Pusher
io.ReaderFrom
http.Hijacker
}{d, pusherDelegator{d}, readerFromDelegator{d}, hijackerDelegator{d}}
}
pickDelegator[pusher+readerFrom+hijacker+closeNotifier] = func(d *responseWriterDelegator) delegator { // 29
return struct {
*responseWriterDelegator
http.Pusher
io.ReaderFrom
http.Hijacker
http.CloseNotifier
}{d, pusherDelegator{d}, readerFromDelegator{d}, hijackerDelegator{d}, closeNotifierDelegator{d}}
}
pickDelegator[pusher+readerFrom+hijacker+flusher] = func(d *responseWriterDelegator) delegator { // 30
return struct {
*responseWriterDelegator
http.Pusher
io.ReaderFrom
http.Hijacker
http.Flusher
}{d, pusherDelegator{d}, readerFromDelegator{d}, hijackerDelegator{d}, flusherDelegator{d}}
}
pickDelegator[pusher+readerFrom+hijacker+flusher+closeNotifier] = func(d *responseWriterDelegator) delegator { // 31
return struct {
*responseWriterDelegator
http.Pusher
io.ReaderFrom
http.Hijacker
http.Flusher
http.CloseNotifier
}{d, pusherDelegator{d}, readerFromDelegator{d}, hijackerDelegator{d}, flusherDelegator{d}, closeNotifierDelegator{d}}
}
}
func newDelegator(w http.ResponseWriter, observeWriteHeaderFunc func(int)) delegator {
d := &responseWriterDelegator{
ResponseWriter: w,
observeWriteHeader: observeWriteHeaderFunc,
}
id := 0
//lint:ignore SA1019 http.CloseNotifier is deprecated but we don't want to
//remove support from client_golang yet.
if _, ok := w.(http.CloseNotifier); ok {
id += closeNotifier
}
if _, ok := w.(http.Flusher); ok {
id += flusher
}
if _, ok := w.(http.Hijacker); ok {
id += hijacker
}
if _, ok := w.(io.ReaderFrom); ok {
id += readerFrom
}
if _, ok := w.(http.Pusher); ok {
id += pusher
}
return pickDelegator[id](d)
}

View File

@ -0,0 +1,349 @@
// Copyright 2016 The Prometheus Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// Package promhttp provides tooling around HTTP servers and clients.
//
// First, the package allows the creation of http.Handler instances to expose
// Prometheus metrics via HTTP. promhttp.Handler acts on the
// prometheus.DefaultGatherer. With HandlerFor, you can create a handler for a
// custom registry or anything that implements the Gatherer interface. It also
// allows the creation of handlers that act differently on errors or allow to
// log errors.
//
// Second, the package provides tooling to instrument instances of http.Handler
// via middleware. Middleware wrappers follow the naming scheme
// InstrumentHandlerX, where X describes the intended use of the middleware.
// See each function's doc comment for specific details.
//
// Finally, the package allows for an http.RoundTripper to be instrumented via
// middleware. Middleware wrappers follow the naming scheme
// InstrumentRoundTripperX, where X describes the intended use of the
// middleware. See each function's doc comment for specific details.
package promhttp
import (
"compress/gzip"
"fmt"
"io"
"net/http"
"strings"
"sync"
"time"
"github.com/prometheus/common/expfmt"
"github.com/prometheus/client_golang/prometheus"
)
const (
contentTypeHeader = "Content-Type"
contentEncodingHeader = "Content-Encoding"
acceptEncodingHeader = "Accept-Encoding"
)
var gzipPool = sync.Pool{
New: func() interface{} {
return gzip.NewWriter(nil)
},
}
// Handler returns an http.Handler for the prometheus.DefaultGatherer, using
// default HandlerOpts, i.e. it reports the first error as an HTTP error, it has
// no error logging, and it applies compression if requested by the client.
//
// The returned http.Handler is already instrumented using the
// InstrumentMetricHandler function and the prometheus.DefaultRegisterer. If you
// create multiple http.Handlers by separate calls of the Handler function, the
// metrics used for instrumentation will be shared between them, providing
// global scrape counts.
//
// This function is meant to cover the bulk of basic use cases. If you are doing
// anything that requires more customization (including using a non-default
// Gatherer, different instrumentation, and non-default HandlerOpts), use the
// HandlerFor function. See there for details.
func Handler() http.Handler {
return InstrumentMetricHandler(
prometheus.DefaultRegisterer, HandlerFor(prometheus.DefaultGatherer, HandlerOpts{}),
)
}
// HandlerFor returns an uninstrumented http.Handler for the provided
// Gatherer. The behavior of the Handler is defined by the provided
// HandlerOpts. Thus, HandlerFor is useful to create http.Handlers for custom
// Gatherers, with non-default HandlerOpts, and/or with custom (or no)
// instrumentation. Use the InstrumentMetricHandler function to apply the same
// kind of instrumentation as it is used by the Handler function.
func HandlerFor(reg prometheus.Gatherer, opts HandlerOpts) http.Handler {
var (
inFlightSem chan struct{}
errCnt = prometheus.NewCounterVec(
prometheus.CounterOpts{
Name: "promhttp_metric_handler_errors_total",
Help: "Total number of internal errors encountered by the promhttp metric handler.",
},
[]string{"cause"},
)
)
if opts.MaxRequestsInFlight > 0 {
inFlightSem = make(chan struct{}, opts.MaxRequestsInFlight)
}
if opts.Registry != nil {
// Initialize all possibilites that can occur below.
errCnt.WithLabelValues("gathering")
errCnt.WithLabelValues("encoding")
if err := opts.Registry.Register(errCnt); err != nil {
if are, ok := err.(prometheus.AlreadyRegisteredError); ok {
errCnt = are.ExistingCollector.(*prometheus.CounterVec)
} else {
panic(err)
}
}
}
h := http.HandlerFunc(func(rsp http.ResponseWriter, req *http.Request) {
if inFlightSem != nil {
select {
case inFlightSem <- struct{}{}: // All good, carry on.
defer func() { <-inFlightSem }()
default:
http.Error(rsp, fmt.Sprintf(
"Limit of concurrent requests reached (%d), try again later.", opts.MaxRequestsInFlight,
), http.StatusServiceUnavailable)
return
}
}
mfs, err := reg.Gather()
if err != nil {
if opts.ErrorLog != nil {
opts.ErrorLog.Println("error gathering metrics:", err)
}
errCnt.WithLabelValues("gathering").Inc()
switch opts.ErrorHandling {
case PanicOnError:
panic(err)
case ContinueOnError:
if len(mfs) == 0 {
// Still report the error if no metrics have been gathered.
httpError(rsp, err)
return
}
case HTTPErrorOnError:
httpError(rsp, err)
return
}
}
contentType := expfmt.Negotiate(req.Header)
header := rsp.Header()
header.Set(contentTypeHeader, string(contentType))
w := io.Writer(rsp)
if !opts.DisableCompression && gzipAccepted(req.Header) {
header.Set(contentEncodingHeader, "gzip")
gz := gzipPool.Get().(*gzip.Writer)
defer gzipPool.Put(gz)
gz.Reset(w)
defer gz.Close()
w = gz
}
enc := expfmt.NewEncoder(w, contentType)
var lastErr error
for _, mf := range mfs {
if err := enc.Encode(mf); err != nil {
lastErr = err
if opts.ErrorLog != nil {
opts.ErrorLog.Println("error encoding and sending metric family:", err)
}
errCnt.WithLabelValues("encoding").Inc()
switch opts.ErrorHandling {
case PanicOnError:
panic(err)
case ContinueOnError:
// Handled later.
case HTTPErrorOnError:
httpError(rsp, err)
return
}
}
}
if lastErr != nil {
httpError(rsp, lastErr)
}
})
if opts.Timeout <= 0 {
return h
}
return http.TimeoutHandler(h, opts.Timeout, fmt.Sprintf(
"Exceeded configured timeout of %v.\n",
opts.Timeout,
))
}
// InstrumentMetricHandler is usually used with an http.Handler returned by the
// HandlerFor function. It instruments the provided http.Handler with two
// metrics: A counter vector "promhttp_metric_handler_requests_total" to count
// scrapes partitioned by HTTP status code, and a gauge
// "promhttp_metric_handler_requests_in_flight" to track the number of
// simultaneous scrapes. This function idempotently registers collectors for
// both metrics with the provided Registerer. It panics if the registration
// fails. The provided metrics are useful to see how many scrapes hit the
// monitored target (which could be from different Prometheus servers or other
// scrapers), and how often they overlap (which would result in more than one
// scrape in flight at the same time). Note that the scrapes-in-flight gauge
// will contain the scrape by which it is exposed, while the scrape counter will
// only get incremented after the scrape is complete (as only then the status
// code is known). For tracking scrape durations, use the
// "scrape_duration_seconds" gauge created by the Prometheus server upon each
// scrape.
func InstrumentMetricHandler(reg prometheus.Registerer, handler http.Handler) http.Handler {
cnt := prometheus.NewCounterVec(
prometheus.CounterOpts{
Name: "promhttp_metric_handler_requests_total",
Help: "Total number of scrapes by HTTP status code.",
},
[]string{"code"},
)
// Initialize the most likely HTTP status codes.
cnt.WithLabelValues("200")
cnt.WithLabelValues("500")
cnt.WithLabelValues("503")
if err := reg.Register(cnt); err != nil {
if are, ok := err.(prometheus.AlreadyRegisteredError); ok {
cnt = are.ExistingCollector.(*prometheus.CounterVec)
} else {
panic(err)
}
}
gge := prometheus.NewGauge(prometheus.GaugeOpts{
Name: "promhttp_metric_handler_requests_in_flight",
Help: "Current number of scrapes being served.",
})
if err := reg.Register(gge); err != nil {
if are, ok := err.(prometheus.AlreadyRegisteredError); ok {
gge = are.ExistingCollector.(prometheus.Gauge)
} else {
panic(err)
}
}
return InstrumentHandlerCounter(cnt, InstrumentHandlerInFlight(gge, handler))
}
// HandlerErrorHandling defines how a Handler serving metrics will handle
// errors.
type HandlerErrorHandling int
// These constants cause handlers serving metrics to behave as described if
// errors are encountered.
const (
// Serve an HTTP status code 500 upon the first error
// encountered. Report the error message in the body.
HTTPErrorOnError HandlerErrorHandling = iota
// Ignore errors and try to serve as many metrics as possible. However,
// if no metrics can be served, serve an HTTP status code 500 and the
// last error message in the body. Only use this in deliberate "best
// effort" metrics collection scenarios. In this case, it is highly
// recommended to provide other means of detecting errors: By setting an
// ErrorLog in HandlerOpts, the errors are logged. By providing a
// Registry in HandlerOpts, the exposed metrics include an error counter
// "promhttp_metric_handler_errors_total", which can be used for
// alerts.
ContinueOnError
// Panic upon the first error encountered (useful for "crash only" apps).
PanicOnError
)
// Logger is the minimal interface HandlerOpts needs for logging. Note that
// log.Logger from the standard library implements this interface, and it is
// easy to implement by custom loggers, if they don't do so already anyway.
type Logger interface {
Println(v ...interface{})
}
// HandlerOpts specifies options how to serve metrics via an http.Handler. The
// zero value of HandlerOpts is a reasonable default.
type HandlerOpts struct {
// ErrorLog specifies an optional logger for errors collecting and
// serving metrics. If nil, errors are not logged at all.
ErrorLog Logger
// ErrorHandling defines how errors are handled. Note that errors are
// logged regardless of the configured ErrorHandling provided ErrorLog
// is not nil.
ErrorHandling HandlerErrorHandling
// If Registry is not nil, it is used to register a metric
// "promhttp_metric_handler_errors_total", partitioned by "cause". A
// failed registration causes a panic. Note that this error counter is
// different from the instrumentation you get from the various
// InstrumentHandler... helpers. It counts errors that don't necessarily
// result in a non-2xx HTTP status code. There are two typical cases:
// (1) Encoding errors that only happen after streaming of the HTTP body
// has already started (and the status code 200 has been sent). This
// should only happen with custom collectors. (2) Collection errors with
// no effect on the HTTP status code because ErrorHandling is set to
// ContinueOnError.
Registry prometheus.Registerer
// If DisableCompression is true, the handler will never compress the
// response, even if requested by the client.
DisableCompression bool
// The number of concurrent HTTP requests is limited to
// MaxRequestsInFlight. Additional requests are responded to with 503
// Service Unavailable and a suitable message in the body. If
// MaxRequestsInFlight is 0 or negative, no limit is applied.
MaxRequestsInFlight int
// If handling a request takes longer than Timeout, it is responded to
// with 503 ServiceUnavailable and a suitable Message. No timeout is
// applied if Timeout is 0 or negative. Note that with the current
// implementation, reaching the timeout simply ends the HTTP requests as
// described above (and even that only if sending of the body hasn't
// started yet), while the bulk work of gathering all the metrics keeps
// running in the background (with the eventual result to be thrown
// away). Until the implementation is improved, it is recommended to
// implement a separate timeout in potentially slow Collectors.
Timeout time.Duration
}
// gzipAccepted returns whether the client will accept gzip-encoded content.
func gzipAccepted(header http.Header) bool {
a := header.Get(acceptEncodingHeader)
parts := strings.Split(a, ",")
for _, part := range parts {
part = strings.TrimSpace(part)
if part == "gzip" || strings.HasPrefix(part, "gzip;") {
return true
}
}
return false
}
// httpError removes any content-encoding header and then calls http.Error with
// the provided error and http.StatusInternalServerErrer. Error contents is
// supposed to be uncompressed plain text. However, same as with a plain
// http.Error, any header settings will be void if the header has already been
// sent. The error message will still be written to the writer, but it will
// probably be of limited use.
func httpError(rsp http.ResponseWriter, err error) {
rsp.Header().Del(contentEncodingHeader)
http.Error(
rsp,
"An error has occurred while serving metrics:\n\n"+err.Error(),
http.StatusInternalServerError,
)
}

View File

@ -0,0 +1,219 @@
// Copyright 2017 The Prometheus Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package promhttp
import (
"crypto/tls"
"net/http"
"net/http/httptrace"
"time"
"github.com/prometheus/client_golang/prometheus"
)
// The RoundTripperFunc type is an adapter to allow the use of ordinary
// functions as RoundTrippers. If f is a function with the appropriate
// signature, RountTripperFunc(f) is a RoundTripper that calls f.
type RoundTripperFunc func(req *http.Request) (*http.Response, error)
// RoundTrip implements the RoundTripper interface.
func (rt RoundTripperFunc) RoundTrip(r *http.Request) (*http.Response, error) {
return rt(r)
}
// InstrumentRoundTripperInFlight is a middleware that wraps the provided
// http.RoundTripper. It sets the provided prometheus.Gauge to the number of
// requests currently handled by the wrapped http.RoundTripper.
//
// See the example for ExampleInstrumentRoundTripperDuration for example usage.
func InstrumentRoundTripperInFlight(gauge prometheus.Gauge, next http.RoundTripper) RoundTripperFunc {
return RoundTripperFunc(func(r *http.Request) (*http.Response, error) {
gauge.Inc()
defer gauge.Dec()
return next.RoundTrip(r)
})
}
// InstrumentRoundTripperCounter is a middleware that wraps the provided
// http.RoundTripper to observe the request result with the provided CounterVec.
// The CounterVec must have zero, one, or two non-const non-curried labels. For
// those, the only allowed label names are "code" and "method". The function
// panics otherwise. Partitioning of the CounterVec happens by HTTP status code
// and/or HTTP method if the respective instance label names are present in the
// CounterVec. For unpartitioned counting, use a CounterVec with zero labels.
//
// If the wrapped RoundTripper panics or returns a non-nil error, the Counter
// is not incremented.
//
// See the example for ExampleInstrumentRoundTripperDuration for example usage.
func InstrumentRoundTripperCounter(counter *prometheus.CounterVec, next http.RoundTripper) RoundTripperFunc {
code, method := checkLabels(counter)
return RoundTripperFunc(func(r *http.Request) (*http.Response, error) {
resp, err := next.RoundTrip(r)
if err == nil {
counter.With(labels(code, method, r.Method, resp.StatusCode)).Inc()
}
return resp, err
})
}
// InstrumentRoundTripperDuration is a middleware that wraps the provided
// http.RoundTripper to observe the request duration with the provided
// ObserverVec. The ObserverVec must have zero, one, or two non-const
// non-curried labels. For those, the only allowed label names are "code" and
// "method". The function panics otherwise. The Observe method of the Observer
// in the ObserverVec is called with the request duration in
// seconds. Partitioning happens by HTTP status code and/or HTTP method if the
// respective instance label names are present in the ObserverVec. For
// unpartitioned observations, use an ObserverVec with zero labels. Note that
// partitioning of Histograms is expensive and should be used judiciously.
//
// If the wrapped RoundTripper panics or returns a non-nil error, no values are
// reported.
//
// Note that this method is only guaranteed to never observe negative durations
// if used with Go1.9+.
func InstrumentRoundTripperDuration(obs prometheus.ObserverVec, next http.RoundTripper) RoundTripperFunc {
code, method := checkLabels(obs)
return RoundTripperFunc(func(r *http.Request) (*http.Response, error) {
start := time.Now()
resp, err := next.RoundTrip(r)
if err == nil {
obs.With(labels(code, method, r.Method, resp.StatusCode)).Observe(time.Since(start).Seconds())
}
return resp, err
})
}
// InstrumentTrace is used to offer flexibility in instrumenting the available
// httptrace.ClientTrace hook functions. Each function is passed a float64
// representing the time in seconds since the start of the http request. A user
// may choose to use separately buckets Histograms, or implement custom
// instance labels on a per function basis.
type InstrumentTrace struct {
GotConn func(float64)
PutIdleConn func(float64)
GotFirstResponseByte func(float64)
Got100Continue func(float64)
DNSStart func(float64)
DNSDone func(float64)
ConnectStart func(float64)
ConnectDone func(float64)
TLSHandshakeStart func(float64)
TLSHandshakeDone func(float64)
WroteHeaders func(float64)
Wait100Continue func(float64)
WroteRequest func(float64)
}
// InstrumentRoundTripperTrace is a middleware that wraps the provided
// RoundTripper and reports times to hook functions provided in the
// InstrumentTrace struct. Hook functions that are not present in the provided
// InstrumentTrace struct are ignored. Times reported to the hook functions are
// time since the start of the request. Only with Go1.9+, those times are
// guaranteed to never be negative. (Earlier Go versions are not using a
// monotonic clock.) Note that partitioning of Histograms is expensive and
// should be used judiciously.
//
// For hook functions that receive an error as an argument, no observations are
// made in the event of a non-nil error value.
//
// See the example for ExampleInstrumentRoundTripperDuration for example usage.
func InstrumentRoundTripperTrace(it *InstrumentTrace, next http.RoundTripper) RoundTripperFunc {
return RoundTripperFunc(func(r *http.Request) (*http.Response, error) {
start := time.Now()
trace := &httptrace.ClientTrace{
GotConn: func(_ httptrace.GotConnInfo) {
if it.GotConn != nil {
it.GotConn(time.Since(start).Seconds())
}
},
PutIdleConn: func(err error) {
if err != nil {
return
}
if it.PutIdleConn != nil {
it.PutIdleConn(time.Since(start).Seconds())
}
},
DNSStart: func(_ httptrace.DNSStartInfo) {
if it.DNSStart != nil {
it.DNSStart(time.Since(start).Seconds())
}
},
DNSDone: func(_ httptrace.DNSDoneInfo) {
if it.DNSDone != nil {
it.DNSDone(time.Since(start).Seconds())
}
},
ConnectStart: func(_, _ string) {
if it.ConnectStart != nil {
it.ConnectStart(time.Since(start).Seconds())
}
},
ConnectDone: func(_, _ string, err error) {
if err != nil {
return
}
if it.ConnectDone != nil {
it.ConnectDone(time.Since(start).Seconds())
}
},
GotFirstResponseByte: func() {
if it.GotFirstResponseByte != nil {
it.GotFirstResponseByte(time.Since(start).Seconds())
}
},
Got100Continue: func() {
if it.Got100Continue != nil {
it.Got100Continue(time.Since(start).Seconds())
}
},
TLSHandshakeStart: func() {
if it.TLSHandshakeStart != nil {
it.TLSHandshakeStart(time.Since(start).Seconds())
}
},
TLSHandshakeDone: func(_ tls.ConnectionState, err error) {
if err != nil {
return
}
if it.TLSHandshakeDone != nil {
it.TLSHandshakeDone(time.Since(start).Seconds())
}
},
WroteHeaders: func() {
if it.WroteHeaders != nil {
it.WroteHeaders(time.Since(start).Seconds())
}
},
Wait100Continue: func() {
if it.Wait100Continue != nil {
it.Wait100Continue(time.Since(start).Seconds())
}
},
WroteRequest: func(_ httptrace.WroteRequestInfo) {
if it.WroteRequest != nil {
it.WroteRequest(time.Since(start).Seconds())
}
},
}
r = r.WithContext(httptrace.WithClientTrace(r.Context(), trace))
return next.RoundTrip(r)
})
}

View File

@ -0,0 +1,447 @@
// Copyright 2017 The Prometheus Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package promhttp
import (
"errors"
"net/http"
"strconv"
"strings"
"time"
dto "github.com/prometheus/client_model/go"
"github.com/prometheus/client_golang/prometheus"
)
// magicString is used for the hacky label test in checkLabels. Remove once fixed.
const magicString = "zZgWfBxLqvG8kc8IMv3POi2Bb0tZI3vAnBx+gBaFi9FyPzB/CzKUer1yufDa"
// InstrumentHandlerInFlight is a middleware that wraps the provided
// http.Handler. It sets the provided prometheus.Gauge to the number of
// requests currently handled by the wrapped http.Handler.
//
// See the example for InstrumentHandlerDuration for example usage.
func InstrumentHandlerInFlight(g prometheus.Gauge, next http.Handler) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
g.Inc()
defer g.Dec()
next.ServeHTTP(w, r)
})
}
// InstrumentHandlerDuration is a middleware that wraps the provided
// http.Handler to observe the request duration with the provided ObserverVec.
// The ObserverVec must have zero, one, or two non-const non-curried labels. For
// those, the only allowed label names are "code" and "method". The function
// panics otherwise. The Observe method of the Observer in the ObserverVec is
// called with the request duration in seconds. Partitioning happens by HTTP
// status code and/or HTTP method if the respective instance label names are
// present in the ObserverVec. For unpartitioned observations, use an
// ObserverVec with zero labels. Note that partitioning of Histograms is
// expensive and should be used judiciously.
//
// If the wrapped Handler does not set a status code, a status code of 200 is assumed.
//
// If the wrapped Handler panics, no values are reported.
//
// Note that this method is only guaranteed to never observe negative durations
// if used with Go1.9+.
func InstrumentHandlerDuration(obs prometheus.ObserverVec, next http.Handler) http.HandlerFunc {
code, method := checkLabels(obs)
if code {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
now := time.Now()
d := newDelegator(w, nil)
next.ServeHTTP(d, r)
obs.With(labels(code, method, r.Method, d.Status())).Observe(time.Since(now).Seconds())
})
}
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
now := time.Now()
next.ServeHTTP(w, r)
obs.With(labels(code, method, r.Method, 0)).Observe(time.Since(now).Seconds())
})
}
// InstrumentHandlerCounter is a middleware that wraps the provided http.Handler
// to observe the request result with the provided CounterVec. The CounterVec
// must have zero, one, or two non-const non-curried labels. For those, the only
// allowed label names are "code" and "method". The function panics
// otherwise. Partitioning of the CounterVec happens by HTTP status code and/or
// HTTP method if the respective instance label names are present in the
// CounterVec. For unpartitioned counting, use a CounterVec with zero labels.
//
// If the wrapped Handler does not set a status code, a status code of 200 is assumed.
//
// If the wrapped Handler panics, the Counter is not incremented.
//
// See the example for InstrumentHandlerDuration for example usage.
func InstrumentHandlerCounter(counter *prometheus.CounterVec, next http.Handler) http.HandlerFunc {
code, method := checkLabels(counter)
if code {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
d := newDelegator(w, nil)
next.ServeHTTP(d, r)
counter.With(labels(code, method, r.Method, d.Status())).Inc()
})
}
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
next.ServeHTTP(w, r)
counter.With(labels(code, method, r.Method, 0)).Inc()
})
}
// InstrumentHandlerTimeToWriteHeader is a middleware that wraps the provided
// http.Handler to observe with the provided ObserverVec the request duration
// until the response headers are written. The ObserverVec must have zero, one,
// or two non-const non-curried labels. For those, the only allowed label names
// are "code" and "method". The function panics otherwise. The Observe method of
// the Observer in the ObserverVec is called with the request duration in
// seconds. Partitioning happens by HTTP status code and/or HTTP method if the
// respective instance label names are present in the ObserverVec. For
// unpartitioned observations, use an ObserverVec with zero labels. Note that
// partitioning of Histograms is expensive and should be used judiciously.
//
// If the wrapped Handler panics before calling WriteHeader, no value is
// reported.
//
// Note that this method is only guaranteed to never observe negative durations
// if used with Go1.9+.
//
// See the example for InstrumentHandlerDuration for example usage.
func InstrumentHandlerTimeToWriteHeader(obs prometheus.ObserverVec, next http.Handler) http.HandlerFunc {
code, method := checkLabels(obs)
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
now := time.Now()
d := newDelegator(w, func(status int) {
obs.With(labels(code, method, r.Method, status)).Observe(time.Since(now).Seconds())
})
next.ServeHTTP(d, r)
})
}
// InstrumentHandlerRequestSize is a middleware that wraps the provided
// http.Handler to observe the request size with the provided ObserverVec. The
// ObserverVec must have zero, one, or two non-const non-curried labels. For
// those, the only allowed label names are "code" and "method". The function
// panics otherwise. The Observe method of the Observer in the ObserverVec is
// called with the request size in bytes. Partitioning happens by HTTP status
// code and/or HTTP method if the respective instance label names are present in
// the ObserverVec. For unpartitioned observations, use an ObserverVec with zero
// labels. Note that partitioning of Histograms is expensive and should be used
// judiciously.
//
// If the wrapped Handler does not set a status code, a status code of 200 is assumed.
//
// If the wrapped Handler panics, no values are reported.
//
// See the example for InstrumentHandlerDuration for example usage.
func InstrumentHandlerRequestSize(obs prometheus.ObserverVec, next http.Handler) http.HandlerFunc {
code, method := checkLabels(obs)
if code {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
d := newDelegator(w, nil)
next.ServeHTTP(d, r)
size := computeApproximateRequestSize(r)
obs.With(labels(code, method, r.Method, d.Status())).Observe(float64(size))
})
}
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
next.ServeHTTP(w, r)
size := computeApproximateRequestSize(r)
obs.With(labels(code, method, r.Method, 0)).Observe(float64(size))
})
}
// InstrumentHandlerResponseSize is a middleware that wraps the provided
// http.Handler to observe the response size with the provided ObserverVec. The
// ObserverVec must have zero, one, or two non-const non-curried labels. For
// those, the only allowed label names are "code" and "method". The function
// panics otherwise. The Observe method of the Observer in the ObserverVec is
// called with the response size in bytes. Partitioning happens by HTTP status
// code and/or HTTP method if the respective instance label names are present in
// the ObserverVec. For unpartitioned observations, use an ObserverVec with zero
// labels. Note that partitioning of Histograms is expensive and should be used
// judiciously.
//
// If the wrapped Handler does not set a status code, a status code of 200 is assumed.
//
// If the wrapped Handler panics, no values are reported.
//
// See the example for InstrumentHandlerDuration for example usage.
func InstrumentHandlerResponseSize(obs prometheus.ObserverVec, next http.Handler) http.Handler {
code, method := checkLabels(obs)
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
d := newDelegator(w, nil)
next.ServeHTTP(d, r)
obs.With(labels(code, method, r.Method, d.Status())).Observe(float64(d.Written()))
})
}
func checkLabels(c prometheus.Collector) (code bool, method bool) {
// TODO(beorn7): Remove this hacky way to check for instance labels
// once Descriptors can have their dimensionality queried.
var (
desc *prometheus.Desc
m prometheus.Metric
pm dto.Metric
lvs []string
)
// Get the Desc from the Collector.
descc := make(chan *prometheus.Desc, 1)
c.Describe(descc)
select {
case desc = <-descc:
default:
panic("no description provided by collector")
}
select {
case <-descc:
panic("more than one description provided by collector")
default:
}
close(descc)
// Create a ConstMetric with the Desc. Since we don't know how many
// variable labels there are, try for as long as it needs.
for err := errors.New("dummy"); err != nil; lvs = append(lvs, magicString) {
m, err = prometheus.NewConstMetric(desc, prometheus.UntypedValue, 0, lvs...)
}
// Write out the metric into a proto message and look at the labels.
// If the value is not the magicString, it is a constLabel, which doesn't interest us.
// If the label is curried, it doesn't interest us.
// In all other cases, only "code" or "method" is allowed.
if err := m.Write(&pm); err != nil {
panic("error checking metric for labels")
}
for _, label := range pm.Label {
name, value := label.GetName(), label.GetValue()
if value != magicString || isLabelCurried(c, name) {
continue
}
switch name {
case "code":
code = true
case "method":
method = true
default:
panic("metric partitioned with non-supported labels")
}
}
return
}
func isLabelCurried(c prometheus.Collector, label string) bool {
// This is even hackier than the label test above.
// We essentially try to curry again and see if it works.
// But for that, we need to type-convert to the two
// types we use here, ObserverVec or *CounterVec.
switch v := c.(type) {
case *prometheus.CounterVec:
if _, err := v.CurryWith(prometheus.Labels{label: "dummy"}); err == nil {
return false
}
case prometheus.ObserverVec:
if _, err := v.CurryWith(prometheus.Labels{label: "dummy"}); err == nil {
return false
}
default:
panic("unsupported metric vec type")
}
return true
}
// emptyLabels is a one-time allocation for non-partitioned metrics to avoid
// unnecessary allocations on each request.
var emptyLabels = prometheus.Labels{}
func labels(code, method bool, reqMethod string, status int) prometheus.Labels {
if !(code || method) {
return emptyLabels
}
labels := prometheus.Labels{}
if code {
labels["code"] = sanitizeCode(status)
}
if method {
labels["method"] = sanitizeMethod(reqMethod)
}
return labels
}
func computeApproximateRequestSize(r *http.Request) int {
s := 0
if r.URL != nil {
s += len(r.URL.String())
}
s += len(r.Method)
s += len(r.Proto)
for name, values := range r.Header {
s += len(name)
for _, value := range values {
s += len(value)
}
}
s += len(r.Host)
// N.B. r.Form and r.MultipartForm are assumed to be included in r.URL.
if r.ContentLength != -1 {
s += int(r.ContentLength)
}
return s
}
func sanitizeMethod(m string) string {
switch m {
case "GET", "get":
return "get"
case "PUT", "put":
return "put"
case "HEAD", "head":
return "head"
case "POST", "post":
return "post"
case "DELETE", "delete":
return "delete"
case "CONNECT", "connect":
return "connect"
case "OPTIONS", "options":
return "options"
case "NOTIFY", "notify":
return "notify"
default:
return strings.ToLower(m)
}
}
// If the wrapped http.Handler has not set a status code, i.e. the value is
// currently 0, santizeCode will return 200, for consistency with behavior in
// the stdlib.
func sanitizeCode(s int) string {
switch s {
case 100:
return "100"
case 101:
return "101"
case 200, 0:
return "200"
case 201:
return "201"
case 202:
return "202"
case 203:
return "203"
case 204:
return "204"
case 205:
return "205"
case 206:
return "206"
case 300:
return "300"
case 301:
return "301"
case 302:
return "302"
case 304:
return "304"
case 305:
return "305"
case 307:
return "307"
case 400:
return "400"
case 401:
return "401"
case 402:
return "402"
case 403:
return "403"
case 404:
return "404"
case 405:
return "405"
case 406:
return "406"
case 407:
return "407"
case 408:
return "408"
case 409:
return "409"
case 410:
return "410"
case 411:
return "411"
case 412:
return "412"
case 413:
return "413"
case 414:
return "414"
case 415:
return "415"
case 416:
return "416"
case 417:
return "417"
case 418:
return "418"
case 500:
return "500"
case 501:
return "501"
case 502:
return "502"
case 503:
return "503"
case 504:
return "504"
case 505:
return "505"
case 428:
return "428"
case 429:
return "429"
case 431:
return "431"
case 511:
return "511"
default:
return strconv.Itoa(s)
}
}

View File

@ -91,9 +91,13 @@ func onesCount64(x uint64) int {
const m0 = 0x5555555555555555 // 01010101 ...
const m1 = 0x3333333333333333 // 00110011 ...
const m2 = 0x0f0f0f0f0f0f0f0f // 00001111 ...
const m3 = 0x00ff00ff00ff00ff // etc.
const m4 = 0x0000ffff0000ffff
// Unused in this function, but definitions preserved for
// documentation purposes:
//
// const m3 = 0x00ff00ff00ff00ff // etc.
// const m4 = 0x0000ffff0000ffff
//
// Implementation: Parallel summing of adjacent bits.
// See "Hacker's Delight", Chap. 5: Counting Bits.
// The following pattern shows the general approach:

View File

@ -2,7 +2,7 @@
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
// +build aix darwin dragonfly freebsd linux nacl netbsd openbsd solaris
// +build aix darwin dragonfly freebsd linux netbsd openbsd solaris
package unix

52
vendor/k8s.io/utils/trace/trace.go generated vendored
View File

@ -25,32 +25,55 @@ import (
"k8s.io/klog"
)
// Field is a key value pair that provides additional details about the trace.
type Field struct {
Key string
Value interface{}
}
func (f Field) format() string {
return fmt.Sprintf("%s:%v", f.Key, f.Value)
}
func writeFields(b *bytes.Buffer, l []Field) {
for i, f := range l {
b.WriteString(f.format())
if i < len(l)-1 {
b.WriteString(",")
}
}
}
type traceStep struct {
stepTime time.Time
msg string
fields []Field
}
// Trace keeps track of a set of "steps" and allows us to log a specific
// step if it took longer than its share of the total allowed time
type Trace struct {
name string
fields []Field
startTime time.Time
steps []traceStep
}
// New creates a Trace with the specified name
func New(name string) *Trace {
return &Trace{name, time.Now(), nil}
// New creates a Trace with the specified name. The name identifies the operation to be traced. The
// Fields add key value pairs to provide additional details about the trace, such as operation inputs.
func New(name string, fields ...Field) *Trace {
return &Trace{name: name, startTime: time.Now(), fields: fields}
}
// Step adds a new step with a specific message. Call this at the end of an
// execution step to record how long it took.
func (t *Trace) Step(msg string) {
// Step adds a new step with a specific message. Call this at the end of an execution step to record
// how long it took. The Fields add key value pairs to provide additional details about the trace
// step.
func (t *Trace) Step(msg string, fields ...Field) {
if t.steps == nil {
// traces almost always have less than 6 steps, do this to avoid more than a single allocation
t.steps = make([]traceStep, 0, 6)
}
t.steps = append(t.steps, traceStep{time.Now(), msg})
t.steps = append(t.steps, traceStep{stepTime: time.Now(), msg: msg, fields: fields})
}
// Log is used to dump all the steps in the Trace
@ -65,12 +88,23 @@ func (t *Trace) logWithStepThreshold(stepThreshold time.Duration) {
endTime := time.Now()
totalTime := endTime.Sub(t.startTime)
buffer.WriteString(fmt.Sprintf("Trace[%d]: %q (started: %v) (total time: %v):\n", tracenum, t.name, t.startTime, totalTime))
buffer.WriteString(fmt.Sprintf("Trace[%d]: %q ", tracenum, t.name))
if len(t.fields) > 0 {
writeFields(&buffer, t.fields)
buffer.WriteString(" ")
}
buffer.WriteString(fmt.Sprintf("(started: %v) (total time: %v):\n", t.startTime, totalTime))
lastStepTime := t.startTime
for _, step := range t.steps {
stepDuration := step.stepTime.Sub(lastStepTime)
if stepThreshold == 0 || stepDuration > stepThreshold || klog.V(4) {
buffer.WriteString(fmt.Sprintf("Trace[%d]: [%v] [%v] %v\n", tracenum, step.stepTime.Sub(t.startTime), stepDuration, step.msg))
buffer.WriteString(fmt.Sprintf("Trace[%d]: [%v] [%v] ", tracenum, step.stepTime.Sub(t.startTime), stepDuration))
buffer.WriteString(step.msg)
if len(step.fields) > 0 {
buffer.WriteString(" ")
writeFields(&buffer, step.fields)
}
buffer.WriteString("\n")
}
lastStepTime = step.stepTime
}