mirror of
https://github.com/ceph/ceph-csi.git
synced 2025-01-18 02:39:30 +00:00
Merge pull request #208 from red-hat-storage/sync_us--devel
Syncing latest changes from devel for ceph-csi
This commit is contained in:
commit
817ac33312
16
.github/pull_request_template.md
vendored
16
.github/pull_request_template.md
vendored
@ -34,6 +34,20 @@ Fixes: #issue_number
|
||||
List items that are not part of the PR and do not impact it's
|
||||
functionality, but are work items that can be taken up subsequently.
|
||||
|
||||
**Checklist:**
|
||||
|
||||
* [ ] **Commit Message Formatting**: Commit titles and messages follow
|
||||
guidelines in the [developer
|
||||
guide](https://github.com/ceph/ceph-csi/blob/devel/docs/development-guide.md#commit-messages).
|
||||
* [ ] Reviewed the developer guide on [Submitting a Pull
|
||||
Request](https://github.com/ceph/ceph-csi/blob/devel/docs/development-guide.md#development-workflow)
|
||||
* [ ] [Pending release
|
||||
notes](https://github.com/ceph/ceph-csi/blob/devel/PendingReleaseNotes.md)
|
||||
updated with breaking and/or notable changes for the next major release.
|
||||
* [ ] Documentation has been updated, if necessary.
|
||||
* [ ] Unit tests have been added, if necessary.
|
||||
* [ ] Integration tests have been added, if necessary.
|
||||
|
||||
---
|
||||
|
||||
<details>
|
||||
@ -42,7 +56,7 @@ functionality, but are work items that can be taken up subsequently.
|
||||
These commands are normally not required, but in case of issues, leave any of
|
||||
the following bot commands in an otherwise empty comment in this PR:
|
||||
|
||||
- `/retest ci/centos/<job-name>`: retest the `<job-name>` after unrelated
|
||||
* `/retest ci/centos/<job-name>`: retest the `<job-name>` after unrelated
|
||||
failure (please report the failure too!)
|
||||
|
||||
</details>
|
||||
|
4
Makefile
4
Makefile
@ -47,9 +47,7 @@ endif
|
||||
GO_PROJECT=github.com/ceph/ceph-csi
|
||||
|
||||
CEPH_VERSION ?= $(shell . $(CURDIR)/build.env ; echo $${CEPH_VERSION})
|
||||
# TODO: ceph_preview tag may be removed with go-ceph 0.17.0
|
||||
# TODO: ceph_ci_untested is added for subvolume metadata (go-ceph#691) and snapshot metadata management (go-ceph#698)
|
||||
GO_TAGS_LIST ?= $(CEPH_VERSION) ceph_preview ceph_ci_untested ceph_pre_quincy
|
||||
GO_TAGS_LIST ?= $(CEPH_VERSION)
|
||||
|
||||
# go build flags
|
||||
LDFLAGS ?=
|
||||
|
5
PendingReleaseNotes.md
Normal file
5
PendingReleaseNotes.md
Normal file
@ -0,0 +1,5 @@
|
||||
# v3.10 Pending Release Notes
|
||||
|
||||
## Breaking Changes
|
||||
|
||||
## Features
|
@ -106,8 +106,8 @@ func init() {
|
||||
"",
|
||||
"Comma separated string of mount options accepted by ceph-fuse mounter")
|
||||
|
||||
// liveness/grpc metrics related flags
|
||||
flag.IntVar(&conf.MetricsPort, "metricsport", 8080, "TCP port for liveness/grpc metrics requests")
|
||||
// liveness/profile metrics related flags
|
||||
flag.IntVar(&conf.MetricsPort, "metricsport", 8080, "TCP port for liveness/profile metrics requests")
|
||||
flag.StringVar(
|
||||
&conf.MetricsPath,
|
||||
"metricspath",
|
||||
@ -116,14 +116,6 @@ func init() {
|
||||
flag.DurationVar(&conf.PollTime, "polltime", time.Second*pollTime, "time interval in seconds between each poll")
|
||||
flag.DurationVar(&conf.PoolTimeout, "timeout", time.Second*probeTimeout, "probe timeout in seconds")
|
||||
|
||||
flag.BoolVar(&conf.EnableGRPCMetrics, "enablegrpcmetrics", false, "[DEPRECATED] enable grpc metrics")
|
||||
flag.StringVar(
|
||||
&conf.HistogramOption,
|
||||
"histogramoption",
|
||||
"0.5,2,6",
|
||||
"[DEPRECATED] Histogram option for grpc metrics, should be comma separated value, "+
|
||||
"ex:= 0.5,2,6 where start=0.5 factor=2, count=6")
|
||||
|
||||
flag.UintVar(
|
||||
&conf.RbdHardMaxCloneDepth,
|
||||
"rbdhardmaxclonedepth",
|
||||
@ -210,7 +202,7 @@ func main() {
|
||||
|
||||
setPIDLimit(&conf)
|
||||
|
||||
if conf.EnableGRPCMetrics || conf.Vtype == livenessType {
|
||||
if conf.EnableProfiling || conf.Vtype == livenessType {
|
||||
// validate metrics endpoint
|
||||
conf.MetricsIP = os.Getenv("POD_IP")
|
||||
|
||||
|
10
deploy.sh
10
deploy.sh
@ -14,10 +14,10 @@ GITHUB_EMAIL=${GITHUB_EMAIL:-"ceph-csi-bot@users.noreply.github.com"}
|
||||
|
||||
# Build and push images. Steps as below:
|
||||
# 1. get base image from ./build.env (BASE_IMAGE=ceph/ceph:v14.2)
|
||||
# 2. parse manifest to get image digest per arch (sha256:XXX, sha256:YYY)
|
||||
# 3. patch Dockerfile with amd64 base image (FROM ceph/ceph:v14.2@sha256:XXX)
|
||||
# 2. parse manifest to get image digest per arch (sha256:XYZ, sha256:ZYX)
|
||||
# 3. patch Dockerfile with amd64 base image (FROM ceph/ceph:v14.2@sha256:XYZ)
|
||||
# 4. build and push amd64 image
|
||||
# 5. patch Dockerfile with arm64 base image (FROM ceph/ceph:v14.2@sha256:YYY)
|
||||
# 5. patch Dockerfile with arm64 base image (FROM ceph/ceph:v14.2@sha256:ZYX)
|
||||
# 6. build and push arm64 image
|
||||
build_push_images() {
|
||||
# "docker manifest" requires experimental feature enabled
|
||||
@ -29,11 +29,11 @@ build_push_images() {
|
||||
# get image digest per architecture
|
||||
# {
|
||||
# "arch": "amd64",
|
||||
# "digest": "sha256:XXX"
|
||||
# "digest": "sha256:XYZ"
|
||||
# }
|
||||
# {
|
||||
# "arch": "arm64",
|
||||
# "digest": "sha256:YYY"
|
||||
# "digest": "sha256:ZYX"
|
||||
# }
|
||||
manifests=$(docker manifest inspect "${baseimg}" | jq '.manifests[] | {arch: .platform.architecture, digest: .digest}')
|
||||
# qemu-user-static is to enable an execution of different multi-architecture containers by QEMU
|
||||
|
@ -40,11 +40,9 @@ make image-cephcsi
|
||||
| `--pidlimit` | _0_ | Configure the PID limit in cgroups. The container runtime can restrict the number of processes/tasks which can cause problems while provisioning (or deleting) a large number of volumes. A value of `-1` configures the limit to the maximum, `0` does not configure limits at all. |
|
||||
| `--metricsport` | `8080` | TCP port for liveness metrics requests |
|
||||
| `--metricspath` | `/metrics` | Path of prometheus endpoint where metrics will be available |
|
||||
| `--enablegrpcmetrics` | `false` | [Deprecated] Enable grpc metrics collection and start prometheus server |
|
||||
| `--polltime` | `60s` | Time interval in between each poll |
|
||||
| `--timeout` | `3s` | Probe timeout in seconds |
|
||||
| `--clustername` | _empty_ | Cluster name to set on subvolume |
|
||||
| `--histogramoption` | `0.5,2,6` | [Deprecated] Histogram option for grpc metrics, should be comma separated value (ex:= "0.5,2,6" where start=0.5 factor=2, count=6) |
|
||||
| `--forcecephkernelclient` | `false` | Force enabling Ceph Kernel clients for mounting on kernels < 4.17 |
|
||||
| `--kernelmountoptions` | _empty_ | Comma separated string of mount options accepted by cephfs kernel mounter |
|
||||
| `--fusemountoptions` | _empty_ | Comma separated string of mount options accepted by ceph-fuse mounter |
|
||||
|
@ -37,11 +37,9 @@ make image-cephcsi
|
||||
| `--pidlimit` | _0_ | Configure the PID limit in cgroups. The container runtime can restrict the number of processes/tasks which can cause problems while provisioning (or deleting) a large number of volumes. A value of `-1` configures the limit to the maximum, `0` does not configure limits at all. |
|
||||
| `--metricsport` | `8080` | TCP port for liveness metrics requests |
|
||||
| `--metricspath` | `"/metrics"` | Path of prometheus endpoint where metrics will be available |
|
||||
| `--enablegrpcmetrics` | `false` | [Deprecated] Enable grpc metrics collection and start prometheus server |
|
||||
| `--polltime` | `"60s"` | Time interval in between each poll |
|
||||
| `--timeout` | `"3s"` | Probe timeout in seconds |
|
||||
| `--clustername` | _empty_ | Cluster name to set on RBD image |
|
||||
| `--histogramoption` | `0.5,2,6` | [Deprecated] Histogram option for grpc metrics, should be comma separated value (ex:= "0.5,2,6" where start=0.5 factor=2, count=6) |
|
||||
| `--domainlabels` | _empty_ | Kubernetes node labels to use as CSI domain labels for topology aware provisioning, should be a comma separated value (ex:= "failure-domain/region,failure-domain/zone") |
|
||||
| `--rbdhardmaxclonedepth` | `8` | Hard limit for maximum number of nested volume clones that are taken before a flatten occurs |
|
||||
| `--rbdsoftmaxclonedepth` | `4` | Soft limit for maximum number of nested volume clones that are taken before a flatten occurs |
|
||||
|
25
docs/design/proposals/volume-condition.md
Normal file
25
docs/design/proposals/volume-condition.md
Normal file
@ -0,0 +1,25 @@
|
||||
# Support for CSI `VolumeCondition` aka Volume Health Checker
|
||||
|
||||
## health-checker API
|
||||
|
||||
Under `internal/health-checker` the Manager for health-checking is
|
||||
implemented. The Manager can start a checking process for a given path, return
|
||||
the (un)healthy state and stop the checking process when the volume is not
|
||||
needed anymore.
|
||||
|
||||
The Manager is responsible for creating a suitable checker for the requested
|
||||
path. If the path is a block-device, the BlockChecker should be created. For a
|
||||
filesystem path (directory), the FileChecker is appropriate.
|
||||
|
||||
## CephFS
|
||||
|
||||
The health-checker writes to the file `csi-volume-condition.ts` in the root of
|
||||
the volume. This file contains a JSON formatted timestamp.
|
||||
|
||||
A new `data` directory is introduced for newly created volumes. During the
|
||||
`NodeStageVolume` call the root of the volume is mounted, and the `data`
|
||||
directory is bind-mounted inside the container when `NodePublishVolume` is
|
||||
called.
|
||||
|
||||
The `data` directory makes it possible to place Ceph-CSI internal files in the
|
||||
root of the volume, without that the user/application has access to it.
|
17
e2e/node.go
17
e2e/node.go
@ -26,9 +26,10 @@ import (
|
||||
"k8s.io/client-go/kubernetes"
|
||||
"k8s.io/kubernetes/test/e2e/framework"
|
||||
e2enode "k8s.io/kubernetes/test/e2e/framework/node"
|
||||
testutils "k8s.io/kubernetes/test/utils"
|
||||
)
|
||||
|
||||
func createNodeLabel(f *framework.Framework, labelKey, labelValue string) error {
|
||||
func addLabelsToNodes(f *framework.Framework, labels map[string]string) error {
|
||||
// NOTE: This makes all nodes (in a multi-node setup) in the test take
|
||||
// the same label values, which is fine for the test
|
||||
nodes, err := f.ClientSet.CoreV1().Nodes().List(context.TODO(), metav1.ListOptions{})
|
||||
@ -36,19 +37,27 @@ func createNodeLabel(f *framework.Framework, labelKey, labelValue string) error
|
||||
return fmt.Errorf("failed to list node: %w", err)
|
||||
}
|
||||
for i := range nodes.Items {
|
||||
e2enode.AddOrUpdateLabelOnNode(f.ClientSet, nodes.Items[i].Name, labelKey, labelValue)
|
||||
if err := testutils.AddLabelsToNode(f.ClientSet, nodes.Items[i].Name, labels); err != nil {
|
||||
return fmt.Errorf("failed to add labels to node: %w", err)
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func deleteNodeLabel(c kubernetes.Interface, labelKey string) error {
|
||||
func deleteNodeLabels(c kubernetes.Interface, labelKeys []string) error {
|
||||
nodes, err := c.CoreV1().Nodes().List(context.TODO(), metav1.ListOptions{})
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to list node: %w", err)
|
||||
}
|
||||
for i := range nodes.Items {
|
||||
e2enode.RemoveLabelOffNode(c, nodes.Items[i].Name, labelKey)
|
||||
if err := testutils.RemoveLabelOffNode(c, nodes.Items[i].Name, labelKeys); err != nil {
|
||||
return fmt.Errorf("failed to remove label off node: %w", err)
|
||||
}
|
||||
|
||||
if err := testutils.VerifyLabelsRemoved(c, nodes.Items[i].Name, labelKeys); err != nil {
|
||||
return fmt.Errorf("failed to verify label removed from node: %w", err)
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
|
54
e2e/rbd.go
54
e2e/rbd.go
@ -275,21 +275,14 @@ var _ = Describe("RBD", func() {
|
||||
}
|
||||
c = f.ClientSet
|
||||
if deployRBD {
|
||||
err := createNodeLabel(f, nodeRegionLabel, regionValue)
|
||||
err := addLabelsToNodes(f, map[string]string{
|
||||
nodeRegionLabel: regionValue,
|
||||
nodeZoneLabel: zoneValue,
|
||||
crushLocationRegionLabel: crushLocationRegionValue,
|
||||
crushLocationZoneLabel: crushLocationZoneValue,
|
||||
})
|
||||
if err != nil {
|
||||
framework.Failf("failed to create node label: %v", err)
|
||||
}
|
||||
err = createNodeLabel(f, nodeZoneLabel, zoneValue)
|
||||
if err != nil {
|
||||
framework.Failf("failed to create node label: %v", err)
|
||||
}
|
||||
err = createNodeLabel(f, crushLocationRegionLabel, crushLocationRegionValue)
|
||||
if err != nil {
|
||||
framework.Failf("failed to create node label: %v", err)
|
||||
}
|
||||
err = createNodeLabel(f, crushLocationZoneLabel, crushLocationZoneValue)
|
||||
if err != nil {
|
||||
framework.Failf("failed to create node label: %v", err)
|
||||
framework.Failf("failed to add node labels: %v", err)
|
||||
}
|
||||
if cephCSINamespace != defaultNs {
|
||||
err = createNamespace(c, cephCSINamespace)
|
||||
@ -408,31 +401,16 @@ var _ = Describe("RBD", func() {
|
||||
}
|
||||
}
|
||||
}
|
||||
err = deleteNodeLabel(c, nodeRegionLabel)
|
||||
err = deleteNodeLabels(c, []string{
|
||||
nodeRegionLabel,
|
||||
nodeZoneLabel,
|
||||
nodeCSIRegionLabel,
|
||||
nodeCSIZoneLabel,
|
||||
crushLocationRegionLabel,
|
||||
crushLocationZoneLabel,
|
||||
})
|
||||
if err != nil {
|
||||
framework.Failf("failed to delete node label: %v", err)
|
||||
}
|
||||
err = deleteNodeLabel(c, nodeZoneLabel)
|
||||
if err != nil {
|
||||
framework.Failf("failed to delete node label: %v", err)
|
||||
}
|
||||
// Remove the CSI labels that get added
|
||||
err = deleteNodeLabel(c, nodeCSIRegionLabel)
|
||||
if err != nil {
|
||||
framework.Failf("failed to delete node label: %v", err)
|
||||
}
|
||||
err = deleteNodeLabel(c, nodeCSIZoneLabel)
|
||||
if err != nil {
|
||||
framework.Failf("failed to delete node label: %v", err)
|
||||
}
|
||||
// Remove the CRUSH Location labels
|
||||
err = deleteNodeLabel(c, crushLocationRegionLabel)
|
||||
if err != nil {
|
||||
framework.Failf("failed to delete node label: %v", err)
|
||||
}
|
||||
err = deleteNodeLabel(c, crushLocationZoneLabel)
|
||||
if err != nil {
|
||||
framework.Failf("failed to delete node label: %v", err)
|
||||
framework.Failf("failed to delete node labels: %v", err)
|
||||
}
|
||||
})
|
||||
|
||||
|
@ -107,14 +107,12 @@ var _ = Describe("RBD Upgrade Testing", func() {
|
||||
if err != nil {
|
||||
framework.Failf("failed to create snapshotclass: %v", err)
|
||||
}
|
||||
|
||||
err = createNodeLabel(f, nodeRegionLabel, regionValue)
|
||||
err = addLabelsToNodes(f, map[string]string{
|
||||
nodeRegionLabel: regionValue,
|
||||
nodeZoneLabel: zoneValue,
|
||||
})
|
||||
if err != nil {
|
||||
framework.Failf("failed to create node label: %v", err)
|
||||
}
|
||||
err = createNodeLabel(f, nodeZoneLabel, zoneValue)
|
||||
if err != nil {
|
||||
framework.Failf("failed to create node label: %v", err)
|
||||
framework.Failf("failed to add node labels: %v", err)
|
||||
}
|
||||
})
|
||||
AfterEach(func() {
|
||||
@ -167,13 +165,12 @@ var _ = Describe("RBD Upgrade Testing", func() {
|
||||
}
|
||||
}
|
||||
}
|
||||
err = deleteNodeLabel(c, nodeRegionLabel)
|
||||
err = deleteNodeLabels(c, []string{
|
||||
nodeRegionLabel,
|
||||
nodeZoneLabel,
|
||||
})
|
||||
if err != nil {
|
||||
framework.Failf("failed to delete node label: %v", err)
|
||||
}
|
||||
err = deleteNodeLabel(c, nodeZoneLabel)
|
||||
if err != nil {
|
||||
framework.Failf("failed to delete node label: %v", err)
|
||||
framework.Failf("failed to delete node labels: %v", err)
|
||||
}
|
||||
})
|
||||
|
||||
|
1
go.mod
1
go.mod
@ -7,7 +7,6 @@ require (
|
||||
github.com/aws/aws-sdk-go v1.46.4
|
||||
github.com/aws/aws-sdk-go-v2/service/sts v1.23.2
|
||||
github.com/ceph/ceph-csi/api v0.0.0-00010101000000-000000000000
|
||||
// TODO: API for managing subvolume metadata and snapshot metadata requires `ceph_ci_untested` build-tag
|
||||
github.com/ceph/go-ceph v0.24.0
|
||||
github.com/container-storage-interface/spec v1.9.0
|
||||
github.com/csi-addons/replication-lib-utils v0.2.0
|
||||
|
@ -25,6 +25,7 @@ import (
|
||||
casceph "github.com/ceph/ceph-csi/internal/csi-addons/cephfs"
|
||||
csiaddons "github.com/ceph/ceph-csi/internal/csi-addons/server"
|
||||
csicommon "github.com/ceph/ceph-csi/internal/csi-common"
|
||||
hc "github.com/ceph/ceph-csi/internal/health-checker"
|
||||
"github.com/ceph/ceph-csi/internal/journal"
|
||||
"github.com/ceph/ceph-csi/internal/util"
|
||||
"github.com/ceph/ceph-csi/internal/util/log"
|
||||
@ -82,6 +83,7 @@ func NewNodeServer(
|
||||
VolumeLocks: util.NewVolumeLocks(),
|
||||
kernelMountOptions: kernelMountOptions,
|
||||
fuseMountOptions: fuseMountOptions,
|
||||
healthChecker: hc.NewHealthCheckManager(),
|
||||
}
|
||||
}
|
||||
|
||||
@ -167,15 +169,10 @@ func (fs *Driver) Run(conf *util.Config) {
|
||||
// passing nil for replication server as cephFS does not support mirroring.
|
||||
RS: nil,
|
||||
}
|
||||
server.Start(conf.Endpoint, conf.HistogramOption, srv, conf.EnableGRPCMetrics)
|
||||
if conf.EnableGRPCMetrics {
|
||||
log.WarningLogMsg("EnableGRPCMetrics is deprecated")
|
||||
go util.StartMetricsServer(conf)
|
||||
}
|
||||
server.Start(conf.Endpoint, srv)
|
||||
|
||||
if conf.EnableProfiling {
|
||||
if !conf.EnableGRPCMetrics {
|
||||
go util.StartMetricsServer(conf)
|
||||
}
|
||||
log.DebugLogMsg("Registering profiling handler")
|
||||
go util.EnableProfiling()
|
||||
}
|
||||
|
@ -29,6 +29,7 @@ import (
|
||||
"github.com/ceph/ceph-csi/internal/cephfs/store"
|
||||
fsutil "github.com/ceph/ceph-csi/internal/cephfs/util"
|
||||
csicommon "github.com/ceph/ceph-csi/internal/csi-common"
|
||||
hc "github.com/ceph/ceph-csi/internal/health-checker"
|
||||
"github.com/ceph/ceph-csi/internal/util"
|
||||
"github.com/ceph/ceph-csi/internal/util/fscrypt"
|
||||
"github.com/ceph/ceph-csi/internal/util/log"
|
||||
@ -47,6 +48,7 @@ type NodeServer struct {
|
||||
VolumeLocks *util.VolumeLocks
|
||||
kernelMountOptions string
|
||||
fuseMountOptions string
|
||||
healthChecker hc.Manager
|
||||
}
|
||||
|
||||
func getCredentialsForVolume(
|
||||
@ -228,6 +230,8 @@ func (ns *NodeServer) NodeStageVolume(
|
||||
return nil, status.Error(codes.Internal, err.Error())
|
||||
}
|
||||
|
||||
ns.startSharedHealthChecker(ctx, req.GetVolumeId(), stagingTargetPath)
|
||||
|
||||
return &csi.NodeStageVolumeResponse{}, nil
|
||||
}
|
||||
|
||||
@ -270,9 +274,24 @@ func (ns *NodeServer) NodeStageVolume(
|
||||
}
|
||||
}
|
||||
|
||||
ns.startSharedHealthChecker(ctx, req.GetVolumeId(), stagingTargetPath)
|
||||
|
||||
return &csi.NodeStageVolumeResponse{}, nil
|
||||
}
|
||||
|
||||
// startSharedHealthChecker starts a health-checker on the stagingTargetPath.
|
||||
// This checker can be shared between multiple containers.
|
||||
//
|
||||
// TODO: start a FileChecker for read-writable volumes that have an app-data subdir.
|
||||
func (ns *NodeServer) startSharedHealthChecker(ctx context.Context, volumeID, dir string) {
|
||||
// The StatChecker works for volumes that do not have a dedicated app-data
|
||||
// subdirectory, or are read-only.
|
||||
err := ns.healthChecker.StartSharedChecker(volumeID, dir, hc.StatCheckerType)
|
||||
if err != nil {
|
||||
log.WarningLog(ctx, "failed to start healthchecker: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
func (ns *NodeServer) mount(
|
||||
ctx context.Context,
|
||||
mnt mounter.VolumeMounter,
|
||||
@ -479,7 +498,8 @@ func (ns *NodeServer) NodePublishVolume(
|
||||
|
||||
// Ensure staging target path is a mountpoint.
|
||||
|
||||
if isMnt, err := util.IsMountPoint(ns.Mounter, stagingTargetPath); err != nil {
|
||||
isMnt, err := util.IsMountPoint(ns.Mounter, stagingTargetPath)
|
||||
if err != nil {
|
||||
log.ErrorLog(ctx, "stat failed: %v", err)
|
||||
|
||||
return nil, status.Error(codes.Internal, err.Error())
|
||||
@ -491,7 +511,7 @@ func (ns *NodeServer) NodePublishVolume(
|
||||
|
||||
// Check if the volume is already mounted
|
||||
|
||||
isMnt, err := util.IsMountPoint(ns.Mounter, targetPath)
|
||||
isMnt, err = util.IsMountPoint(ns.Mounter, targetPath)
|
||||
if err != nil {
|
||||
log.ErrorLog(ctx, "stat failed: %v", err)
|
||||
|
||||
@ -545,6 +565,10 @@ func (ns *NodeServer) NodeUnpublishVolume(
|
||||
// considering kubelet make sure node operations like unpublish/unstage...etc can not be called
|
||||
// at same time, an explicit locking at time of nodeunpublish is not required.
|
||||
targetPath := req.GetTargetPath()
|
||||
|
||||
// stop the health-checker that may have been started in NodeGetVolumeStats()
|
||||
ns.healthChecker.StopChecker(req.GetVolumeId(), targetPath)
|
||||
|
||||
isMnt, err := util.IsMountPoint(ns.Mounter, targetPath)
|
||||
if err != nil {
|
||||
log.ErrorLog(ctx, "stat failed: %v", err)
|
||||
@ -599,6 +623,9 @@ func (ns *NodeServer) NodeUnstageVolume(
|
||||
}
|
||||
|
||||
volID := req.GetVolumeId()
|
||||
|
||||
ns.healthChecker.StopSharedChecker(volID)
|
||||
|
||||
if acquired := ns.VolumeLocks.TryAcquire(volID); !acquired {
|
||||
log.ErrorLog(ctx, util.VolumeOperationAlreadyExistsFmt, volID)
|
||||
|
||||
@ -670,6 +697,13 @@ func (ns *NodeServer) NodeGetCapabilities(
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
Type: &csi.NodeServiceCapability_Rpc{
|
||||
Rpc: &csi.NodeServiceCapability_RPC{
|
||||
Type: csi.NodeServiceCapability_RPC_VOLUME_CONDITION,
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
Type: &csi.NodeServiceCapability_Rpc{
|
||||
Rpc: &csi.NodeServiceCapability_RPC{
|
||||
@ -694,6 +728,35 @@ func (ns *NodeServer) NodeGetVolumeStats(
|
||||
return nil, status.Error(codes.InvalidArgument, err.Error())
|
||||
}
|
||||
|
||||
// health check first, return without stats if unhealthy
|
||||
healthy, msg := ns.healthChecker.IsHealthy(req.GetVolumeId(), targetPath)
|
||||
|
||||
// If healthy and an error is returned, it means that the checker was not
|
||||
// started. This could happen when the node-plugin was restarted and the
|
||||
// volume is already staged and published.
|
||||
if healthy && msg != nil {
|
||||
// Start a StatChecker for the mounted targetPath, this prevents
|
||||
// writing a file in the user-visible location. Ideally a (shared)
|
||||
// FileChecker is started with the stagingTargetPath, but we can't
|
||||
// get the stagingPath from the request easily.
|
||||
// TODO: resolve the stagingPath like rbd.getStagingPath() does
|
||||
err = ns.healthChecker.StartChecker(req.GetVolumeId(), targetPath, hc.StatCheckerType)
|
||||
if err != nil {
|
||||
log.WarningLog(ctx, "failed to start healthchecker: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
// !healthy indicates a problem with the volume
|
||||
if !healthy {
|
||||
return &csi.NodeGetVolumeStatsResponse{
|
||||
VolumeCondition: &csi.VolumeCondition{
|
||||
Abnormal: true,
|
||||
Message: msg.Error(),
|
||||
},
|
||||
}, nil
|
||||
}
|
||||
|
||||
// warning: stat() may hang on an unhealthy volume
|
||||
stat, err := os.Stat(targetPath)
|
||||
if err != nil {
|
||||
if util.IsCorruptedMountError(err) {
|
||||
|
@ -19,16 +19,12 @@ package csicommon
|
||||
import (
|
||||
"net"
|
||||
"os"
|
||||
"strconv"
|
||||
"strings"
|
||||
"sync"
|
||||
|
||||
"github.com/ceph/ceph-csi/internal/util/log"
|
||||
|
||||
"github.com/container-storage-interface/spec/lib/go/csi"
|
||||
"github.com/csi-addons/spec/lib/go/replication"
|
||||
grpc_prometheus "github.com/grpc-ecosystem/go-grpc-prometheus"
|
||||
"github.com/prometheus/client_golang/prometheus"
|
||||
"google.golang.org/grpc"
|
||||
"k8s.io/klog/v2"
|
||||
)
|
||||
@ -36,7 +32,7 @@ import (
|
||||
// NonBlockingGRPCServer defines Non blocking GRPC server interfaces.
|
||||
type NonBlockingGRPCServer interface {
|
||||
// Start services at the endpoint
|
||||
Start(endpoint, hstOptions string, srv Servers, metrics bool)
|
||||
Start(endpoint string, srv Servers)
|
||||
// Waits for the service to stop
|
||||
Wait()
|
||||
// Stops the service gracefully
|
||||
@ -65,9 +61,9 @@ type nonBlockingGRPCServer struct {
|
||||
}
|
||||
|
||||
// Start start service on endpoint.
|
||||
func (s *nonBlockingGRPCServer) Start(endpoint, hstOptions string, srv Servers, metrics bool) {
|
||||
func (s *nonBlockingGRPCServer) Start(endpoint string, srv Servers) {
|
||||
s.wg.Add(1)
|
||||
go s.serve(endpoint, hstOptions, srv, metrics)
|
||||
go s.serve(endpoint, srv)
|
||||
}
|
||||
|
||||
// Wait blocks until the WaitGroup counter.
|
||||
@ -85,7 +81,7 @@ func (s *nonBlockingGRPCServer) ForceStop() {
|
||||
s.server.Stop()
|
||||
}
|
||||
|
||||
func (s *nonBlockingGRPCServer) serve(endpoint, hstOptions string, srv Servers, metrics bool) {
|
||||
func (s *nonBlockingGRPCServer) serve(endpoint string, srv Servers) {
|
||||
proto, addr, err := parseEndpoint(endpoint)
|
||||
if err != nil {
|
||||
klog.Fatal(err.Error())
|
||||
@ -103,11 +99,7 @@ func (s *nonBlockingGRPCServer) serve(endpoint, hstOptions string, srv Servers,
|
||||
klog.Fatalf("Failed to listen: %v", err)
|
||||
}
|
||||
|
||||
opts := []grpc.ServerOption{
|
||||
NewMiddlewareServerOption(metrics),
|
||||
}
|
||||
|
||||
server := grpc.NewServer(opts...)
|
||||
server := grpc.NewServer()
|
||||
s.server = server
|
||||
|
||||
if srv.IS != nil {
|
||||
@ -124,29 +116,6 @@ func (s *nonBlockingGRPCServer) serve(endpoint, hstOptions string, srv Servers,
|
||||
}
|
||||
|
||||
log.DefaultLog("Listening for connections on address: %#v", listener.Addr())
|
||||
if metrics {
|
||||
ho := strings.Split(hstOptions, ",")
|
||||
const expectedHo = 3
|
||||
if len(ho) != expectedHo {
|
||||
klog.Fatalf("invalid histogram options provided: %v", hstOptions)
|
||||
}
|
||||
start, e := strconv.ParseFloat(ho[0], 32)
|
||||
if e != nil {
|
||||
klog.Fatalf("failed to parse histogram start value: %v", e)
|
||||
}
|
||||
factor, e := strconv.ParseFloat(ho[1], 32)
|
||||
if err != nil {
|
||||
klog.Fatalf("failed to parse histogram factor value: %v", e)
|
||||
}
|
||||
count, e := strconv.Atoi(ho[2])
|
||||
if err != nil {
|
||||
klog.Fatalf("failed to parse histogram count value: %v", e)
|
||||
}
|
||||
buckets := prometheus.ExponentialBuckets(start, factor, count)
|
||||
bktOptions := grpc_prometheus.WithHistogramBuckets(buckets)
|
||||
grpc_prometheus.EnableHandlingTimeHistogram(bktOptions)
|
||||
grpc_prometheus.Register(server)
|
||||
}
|
||||
err = server.Serve(listener)
|
||||
if err != nil {
|
||||
klog.Fatalf("Failed to server: %v", err)
|
||||
|
@ -312,6 +312,12 @@ func FilesystemNodeGetVolumeStats(
|
||||
})
|
||||
}
|
||||
|
||||
// include marker for a healthy volume by default
|
||||
res.VolumeCondition = &csi.VolumeCondition{
|
||||
Abnormal: false,
|
||||
Message: "volume is in a healthy condition",
|
||||
}
|
||||
|
||||
return res, nil
|
||||
}
|
||||
|
||||
|
107
internal/health-checker/checker.go
Normal file
107
internal/health-checker/checker.go
Normal file
@ -0,0 +1,107 @@
|
||||
/*
|
||||
Copyright 2023 ceph-csi authors.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package healthchecker
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"sync"
|
||||
"time"
|
||||
)
|
||||
|
||||
// command is what is sent through the channel to terminate the go routine.
|
||||
type command string
|
||||
|
||||
const (
|
||||
// stopCommand is sent through the channel to stop checking.
|
||||
stopCommand = command("STOP")
|
||||
)
|
||||
|
||||
type checker struct {
|
||||
// interval contains the time to sleep between health checks.
|
||||
interval time.Duration
|
||||
|
||||
// timeout contains the delay (interval + timeout)
|
||||
timeout time.Duration
|
||||
|
||||
// mutex protects against concurrent access to healty, err and
|
||||
// lastUpdate
|
||||
mutex *sync.RWMutex
|
||||
|
||||
// current status
|
||||
isRunning bool
|
||||
healthy bool
|
||||
err error
|
||||
lastUpdate time.Time
|
||||
|
||||
// commands is the channel to read commands from; when to stop.
|
||||
commands chan command
|
||||
|
||||
runChecker func()
|
||||
}
|
||||
|
||||
func (c *checker) initDefaults() {
|
||||
c.interval = 60 * time.Second
|
||||
c.timeout = 15 * time.Second
|
||||
c.mutex = &sync.RWMutex{}
|
||||
c.isRunning = false
|
||||
c.err = nil
|
||||
c.healthy = true
|
||||
c.lastUpdate = time.Now()
|
||||
c.commands = make(chan command)
|
||||
|
||||
c.runChecker = func() {
|
||||
panic("BUG: implement runChecker() in the final checker struct")
|
||||
}
|
||||
}
|
||||
|
||||
func (c *checker) start() {
|
||||
if c.isRunning {
|
||||
return
|
||||
}
|
||||
|
||||
go c.runChecker()
|
||||
}
|
||||
|
||||
func (c *checker) stop() {
|
||||
c.commands <- stopCommand
|
||||
}
|
||||
|
||||
func (c *checker) isHealthy() (bool, error) {
|
||||
// check for the last update, it should be within
|
||||
//
|
||||
// c.lastUpdate < (c.interval + c.timeout)
|
||||
//
|
||||
// Without such a check, a single slow write/read could trigger actions
|
||||
// to recover an unhealthy volume already.
|
||||
//
|
||||
// It is required to check, in case the write or read in the go routine
|
||||
// is blocked.
|
||||
|
||||
delay := time.Since(c.lastUpdate)
|
||||
if delay > (c.interval + c.timeout) {
|
||||
c.mutex.Lock()
|
||||
c.healthy = false
|
||||
c.err = fmt.Errorf("health-check has not responded for %f seconds", delay.Seconds())
|
||||
c.mutex.Unlock()
|
||||
}
|
||||
|
||||
// read lock to get consistency between the return values
|
||||
c.mutex.RLock()
|
||||
defer c.mutex.RUnlock()
|
||||
|
||||
return c.healthy, c.err
|
||||
}
|
118
internal/health-checker/filechecker.go
Normal file
118
internal/health-checker/filechecker.go
Normal file
@ -0,0 +1,118 @@
|
||||
/*
|
||||
Copyright 2023 ceph-csi authors.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package healthchecker
|
||||
|
||||
import (
|
||||
"errors"
|
||||
"os"
|
||||
"path"
|
||||
"time"
|
||||
)
|
||||
|
||||
type fileChecker struct {
|
||||
checker
|
||||
|
||||
// filename contains the filename that is used for checking.
|
||||
filename string
|
||||
}
|
||||
|
||||
func newFileChecker(dir string) ConditionChecker {
|
||||
fc := &fileChecker{
|
||||
filename: path.Join(dir, "csi-volume-condition.ts"),
|
||||
}
|
||||
fc.initDefaults()
|
||||
|
||||
fc.checker.runChecker = func() {
|
||||
fc.isRunning = true
|
||||
|
||||
ticker := time.NewTicker(fc.interval)
|
||||
defer ticker.Stop()
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-fc.commands: // STOP command received
|
||||
fc.isRunning = false
|
||||
|
||||
return
|
||||
case now := <-ticker.C:
|
||||
err := fc.writeTimestamp(now)
|
||||
if err != nil {
|
||||
fc.mutex.Lock()
|
||||
fc.healthy = false
|
||||
fc.err = err
|
||||
fc.mutex.Unlock()
|
||||
|
||||
continue
|
||||
}
|
||||
|
||||
ts, err := fc.readTimestamp()
|
||||
if err != nil {
|
||||
fc.mutex.Lock()
|
||||
fc.healthy = false
|
||||
fc.err = err
|
||||
fc.mutex.Unlock()
|
||||
|
||||
continue
|
||||
}
|
||||
|
||||
// verify that the written timestamp is read back
|
||||
if now.Compare(ts) != 0 {
|
||||
fc.mutex.Lock()
|
||||
fc.healthy = false
|
||||
fc.err = errors.New("timestamp read from file does not match what was written")
|
||||
fc.mutex.Unlock()
|
||||
|
||||
continue
|
||||
}
|
||||
|
||||
// run health check, write a timestamp to a file, read it back
|
||||
fc.mutex.Lock()
|
||||
fc.healthy = true
|
||||
fc.err = nil
|
||||
fc.lastUpdate = ts
|
||||
fc.mutex.Unlock()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return fc
|
||||
}
|
||||
|
||||
// readTimestamp reads the JSON formatted timestamp from the file.
|
||||
func (fc *fileChecker) readTimestamp() (time.Time, error) {
|
||||
var ts time.Time
|
||||
|
||||
data, err := os.ReadFile(fc.filename)
|
||||
if err != nil {
|
||||
return ts, err
|
||||
}
|
||||
|
||||
err = ts.UnmarshalJSON(data)
|
||||
|
||||
return ts, err
|
||||
}
|
||||
|
||||
// writeTimestamp writes the timestamp to the file in JSON format.
|
||||
func (fc *fileChecker) writeTimestamp(ts time.Time) error {
|
||||
data, err := ts.MarshalJSON()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
//nolint:gosec // allow reading of the timestamp for debugging
|
||||
return os.WriteFile(fc.filename, data, 0o644)
|
||||
}
|
82
internal/health-checker/filechecker_test.go
Normal file
82
internal/health-checker/filechecker_test.go
Normal file
@ -0,0 +1,82 @@
|
||||
/*
|
||||
Copyright 2023 ceph-csi authors.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package healthchecker
|
||||
|
||||
import (
|
||||
"testing"
|
||||
"time"
|
||||
)
|
||||
|
||||
func TestFileChecker(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
volumePath := t.TempDir()
|
||||
fc := newFileChecker(volumePath)
|
||||
checker, ok := fc.(*fileChecker)
|
||||
if !ok {
|
||||
t.Errorf("failed to convert fc to *fileChecker: %v", fc)
|
||||
}
|
||||
checker.interval = time.Second * 5
|
||||
|
||||
// start the checker
|
||||
checker.start()
|
||||
|
||||
// wait a second to get the go routine running
|
||||
time.Sleep(time.Second)
|
||||
if !checker.isRunning {
|
||||
t.Error("checker failed to start")
|
||||
}
|
||||
|
||||
for i := 0; i < 10; i++ {
|
||||
// check health, should be healthy
|
||||
healthy, msg := checker.isHealthy()
|
||||
if !healthy || msg != nil {
|
||||
t.Error("volume is unhealthy")
|
||||
}
|
||||
|
||||
time.Sleep(time.Second)
|
||||
}
|
||||
|
||||
if !checker.isRunning {
|
||||
t.Error("runChecker() exited already")
|
||||
}
|
||||
|
||||
// stop the checker
|
||||
checker.stop()
|
||||
}
|
||||
|
||||
func TestWriteReadTimestamp(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
volumePath := t.TempDir()
|
||||
fc := newFileChecker(volumePath)
|
||||
checker, ok := fc.(*fileChecker)
|
||||
if !ok {
|
||||
t.Errorf("failed to convert fc to *fileChecker: %v", fc)
|
||||
}
|
||||
ts := time.Now()
|
||||
|
||||
err := checker.writeTimestamp(ts)
|
||||
if err != nil {
|
||||
t.Fatalf("failed to write timestamp: %v", err)
|
||||
}
|
||||
|
||||
_, err = checker.readTimestamp()
|
||||
if err != nil {
|
||||
t.Fatalf("failed to read timestamp: %v", err)
|
||||
}
|
||||
}
|
208
internal/health-checker/manager.go
Normal file
208
internal/health-checker/manager.go
Normal file
@ -0,0 +1,208 @@
|
||||
/*
|
||||
Copyright 2023 ceph-csi authors.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package healthchecker
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"sync"
|
||||
)
|
||||
|
||||
// CheckerType describes the type of health-check that needs to be done.
|
||||
type CheckerType uint64
|
||||
|
||||
const (
|
||||
// StatCheckerType uses the stat() syscall to validate volume health.
|
||||
StatCheckerType = iota
|
||||
// FileCheckerType writes and reads a timestamp to a file for checking the
|
||||
// volume health.
|
||||
FileCheckerType
|
||||
)
|
||||
|
||||
// Manager provides the API for getting the health status of a volume. The main
|
||||
// usage is requesting the health status by volumeID.
|
||||
//
|
||||
// When the Manager detects that a new volumeID is used for checking, a new
|
||||
// instance of a ConditionChecker is created for the volumeID on the given
|
||||
// path, and started.
|
||||
//
|
||||
// Once the volumeID is not active anymore (when NodeUnstageVolume is called),
|
||||
// the ConditionChecker needs to be stopped, which can be done by
|
||||
// Manager.StopChecker().
|
||||
type Manager interface {
|
||||
// StartChecker starts a health-checker of the requested type for the
|
||||
// volumeID using the path. The path usually is the publishTargetPath, and
|
||||
// a unique path for this checker. If the path can be used by multiple
|
||||
// containers, use the StartSharedChecker function instead.
|
||||
StartChecker(volumeID, path string, ct CheckerType) error
|
||||
|
||||
// StartSharedChecker starts a health-checker of the requested type for the
|
||||
// volumeID using the path. The path usually is the stagingTargetPath, and
|
||||
// can be used for multiple containers.
|
||||
StartSharedChecker(volumeID, path string, ct CheckerType) error
|
||||
|
||||
StopChecker(volumeID, path string)
|
||||
StopSharedChecker(volumeID string)
|
||||
|
||||
// IsHealthy locates the checker for the volumeID and path. If no checker
|
||||
// is found, `true` is returned together with an error message.
|
||||
// When IsHealthy runs into an internal error, it is assumed that the
|
||||
// volume is healthy. Only when it is confirmed that the volume is
|
||||
// unhealthy, `false` is returned together with an error message.
|
||||
IsHealthy(volumeID, path string) (bool, error)
|
||||
}
|
||||
|
||||
// ConditionChecker describes the interface that a health status reporter needs
|
||||
// to implement. It is used internally by the Manager only.
|
||||
type ConditionChecker interface {
|
||||
// start runs a the health checking function in a new go routine.
|
||||
start()
|
||||
|
||||
// stop terminates a the health checking function that runs in a go
|
||||
// routine.
|
||||
stop()
|
||||
|
||||
// isHealthy returns the status of the volume, without blocking.
|
||||
isHealthy() (bool, error)
|
||||
}
|
||||
|
||||
type healthCheckManager struct {
|
||||
checkers sync.Map // map[volumeID]ConditionChecker
|
||||
}
|
||||
|
||||
func NewHealthCheckManager() Manager {
|
||||
return &healthCheckManager{
|
||||
checkers: sync.Map{},
|
||||
}
|
||||
}
|
||||
|
||||
func (hcm *healthCheckManager) StartSharedChecker(volumeID, path string, ct CheckerType) error {
|
||||
return hcm.createChecker(volumeID, path, ct, true)
|
||||
}
|
||||
|
||||
func (hcm *healthCheckManager) StartChecker(volumeID, path string, ct CheckerType) error {
|
||||
return hcm.createChecker(volumeID, path, ct, false)
|
||||
}
|
||||
|
||||
// createChecker decides based on the CheckerType what checker to start for
|
||||
// the volume.
|
||||
func (hcm *healthCheckManager) createChecker(volumeID, path string, ct CheckerType, shared bool) error {
|
||||
switch ct {
|
||||
case FileCheckerType:
|
||||
return hcm.startFileChecker(volumeID, path, shared)
|
||||
case StatCheckerType:
|
||||
return hcm.startStatChecker(volumeID, path, shared)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// startFileChecker initializes the fileChecker and starts it.
|
||||
func (hcm *healthCheckManager) startFileChecker(volumeID, path string, shared bool) error {
|
||||
workdir := filepath.Join(path, ".csi")
|
||||
err := os.Mkdir(workdir, 0o755)
|
||||
if err != nil && !os.IsExist(err) {
|
||||
return fmt.Errorf("failed to created workdir %q for health-checker: %w", workdir, err)
|
||||
}
|
||||
|
||||
cc := newFileChecker(workdir)
|
||||
|
||||
return hcm.startChecker(cc, volumeID, path, shared)
|
||||
}
|
||||
|
||||
// startStatChecker initializes the statChecker and starts it.
|
||||
func (hcm *healthCheckManager) startStatChecker(volumeID, path string, shared bool) error {
|
||||
cc := newStatChecker(path)
|
||||
|
||||
return hcm.startChecker(cc, volumeID, path, shared)
|
||||
}
|
||||
|
||||
// startChecker adds the checker to its map and starts it.
|
||||
// Shared checkers are key'd by their volumeID, whereas non-shared checkers
|
||||
// are key'd by theit volumeID+path.
|
||||
func (hcm *healthCheckManager) startChecker(cc ConditionChecker, volumeID, path string, shared bool) error {
|
||||
key := volumeID
|
||||
if shared {
|
||||
key = fallbackKey(volumeID, path)
|
||||
}
|
||||
|
||||
// load the 'old' ConditionChecker if it exists, otherwise store 'cc'
|
||||
old, ok := hcm.checkers.LoadOrStore(key, cc)
|
||||
if ok {
|
||||
// 'old' was loaded, cast it to ConditionChecker
|
||||
_, ok = old.(ConditionChecker)
|
||||
if !ok {
|
||||
return fmt.Errorf("failed to cast cc to ConditionChecker for volume-id %q", volumeID)
|
||||
}
|
||||
} else {
|
||||
// 'cc' was stored, start it only once
|
||||
cc.start()
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (hcm *healthCheckManager) StopSharedChecker(volumeID string) {
|
||||
hcm.StopChecker(volumeID, "")
|
||||
}
|
||||
|
||||
func (hcm *healthCheckManager) StopChecker(volumeID, path string) {
|
||||
old, ok := hcm.checkers.LoadAndDelete(fallbackKey(volumeID, path))
|
||||
if !ok {
|
||||
// nothing was loaded, nothing to do
|
||||
return
|
||||
}
|
||||
|
||||
// 'old' was loaded, cast it to ConditionChecker
|
||||
cc, ok := old.(ConditionChecker)
|
||||
if !ok {
|
||||
// failed to cast, should not be possible
|
||||
return
|
||||
}
|
||||
cc.stop()
|
||||
}
|
||||
|
||||
func (hcm *healthCheckManager) IsHealthy(volumeID, path string) (bool, error) {
|
||||
// load the 'old' ConditionChecker if it exists
|
||||
old, ok := hcm.checkers.Load(volumeID)
|
||||
if !ok {
|
||||
// try fallback which include an optional (unique) path (usually publishTargetPath)
|
||||
old, ok = hcm.checkers.Load(fallbackKey(volumeID, path))
|
||||
if !ok {
|
||||
return true, fmt.Errorf("no ConditionChecker for volume-id: %s", volumeID)
|
||||
}
|
||||
}
|
||||
|
||||
// 'old' was loaded, cast it to ConditionChecker
|
||||
cc, ok := old.(ConditionChecker)
|
||||
if !ok {
|
||||
return true, fmt.Errorf("failed to cast cc to ConditionChecker for volume-id %q", volumeID)
|
||||
}
|
||||
|
||||
return cc.isHealthy()
|
||||
}
|
||||
|
||||
// fallbackKey returns the key for a checker in the map. If the path is empty,
|
||||
// it is assumed that the key'd checked is shared.
|
||||
func fallbackKey(volumeID, path string) string {
|
||||
if path == "" {
|
||||
return volumeID
|
||||
}
|
||||
|
||||
return fmt.Sprintf("%s:%s", volumeID, path)
|
||||
}
|
85
internal/health-checker/manager_test.go
Normal file
85
internal/health-checker/manager_test.go
Normal file
@ -0,0 +1,85 @@
|
||||
/*
|
||||
Copyright 2023 ceph-csi authors.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package healthchecker
|
||||
|
||||
import (
|
||||
"testing"
|
||||
)
|
||||
|
||||
func TestManager(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
volumeID := "fake-volume-id"
|
||||
volumePath := t.TempDir()
|
||||
mgr := NewHealthCheckManager()
|
||||
|
||||
// expected to have an error in msg
|
||||
healthy, msg := mgr.IsHealthy(volumeID, volumePath)
|
||||
if !(healthy && msg != nil) {
|
||||
t.Error("ConditionChecker was not started yet, did not get an error")
|
||||
}
|
||||
|
||||
t.Log("start the checker")
|
||||
err := mgr.StartChecker(volumeID, volumePath, StatCheckerType)
|
||||
if err != nil {
|
||||
t.Fatalf("ConditionChecker could not get started: %v", err)
|
||||
}
|
||||
|
||||
t.Log("check health, should be healthy")
|
||||
healthy, msg = mgr.IsHealthy(volumeID, volumePath)
|
||||
if !healthy || err != nil {
|
||||
t.Errorf("volume is unhealthy: %s", msg)
|
||||
}
|
||||
|
||||
t.Log("stop the checker")
|
||||
mgr.StopChecker(volumeID, volumePath)
|
||||
}
|
||||
|
||||
func TestSharedChecker(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
volumeID := "fake-volume-id"
|
||||
volumePath := t.TempDir()
|
||||
mgr := NewHealthCheckManager()
|
||||
|
||||
// expected to have an error in msg
|
||||
healthy, msg := mgr.IsHealthy(volumeID, volumePath)
|
||||
if !(healthy && msg != nil) {
|
||||
t.Error("ConditionChecker was not started yet, did not get an error")
|
||||
}
|
||||
|
||||
t.Log("start the checker")
|
||||
err := mgr.StartSharedChecker(volumeID, volumePath, StatCheckerType)
|
||||
if err != nil {
|
||||
t.Fatalf("ConditionChecker could not get started: %v", err)
|
||||
}
|
||||
|
||||
t.Log("check health, should be healthy")
|
||||
healthy, msg = mgr.IsHealthy(volumeID, volumePath)
|
||||
if !healthy || err != nil {
|
||||
t.Errorf("volume is unhealthy: %s", msg)
|
||||
}
|
||||
|
||||
t.Log("check health, should be healthy, path is ignored")
|
||||
healthy, msg = mgr.IsHealthy(volumeID, "different-path")
|
||||
if !healthy || err != nil {
|
||||
t.Errorf("volume is unhealthy: %s", msg)
|
||||
}
|
||||
|
||||
t.Log("stop the checker")
|
||||
mgr.StopSharedChecker(volumeID)
|
||||
}
|
70
internal/health-checker/statchecker.go
Normal file
70
internal/health-checker/statchecker.go
Normal file
@ -0,0 +1,70 @@
|
||||
/*
|
||||
Copyright 2023 ceph-csi authors.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package healthchecker
|
||||
|
||||
import (
|
||||
"os"
|
||||
"time"
|
||||
)
|
||||
|
||||
type statChecker struct {
|
||||
checker
|
||||
|
||||
// dirname points to the directory that is used for checking.
|
||||
dirname string
|
||||
}
|
||||
|
||||
func newStatChecker(dir string) ConditionChecker {
|
||||
sc := &statChecker{
|
||||
dirname: dir,
|
||||
}
|
||||
sc.initDefaults()
|
||||
|
||||
sc.checker.runChecker = func() {
|
||||
sc.isRunning = true
|
||||
|
||||
ticker := time.NewTicker(sc.interval)
|
||||
defer ticker.Stop()
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-sc.commands: // STOP command received
|
||||
sc.isRunning = false
|
||||
|
||||
return
|
||||
case now := <-ticker.C:
|
||||
_, err := os.Stat(sc.dirname)
|
||||
if err != nil {
|
||||
sc.mutex.Lock()
|
||||
sc.healthy = false
|
||||
sc.err = err
|
||||
sc.mutex.Unlock()
|
||||
|
||||
continue
|
||||
}
|
||||
|
||||
sc.mutex.Lock()
|
||||
sc.healthy = true
|
||||
sc.err = nil
|
||||
sc.lastUpdate = now
|
||||
sc.mutex.Unlock()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return sc
|
||||
}
|
60
internal/health-checker/statchecker_test.go
Normal file
60
internal/health-checker/statchecker_test.go
Normal file
@ -0,0 +1,60 @@
|
||||
/*
|
||||
Copyright 2023 ceph-csi authors.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package healthchecker
|
||||
|
||||
import (
|
||||
"testing"
|
||||
"time"
|
||||
)
|
||||
|
||||
func TestStatChecker(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
volumePath := t.TempDir()
|
||||
sc := newStatChecker(volumePath)
|
||||
checker, ok := sc.(*statChecker)
|
||||
if !ok {
|
||||
t.Errorf("failed to convert fc to *fileChecker: %v", sc)
|
||||
}
|
||||
checker.interval = time.Second * 5
|
||||
|
||||
// start the checker
|
||||
checker.start()
|
||||
|
||||
// wait a second to get the go routine running
|
||||
time.Sleep(time.Second)
|
||||
if !checker.isRunning {
|
||||
t.Error("checker failed to start")
|
||||
}
|
||||
|
||||
for i := 0; i < 10; i++ {
|
||||
// check health, should be healthy
|
||||
healthy, msg := checker.isHealthy()
|
||||
if !healthy || msg != nil {
|
||||
t.Error("volume is unhealthy")
|
||||
}
|
||||
|
||||
time.Sleep(time.Second)
|
||||
}
|
||||
|
||||
if !checker.isRunning {
|
||||
t.Error("runChecker() exited already")
|
||||
}
|
||||
|
||||
// stop the checker
|
||||
checker.stop()
|
||||
}
|
@ -77,15 +77,9 @@ func (fs *Driver) Run(conf *util.Config) {
|
||||
srv.CS = controller.NewControllerServer(cd)
|
||||
}
|
||||
|
||||
server.Start(conf.Endpoint, conf.HistogramOption, srv, conf.EnableGRPCMetrics)
|
||||
if conf.EnableGRPCMetrics {
|
||||
log.WarningLogMsg("EnableGRPCMetrics is deprecated")
|
||||
go util.StartMetricsServer(conf)
|
||||
}
|
||||
server.Start(conf.Endpoint, srv)
|
||||
if conf.EnableProfiling {
|
||||
if !conf.EnableGRPCMetrics {
|
||||
go util.StartMetricsServer(conf)
|
||||
}
|
||||
log.DebugLogMsg("Registering profiling handler")
|
||||
go util.EnableProfiling()
|
||||
}
|
||||
|
@ -177,11 +177,7 @@ func (r *Driver) Run(conf *util.Config) {
|
||||
CS: r.cs,
|
||||
NS: r.ns,
|
||||
}
|
||||
s.Start(conf.Endpoint, conf.HistogramOption, srv, conf.EnableGRPCMetrics)
|
||||
if conf.EnableGRPCMetrics {
|
||||
log.WarningLogMsg("EnableGRPCMetrics is deprecated")
|
||||
go util.StartMetricsServer(conf)
|
||||
}
|
||||
s.Start(conf.Endpoint, srv)
|
||||
|
||||
r.startProfiling(conf)
|
||||
|
||||
@ -241,9 +237,7 @@ func (r *Driver) setupCSIAddonsServer(conf *util.Config) error {
|
||||
// starts the required profiling services.
|
||||
func (r *Driver) startProfiling(conf *util.Config) {
|
||||
if conf.EnableProfiling {
|
||||
if !conf.EnableGRPCMetrics {
|
||||
go util.StartMetricsServer(conf)
|
||||
}
|
||||
log.DebugLogMsg("Registering profiling handler")
|
||||
go util.EnableProfiling()
|
||||
}
|
||||
|
@ -48,12 +48,6 @@ const (
|
||||
// with cryptsetup before updating the state to `rbdImageEncrypted`.
|
||||
rbdImageEncryptionPrepared = rbdEncryptionState("encryptionPrepared")
|
||||
|
||||
// rbdImageRequiresEncryption has been deprecated, it is used only for
|
||||
// volumes that have been created with an old provisioner, were never
|
||||
// attached/mounted and now get staged by a new node-plugin
|
||||
// TODO: remove this backwards compatibility support.
|
||||
rbdImageRequiresEncryption = rbdEncryptionState("requiresEncryption")
|
||||
|
||||
// image metadata key for encryption.
|
||||
encryptionMetaKey = "rbd.csi.ceph.com/encrypted"
|
||||
oldEncryptionMetaKey = ".rbd.csi.ceph.com/encrypted"
|
||||
|
@ -1234,23 +1234,6 @@ func (ns *NodeServer) processEncryptedDevice(
|
||||
}
|
||||
|
||||
switch {
|
||||
case encrypted == rbdImageRequiresEncryption:
|
||||
// If we get here, it means the image was created with a
|
||||
// ceph-csi version that creates a passphrase for the encrypted
|
||||
// device in NodeStage. New versions moved that to
|
||||
// CreateVolume.
|
||||
// Use the same setupEncryption() as CreateVolume does, and
|
||||
// continue with the common process to crypt-format the device.
|
||||
err = volOptions.setupBlockEncryption(ctx)
|
||||
if err != nil {
|
||||
log.ErrorLog(ctx, "failed to setup encryption for rbd"+
|
||||
"image %s: %v", imageSpec, err)
|
||||
|
||||
return "", err
|
||||
}
|
||||
|
||||
// make sure we continue with the encrypting of the device
|
||||
fallthrough
|
||||
case encrypted == rbdImageEncryptionPrepared:
|
||||
diskMounter := &mount.SafeFormatAndMount{Interface: ns.Mounter, Exec: utilexec.New()}
|
||||
// TODO: update this when adding support for static (pre-provisioned) PVs
|
||||
|
@ -40,8 +40,6 @@ const (
|
||||
type ClusterInfo struct {
|
||||
// ClusterID is used for unique identification
|
||||
ClusterID string `json:"clusterID"`
|
||||
// RadosNamespace is a rados namespace in the pool
|
||||
RadosNamespace string `json:"radosNamespace"` // For backward compatibility. TODO: Remove this in 3.7.0
|
||||
// Monitors is monitor list for corresponding cluster ID
|
||||
Monitors []string `json:"monitors"`
|
||||
// CephFS contains CephFS specific options
|
||||
@ -130,13 +128,9 @@ func GetRadosNamespace(pathToConfig, clusterID string) (string, error) {
|
||||
return "", err
|
||||
}
|
||||
|
||||
if cluster.RBD.RadosNamespace != "" {
|
||||
return cluster.RBD.RadosNamespace, nil
|
||||
}
|
||||
|
||||
return cluster.RadosNamespace, nil
|
||||
}
|
||||
|
||||
// CephFSSubvolumeGroup returns the subvolumeGroup for CephFS volumes. If not set, it returns the default value "csi".
|
||||
func CephFSSubvolumeGroup(pathToConfig, clusterID string) (string, error) {
|
||||
cluster, err := readClusterInfo(pathToConfig, clusterID)
|
||||
|
@ -93,37 +93,20 @@ type Config struct {
|
||||
PluginPath string // location of cephcsi plugin
|
||||
StagingPath string // location of cephcsi staging path
|
||||
DomainLabels string // list of domain labels to read from the node
|
||||
|
||||
// metrics related flags
|
||||
MetricsPath string // path of prometheus endpoint where metrics will be available
|
||||
HistogramOption string // Histogram option for grpc metrics, should be comma separated value,
|
||||
// ex:= "0.5,2,6" where start=0.5 factor=2, count=6
|
||||
MetricsIP string // TCP port for liveness/ metrics requests
|
||||
|
||||
// CSI-Addons endpoint
|
||||
CSIAddonsEndpoint string
|
||||
|
||||
// Cluster name
|
||||
ClusterName string
|
||||
|
||||
// mount option related flags
|
||||
KernelMountOptions string // Comma separated string of mount options accepted by cephfs kernel mounter
|
||||
FuseMountOptions string // Comma separated string of mount options accepted by ceph-fuse mounter
|
||||
|
||||
PidLimit int // PID limit to configure through cgroups")
|
||||
MetricsPort int // TCP port for liveness/grpc metrics requests
|
||||
PollTime time.Duration // time interval in seconds between each poll
|
||||
PoolTimeout time.Duration // probe timeout in seconds
|
||||
EnableGRPCMetrics bool // option to enable grpc metrics
|
||||
|
||||
EnableProfiling bool // flag to enable profiling
|
||||
IsControllerServer bool // if set to true start provisioner server
|
||||
IsNodeServer bool // if set to true start node server
|
||||
Version bool // cephcsi version
|
||||
|
||||
// SkipForceFlatten is set to false if the kernel supports mounting of
|
||||
// rbd image or the image chain has the deep-flatten feature.
|
||||
SkipForceFlatten bool
|
||||
|
||||
// cephfs related flags
|
||||
ForceKernelCephFS bool // force to use the ceph kernel client even if the kernel is < 4.17
|
||||
|
||||
SetMetadata bool // set metadata on the volume
|
||||
|
||||
// RbdHardMaxCloneDepth is the hard limit for maximum number of nested volume clones that are taken before a flatten
|
||||
// occurs
|
||||
RbdHardMaxCloneDepth uint
|
||||
@ -142,11 +125,24 @@ type Config struct {
|
||||
// reached cephcsi will start flattening the older rbd images.
|
||||
MinSnapshotsOnImage uint
|
||||
|
||||
// CSI-Addons endpoint
|
||||
CSIAddonsEndpoint string
|
||||
PidLimit int // PID limit to configure through cgroups")
|
||||
MetricsPort int // TCP port for liveness/grpc metrics requests
|
||||
PollTime time.Duration // time interval in seconds between each poll
|
||||
PoolTimeout time.Duration // probe timeout in seconds
|
||||
|
||||
// Cluster name
|
||||
ClusterName string
|
||||
EnableProfiling bool // flag to enable profiling
|
||||
IsControllerServer bool // if set to true start provisioner server
|
||||
IsNodeServer bool // if set to true start node server
|
||||
Version bool // cephcsi version
|
||||
|
||||
// SkipForceFlatten is set to false if the kernel supports mounting of
|
||||
// rbd image or the image chain has the deep-flatten feature.
|
||||
SkipForceFlatten bool
|
||||
|
||||
// cephfs related flags
|
||||
ForceKernelCephFS bool // force to use the ceph kernel client even if the kernel is < 4.17
|
||||
|
||||
SetMetadata bool // set metadata on the volume
|
||||
|
||||
// Read affinity related options
|
||||
EnableReadAffinity bool // enable OSD read affinity.
|
||||
|
@ -16,11 +16,11 @@ baseimg=$(awk -F = '/^BASE_IMAGE=/ {print $NF}' "${build_env}")
|
||||
# get image digest per architecture
|
||||
# {
|
||||
# "arch": "amd64",
|
||||
# "digest": "sha256:XXX"
|
||||
# "digest": "sha256:XYZ"
|
||||
# }
|
||||
# {
|
||||
# "arch": "arm64",
|
||||
# "digest": "sha256:YYY"
|
||||
# "digest": "sha256:ZYX"
|
||||
# }
|
||||
manifests=$(docker manifest inspect "${baseimg}" | jq '.manifests[] | {arch: .platform.architecture, digest: .digest}')
|
||||
# qemu-user-static is to enable an execution of different multi-architecture containers by QEMU
|
||||
|
Loading…
Reference in New Issue
Block a user