diff --git a/Gopkg.lock b/Gopkg.lock index 5b3c50466..c7dd640c2 100644 --- a/Gopkg.lock +++ b/Gopkg.lock @@ -33,8 +33,8 @@ name = "github.com/beorn7/perks" packages = ["quantile"] pruneopts = "NUT" - revision = "4b2b341e8d7715fae06375aa633dbb6e91b3fb46" - version = "v1.0.0" + revision = "37c8de3658fcb183f997c4e13e8337516ab753e6" + version = "v1.0.1" [[projects]] digest = "1:7f21fa1f8ab9a529dba26a7e9cf20de217c307fa1d96cb599d3afd9e5c83e9d6" @@ -305,9 +305,13 @@ version = "v1.1.7" [[projects]] - digest = "1:5f039a8e43dc5b00adee7b38e39baf6c36f607372940c11975f00ec9c5f298ae" + digest = "1:a0892607b4f5385bb9fb12759facc8fad4e61b8b557384e4a078150c6ba43623" name = "github.com/kubernetes-csi/csi-lib-utils" - packages = ["protosanitizer"] + packages = [ + "connection", + "protosanitizer", + "rpc", + ] pruneopts = "NUT" revision = "b8b7a89535d80e12f2c0f4c53cfb981add8aaca2" version = "v0.6.1" @@ -457,15 +461,16 @@ version = "v1.0.0" [[projects]] - digest = "1:623090ece42820e4121c5c85a4373b10b922e79f3b8c44caad67a45a50e10054" + digest = "1:097cc61836050f45cbb712ae3bb45d66fba464c16b8fac09907fa3c1f753eff6" name = "github.com/prometheus/client_golang" packages = [ "prometheus", "prometheus/internal", + "prometheus/promhttp", ] pruneopts = "NUT" - revision = "4ab88e80c249ed361d3299e2930427d9ac43ef8d" - version = "v1.0.0" + revision = "170205fb58decfd011f1550d4cfb737230d7ae4f" + version = "v1.1.0" [[projects]] branch = "master" @@ -588,7 +593,7 @@ [[projects]] branch = "master" - digest = "1:7a9d37d6b986937aadaf23b125ee6a6d4e3e6eec8560b51ac18fd2adf62e6406" + digest = "1:816780136a1ee09b3070cdfed092c244010fa3f2bde27beb1b1f80dfef4338e1" name = "golang.org/x/sys" packages = [ "cpu", @@ -596,7 +601,7 @@ "windows", ] pruneopts = "NUT" - revision = "fc99dfbffb4e5ed5758a37e31dd861afe285406b" + revision = "cbf593c0f2f39034e9104bbf77e2ec7c48c98fc5" [[projects]] digest = "1:0b5dc8c3581fc3ea2b80cc2e360dfb9c2d61dd0cba0d2fe247e8edd3e83f7551" @@ -1106,7 +1111,7 @@ "volume/helpers", ] pruneopts = "NUT" - revision = "21f9d6b59624871aea170656c0433ad603e67ecd" + revision = "4405817736360488dfda1950e6266d2ff4bcba9e" [[projects]] branch = "master" @@ -1117,7 +1122,7 @@ "featuregate", ] pruneopts = "NUT" - revision = "042c00bc1f9e2571ae04255e09819fa8f3e870d1" + revision = "8d609ba1a28a47e7880ecf03fd75faba56797aad" [[projects]] branch = "master" @@ -1125,7 +1130,7 @@ name = "k8s.io/cri-api" packages = ["pkg/apis/runtime/v1alpha2"] pruneopts = "NUT" - revision = "247eeb1afecb07101bb58883f79b8b463864b035" + revision = "3baa588ab5670ff6ff8a6fcb4a8ec9234a2346c4" [[projects]] digest = "1:43099cc4ed575c40f80277c7ba7168df37d0c663bdc4f541325430bd175cce8a" @@ -1276,7 +1281,7 @@ [[projects]] branch = "master" - digest = "1:1541afde648991d68b2579076b29afb2d51a54aec822b434644f60f435eeee6e" + digest = "1:9300c13de75a1813f7c386de9aebc7dc49f490bb77369cb9d50af571dd9acf52" name = "k8s.io/utils" packages = [ "buffer", @@ -1292,7 +1297,7 @@ "trace", ] pruneopts = "NUT" - revision = "3dccf664f023863740c508fb4284e49742bedfa4" + revision = "581e00157fb1a0435d4fac54a52d1ca1e481d60e" [[projects]] digest = "1:cb422c75bab66a8339a38b64e837f3b28f3d5a8c06abd7b9048f420363baa18a" @@ -1341,13 +1346,17 @@ "github.com/golang/protobuf/ptypes", "github.com/golang/protobuf/ptypes/timestamp", "github.com/grpc-ecosystem/go-grpc-middleware", + "github.com/kubernetes-csi/csi-lib-utils/connection", "github.com/kubernetes-csi/csi-lib-utils/protosanitizer", + "github.com/kubernetes-csi/csi-lib-utils/rpc", "github.com/kubernetes-csi/external-snapshotter/pkg/apis/volumesnapshot/v1alpha1", "github.com/kubernetes-csi/external-snapshotter/pkg/client/clientset/versioned/typed/volumesnapshot/v1alpha1", "github.com/onsi/ginkgo", "github.com/onsi/gomega", "github.com/pborman/uuid", "github.com/pkg/errors", + "github.com/prometheus/client_golang/prometheus", + "github.com/prometheus/client_golang/prometheus/promhttp", "golang.org/x/net/context", "google.golang.org/grpc", "google.golang.org/grpc/codes", diff --git a/cmd/cephcsi.go b/cmd/cephcsi.go index cd97787ad..43031b811 100644 --- a/cmd/cephcsi.go +++ b/cmd/cephcsi.go @@ -22,24 +22,28 @@ import ( "path" "path/filepath" "strings" + "time" "github.com/ceph/ceph-csi/pkg/cephfs" + "github.com/ceph/ceph-csi/pkg/liveness" "github.com/ceph/ceph-csi/pkg/rbd" "github.com/ceph/ceph-csi/pkg/util" "k8s.io/klog" ) const ( - rbdType = "rbd" - cephfsType = "cephfs" + rbdType = "rbd" + cephfsType = "cephfs" + livenessType = "liveness" - rbdDefaultName = "rbd.csi.ceph.com" - cephfsDefaultName = "cephfs.csi.ceph.com" + rbdDefaultName = "rbd.csi.ceph.com" + cephfsDefaultName = "cephfs.csi.ceph.com" + livenessDefaultName = "liveness.csi.ceph.com" ) var ( // common flags - vtype = flag.String("type", "", "driver type [rbd|cephfs]") + vtype = flag.String("type", "", "driver type [rbd|cephfs|liveness]") endpoint = flag.String("endpoint", "unix://tmp/csi.sock", "CSI endpoint") driverName = flag.String("drivername", "", "name of the driver") nodeID = flag.String("nodeid", "", "node id") @@ -55,6 +59,12 @@ var ( // cephfs related flags volumeMounter = flag.String("volumemounter", "", "default volume mounter (possible options are 'kernel', 'fuse')") mountCacheDir = flag.String("mountcachedir", "", "mount info cache save dir") + + // livenes related flags + livenessport = flag.Int("livenessport", 8080, "TCP port for liveness requests") + livenesspath = flag.String("livenesspath", "/metrics", "path of prometheus endpoint where metrics will be available") + pollTime = flag.Duration("polltime", time.Second*60, "time interval in seconds between each poll") + timeout = flag.Duration("timeout", time.Second*3, "probe timeout in seconds") ) func init() { @@ -90,6 +100,8 @@ func getDriverName() string { return rbdDefaultName case cephfsType: return cephfsDefaultName + case livenessType: + return livenessDefaultName default: return "" } @@ -148,6 +160,9 @@ func main() { driver := cephfs.NewDriver() driver.Run(dname, *nodeID, *endpoint, *volumeMounter, *mountCacheDir, *instanceID, csipluginPath, cp, driverType) + case livenessType: + liveness.Run(*endpoint, *livenesspath, *livenessport, *pollTime, *timeout) + default: klog.Fatalln("invalid volume type", vtype) // calls exit } diff --git a/deploy/cephfs/kubernetes/v1.13/csi-cephfsplugin-provisioner.yaml b/deploy/cephfs/kubernetes/v1.13/csi-cephfsplugin-provisioner.yaml index 738675004..0502e3615 100644 --- a/deploy/cephfs/kubernetes/v1.13/csi-cephfsplugin-provisioner.yaml +++ b/deploy/cephfs/kubernetes/v1.13/csi-cephfsplugin-provisioner.yaml @@ -4,13 +4,15 @@ apiVersion: v1 metadata: name: csi-cephfsplugin-provisioner labels: - app: csi-cephfsplugin-provisioner + app: csi-liveness spec: selector: app: csi-cephfsplugin-provisioner ports: - - name: dummy - port: 12345 + - name: http-metrics + port: 8080 + protocol: TCP + targetPort: 8081 --- kind: StatefulSet @@ -97,6 +99,26 @@ spec: mountPath: /etc/ceph-csi-config/ - name: keys-tmp-dir mountPath: /tmp/csi/keys + - name: liveness-prometheus + image: quay.io/cephcsi/cephcsi:canary + args: + - "--type=liveness" + - "--endpoint=$(CSI_ENDPOINT)" + - "--livenessport=8081" + - "--livenesspath=/metrics" + - "--polltime=60s" + - "--timeout=3s" + env: + - name: CSI_ENDPOINT + value: unix:///csi/csi.sock + - name: POD_IP + valueFrom: + fieldRef: + fieldPath: status.podIP + volumeMounts: + - name: socket-dir + mountPath: /csi + imagePullPolicy: "IfNotPresent" volumes: - name: socket-dir hostPath: diff --git a/deploy/cephfs/kubernetes/v1.13/csi-cephfsplugin.yaml b/deploy/cephfs/kubernetes/v1.13/csi-cephfsplugin.yaml index ac6dbd21c..e817a51a1 100644 --- a/deploy/cephfs/kubernetes/v1.13/csi-cephfsplugin.yaml +++ b/deploy/cephfs/kubernetes/v1.13/csi-cephfsplugin.yaml @@ -90,8 +90,29 @@ spec: mountPath: /dev - name: ceph-csi-config mountPath: /etc/ceph-csi-config/ + - name: keys-tmp-dir mountPath: /tmp/csi/keys + - name: liveness-prometheus + image: quay.io/cephcsi/cephcsi:canary + args: + - "--type=liveness" + - "--endpoint=$(CSI_ENDPOINT)" + - "--livenessport=8081" + - "--livenesspath=/metrics" + - "--polltime=60s" + - "--timeout=3s" + env: + - name: CSI_ENDPOINT + value: unix:///csi/csi.sock + - name: POD_IP + valueFrom: + fieldRef: + fieldPath: status.podIP + volumeMounts: + - name: socket-dir + mountPath: /csi + imagePullPolicy: "IfNotPresent" volumes: - name: mount-cache-dir emptyDir: {} @@ -127,3 +148,19 @@ spec: emptyDir: { medium: "Memory" } +--- +# This is a service to expose the liveness side car +apiVersion: v1 +kind: Service +metadata: + name: csi-liveness-cephfsplugin + labels: + app: csi-liveness +spec: + ports: + - name: http-metrics + port: 8080 + protocol: TCP + targetPort: 8081 + selector: + app: csi-cephfsplugin diff --git a/deploy/cephfs/kubernetes/v1.13/helm/templates/nodeplugin-daemonset.yaml b/deploy/cephfs/kubernetes/v1.13/helm/templates/nodeplugin-daemonset.yaml index 33f613e2e..a3c105114 100644 --- a/deploy/cephfs/kubernetes/v1.13/helm/templates/nodeplugin-daemonset.yaml +++ b/deploy/cephfs/kubernetes/v1.13/helm/templates/nodeplugin-daemonset.yaml @@ -108,6 +108,28 @@ spec: - name: keys-tmp-dir mountPath: /tmp/csi/keys resources: +{{ toYaml .Values.nodeplugin.plugin.resources | indent 12 }} + - name: liveness-prometheus + image: "{{ .Values.nodeplugin.plugin.image.repository }}:{{ .Values.nodeplugin.plugin.image.tag }}" + args: + - "--type=liveness" + - "--endpoint=$(CSI_ENDPOINT)" + - "--livenessport=8081" + - "--livenesspath=/metrics" + - "--polltime=60s" + - "--timeout=3s" + imagePullPolicy: {{ .Values.nodeplugin.plugin.image.pullPolicy }} + env: + - name: CSI_ENDPOINT + value: "unix:/{{ .Values.socketDir }}/{{ .Values.socketFile }}" + - name: POD_IP + valueFrom: + fieldRef: + fieldPath: status.podIP + volumeMounts: + - name: plugin-dir + mountPath: {{ .Values.socketDir }} + resources: {{ toYaml .Values.nodeplugin.plugin.resources | indent 12 }} volumes: - name: mount-cache-dir diff --git a/deploy/cephfs/kubernetes/v1.13/helm/templates/provisioner-service.yaml b/deploy/cephfs/kubernetes/v1.13/helm/templates/provisioner-service.yaml index a1f92dcde..073f09440 100644 --- a/deploy/cephfs/kubernetes/v1.13/helm/templates/provisioner-service.yaml +++ b/deploy/cephfs/kubernetes/v1.13/helm/templates/provisioner-service.yaml @@ -4,7 +4,7 @@ metadata: name: {{ include "ceph-csi-cephfs.provisioner.fullname" . }} namespace: {{ .Release.Namespace }} labels: - app: {{ include "ceph-csi-cephfs.name" . }} + app: csi-liveness chart: {{ include "ceph-csi-cephfs.chart" . }} component: {{ .Values.provisioner.name }} release: {{ .Release.Name }} @@ -15,5 +15,7 @@ spec: component: {{ .Values.provisioner.name }} release: {{ .Release.Name }} ports: - - name: dummy - port: 12345 + - name: http-metrics + port: 8080 + protocol: TCP + targetPort: 8081 diff --git a/deploy/cephfs/kubernetes/v1.13/helm/templates/provisioner-statefulset.yaml b/deploy/cephfs/kubernetes/v1.13/helm/templates/provisioner-statefulset.yaml index 2e51787e8..7ae18fbfd 100644 --- a/deploy/cephfs/kubernetes/v1.13/helm/templates/provisioner-statefulset.yaml +++ b/deploy/cephfs/kubernetes/v1.13/helm/templates/provisioner-statefulset.yaml @@ -95,6 +95,28 @@ spec: - name: keys-tmp-dir mountPath: /tmp/csi/keys resources: +{{ toYaml .Values.nodeplugin.plugin.resources | indent 12 }} + - name: liveness-prometheus + image: "{{ .Values.nodeplugin.plugin.image.repository }}:{{ .Values.nodeplugin.plugin.image.tag }}" + args: + - "--type=liveness" + - "--endpoint=$(CSI_ENDPOINT)" + - "--livenessport=8081" + - "--livenesspath=/metrics" + - "--polltime=60s" + - "--timeout=3s" + imagePullPolicy: {{ .Values.nodeplugin.plugin.image.pullPolicy }} + env: + - name: CSI_ENDPOINT + value: "unix:/{{ .Values.socketDir }}/{{ .Values.socketFile }}" + - name: POD_IP + valueFrom: + fieldRef: + fieldPath: status.podIP + volumeMounts: + - name: socket-dir + mountPath: {{ .Values.socketDir }} + resources: {{ toYaml .Values.nodeplugin.plugin.resources | indent 12 }} volumes: - name: socket-dir diff --git a/deploy/cephfs/kubernetes/v1.14+/csi-cephfsplugin-provisioner.yaml b/deploy/cephfs/kubernetes/v1.14+/csi-cephfsplugin-provisioner.yaml index 371643f35..a593b1fdc 100644 --- a/deploy/cephfs/kubernetes/v1.14+/csi-cephfsplugin-provisioner.yaml +++ b/deploy/cephfs/kubernetes/v1.14+/csi-cephfsplugin-provisioner.yaml @@ -1,3 +1,19 @@ +--- +kind: Service +apiVersion: v1 +metadata: + name: csi-cephfsplugin-provisioner + labels: + app: csi-liveness +spec: + selector: + app: csi-cephfsplugin-provisioner + ports: + - name: http-metrics + port: 8080 + protocol: TCP + targetPort: 8081 + --- kind: Deployment apiVersion: apps/v1 @@ -86,6 +102,26 @@ spec: mountPath: /etc/ceph-csi-config/ - name: keys-tmp-dir mountPath: /tmp/csi/keys + - name: liveness-prometheus + image: quay.io/cephcsi/cephcsi:canary + args: + - "--type=liveness" + - "--endpoint=$(CSI_ENDPOINT)" + - "--livenessport=8081" + - "--livenesspath=/metrics" + - "--polltime=60s" + - "--timeout=3s" + env: + - name: CSI_ENDPOINT + value: unix:///csi/csi.sock + - name: POD_IP + valueFrom: + fieldRef: + fieldPath: status.podIP + volumeMounts: + - name: socket-dir + mountPath: /csi + imagePullPolicy: "IfNotPresent" volumes: - name: socket-dir hostPath: diff --git a/deploy/cephfs/kubernetes/v1.14+/csi-cephfsplugin.yaml b/deploy/cephfs/kubernetes/v1.14+/csi-cephfsplugin.yaml index ac6dbd21c..45f59a371 100644 --- a/deploy/cephfs/kubernetes/v1.14+/csi-cephfsplugin.yaml +++ b/deploy/cephfs/kubernetes/v1.14+/csi-cephfsplugin.yaml @@ -92,6 +92,26 @@ spec: mountPath: /etc/ceph-csi-config/ - name: keys-tmp-dir mountPath: /tmp/csi/keys + - name: liveness-prometheus + image: quay.io/cephcsi/cephcsi:canary + args: + - "--type=liveness" + - "--endpoint=$(CSI_ENDPOINT)" + - "--livenessport=8081" + - "--livenesspath=/metrics" + - "--polltime=60s" + - "--timeout=3s" + env: + - name: CSI_ENDPOINT + value: unix:///csi/csi.sock + - name: POD_IP + valueFrom: + fieldRef: + fieldPath: status.podIP + volumeMounts: + - name: socket-dir + mountPath: /csi + imagePullPolicy: "IfNotPresent" volumes: - name: mount-cache-dir emptyDir: {} @@ -127,3 +147,19 @@ spec: emptyDir: { medium: "Memory" } +--- +# This is a service to expose the liveness side car +apiVersion: v1 +kind: Service +metadata: + name: csi-liveness-cephfsplugin + labels: + app: csi-liveness +spec: + ports: + - name: http-metrics + port: 8080 + protocol: TCP + targetPort: 8081 + selector: + app: csi-cephfsplugin diff --git a/deploy/rbd/kubernetes/v1.13/csi-rbdplugin-provisioner.yaml b/deploy/rbd/kubernetes/v1.13/csi-rbdplugin-provisioner.yaml index 3b03ec341..37c823460 100644 --- a/deploy/rbd/kubernetes/v1.13/csi-rbdplugin-provisioner.yaml +++ b/deploy/rbd/kubernetes/v1.13/csi-rbdplugin-provisioner.yaml @@ -4,13 +4,15 @@ apiVersion: v1 metadata: name: csi-rbdplugin-provisioner labels: - app: csi-rbdplugin-provisioner + app: csi-liveness spec: selector: app: csi-rbdplugin-provisioner ports: - - name: dummy - port: 12345 + - name: http-metrics + port: 8080 + protocol: TCP + targetPort: 8080 --- kind: StatefulSet @@ -110,6 +112,26 @@ spec: mountPath: /etc/ceph-csi-config/ - name: keys-tmp-dir mountPath: /tmp/csi/keys + - name: liveness-prometheus + image: quay.io/cephcsi/cephcsi:canary + args: + - "--type=liveness" + - "--endpoint=$(CSI_ENDPOINT)" + - "--livenessport=8080" + - "--livenesspath=/metrics" + - "--polltime=60s" + - "--timeout=3s" + env: + - name: CSI_ENDPOINT + value: unix:///csi/csi.sock + - name: POD_IP + valueFrom: + fieldRef: + fieldPath: status.podIP + volumeMounts: + - name: socket-dir + mountPath: /csi + imagePullPolicy: "IfNotPresent" volumes: - name: host-dev hostPath: diff --git a/deploy/rbd/kubernetes/v1.13/csi-rbdplugin.yaml b/deploy/rbd/kubernetes/v1.13/csi-rbdplugin.yaml index 1632703b0..dd5dad428 100644 --- a/deploy/rbd/kubernetes/v1.13/csi-rbdplugin.yaml +++ b/deploy/rbd/kubernetes/v1.13/csi-rbdplugin.yaml @@ -90,6 +90,26 @@ spec: mountPropagation: "Bidirectional" - name: keys-tmp-dir mountPath: /tmp/csi/keys + - name: liveness-prometheus + image: quay.io/cephcsi/cephcsi:canary + args: + - "--type=liveness" + - "--endpoint=$(CSI_ENDPOINT)" + - "--livenessport=8080" + - "--livenesspath=/metrics" + - "--polltime=60s" + - "--timeout=3s" + env: + - name: CSI_ENDPOINT + value: unix:///csi/csi.sock + - name: POD_IP + valueFrom: + fieldRef: + fieldPath: status.podIP + volumeMounts: + - name: socket-dir + mountPath: /csi + imagePullPolicy: "IfNotPresent" volumes: - name: socket-dir hostPath: @@ -126,3 +146,19 @@ spec: emptyDir: { medium: "Memory" } +--- +# This is a service to expose the liveness side car +apiVersion: v1 +kind: Service +metadata: + name: csi-liveness-rbdplugin + labels: + app: csi-liveness +spec: + ports: + - name: http-metrics + port: 8080 + protocol: TCP + targetPort: 8080 + selector: + app: csi-rbdplugin diff --git a/deploy/rbd/kubernetes/v1.13/helm/templates/nodeplugin-daemonset.yaml b/deploy/rbd/kubernetes/v1.13/helm/templates/nodeplugin-daemonset.yaml index ff6f9b7ec..831777b50 100644 --- a/deploy/rbd/kubernetes/v1.13/helm/templates/nodeplugin-daemonset.yaml +++ b/deploy/rbd/kubernetes/v1.13/helm/templates/nodeplugin-daemonset.yaml @@ -107,6 +107,28 @@ spec: - name: keys-tmp-dir mountPath: /tmp/csi/keys resources: +{{ toYaml .Values.nodeplugin.plugin.resources | indent 12 }} + - name: liveness-prometheus + image: "{{ .Values.nodeplugin.plugin.image.repository }}:{{ .Values.nodeplugin.plugin.image.tag }}" + args: + - "--type=liveness" + - "--endpoint=$(CSI_ENDPOINT)" + - "--livenessport=8080" + - "--livenesspath=/metrics" + - "--polltime=60s" + - "--timeout=3s" + imagePullPolicy: {{ .Values.nodeplugin.plugin.image.pullPolicy }} + env: + - name: CSI_ENDPOINT + value: "unix:/{{ .Values.socketDir }}/{{ .Values.socketFile }}" + - name: POD_IP + valueFrom: + fieldRef: + fieldPath: status.podIP + volumeMounts: + - name: plugin-dir + mountPath: {{ .Values.socketDir }} + resources: {{ toYaml .Values.nodeplugin.plugin.resources | indent 12 }} volumes: - name: socket-dir diff --git a/deploy/rbd/kubernetes/v1.13/helm/templates/provisioner-service.yaml b/deploy/rbd/kubernetes/v1.13/helm/templates/provisioner-service.yaml index b0a6cf477..73e8798c5 100644 --- a/deploy/rbd/kubernetes/v1.13/helm/templates/provisioner-service.yaml +++ b/deploy/rbd/kubernetes/v1.13/helm/templates/provisioner-service.yaml @@ -4,7 +4,7 @@ metadata: name: {{ include "ceph-csi-rbd.provisioner.fullname" . }} namespace: {{ .Release.Namespace }} labels: - app: {{ include "ceph-csi-rbd.name" . }} + app: csi-liveness chart: {{ include "ceph-csi-rbd.chart" . }} component: {{ .Values.provisioner.name }} release: {{ .Release.Name }} @@ -15,5 +15,7 @@ spec: component: {{ .Values.provisioner.name }} release: {{ .Release.Name }} ports: - - name: dummy - port: 12345 + - name: http-metrics + port: 8080 + protocol: TCP + targetPort: 8080 diff --git a/deploy/rbd/kubernetes/v1.13/helm/templates/provisioner-statefulset.yaml b/deploy/rbd/kubernetes/v1.13/helm/templates/provisioner-statefulset.yaml index 5ab218318..a603d167e 100644 --- a/deploy/rbd/kubernetes/v1.13/helm/templates/provisioner-statefulset.yaml +++ b/deploy/rbd/kubernetes/v1.13/helm/templates/provisioner-statefulset.yaml @@ -25,6 +25,7 @@ spec: component: {{ .Values.provisioner.name }} release: {{ .Release.Name }} heritage: {{ .Release.Service }} + contains: liveness spec: serviceAccountName: {{ include "ceph-csi-rbd.serviceAccountName.provisioner" . }} containers: @@ -112,6 +113,28 @@ spec: - name: keys-tmp-dir mountPath: /tmp/csi/keys resources: +{{ toYaml .Values.nodeplugin.plugin.resources | indent 12 }} + - name: liveness-prometheus + image: "{{ .Values.nodeplugin.plugin.image.repository }}:{{ .Values.nodeplugin.plugin.image.tag }}" + args: + - "--type=liveness" + - "--endpoint=$(CSI_ENDPOINT)" + - "--livenessport=8080" + - "--livenesspath=/metrics" + - "--polltime=60s" + - "--timeout=3s" + imagePullPolicy: {{ .Values.nodeplugin.plugin.image.pullPolicy }} + env: + - name: CSI_ENDPOINT + value: "unix:/{{ .Values.socketDir }}/{{ .Values.socketFile }}" + - name: POD_IP + valueFrom: + fieldRef: + fieldPath: status.podIP + volumeMounts: + - name: socket-dir + mountPath: {{ .Values.socketDir }} + resources: {{ toYaml .Values.nodeplugin.plugin.resources | indent 12 }} volumes: - name: socket-dir diff --git a/deploy/rbd/kubernetes/v1.14+/csi-rbdplugin-provisioner.yaml b/deploy/rbd/kubernetes/v1.14+/csi-rbdplugin-provisioner.yaml index 02aecd603..b3bd49cb4 100644 --- a/deploy/rbd/kubernetes/v1.14+/csi-rbdplugin-provisioner.yaml +++ b/deploy/rbd/kubernetes/v1.14+/csi-rbdplugin-provisioner.yaml @@ -1,3 +1,19 @@ +--- +kind: Service +apiVersion: v1 +metadata: + name: csi-rbdplugin-provisioner + labels: + app: csi-liveness +spec: + selector: + app: csi-rbdplugin-provisioner + ports: + - name: http-metrics + port: 8080 + protocol: TCP + targetPort: 8080 + --- kind: Deployment apiVersion: apps/v1 @@ -100,6 +116,26 @@ spec: mountPath: /etc/ceph-csi-config/ - name: keys-tmp-dir mountPath: /tmp/csi/keys + - name: liveness-prometheus + image: quay.io/cephcsi/cephcsi:canary + args: + - "--type=liveness" + - "--endpoint=$(CSI_ENDPOINT)" + - "--livenessport=8080" + - "--livenesspath=/metrics" + - "--polltime=60s" + - "--timeout=3s" + env: + - name: CSI_ENDPOINT + value: unix:///csi/csi.sock + - name: POD_IP + valueFrom: + fieldRef: + fieldPath: status.podIP + volumeMounts: + - name: socket-dir + mountPath: /csi + imagePullPolicy: "IfNotPresent" volumes: - name: host-dev hostPath: diff --git a/deploy/rbd/kubernetes/v1.14+/csi-rbdplugin.yaml b/deploy/rbd/kubernetes/v1.14+/csi-rbdplugin.yaml index 1632703b0..dd5dad428 100644 --- a/deploy/rbd/kubernetes/v1.14+/csi-rbdplugin.yaml +++ b/deploy/rbd/kubernetes/v1.14+/csi-rbdplugin.yaml @@ -90,6 +90,26 @@ spec: mountPropagation: "Bidirectional" - name: keys-tmp-dir mountPath: /tmp/csi/keys + - name: liveness-prometheus + image: quay.io/cephcsi/cephcsi:canary + args: + - "--type=liveness" + - "--endpoint=$(CSI_ENDPOINT)" + - "--livenessport=8080" + - "--livenesspath=/metrics" + - "--polltime=60s" + - "--timeout=3s" + env: + - name: CSI_ENDPOINT + value: unix:///csi/csi.sock + - name: POD_IP + valueFrom: + fieldRef: + fieldPath: status.podIP + volumeMounts: + - name: socket-dir + mountPath: /csi + imagePullPolicy: "IfNotPresent" volumes: - name: socket-dir hostPath: @@ -126,3 +146,19 @@ spec: emptyDir: { medium: "Memory" } +--- +# This is a service to expose the liveness side car +apiVersion: v1 +kind: Service +metadata: + name: csi-liveness-rbdplugin + labels: + app: csi-liveness +spec: + ports: + - name: http-metrics + port: 8080 + protocol: TCP + targetPort: 8080 + selector: + app: csi-rbdplugin diff --git a/docs/deploy-cephfs.md b/docs/deploy-cephfs.md index 7b1c69447..1b78e2d86 100644 --- a/docs/deploy-cephfs.md +++ b/docs/deploy-cephfs.md @@ -1,3 +1,4 @@ + # CSI CephFS plugin The CSI CephFS plugin is able to both provision new CephFS volumes @@ -42,18 +43,22 @@ that should be resolved in v14.2.3. **Available command line arguments:** -| Option | Default value | Description | -| ------------------- | --------------------------- | -------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- | -| `--endpoint` | `unix://tmp/csi.sock` | CSI endpoint, must be a UNIX socket | -| `--drivername` | `cephfs.csi.ceph.com` | Name of the driver (Kubernetes: `provisioner` field in StorageClass must correspond to this value) | -| `--nodeid` | _empty_ | This node's ID | -| `--type` | _empty_ | Driver type `[rbd | cephfs]` If the driver type is set to `rbd` it will act as a `rbd plugin` or if it's set to `cephfs` will act as a `cephfs plugin` | +| Option | Default value | Description | +| ------------------- | --------------------------- | ----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- | +| `--endpoint` | `unix://tmp/csi.sock` | CSI endpoint, must be a UNIX socket | +| `--drivername` | `cephfs.csi.ceph.com` | Name of the driver (Kubernetes: `provisioner` field in StorageClass must correspond to this value) | +| `--nodeid` | _empty_ | This node's ID | +| `--type` | _empty_ | Driver type `[rbd | cephfs]` If the driver type is set to `rbd` it will act as a `rbd plugin` or if it's set to `cephfs` will act as a `cephfs plugin` | | `--volumemounter` | _empty_ | Default volume mounter. Available options are `kernel` and `fuse`. This is the mount method used if volume parameters don't specify otherwise. If left unspecified, the driver will first probe for `ceph-fuse` in system's path and will choose Ceph kernel client if probing failed. | | `--mountcachedir` | _empty_ | Volume mount cache info save dir. If left unspecified, the dirver will not record mount info, or it will save mount info and when driver restart it will remount volume it cached. | | `--instanceid` | "default" | Unique ID distinguishing this instance of Ceph CSI among other instances, when sharing Ceph clusters across CSI instances for provisioning | | `--pluginpath` | "/var/lib/kubelet/plugins/" | The location of cephcsi plugin on host | | `--metadatastorage` | _empty_ | Points to where older (1.0.0 or older plugin versions) metadata about provisioned volumes are kept, as file or in as k8s configmap (`node` or `k8s_configmap` respectively) | | `--pidlimit` | _0_ | Configure the PID limit in cgroups. The container runtime can restrict the number of processes/tasks which can cause problems while provisioning (or deleting) a large number of volumes. A value of `-1` configures the limit to the maximum, `0` does not configure limits at all. | +| `--livenessport` | `8080` | TCP port for liveness requests | +| `--livenesspath` | `/metrics` | Path of prometheus endpoint where metrics will be available | +| `--polltime` | `60s` | Time interval in between each poll | +| `--timeout` | `3s` | Probe timeout in seconds | **Available environmental variables:** @@ -163,11 +168,11 @@ After successfully completing the steps above, you should see output similar to ```bash $ kubectl get all NAME READY STATUS RESTARTS AGE -pod/csi-cephfsplugin-provisioner-0 3/3 Running 0 25s -pod/csi-cephfsplugin-rljcv 2/2 Running 0 24s +pod/csi-cephfsplugin-provisioner-0 4/4 Running 0 25s +pod/csi-cephfsplugin-rljcv 3/3 Running 0 24s NAME TYPE CLUSTER-IP EXTERNAL-IP PORT(S) AGE -service/csi-cephfsplugin-provisioner ClusterIP 10.101.78.75 12345/TCP 26s +service/csi-cephfsplugin-provisioner ClusterIP 10.101.78.75 8080/TCP 26s ... ``` diff --git a/docs/deploy-rbd.md b/docs/deploy-rbd.md index 3210e3ace..f73a0b2f7 100644 --- a/docs/deploy-rbd.md +++ b/docs/deploy-rbd.md @@ -1,3 +1,4 @@ + # CSI RBD Plugin The RBD CSI plugin is able to provision new RBD images and @@ -36,6 +37,10 @@ make image-cephcsi | `--instanceid` | "default" | Unique ID distinguishing this instance of Ceph CSI among other instances, when sharing Ceph clusters across CSI instances for provisioning | | `--metadatastorage` | _empty_ | Points to where legacy (1.0.0 or older plugin versions) metadata about provisioned volumes are kept, as file or in as k8s configmap (`node` or `k8s_configmap` respectively) | | `--pidlimit` | _0_ | Configure the PID limit in cgroups. The container runtime can restrict the number of processes/tasks which can cause problems while provisioning (or deleting) a large number of volumes. A value of `-1` configures the limit to the maximum, `0` does not configure limits at all. | +| `--livenessport` | `8080` | TCP port for liveness requests | +| `--livenesspath` | `"/metrics"` | Path of prometheus endpoint where metrics will be available | +| `--polltime` | `"60s"` | Time interval in between each poll | +| `--timeout` | `"3s"` | Probe timeout in seconds | **Available volume parameters:** @@ -126,11 +131,11 @@ After successfully completing the steps above, you should see output similar to ```bash $ kubectl get all NAME READY STATUS RESTARTS AGE -pod/csi-rbdplugin-fptqr 2/2 Running 0 21s -pod/csi-rbdplugin-provisioner-0 4/4 Running 0 22s +pod/csi-rbdplugin-fptqr 3/3 Running 0 21s +pod/csi-rbdplugin-provisioner-0 5/5 Running 0 22s NAME TYPE CLUSTER-IP EXTERNAL-IP PORT(S) AGE -service/csi-rbdplugin-provisioner ClusterIP 10.104.2.130 12345/TCP 23s +service/csi-rbdplugin-provisioner ClusterIP 10.104.2.130 8080/TCP 23s ... ``` diff --git a/docs/metrics.md b/docs/metrics.md new file mode 100644 index 000000000..7f4a9ef62 --- /dev/null +++ b/docs/metrics.md @@ -0,0 +1,32 @@ +# Metrics + +CSI deploys a sidecar container that is responsible for collecting metrics. + +## Liveness + +Liveness metrics are intended to be collected by prometheus but can be accesesed +through a GET request to a specific pod ip. + +for example +`curl -X get http://[pod ip]:[liveness-port][liveness-path] 2>/dev/null | grep csi` + +the expected output should be + +```bash +[root@worker2 /]# curl -X GET http://10.109.65.142:8080/metrics 2>/dev/null | grep csi +# HELP csi_liveness Liveness Probe +# TYPE csi_liveness gauge +csi_liveness 1 +``` + +Promethues can be deployed through the promethues operator described [here](https://coreos.com/operators/prometheus/docs/latest/user-guides/getting-started.html). +The [service-monitor](../examples/service-monitor.yaml) will tell promethues how +to pull metrics out of CSI. + +Each CSI pod has a service to expose the end point to prometheus. By default rbd +pods run on port 8080 and cephfs 8081. +These can be changed if desired or if multiple ceph clusters are deployed more +ports will be used for additional CSI pods. + +You may need to open the ports used in your firewall depending on how you +cluster is setup. diff --git a/examples/service-monitor.yaml b/examples/service-monitor.yaml new file mode 100644 index 000000000..29065bc7f --- /dev/null +++ b/examples/service-monitor.yaml @@ -0,0 +1,22 @@ +--- +# This is a servicemonitor that would be used by a prometheus collector to pick +# up liveness metrics. The label your prometheus instance is looking for will +# need to be supplied and the namespace may need to changed +apiVersion: monitoring.coreos.com/v1 +kind: ServiceMonitor +metadata: + name: csi-liveness + namespace: rook-ceph + labels: + team: rook +spec: + namespaceSelector: + matchNames: + - default + selector: + matchLabels: + app: csi-liveness + endpoints: + - port: http-metrics + path: /metrics + interval: 5s diff --git a/pkg/liveness/liveness.go b/pkg/liveness/liveness.go new file mode 100644 index 000000000..411045a57 --- /dev/null +++ b/pkg/liveness/liveness.go @@ -0,0 +1,106 @@ +/* +Copyright 2019 The Ceph-CSI Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package liveness + +import ( + "context" + "net" + "net/http" + "os" + "strconv" + "time" + + connlib "github.com/kubernetes-csi/csi-lib-utils/connection" + "github.com/kubernetes-csi/csi-lib-utils/rpc" + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promhttp" + "k8s.io/klog" +) + +var ( + liveness = prometheus.NewGauge(prometheus.GaugeOpts{ + Namespace: "csi", + Name: "liveness", + Help: "Liveness Probe", + }) +) + +func getLiveness(endpoint string, timeout time.Duration) { + + csiConn, err := connlib.Connect(endpoint) + if err != nil { + // connlib should retry forever so a returned error should mean + // the grpc client is misconfigured rather than an error on the network + klog.Fatalf("failed to establish connection to CSI driver: %v", err) + } + + ctx, cancel := context.WithTimeout(context.Background(), timeout) + defer cancel() + + klog.Info("Sending probe request to CSI driver") + ready, err := rpc.Probe(ctx, csiConn) + if err != nil { + liveness.Set(0) + klog.Errorf("health check failed: %v", err) + return + } + + if !ready { + liveness.Set(0) + klog.Error("driver responded but is not ready") + return + } + liveness.Set(1) + klog.Infof("Health check succeeded") +} + +func recordLiveness(endpoint string, pollTime, timeout time.Duration) { + // register prometheus metrics + err := prometheus.Register(liveness) + if err != nil { + klog.Fatalln(err) + } + + // get liveness periodically + ticker := time.NewTicker(pollTime) + defer ticker.Stop() + for range ticker.C { + getLiveness(endpoint, timeout) + } +} + +func Run(endpoint, livenessendpoint string, port int, pollTime, timeout time.Duration) { + klog.Infof("Liveness Running") + + ip := os.Getenv("POD_IP") + + if ip == "" { + klog.Warning("missing POD_IP env var defaulting to 0.0.0.0") + ip = "0.0.0.0" + } + + // start liveness collection + go recordLiveness(endpoint, pollTime, timeout) + + // start up prometheus endpoint + addr := net.JoinHostPort(ip, strconv.Itoa(port)) + http.Handle(livenessendpoint, promhttp.Handler()) + err := http.ListenAndServe(addr, nil) + if err != nil { + klog.Fatalln(err) + } +} diff --git a/vendor/github.com/kubernetes-csi/csi-lib-utils/connection/connection.go b/vendor/github.com/kubernetes-csi/csi-lib-utils/connection/connection.go new file mode 100644 index 000000000..588826ee3 --- /dev/null +++ b/vendor/github.com/kubernetes-csi/csi-lib-utils/connection/connection.go @@ -0,0 +1,310 @@ +/* +Copyright 2019 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package connection + +import ( + "context" + "errors" + "fmt" + "io/ioutil" + "net" + "strings" + "time" + + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" + + "github.com/container-storage-interface/spec/lib/go/csi" + "github.com/kubernetes-csi/csi-lib-utils/protosanitizer" + "google.golang.org/grpc" + "k8s.io/klog" +) + +const ( + // Interval of logging connection errors + connectionLoggingInterval = 10 * time.Second + + // Interval of trying to call Probe() until it succeeds + probeInterval = 1 * time.Second +) + +const terminationLogPath = "/dev/termination-log" + +// Connect opens insecure gRPC connection to a CSI driver. Address must be either absolute path to UNIX domain socket +// file or have format '://', following gRPC name resolution mechanism at +// https://github.com/grpc/grpc/blob/master/doc/naming.md. +// +// The function tries to connect indefinitely every second until it connects. The function automatically disables TLS +// and adds interceptor for logging of all gRPC messages at level 5. +// +// For a connection to a Unix Domain socket, the behavior after +// loosing the connection is configurable. The default is to +// log the connection loss and reestablish a connection. Applications +// which need to know about a connection loss can be notified by +// passing a callback with OnConnectionLoss and in that callback +// can decide what to do: +// - exit the application with os.Exit +// - invalidate cached information +// - disable the reconnect, which will cause all gRPC method calls to fail with status.Unavailable +// +// For other connections, the default behavior from gRPC is used and +// loss of connection is not detected reliably. +func Connect(address string, options ...Option) (*grpc.ClientConn, error) { + return connect(address, []grpc.DialOption{}, options) +} + +// Option is the type of all optional parameters for Connect. +type Option func(o *options) + +// OnConnectionLoss registers a callback that will be invoked when the +// connection got lost. If that callback returns true, the connection +// is reestablished. Otherwise the connection is left as it is and +// all future gRPC calls using it will fail with status.Unavailable. +func OnConnectionLoss(reconnect func() bool) Option { + return func(o *options) { + o.reconnect = reconnect + } +} + +// ExitOnConnectionLoss returns callback for OnConnectionLoss() that writes +// an error to /dev/termination-log and exits. +func ExitOnConnectionLoss() func() bool { + return func() bool { + terminationMsg := "Lost connection to CSI driver, exiting" + if err := ioutil.WriteFile(terminationLogPath, []byte(terminationMsg), 0644); err != nil { + klog.Errorf("%s: %s", terminationLogPath, err) + } + klog.Fatalf(terminationMsg) + return false + } +} + +type options struct { + reconnect func() bool +} + +// connect is the internal implementation of Connect. It has more options to enable testing. +func connect(address string, dialOptions []grpc.DialOption, connectOptions []Option) (*grpc.ClientConn, error) { + var o options + for _, option := range connectOptions { + option(&o) + } + + dialOptions = append(dialOptions, + grpc.WithInsecure(), // Don't use TLS, it's usually local Unix domain socket in a container. + grpc.WithBackoffMaxDelay(time.Second), // Retry every second after failure. + grpc.WithBlock(), // Block until connection succeeds. + grpc.WithUnaryInterceptor(LogGRPC), // Log all messages. + ) + unixPrefix := "unix://" + if strings.HasPrefix(address, "/") { + // It looks like filesystem path. + address = unixPrefix + address + } + + if strings.HasPrefix(address, unixPrefix) { + // state variables for the custom dialer + haveConnected := false + lostConnection := false + reconnect := true + + dialOptions = append(dialOptions, grpc.WithDialer(func(addr string, timeout time.Duration) (net.Conn, error) { + if haveConnected && !lostConnection { + // We have detected a loss of connection for the first time. Decide what to do... + // Record this once. TODO (?): log at regular time intervals. + klog.Errorf("Lost connection to %s.", address) + // Inform caller and let it decide? Default is to reconnect. + if o.reconnect != nil { + reconnect = o.reconnect() + } + lostConnection = true + } + if !reconnect { + return nil, errors.New("connection lost, reconnecting disabled") + } + conn, err := net.DialTimeout("unix", address[len(unixPrefix):], timeout) + if err == nil { + // Connection restablished. + haveConnected = true + lostConnection = false + } + return conn, err + })) + } else if o.reconnect != nil { + return nil, errors.New("OnConnectionLoss callback only supported for unix:// addresses") + } + + klog.Infof("Connecting to %s", address) + + // Connect in background. + var conn *grpc.ClientConn + var err error + ready := make(chan bool) + go func() { + conn, err = grpc.Dial(address, dialOptions...) + close(ready) + }() + + // Log error every connectionLoggingInterval + ticker := time.NewTicker(connectionLoggingInterval) + defer ticker.Stop() + + // Wait until Dial() succeeds. + for { + select { + case <-ticker.C: + klog.Warningf("Still connecting to %s", address) + + case <-ready: + return conn, err + } + } +} + +// LogGRPC is gPRC unary interceptor for logging of CSI messages at level 5. It removes any secrets from the message. +func LogGRPC(ctx context.Context, method string, req, reply interface{}, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error { + klog.V(5).Infof("GRPC call: %s", method) + klog.V(5).Infof("GRPC request: %s", protosanitizer.StripSecrets(req)) + err := invoker(ctx, method, req, reply, cc, opts...) + klog.V(5).Infof("GRPC response: %s", protosanitizer.StripSecrets(reply)) + klog.V(5).Infof("GRPC error: %v", err) + return err +} + +// GetDriverName returns name of CSI driver. +func GetDriverName(ctx context.Context, conn *grpc.ClientConn) (string, error) { + client := csi.NewIdentityClient(conn) + + req := csi.GetPluginInfoRequest{} + rsp, err := client.GetPluginInfo(ctx, &req) + if err != nil { + return "", err + } + name := rsp.GetName() + if name == "" { + return "", fmt.Errorf("driver name is empty") + } + return name, nil +} + +// PluginCapabilitySet is set of CSI plugin capabilities. Only supported capabilities are in the map. +type PluginCapabilitySet map[csi.PluginCapability_Service_Type]bool + +// GetPluginCapabilities returns set of supported capabilities of CSI driver. +func GetPluginCapabilities(ctx context.Context, conn *grpc.ClientConn) (PluginCapabilitySet, error) { + client := csi.NewIdentityClient(conn) + req := csi.GetPluginCapabilitiesRequest{} + rsp, err := client.GetPluginCapabilities(ctx, &req) + if err != nil { + return nil, err + } + caps := PluginCapabilitySet{} + for _, cap := range rsp.GetCapabilities() { + if cap == nil { + continue + } + srv := cap.GetService() + if srv == nil { + continue + } + t := srv.GetType() + caps[t] = true + } + return caps, nil +} + +// ControllerCapabilitySet is set of CSI controller capabilities. Only supported capabilities are in the map. +type ControllerCapabilitySet map[csi.ControllerServiceCapability_RPC_Type]bool + +// GetControllerCapabilities returns set of supported controller capabilities of CSI driver. +func GetControllerCapabilities(ctx context.Context, conn *grpc.ClientConn) (ControllerCapabilitySet, error) { + client := csi.NewControllerClient(conn) + req := csi.ControllerGetCapabilitiesRequest{} + rsp, err := client.ControllerGetCapabilities(ctx, &req) + if err != nil { + return nil, err + } + + caps := ControllerCapabilitySet{} + for _, cap := range rsp.GetCapabilities() { + if cap == nil { + continue + } + rpc := cap.GetRpc() + if rpc == nil { + continue + } + t := rpc.GetType() + caps[t] = true + } + return caps, nil +} + +// ProbeForever calls Probe() of a CSI driver and waits until the driver becomes ready. +// Any error other than timeout is returned. +func ProbeForever(conn *grpc.ClientConn, singleProbeTimeout time.Duration) error { + for { + klog.Info("Probing CSI driver for readiness") + ready, err := probeOnce(conn, singleProbeTimeout) + if err != nil { + st, ok := status.FromError(err) + if !ok { + // This is not gRPC error. The probe must have failed before gRPC + // method was called, otherwise we would get gRPC error. + return fmt.Errorf("CSI driver probe failed: %s", err) + } + if st.Code() != codes.DeadlineExceeded { + return fmt.Errorf("CSI driver probe failed: %s", err) + } + // Timeout -> driver is not ready. Fall through to sleep() below. + klog.Warning("CSI driver probe timed out") + } else { + if ready { + return nil + } + klog.Warning("CSI driver is not ready") + } + // Timeout was returned or driver is not ready. + time.Sleep(probeInterval) + } +} + +// probeOnce is a helper to simplify defer cancel() +func probeOnce(conn *grpc.ClientConn, timeout time.Duration) (bool, error) { + ctx, cancel := context.WithTimeout(context.Background(), timeout) + defer cancel() + return Probe(ctx, conn) +} + +// Probe calls driver Probe() just once and returns its result without any processing. +func Probe(ctx context.Context, conn *grpc.ClientConn) (ready bool, err error) { + client := csi.NewIdentityClient(conn) + + req := csi.ProbeRequest{} + rsp, err := client.Probe(ctx, &req) + + if err != nil { + return false, err + } + + r := rsp.GetReady() + if r == nil { + // "If not present, the caller SHALL assume that the plugin is in a ready state" + return true, nil + } + return r.GetValue(), nil +} diff --git a/vendor/github.com/kubernetes-csi/csi-lib-utils/rpc/common.go b/vendor/github.com/kubernetes-csi/csi-lib-utils/rpc/common.go new file mode 100644 index 000000000..bb4a5c448 --- /dev/null +++ b/vendor/github.com/kubernetes-csi/csi-lib-utils/rpc/common.go @@ -0,0 +1,160 @@ +/* +Copyright 2019 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package rpc + +import ( + "context" + "fmt" + "time" + + "google.golang.org/grpc" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" + + "github.com/container-storage-interface/spec/lib/go/csi" + + "k8s.io/klog" +) + +const ( + // Interval of trying to call Probe() until it succeeds + probeInterval = 1 * time.Second +) + +// GetDriverName returns name of CSI driver. +func GetDriverName(ctx context.Context, conn *grpc.ClientConn) (string, error) { + client := csi.NewIdentityClient(conn) + + req := csi.GetPluginInfoRequest{} + rsp, err := client.GetPluginInfo(ctx, &req) + if err != nil { + return "", err + } + name := rsp.GetName() + if name == "" { + return "", fmt.Errorf("driver name is empty") + } + return name, nil +} + +// PluginCapabilitySet is set of CSI plugin capabilities. Only supported capabilities are in the map. +type PluginCapabilitySet map[csi.PluginCapability_Service_Type]bool + +// GetPluginCapabilities returns set of supported capabilities of CSI driver. +func GetPluginCapabilities(ctx context.Context, conn *grpc.ClientConn) (PluginCapabilitySet, error) { + client := csi.NewIdentityClient(conn) + req := csi.GetPluginCapabilitiesRequest{} + rsp, err := client.GetPluginCapabilities(ctx, &req) + if err != nil { + return nil, err + } + caps := PluginCapabilitySet{} + for _, cap := range rsp.GetCapabilities() { + if cap == nil { + continue + } + srv := cap.GetService() + if srv == nil { + continue + } + t := srv.GetType() + caps[t] = true + } + return caps, nil +} + +// ControllerCapabilitySet is set of CSI controller capabilities. Only supported capabilities are in the map. +type ControllerCapabilitySet map[csi.ControllerServiceCapability_RPC_Type]bool + +// GetControllerCapabilities returns set of supported controller capabilities of CSI driver. +func GetControllerCapabilities(ctx context.Context, conn *grpc.ClientConn) (ControllerCapabilitySet, error) { + client := csi.NewControllerClient(conn) + req := csi.ControllerGetCapabilitiesRequest{} + rsp, err := client.ControllerGetCapabilities(ctx, &req) + if err != nil { + return nil, err + } + + caps := ControllerCapabilitySet{} + for _, cap := range rsp.GetCapabilities() { + if cap == nil { + continue + } + rpc := cap.GetRpc() + if rpc == nil { + continue + } + t := rpc.GetType() + caps[t] = true + } + return caps, nil +} + +// ProbeForever calls Probe() of a CSI driver and waits until the driver becomes ready. +// Any error other than timeout is returned. +func ProbeForever(conn *grpc.ClientConn, singleProbeTimeout time.Duration) error { + for { + klog.Info("Probing CSI driver for readiness") + ready, err := probeOnce(conn, singleProbeTimeout) + if err != nil { + st, ok := status.FromError(err) + if !ok { + // This is not gRPC error. The probe must have failed before gRPC + // method was called, otherwise we would get gRPC error. + return fmt.Errorf("CSI driver probe failed: %s", err) + } + if st.Code() != codes.DeadlineExceeded { + return fmt.Errorf("CSI driver probe failed: %s", err) + } + // Timeout -> driver is not ready. Fall through to sleep() below. + klog.Warning("CSI driver probe timed out") + } else { + if ready { + return nil + } + klog.Warning("CSI driver is not ready") + } + // Timeout was returned or driver is not ready. + time.Sleep(probeInterval) + } +} + +// probeOnce is a helper to simplify defer cancel() +func probeOnce(conn *grpc.ClientConn, timeout time.Duration) (bool, error) { + ctx, cancel := context.WithTimeout(context.Background(), timeout) + defer cancel() + return Probe(ctx, conn) +} + +// Probe calls driver Probe() just once and returns its result without any processing. +func Probe(ctx context.Context, conn *grpc.ClientConn) (ready bool, err error) { + client := csi.NewIdentityClient(conn) + + req := csi.ProbeRequest{} + rsp, err := client.Probe(ctx, &req) + + if err != nil { + return false, err + } + + r := rsp.GetReady() + if r == nil { + // "If not present, the caller SHALL assume that the plugin is in a ready state" + return true, nil + } + return r.GetValue(), nil +} diff --git a/vendor/github.com/prometheus/client_golang/prometheus/promhttp/delegator.go b/vendor/github.com/prometheus/client_golang/prometheus/promhttp/delegator.go new file mode 100644 index 000000000..fa535684f --- /dev/null +++ b/vendor/github.com/prometheus/client_golang/prometheus/promhttp/delegator.go @@ -0,0 +1,357 @@ +// Copyright 2017 The Prometheus Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package promhttp + +import ( + "bufio" + "io" + "net" + "net/http" +) + +const ( + closeNotifier = 1 << iota + flusher + hijacker + readerFrom + pusher +) + +type delegator interface { + http.ResponseWriter + + Status() int + Written() int64 +} + +type responseWriterDelegator struct { + http.ResponseWriter + + status int + written int64 + wroteHeader bool + observeWriteHeader func(int) +} + +func (r *responseWriterDelegator) Status() int { + return r.status +} + +func (r *responseWriterDelegator) Written() int64 { + return r.written +} + +func (r *responseWriterDelegator) WriteHeader(code int) { + r.status = code + r.wroteHeader = true + r.ResponseWriter.WriteHeader(code) + if r.observeWriteHeader != nil { + r.observeWriteHeader(code) + } +} + +func (r *responseWriterDelegator) Write(b []byte) (int, error) { + if !r.wroteHeader { + r.WriteHeader(http.StatusOK) + } + n, err := r.ResponseWriter.Write(b) + r.written += int64(n) + return n, err +} + +type closeNotifierDelegator struct{ *responseWriterDelegator } +type flusherDelegator struct{ *responseWriterDelegator } +type hijackerDelegator struct{ *responseWriterDelegator } +type readerFromDelegator struct{ *responseWriterDelegator } +type pusherDelegator struct{ *responseWriterDelegator } + +func (d closeNotifierDelegator) CloseNotify() <-chan bool { + //lint:ignore SA1019 http.CloseNotifier is deprecated but we don't want to + //remove support from client_golang yet. + return d.ResponseWriter.(http.CloseNotifier).CloseNotify() +} +func (d flusherDelegator) Flush() { + d.ResponseWriter.(http.Flusher).Flush() +} +func (d hijackerDelegator) Hijack() (net.Conn, *bufio.ReadWriter, error) { + return d.ResponseWriter.(http.Hijacker).Hijack() +} +func (d readerFromDelegator) ReadFrom(re io.Reader) (int64, error) { + if !d.wroteHeader { + d.WriteHeader(http.StatusOK) + } + n, err := d.ResponseWriter.(io.ReaderFrom).ReadFrom(re) + d.written += n + return n, err +} +func (d pusherDelegator) Push(target string, opts *http.PushOptions) error { + return d.ResponseWriter.(http.Pusher).Push(target, opts) +} + +var pickDelegator = make([]func(*responseWriterDelegator) delegator, 32) + +func init() { + // TODO(beorn7): Code generation would help here. + pickDelegator[0] = func(d *responseWriterDelegator) delegator { // 0 + return d + } + pickDelegator[closeNotifier] = func(d *responseWriterDelegator) delegator { // 1 + return closeNotifierDelegator{d} + } + pickDelegator[flusher] = func(d *responseWriterDelegator) delegator { // 2 + return flusherDelegator{d} + } + pickDelegator[flusher+closeNotifier] = func(d *responseWriterDelegator) delegator { // 3 + return struct { + *responseWriterDelegator + http.Flusher + http.CloseNotifier + }{d, flusherDelegator{d}, closeNotifierDelegator{d}} + } + pickDelegator[hijacker] = func(d *responseWriterDelegator) delegator { // 4 + return hijackerDelegator{d} + } + pickDelegator[hijacker+closeNotifier] = func(d *responseWriterDelegator) delegator { // 5 + return struct { + *responseWriterDelegator + http.Hijacker + http.CloseNotifier + }{d, hijackerDelegator{d}, closeNotifierDelegator{d}} + } + pickDelegator[hijacker+flusher] = func(d *responseWriterDelegator) delegator { // 6 + return struct { + *responseWriterDelegator + http.Hijacker + http.Flusher + }{d, hijackerDelegator{d}, flusherDelegator{d}} + } + pickDelegator[hijacker+flusher+closeNotifier] = func(d *responseWriterDelegator) delegator { // 7 + return struct { + *responseWriterDelegator + http.Hijacker + http.Flusher + http.CloseNotifier + }{d, hijackerDelegator{d}, flusherDelegator{d}, closeNotifierDelegator{d}} + } + pickDelegator[readerFrom] = func(d *responseWriterDelegator) delegator { // 8 + return readerFromDelegator{d} + } + pickDelegator[readerFrom+closeNotifier] = func(d *responseWriterDelegator) delegator { // 9 + return struct { + *responseWriterDelegator + io.ReaderFrom + http.CloseNotifier + }{d, readerFromDelegator{d}, closeNotifierDelegator{d}} + } + pickDelegator[readerFrom+flusher] = func(d *responseWriterDelegator) delegator { // 10 + return struct { + *responseWriterDelegator + io.ReaderFrom + http.Flusher + }{d, readerFromDelegator{d}, flusherDelegator{d}} + } + pickDelegator[readerFrom+flusher+closeNotifier] = func(d *responseWriterDelegator) delegator { // 11 + return struct { + *responseWriterDelegator + io.ReaderFrom + http.Flusher + http.CloseNotifier + }{d, readerFromDelegator{d}, flusherDelegator{d}, closeNotifierDelegator{d}} + } + pickDelegator[readerFrom+hijacker] = func(d *responseWriterDelegator) delegator { // 12 + return struct { + *responseWriterDelegator + io.ReaderFrom + http.Hijacker + }{d, readerFromDelegator{d}, hijackerDelegator{d}} + } + pickDelegator[readerFrom+hijacker+closeNotifier] = func(d *responseWriterDelegator) delegator { // 13 + return struct { + *responseWriterDelegator + io.ReaderFrom + http.Hijacker + http.CloseNotifier + }{d, readerFromDelegator{d}, hijackerDelegator{d}, closeNotifierDelegator{d}} + } + pickDelegator[readerFrom+hijacker+flusher] = func(d *responseWriterDelegator) delegator { // 14 + return struct { + *responseWriterDelegator + io.ReaderFrom + http.Hijacker + http.Flusher + }{d, readerFromDelegator{d}, hijackerDelegator{d}, flusherDelegator{d}} + } + pickDelegator[readerFrom+hijacker+flusher+closeNotifier] = func(d *responseWriterDelegator) delegator { // 15 + return struct { + *responseWriterDelegator + io.ReaderFrom + http.Hijacker + http.Flusher + http.CloseNotifier + }{d, readerFromDelegator{d}, hijackerDelegator{d}, flusherDelegator{d}, closeNotifierDelegator{d}} + } + pickDelegator[pusher] = func(d *responseWriterDelegator) delegator { // 16 + return pusherDelegator{d} + } + pickDelegator[pusher+closeNotifier] = func(d *responseWriterDelegator) delegator { // 17 + return struct { + *responseWriterDelegator + http.Pusher + http.CloseNotifier + }{d, pusherDelegator{d}, closeNotifierDelegator{d}} + } + pickDelegator[pusher+flusher] = func(d *responseWriterDelegator) delegator { // 18 + return struct { + *responseWriterDelegator + http.Pusher + http.Flusher + }{d, pusherDelegator{d}, flusherDelegator{d}} + } + pickDelegator[pusher+flusher+closeNotifier] = func(d *responseWriterDelegator) delegator { // 19 + return struct { + *responseWriterDelegator + http.Pusher + http.Flusher + http.CloseNotifier + }{d, pusherDelegator{d}, flusherDelegator{d}, closeNotifierDelegator{d}} + } + pickDelegator[pusher+hijacker] = func(d *responseWriterDelegator) delegator { // 20 + return struct { + *responseWriterDelegator + http.Pusher + http.Hijacker + }{d, pusherDelegator{d}, hijackerDelegator{d}} + } + pickDelegator[pusher+hijacker+closeNotifier] = func(d *responseWriterDelegator) delegator { // 21 + return struct { + *responseWriterDelegator + http.Pusher + http.Hijacker + http.CloseNotifier + }{d, pusherDelegator{d}, hijackerDelegator{d}, closeNotifierDelegator{d}} + } + pickDelegator[pusher+hijacker+flusher] = func(d *responseWriterDelegator) delegator { // 22 + return struct { + *responseWriterDelegator + http.Pusher + http.Hijacker + http.Flusher + }{d, pusherDelegator{d}, hijackerDelegator{d}, flusherDelegator{d}} + } + pickDelegator[pusher+hijacker+flusher+closeNotifier] = func(d *responseWriterDelegator) delegator { //23 + return struct { + *responseWriterDelegator + http.Pusher + http.Hijacker + http.Flusher + http.CloseNotifier + }{d, pusherDelegator{d}, hijackerDelegator{d}, flusherDelegator{d}, closeNotifierDelegator{d}} + } + pickDelegator[pusher+readerFrom] = func(d *responseWriterDelegator) delegator { // 24 + return struct { + *responseWriterDelegator + http.Pusher + io.ReaderFrom + }{d, pusherDelegator{d}, readerFromDelegator{d}} + } + pickDelegator[pusher+readerFrom+closeNotifier] = func(d *responseWriterDelegator) delegator { // 25 + return struct { + *responseWriterDelegator + http.Pusher + io.ReaderFrom + http.CloseNotifier + }{d, pusherDelegator{d}, readerFromDelegator{d}, closeNotifierDelegator{d}} + } + pickDelegator[pusher+readerFrom+flusher] = func(d *responseWriterDelegator) delegator { // 26 + return struct { + *responseWriterDelegator + http.Pusher + io.ReaderFrom + http.Flusher + }{d, pusherDelegator{d}, readerFromDelegator{d}, flusherDelegator{d}} + } + pickDelegator[pusher+readerFrom+flusher+closeNotifier] = func(d *responseWriterDelegator) delegator { // 27 + return struct { + *responseWriterDelegator + http.Pusher + io.ReaderFrom + http.Flusher + http.CloseNotifier + }{d, pusherDelegator{d}, readerFromDelegator{d}, flusherDelegator{d}, closeNotifierDelegator{d}} + } + pickDelegator[pusher+readerFrom+hijacker] = func(d *responseWriterDelegator) delegator { // 28 + return struct { + *responseWriterDelegator + http.Pusher + io.ReaderFrom + http.Hijacker + }{d, pusherDelegator{d}, readerFromDelegator{d}, hijackerDelegator{d}} + } + pickDelegator[pusher+readerFrom+hijacker+closeNotifier] = func(d *responseWriterDelegator) delegator { // 29 + return struct { + *responseWriterDelegator + http.Pusher + io.ReaderFrom + http.Hijacker + http.CloseNotifier + }{d, pusherDelegator{d}, readerFromDelegator{d}, hijackerDelegator{d}, closeNotifierDelegator{d}} + } + pickDelegator[pusher+readerFrom+hijacker+flusher] = func(d *responseWriterDelegator) delegator { // 30 + return struct { + *responseWriterDelegator + http.Pusher + io.ReaderFrom + http.Hijacker + http.Flusher + }{d, pusherDelegator{d}, readerFromDelegator{d}, hijackerDelegator{d}, flusherDelegator{d}} + } + pickDelegator[pusher+readerFrom+hijacker+flusher+closeNotifier] = func(d *responseWriterDelegator) delegator { // 31 + return struct { + *responseWriterDelegator + http.Pusher + io.ReaderFrom + http.Hijacker + http.Flusher + http.CloseNotifier + }{d, pusherDelegator{d}, readerFromDelegator{d}, hijackerDelegator{d}, flusherDelegator{d}, closeNotifierDelegator{d}} + } +} + +func newDelegator(w http.ResponseWriter, observeWriteHeaderFunc func(int)) delegator { + d := &responseWriterDelegator{ + ResponseWriter: w, + observeWriteHeader: observeWriteHeaderFunc, + } + + id := 0 + //lint:ignore SA1019 http.CloseNotifier is deprecated but we don't want to + //remove support from client_golang yet. + if _, ok := w.(http.CloseNotifier); ok { + id += closeNotifier + } + if _, ok := w.(http.Flusher); ok { + id += flusher + } + if _, ok := w.(http.Hijacker); ok { + id += hijacker + } + if _, ok := w.(io.ReaderFrom); ok { + id += readerFrom + } + if _, ok := w.(http.Pusher); ok { + id += pusher + } + + return pickDelegator[id](d) +} diff --git a/vendor/github.com/prometheus/client_golang/prometheus/promhttp/http.go b/vendor/github.com/prometheus/client_golang/prometheus/promhttp/http.go new file mode 100644 index 000000000..cea5a90fd --- /dev/null +++ b/vendor/github.com/prometheus/client_golang/prometheus/promhttp/http.go @@ -0,0 +1,349 @@ +// Copyright 2016 The Prometheus Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// Package promhttp provides tooling around HTTP servers and clients. +// +// First, the package allows the creation of http.Handler instances to expose +// Prometheus metrics via HTTP. promhttp.Handler acts on the +// prometheus.DefaultGatherer. With HandlerFor, you can create a handler for a +// custom registry or anything that implements the Gatherer interface. It also +// allows the creation of handlers that act differently on errors or allow to +// log errors. +// +// Second, the package provides tooling to instrument instances of http.Handler +// via middleware. Middleware wrappers follow the naming scheme +// InstrumentHandlerX, where X describes the intended use of the middleware. +// See each function's doc comment for specific details. +// +// Finally, the package allows for an http.RoundTripper to be instrumented via +// middleware. Middleware wrappers follow the naming scheme +// InstrumentRoundTripperX, where X describes the intended use of the +// middleware. See each function's doc comment for specific details. +package promhttp + +import ( + "compress/gzip" + "fmt" + "io" + "net/http" + "strings" + "sync" + "time" + + "github.com/prometheus/common/expfmt" + + "github.com/prometheus/client_golang/prometheus" +) + +const ( + contentTypeHeader = "Content-Type" + contentEncodingHeader = "Content-Encoding" + acceptEncodingHeader = "Accept-Encoding" +) + +var gzipPool = sync.Pool{ + New: func() interface{} { + return gzip.NewWriter(nil) + }, +} + +// Handler returns an http.Handler for the prometheus.DefaultGatherer, using +// default HandlerOpts, i.e. it reports the first error as an HTTP error, it has +// no error logging, and it applies compression if requested by the client. +// +// The returned http.Handler is already instrumented using the +// InstrumentMetricHandler function and the prometheus.DefaultRegisterer. If you +// create multiple http.Handlers by separate calls of the Handler function, the +// metrics used for instrumentation will be shared between them, providing +// global scrape counts. +// +// This function is meant to cover the bulk of basic use cases. If you are doing +// anything that requires more customization (including using a non-default +// Gatherer, different instrumentation, and non-default HandlerOpts), use the +// HandlerFor function. See there for details. +func Handler() http.Handler { + return InstrumentMetricHandler( + prometheus.DefaultRegisterer, HandlerFor(prometheus.DefaultGatherer, HandlerOpts{}), + ) +} + +// HandlerFor returns an uninstrumented http.Handler for the provided +// Gatherer. The behavior of the Handler is defined by the provided +// HandlerOpts. Thus, HandlerFor is useful to create http.Handlers for custom +// Gatherers, with non-default HandlerOpts, and/or with custom (or no) +// instrumentation. Use the InstrumentMetricHandler function to apply the same +// kind of instrumentation as it is used by the Handler function. +func HandlerFor(reg prometheus.Gatherer, opts HandlerOpts) http.Handler { + var ( + inFlightSem chan struct{} + errCnt = prometheus.NewCounterVec( + prometheus.CounterOpts{ + Name: "promhttp_metric_handler_errors_total", + Help: "Total number of internal errors encountered by the promhttp metric handler.", + }, + []string{"cause"}, + ) + ) + + if opts.MaxRequestsInFlight > 0 { + inFlightSem = make(chan struct{}, opts.MaxRequestsInFlight) + } + if opts.Registry != nil { + // Initialize all possibilites that can occur below. + errCnt.WithLabelValues("gathering") + errCnt.WithLabelValues("encoding") + if err := opts.Registry.Register(errCnt); err != nil { + if are, ok := err.(prometheus.AlreadyRegisteredError); ok { + errCnt = are.ExistingCollector.(*prometheus.CounterVec) + } else { + panic(err) + } + } + } + + h := http.HandlerFunc(func(rsp http.ResponseWriter, req *http.Request) { + if inFlightSem != nil { + select { + case inFlightSem <- struct{}{}: // All good, carry on. + defer func() { <-inFlightSem }() + default: + http.Error(rsp, fmt.Sprintf( + "Limit of concurrent requests reached (%d), try again later.", opts.MaxRequestsInFlight, + ), http.StatusServiceUnavailable) + return + } + } + mfs, err := reg.Gather() + if err != nil { + if opts.ErrorLog != nil { + opts.ErrorLog.Println("error gathering metrics:", err) + } + errCnt.WithLabelValues("gathering").Inc() + switch opts.ErrorHandling { + case PanicOnError: + panic(err) + case ContinueOnError: + if len(mfs) == 0 { + // Still report the error if no metrics have been gathered. + httpError(rsp, err) + return + } + case HTTPErrorOnError: + httpError(rsp, err) + return + } + } + + contentType := expfmt.Negotiate(req.Header) + header := rsp.Header() + header.Set(contentTypeHeader, string(contentType)) + + w := io.Writer(rsp) + if !opts.DisableCompression && gzipAccepted(req.Header) { + header.Set(contentEncodingHeader, "gzip") + gz := gzipPool.Get().(*gzip.Writer) + defer gzipPool.Put(gz) + + gz.Reset(w) + defer gz.Close() + + w = gz + } + + enc := expfmt.NewEncoder(w, contentType) + + var lastErr error + for _, mf := range mfs { + if err := enc.Encode(mf); err != nil { + lastErr = err + if opts.ErrorLog != nil { + opts.ErrorLog.Println("error encoding and sending metric family:", err) + } + errCnt.WithLabelValues("encoding").Inc() + switch opts.ErrorHandling { + case PanicOnError: + panic(err) + case ContinueOnError: + // Handled later. + case HTTPErrorOnError: + httpError(rsp, err) + return + } + } + } + + if lastErr != nil { + httpError(rsp, lastErr) + } + }) + + if opts.Timeout <= 0 { + return h + } + return http.TimeoutHandler(h, opts.Timeout, fmt.Sprintf( + "Exceeded configured timeout of %v.\n", + opts.Timeout, + )) +} + +// InstrumentMetricHandler is usually used with an http.Handler returned by the +// HandlerFor function. It instruments the provided http.Handler with two +// metrics: A counter vector "promhttp_metric_handler_requests_total" to count +// scrapes partitioned by HTTP status code, and a gauge +// "promhttp_metric_handler_requests_in_flight" to track the number of +// simultaneous scrapes. This function idempotently registers collectors for +// both metrics with the provided Registerer. It panics if the registration +// fails. The provided metrics are useful to see how many scrapes hit the +// monitored target (which could be from different Prometheus servers or other +// scrapers), and how often they overlap (which would result in more than one +// scrape in flight at the same time). Note that the scrapes-in-flight gauge +// will contain the scrape by which it is exposed, while the scrape counter will +// only get incremented after the scrape is complete (as only then the status +// code is known). For tracking scrape durations, use the +// "scrape_duration_seconds" gauge created by the Prometheus server upon each +// scrape. +func InstrumentMetricHandler(reg prometheus.Registerer, handler http.Handler) http.Handler { + cnt := prometheus.NewCounterVec( + prometheus.CounterOpts{ + Name: "promhttp_metric_handler_requests_total", + Help: "Total number of scrapes by HTTP status code.", + }, + []string{"code"}, + ) + // Initialize the most likely HTTP status codes. + cnt.WithLabelValues("200") + cnt.WithLabelValues("500") + cnt.WithLabelValues("503") + if err := reg.Register(cnt); err != nil { + if are, ok := err.(prometheus.AlreadyRegisteredError); ok { + cnt = are.ExistingCollector.(*prometheus.CounterVec) + } else { + panic(err) + } + } + + gge := prometheus.NewGauge(prometheus.GaugeOpts{ + Name: "promhttp_metric_handler_requests_in_flight", + Help: "Current number of scrapes being served.", + }) + if err := reg.Register(gge); err != nil { + if are, ok := err.(prometheus.AlreadyRegisteredError); ok { + gge = are.ExistingCollector.(prometheus.Gauge) + } else { + panic(err) + } + } + + return InstrumentHandlerCounter(cnt, InstrumentHandlerInFlight(gge, handler)) +} + +// HandlerErrorHandling defines how a Handler serving metrics will handle +// errors. +type HandlerErrorHandling int + +// These constants cause handlers serving metrics to behave as described if +// errors are encountered. +const ( + // Serve an HTTP status code 500 upon the first error + // encountered. Report the error message in the body. + HTTPErrorOnError HandlerErrorHandling = iota + // Ignore errors and try to serve as many metrics as possible. However, + // if no metrics can be served, serve an HTTP status code 500 and the + // last error message in the body. Only use this in deliberate "best + // effort" metrics collection scenarios. In this case, it is highly + // recommended to provide other means of detecting errors: By setting an + // ErrorLog in HandlerOpts, the errors are logged. By providing a + // Registry in HandlerOpts, the exposed metrics include an error counter + // "promhttp_metric_handler_errors_total", which can be used for + // alerts. + ContinueOnError + // Panic upon the first error encountered (useful for "crash only" apps). + PanicOnError +) + +// Logger is the minimal interface HandlerOpts needs for logging. Note that +// log.Logger from the standard library implements this interface, and it is +// easy to implement by custom loggers, if they don't do so already anyway. +type Logger interface { + Println(v ...interface{}) +} + +// HandlerOpts specifies options how to serve metrics via an http.Handler. The +// zero value of HandlerOpts is a reasonable default. +type HandlerOpts struct { + // ErrorLog specifies an optional logger for errors collecting and + // serving metrics. If nil, errors are not logged at all. + ErrorLog Logger + // ErrorHandling defines how errors are handled. Note that errors are + // logged regardless of the configured ErrorHandling provided ErrorLog + // is not nil. + ErrorHandling HandlerErrorHandling + // If Registry is not nil, it is used to register a metric + // "promhttp_metric_handler_errors_total", partitioned by "cause". A + // failed registration causes a panic. Note that this error counter is + // different from the instrumentation you get from the various + // InstrumentHandler... helpers. It counts errors that don't necessarily + // result in a non-2xx HTTP status code. There are two typical cases: + // (1) Encoding errors that only happen after streaming of the HTTP body + // has already started (and the status code 200 has been sent). This + // should only happen with custom collectors. (2) Collection errors with + // no effect on the HTTP status code because ErrorHandling is set to + // ContinueOnError. + Registry prometheus.Registerer + // If DisableCompression is true, the handler will never compress the + // response, even if requested by the client. + DisableCompression bool + // The number of concurrent HTTP requests is limited to + // MaxRequestsInFlight. Additional requests are responded to with 503 + // Service Unavailable and a suitable message in the body. If + // MaxRequestsInFlight is 0 or negative, no limit is applied. + MaxRequestsInFlight int + // If handling a request takes longer than Timeout, it is responded to + // with 503 ServiceUnavailable and a suitable Message. No timeout is + // applied if Timeout is 0 or negative. Note that with the current + // implementation, reaching the timeout simply ends the HTTP requests as + // described above (and even that only if sending of the body hasn't + // started yet), while the bulk work of gathering all the metrics keeps + // running in the background (with the eventual result to be thrown + // away). Until the implementation is improved, it is recommended to + // implement a separate timeout in potentially slow Collectors. + Timeout time.Duration +} + +// gzipAccepted returns whether the client will accept gzip-encoded content. +func gzipAccepted(header http.Header) bool { + a := header.Get(acceptEncodingHeader) + parts := strings.Split(a, ",") + for _, part := range parts { + part = strings.TrimSpace(part) + if part == "gzip" || strings.HasPrefix(part, "gzip;") { + return true + } + } + return false +} + +// httpError removes any content-encoding header and then calls http.Error with +// the provided error and http.StatusInternalServerErrer. Error contents is +// supposed to be uncompressed plain text. However, same as with a plain +// http.Error, any header settings will be void if the header has already been +// sent. The error message will still be written to the writer, but it will +// probably be of limited use. +func httpError(rsp http.ResponseWriter, err error) { + rsp.Header().Del(contentEncodingHeader) + http.Error( + rsp, + "An error has occurred while serving metrics:\n\n"+err.Error(), + http.StatusInternalServerError, + ) +} diff --git a/vendor/github.com/prometheus/client_golang/prometheus/promhttp/instrument_client.go b/vendor/github.com/prometheus/client_golang/prometheus/promhttp/instrument_client.go new file mode 100644 index 000000000..83c49b66a --- /dev/null +++ b/vendor/github.com/prometheus/client_golang/prometheus/promhttp/instrument_client.go @@ -0,0 +1,219 @@ +// Copyright 2017 The Prometheus Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package promhttp + +import ( + "crypto/tls" + "net/http" + "net/http/httptrace" + "time" + + "github.com/prometheus/client_golang/prometheus" +) + +// The RoundTripperFunc type is an adapter to allow the use of ordinary +// functions as RoundTrippers. If f is a function with the appropriate +// signature, RountTripperFunc(f) is a RoundTripper that calls f. +type RoundTripperFunc func(req *http.Request) (*http.Response, error) + +// RoundTrip implements the RoundTripper interface. +func (rt RoundTripperFunc) RoundTrip(r *http.Request) (*http.Response, error) { + return rt(r) +} + +// InstrumentRoundTripperInFlight is a middleware that wraps the provided +// http.RoundTripper. It sets the provided prometheus.Gauge to the number of +// requests currently handled by the wrapped http.RoundTripper. +// +// See the example for ExampleInstrumentRoundTripperDuration for example usage. +func InstrumentRoundTripperInFlight(gauge prometheus.Gauge, next http.RoundTripper) RoundTripperFunc { + return RoundTripperFunc(func(r *http.Request) (*http.Response, error) { + gauge.Inc() + defer gauge.Dec() + return next.RoundTrip(r) + }) +} + +// InstrumentRoundTripperCounter is a middleware that wraps the provided +// http.RoundTripper to observe the request result with the provided CounterVec. +// The CounterVec must have zero, one, or two non-const non-curried labels. For +// those, the only allowed label names are "code" and "method". The function +// panics otherwise. Partitioning of the CounterVec happens by HTTP status code +// and/or HTTP method if the respective instance label names are present in the +// CounterVec. For unpartitioned counting, use a CounterVec with zero labels. +// +// If the wrapped RoundTripper panics or returns a non-nil error, the Counter +// is not incremented. +// +// See the example for ExampleInstrumentRoundTripperDuration for example usage. +func InstrumentRoundTripperCounter(counter *prometheus.CounterVec, next http.RoundTripper) RoundTripperFunc { + code, method := checkLabels(counter) + + return RoundTripperFunc(func(r *http.Request) (*http.Response, error) { + resp, err := next.RoundTrip(r) + if err == nil { + counter.With(labels(code, method, r.Method, resp.StatusCode)).Inc() + } + return resp, err + }) +} + +// InstrumentRoundTripperDuration is a middleware that wraps the provided +// http.RoundTripper to observe the request duration with the provided +// ObserverVec. The ObserverVec must have zero, one, or two non-const +// non-curried labels. For those, the only allowed label names are "code" and +// "method". The function panics otherwise. The Observe method of the Observer +// in the ObserverVec is called with the request duration in +// seconds. Partitioning happens by HTTP status code and/or HTTP method if the +// respective instance label names are present in the ObserverVec. For +// unpartitioned observations, use an ObserverVec with zero labels. Note that +// partitioning of Histograms is expensive and should be used judiciously. +// +// If the wrapped RoundTripper panics or returns a non-nil error, no values are +// reported. +// +// Note that this method is only guaranteed to never observe negative durations +// if used with Go1.9+. +func InstrumentRoundTripperDuration(obs prometheus.ObserverVec, next http.RoundTripper) RoundTripperFunc { + code, method := checkLabels(obs) + + return RoundTripperFunc(func(r *http.Request) (*http.Response, error) { + start := time.Now() + resp, err := next.RoundTrip(r) + if err == nil { + obs.With(labels(code, method, r.Method, resp.StatusCode)).Observe(time.Since(start).Seconds()) + } + return resp, err + }) +} + +// InstrumentTrace is used to offer flexibility in instrumenting the available +// httptrace.ClientTrace hook functions. Each function is passed a float64 +// representing the time in seconds since the start of the http request. A user +// may choose to use separately buckets Histograms, or implement custom +// instance labels on a per function basis. +type InstrumentTrace struct { + GotConn func(float64) + PutIdleConn func(float64) + GotFirstResponseByte func(float64) + Got100Continue func(float64) + DNSStart func(float64) + DNSDone func(float64) + ConnectStart func(float64) + ConnectDone func(float64) + TLSHandshakeStart func(float64) + TLSHandshakeDone func(float64) + WroteHeaders func(float64) + Wait100Continue func(float64) + WroteRequest func(float64) +} + +// InstrumentRoundTripperTrace is a middleware that wraps the provided +// RoundTripper and reports times to hook functions provided in the +// InstrumentTrace struct. Hook functions that are not present in the provided +// InstrumentTrace struct are ignored. Times reported to the hook functions are +// time since the start of the request. Only with Go1.9+, those times are +// guaranteed to never be negative. (Earlier Go versions are not using a +// monotonic clock.) Note that partitioning of Histograms is expensive and +// should be used judiciously. +// +// For hook functions that receive an error as an argument, no observations are +// made in the event of a non-nil error value. +// +// See the example for ExampleInstrumentRoundTripperDuration for example usage. +func InstrumentRoundTripperTrace(it *InstrumentTrace, next http.RoundTripper) RoundTripperFunc { + return RoundTripperFunc(func(r *http.Request) (*http.Response, error) { + start := time.Now() + + trace := &httptrace.ClientTrace{ + GotConn: func(_ httptrace.GotConnInfo) { + if it.GotConn != nil { + it.GotConn(time.Since(start).Seconds()) + } + }, + PutIdleConn: func(err error) { + if err != nil { + return + } + if it.PutIdleConn != nil { + it.PutIdleConn(time.Since(start).Seconds()) + } + }, + DNSStart: func(_ httptrace.DNSStartInfo) { + if it.DNSStart != nil { + it.DNSStart(time.Since(start).Seconds()) + } + }, + DNSDone: func(_ httptrace.DNSDoneInfo) { + if it.DNSDone != nil { + it.DNSDone(time.Since(start).Seconds()) + } + }, + ConnectStart: func(_, _ string) { + if it.ConnectStart != nil { + it.ConnectStart(time.Since(start).Seconds()) + } + }, + ConnectDone: func(_, _ string, err error) { + if err != nil { + return + } + if it.ConnectDone != nil { + it.ConnectDone(time.Since(start).Seconds()) + } + }, + GotFirstResponseByte: func() { + if it.GotFirstResponseByte != nil { + it.GotFirstResponseByte(time.Since(start).Seconds()) + } + }, + Got100Continue: func() { + if it.Got100Continue != nil { + it.Got100Continue(time.Since(start).Seconds()) + } + }, + TLSHandshakeStart: func() { + if it.TLSHandshakeStart != nil { + it.TLSHandshakeStart(time.Since(start).Seconds()) + } + }, + TLSHandshakeDone: func(_ tls.ConnectionState, err error) { + if err != nil { + return + } + if it.TLSHandshakeDone != nil { + it.TLSHandshakeDone(time.Since(start).Seconds()) + } + }, + WroteHeaders: func() { + if it.WroteHeaders != nil { + it.WroteHeaders(time.Since(start).Seconds()) + } + }, + Wait100Continue: func() { + if it.Wait100Continue != nil { + it.Wait100Continue(time.Since(start).Seconds()) + } + }, + WroteRequest: func(_ httptrace.WroteRequestInfo) { + if it.WroteRequest != nil { + it.WroteRequest(time.Since(start).Seconds()) + } + }, + } + r = r.WithContext(httptrace.WithClientTrace(r.Context(), trace)) + + return next.RoundTrip(r) + }) +} diff --git a/vendor/github.com/prometheus/client_golang/prometheus/promhttp/instrument_server.go b/vendor/github.com/prometheus/client_golang/prometheus/promhttp/instrument_server.go new file mode 100644 index 000000000..9db243805 --- /dev/null +++ b/vendor/github.com/prometheus/client_golang/prometheus/promhttp/instrument_server.go @@ -0,0 +1,447 @@ +// Copyright 2017 The Prometheus Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package promhttp + +import ( + "errors" + "net/http" + "strconv" + "strings" + "time" + + dto "github.com/prometheus/client_model/go" + + "github.com/prometheus/client_golang/prometheus" +) + +// magicString is used for the hacky label test in checkLabels. Remove once fixed. +const magicString = "zZgWfBxLqvG8kc8IMv3POi2Bb0tZI3vAnBx+gBaFi9FyPzB/CzKUer1yufDa" + +// InstrumentHandlerInFlight is a middleware that wraps the provided +// http.Handler. It sets the provided prometheus.Gauge to the number of +// requests currently handled by the wrapped http.Handler. +// +// See the example for InstrumentHandlerDuration for example usage. +func InstrumentHandlerInFlight(g prometheus.Gauge, next http.Handler) http.Handler { + return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + g.Inc() + defer g.Dec() + next.ServeHTTP(w, r) + }) +} + +// InstrumentHandlerDuration is a middleware that wraps the provided +// http.Handler to observe the request duration with the provided ObserverVec. +// The ObserverVec must have zero, one, or two non-const non-curried labels. For +// those, the only allowed label names are "code" and "method". The function +// panics otherwise. The Observe method of the Observer in the ObserverVec is +// called with the request duration in seconds. Partitioning happens by HTTP +// status code and/or HTTP method if the respective instance label names are +// present in the ObserverVec. For unpartitioned observations, use an +// ObserverVec with zero labels. Note that partitioning of Histograms is +// expensive and should be used judiciously. +// +// If the wrapped Handler does not set a status code, a status code of 200 is assumed. +// +// If the wrapped Handler panics, no values are reported. +// +// Note that this method is only guaranteed to never observe negative durations +// if used with Go1.9+. +func InstrumentHandlerDuration(obs prometheus.ObserverVec, next http.Handler) http.HandlerFunc { + code, method := checkLabels(obs) + + if code { + return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + now := time.Now() + d := newDelegator(w, nil) + next.ServeHTTP(d, r) + + obs.With(labels(code, method, r.Method, d.Status())).Observe(time.Since(now).Seconds()) + }) + } + + return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + now := time.Now() + next.ServeHTTP(w, r) + obs.With(labels(code, method, r.Method, 0)).Observe(time.Since(now).Seconds()) + }) +} + +// InstrumentHandlerCounter is a middleware that wraps the provided http.Handler +// to observe the request result with the provided CounterVec. The CounterVec +// must have zero, one, or two non-const non-curried labels. For those, the only +// allowed label names are "code" and "method". The function panics +// otherwise. Partitioning of the CounterVec happens by HTTP status code and/or +// HTTP method if the respective instance label names are present in the +// CounterVec. For unpartitioned counting, use a CounterVec with zero labels. +// +// If the wrapped Handler does not set a status code, a status code of 200 is assumed. +// +// If the wrapped Handler panics, the Counter is not incremented. +// +// See the example for InstrumentHandlerDuration for example usage. +func InstrumentHandlerCounter(counter *prometheus.CounterVec, next http.Handler) http.HandlerFunc { + code, method := checkLabels(counter) + + if code { + return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + d := newDelegator(w, nil) + next.ServeHTTP(d, r) + counter.With(labels(code, method, r.Method, d.Status())).Inc() + }) + } + + return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + next.ServeHTTP(w, r) + counter.With(labels(code, method, r.Method, 0)).Inc() + }) +} + +// InstrumentHandlerTimeToWriteHeader is a middleware that wraps the provided +// http.Handler to observe with the provided ObserverVec the request duration +// until the response headers are written. The ObserverVec must have zero, one, +// or two non-const non-curried labels. For those, the only allowed label names +// are "code" and "method". The function panics otherwise. The Observe method of +// the Observer in the ObserverVec is called with the request duration in +// seconds. Partitioning happens by HTTP status code and/or HTTP method if the +// respective instance label names are present in the ObserverVec. For +// unpartitioned observations, use an ObserverVec with zero labels. Note that +// partitioning of Histograms is expensive and should be used judiciously. +// +// If the wrapped Handler panics before calling WriteHeader, no value is +// reported. +// +// Note that this method is only guaranteed to never observe negative durations +// if used with Go1.9+. +// +// See the example for InstrumentHandlerDuration for example usage. +func InstrumentHandlerTimeToWriteHeader(obs prometheus.ObserverVec, next http.Handler) http.HandlerFunc { + code, method := checkLabels(obs) + + return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + now := time.Now() + d := newDelegator(w, func(status int) { + obs.With(labels(code, method, r.Method, status)).Observe(time.Since(now).Seconds()) + }) + next.ServeHTTP(d, r) + }) +} + +// InstrumentHandlerRequestSize is a middleware that wraps the provided +// http.Handler to observe the request size with the provided ObserverVec. The +// ObserverVec must have zero, one, or two non-const non-curried labels. For +// those, the only allowed label names are "code" and "method". The function +// panics otherwise. The Observe method of the Observer in the ObserverVec is +// called with the request size in bytes. Partitioning happens by HTTP status +// code and/or HTTP method if the respective instance label names are present in +// the ObserverVec. For unpartitioned observations, use an ObserverVec with zero +// labels. Note that partitioning of Histograms is expensive and should be used +// judiciously. +// +// If the wrapped Handler does not set a status code, a status code of 200 is assumed. +// +// If the wrapped Handler panics, no values are reported. +// +// See the example for InstrumentHandlerDuration for example usage. +func InstrumentHandlerRequestSize(obs prometheus.ObserverVec, next http.Handler) http.HandlerFunc { + code, method := checkLabels(obs) + + if code { + return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + d := newDelegator(w, nil) + next.ServeHTTP(d, r) + size := computeApproximateRequestSize(r) + obs.With(labels(code, method, r.Method, d.Status())).Observe(float64(size)) + }) + } + + return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + next.ServeHTTP(w, r) + size := computeApproximateRequestSize(r) + obs.With(labels(code, method, r.Method, 0)).Observe(float64(size)) + }) +} + +// InstrumentHandlerResponseSize is a middleware that wraps the provided +// http.Handler to observe the response size with the provided ObserverVec. The +// ObserverVec must have zero, one, or two non-const non-curried labels. For +// those, the only allowed label names are "code" and "method". The function +// panics otherwise. The Observe method of the Observer in the ObserverVec is +// called with the response size in bytes. Partitioning happens by HTTP status +// code and/or HTTP method if the respective instance label names are present in +// the ObserverVec. For unpartitioned observations, use an ObserverVec with zero +// labels. Note that partitioning of Histograms is expensive and should be used +// judiciously. +// +// If the wrapped Handler does not set a status code, a status code of 200 is assumed. +// +// If the wrapped Handler panics, no values are reported. +// +// See the example for InstrumentHandlerDuration for example usage. +func InstrumentHandlerResponseSize(obs prometheus.ObserverVec, next http.Handler) http.Handler { + code, method := checkLabels(obs) + return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + d := newDelegator(w, nil) + next.ServeHTTP(d, r) + obs.With(labels(code, method, r.Method, d.Status())).Observe(float64(d.Written())) + }) +} + +func checkLabels(c prometheus.Collector) (code bool, method bool) { + // TODO(beorn7): Remove this hacky way to check for instance labels + // once Descriptors can have their dimensionality queried. + var ( + desc *prometheus.Desc + m prometheus.Metric + pm dto.Metric + lvs []string + ) + + // Get the Desc from the Collector. + descc := make(chan *prometheus.Desc, 1) + c.Describe(descc) + + select { + case desc = <-descc: + default: + panic("no description provided by collector") + } + select { + case <-descc: + panic("more than one description provided by collector") + default: + } + + close(descc) + + // Create a ConstMetric with the Desc. Since we don't know how many + // variable labels there are, try for as long as it needs. + for err := errors.New("dummy"); err != nil; lvs = append(lvs, magicString) { + m, err = prometheus.NewConstMetric(desc, prometheus.UntypedValue, 0, lvs...) + } + + // Write out the metric into a proto message and look at the labels. + // If the value is not the magicString, it is a constLabel, which doesn't interest us. + // If the label is curried, it doesn't interest us. + // In all other cases, only "code" or "method" is allowed. + if err := m.Write(&pm); err != nil { + panic("error checking metric for labels") + } + for _, label := range pm.Label { + name, value := label.GetName(), label.GetValue() + if value != magicString || isLabelCurried(c, name) { + continue + } + switch name { + case "code": + code = true + case "method": + method = true + default: + panic("metric partitioned with non-supported labels") + } + } + return +} + +func isLabelCurried(c prometheus.Collector, label string) bool { + // This is even hackier than the label test above. + // We essentially try to curry again and see if it works. + // But for that, we need to type-convert to the two + // types we use here, ObserverVec or *CounterVec. + switch v := c.(type) { + case *prometheus.CounterVec: + if _, err := v.CurryWith(prometheus.Labels{label: "dummy"}); err == nil { + return false + } + case prometheus.ObserverVec: + if _, err := v.CurryWith(prometheus.Labels{label: "dummy"}); err == nil { + return false + } + default: + panic("unsupported metric vec type") + } + return true +} + +// emptyLabels is a one-time allocation for non-partitioned metrics to avoid +// unnecessary allocations on each request. +var emptyLabels = prometheus.Labels{} + +func labels(code, method bool, reqMethod string, status int) prometheus.Labels { + if !(code || method) { + return emptyLabels + } + labels := prometheus.Labels{} + + if code { + labels["code"] = sanitizeCode(status) + } + if method { + labels["method"] = sanitizeMethod(reqMethod) + } + + return labels +} + +func computeApproximateRequestSize(r *http.Request) int { + s := 0 + if r.URL != nil { + s += len(r.URL.String()) + } + + s += len(r.Method) + s += len(r.Proto) + for name, values := range r.Header { + s += len(name) + for _, value := range values { + s += len(value) + } + } + s += len(r.Host) + + // N.B. r.Form and r.MultipartForm are assumed to be included in r.URL. + + if r.ContentLength != -1 { + s += int(r.ContentLength) + } + return s +} + +func sanitizeMethod(m string) string { + switch m { + case "GET", "get": + return "get" + case "PUT", "put": + return "put" + case "HEAD", "head": + return "head" + case "POST", "post": + return "post" + case "DELETE", "delete": + return "delete" + case "CONNECT", "connect": + return "connect" + case "OPTIONS", "options": + return "options" + case "NOTIFY", "notify": + return "notify" + default: + return strings.ToLower(m) + } +} + +// If the wrapped http.Handler has not set a status code, i.e. the value is +// currently 0, santizeCode will return 200, for consistency with behavior in +// the stdlib. +func sanitizeCode(s int) string { + switch s { + case 100: + return "100" + case 101: + return "101" + + case 200, 0: + return "200" + case 201: + return "201" + case 202: + return "202" + case 203: + return "203" + case 204: + return "204" + case 205: + return "205" + case 206: + return "206" + + case 300: + return "300" + case 301: + return "301" + case 302: + return "302" + case 304: + return "304" + case 305: + return "305" + case 307: + return "307" + + case 400: + return "400" + case 401: + return "401" + case 402: + return "402" + case 403: + return "403" + case 404: + return "404" + case 405: + return "405" + case 406: + return "406" + case 407: + return "407" + case 408: + return "408" + case 409: + return "409" + case 410: + return "410" + case 411: + return "411" + case 412: + return "412" + case 413: + return "413" + case 414: + return "414" + case 415: + return "415" + case 416: + return "416" + case 417: + return "417" + case 418: + return "418" + + case 500: + return "500" + case 501: + return "501" + case 502: + return "502" + case 503: + return "503" + case 504: + return "504" + case 505: + return "505" + + case 428: + return "428" + case 429: + return "429" + case 431: + return "431" + case 511: + return "511" + + default: + return strconv.Itoa(s) + } +} diff --git a/vendor/golang.org/x/sys/unix/affinity_linux.go b/vendor/golang.org/x/sys/unix/affinity_linux.go index 72afe3338..14e4d5caa 100644 --- a/vendor/golang.org/x/sys/unix/affinity_linux.go +++ b/vendor/golang.org/x/sys/unix/affinity_linux.go @@ -91,9 +91,13 @@ func onesCount64(x uint64) int { const m0 = 0x5555555555555555 // 01010101 ... const m1 = 0x3333333333333333 // 00110011 ... const m2 = 0x0f0f0f0f0f0f0f0f // 00001111 ... - const m3 = 0x00ff00ff00ff00ff // etc. - const m4 = 0x0000ffff0000ffff + // Unused in this function, but definitions preserved for + // documentation purposes: + // + // const m3 = 0x00ff00ff00ff00ff // etc. + // const m4 = 0x0000ffff0000ffff + // // Implementation: Parallel summing of adjacent bits. // See "Hacker's Delight", Chap. 5: Counting Bits. // The following pattern shows the general approach: diff --git a/vendor/golang.org/x/sys/unix/dirent.go b/vendor/golang.org/x/sys/unix/dirent.go index 6f3460e69..304016b68 100644 --- a/vendor/golang.org/x/sys/unix/dirent.go +++ b/vendor/golang.org/x/sys/unix/dirent.go @@ -2,7 +2,7 @@ // Use of this source code is governed by a BSD-style // license that can be found in the LICENSE file. -// +build aix darwin dragonfly freebsd linux nacl netbsd openbsd solaris +// +build aix darwin dragonfly freebsd linux netbsd openbsd solaris package unix diff --git a/vendor/k8s.io/utils/trace/trace.go b/vendor/k8s.io/utils/trace/trace.go index f672d88f2..3b424104a 100644 --- a/vendor/k8s.io/utils/trace/trace.go +++ b/vendor/k8s.io/utils/trace/trace.go @@ -25,32 +25,55 @@ import ( "k8s.io/klog" ) +// Field is a key value pair that provides additional details about the trace. +type Field struct { + Key string + Value interface{} +} + +func (f Field) format() string { + return fmt.Sprintf("%s:%v", f.Key, f.Value) +} + +func writeFields(b *bytes.Buffer, l []Field) { + for i, f := range l { + b.WriteString(f.format()) + if i < len(l)-1 { + b.WriteString(",") + } + } +} + type traceStep struct { stepTime time.Time msg string + fields []Field } // Trace keeps track of a set of "steps" and allows us to log a specific // step if it took longer than its share of the total allowed time type Trace struct { name string + fields []Field startTime time.Time steps []traceStep } -// New creates a Trace with the specified name -func New(name string) *Trace { - return &Trace{name, time.Now(), nil} +// New creates a Trace with the specified name. The name identifies the operation to be traced. The +// Fields add key value pairs to provide additional details about the trace, such as operation inputs. +func New(name string, fields ...Field) *Trace { + return &Trace{name: name, startTime: time.Now(), fields: fields} } -// Step adds a new step with a specific message. Call this at the end of an -// execution step to record how long it took. -func (t *Trace) Step(msg string) { +// Step adds a new step with a specific message. Call this at the end of an execution step to record +// how long it took. The Fields add key value pairs to provide additional details about the trace +// step. +func (t *Trace) Step(msg string, fields ...Field) { if t.steps == nil { // traces almost always have less than 6 steps, do this to avoid more than a single allocation t.steps = make([]traceStep, 0, 6) } - t.steps = append(t.steps, traceStep{time.Now(), msg}) + t.steps = append(t.steps, traceStep{stepTime: time.Now(), msg: msg, fields: fields}) } // Log is used to dump all the steps in the Trace @@ -65,12 +88,23 @@ func (t *Trace) logWithStepThreshold(stepThreshold time.Duration) { endTime := time.Now() totalTime := endTime.Sub(t.startTime) - buffer.WriteString(fmt.Sprintf("Trace[%d]: %q (started: %v) (total time: %v):\n", tracenum, t.name, t.startTime, totalTime)) + buffer.WriteString(fmt.Sprintf("Trace[%d]: %q ", tracenum, t.name)) + if len(t.fields) > 0 { + writeFields(&buffer, t.fields) + buffer.WriteString(" ") + } + buffer.WriteString(fmt.Sprintf("(started: %v) (total time: %v):\n", t.startTime, totalTime)) lastStepTime := t.startTime for _, step := range t.steps { stepDuration := step.stepTime.Sub(lastStepTime) if stepThreshold == 0 || stepDuration > stepThreshold || klog.V(4) { - buffer.WriteString(fmt.Sprintf("Trace[%d]: [%v] [%v] %v\n", tracenum, step.stepTime.Sub(t.startTime), stepDuration, step.msg)) + buffer.WriteString(fmt.Sprintf("Trace[%d]: [%v] [%v] ", tracenum, step.stepTime.Sub(t.startTime), stepDuration)) + buffer.WriteString(step.msg) + if len(step.fields) > 0 { + buffer.WriteString(" ") + writeFields(&buffer, step.fields) + } + buffer.WriteString("\n") } lastStepTime = step.stepTime }