diff --git a/docs/metrics.md b/docs/metrics.md index 8f19c8d2b..c4f45b0a5 100644 --- a/docs/metrics.md +++ b/docs/metrics.md @@ -22,8 +22,8 @@ curl -X GET http://10.109.65.142:8080/metrics 2>/dev/null | grep csi csi_liveness 1 ``` -Promethues can be deployed through the promethues operator described [here](https://coreos.com/operators/prometheus/docs/latest/user-guides/getting-started.html). -The [service-monitor](../deploy/service-monitor.yaml) will tell promethues how +Prometheus can be deployed through the prometheus operator described [here](https://coreos.com/operators/prometheus/docs/latest/user-guides/getting-started.html). +The [service-monitor](../deploy/service-monitor.yaml) will tell prometheus how to pull metrics out of CSI. Each CSI pod has a service to expose the endpoint to prometheus. By default, rbd diff --git a/examples/rbd/storageclass.yaml b/examples/rbd/storageclass.yaml index 601a6696a..5d2fc5eba 100644 --- a/examples/rbd/storageclass.yaml +++ b/examples/rbd/storageclass.yaml @@ -125,7 +125,7 @@ parameters: # "file": Enable file encryption on the mounted filesystem # "block": Encrypt RBD block device # When unspecified assume type "block". "file" and "block" are - # mutally exclusive. + # mutually exclusive. # encryptionType: "block" # (optional) Use external key management system for encryption passphrases by diff --git a/internal/cephfs/nodeserver.go b/internal/cephfs/nodeserver.go index ede292b31..81757cce0 100644 --- a/internal/cephfs/nodeserver.go +++ b/internal/cephfs/nodeserver.go @@ -450,6 +450,13 @@ func (ns *NodeServer) NodePublishVolume( targetPath := req.GetTargetPath() volID := fsutil.VolumeID(req.GetVolumeId()) + if acquired := ns.VolumeLocks.TryAcquire(targetPath); !acquired { + log.ErrorLog(ctx, util.TargetPathOperationAlreadyExistsFmt, targetPath) + + return nil, status.Errorf(codes.Aborted, util.TargetPathOperationAlreadyExistsFmt, targetPath) + } + defer ns.VolumeLocks.Release(targetPath) + volOptions := &store.VolumeOptions{} defer volOptions.Destroy() @@ -462,9 +469,6 @@ func (ns *NodeServer) NodePublishVolume( return nil, status.Errorf(codes.Internal, "failed to create mounter for volume %s: %v", volID, err.Error()) } - // Considering kubelet make sure the stage and publish operations - // are serialized, we dont need any extra locking in nodePublish - if err = util.CreateMountPoint(targetPath); err != nil { log.ErrorLog(ctx, "failed to create mount point at %s: %v", targetPath, err) @@ -555,12 +559,17 @@ func (ns *NodeServer) NodeUnpublishVolume( return nil, err } - // considering kubelet make sure node operations like unpublish/unstage...etc can not be called - // at same time, an explicit locking at time of nodeunpublish is not required. targetPath := req.GetTargetPath() + volID := req.GetVolumeId() + if acquired := ns.VolumeLocks.TryAcquire(targetPath); !acquired { + log.ErrorLog(ctx, util.TargetPathOperationAlreadyExistsFmt, targetPath) + + return nil, status.Errorf(codes.Aborted, util.TargetPathOperationAlreadyExistsFmt, targetPath) + } + defer ns.VolumeLocks.Release(targetPath) // stop the health-checker that may have been started in NodeGetVolumeStats() - ns.healthChecker.StopChecker(req.GetVolumeId(), targetPath) + ns.healthChecker.StopChecker(volID, targetPath) isMnt, err := util.IsMountPoint(ns.Mounter, targetPath) if err != nil { @@ -583,7 +592,7 @@ func (ns *NodeServer) NodeUnpublishVolume( isMnt = true } if !isMnt { - if err = os.RemoveAll(targetPath); err != nil { + if err = os.Remove(targetPath); err != nil { return nil, status.Error(codes.Internal, err.Error()) } diff --git a/internal/cephfs/store/volumeoptions.go b/internal/cephfs/store/volumeoptions.go index 8a91b44b2..8c3feadee 100644 --- a/internal/cephfs/store/volumeoptions.go +++ b/internal/cephfs/store/volumeoptions.go @@ -151,8 +151,8 @@ func validateMounter(m string) error { return nil } -func (v *VolumeOptions) DetectMounter(options map[string]string) error { - return extractMounter(&v.Mounter, options) +func (vo *VolumeOptions) DetectMounter(options map[string]string) error { + return extractMounter(&vo.Mounter, options) } func extractMounter(dest *string, options map[string]string) error { diff --git a/internal/health-checker/checker.go b/internal/health-checker/checker.go index 5eef779b5..14322b2d3 100644 --- a/internal/health-checker/checker.go +++ b/internal/health-checker/checker.go @@ -37,7 +37,7 @@ type checker struct { // timeout contains the delay (interval + timeout) timeout time.Duration - // mutex protects against concurrent access to healty, err and + // mutex protects against concurrent access to healthy, err and // lastUpdate mutex *sync.RWMutex diff --git a/internal/rbd/nodeserver.go b/internal/rbd/nodeserver.go index b1170b905..21b5c9c1d 100644 --- a/internal/rbd/nodeserver.go +++ b/internal/rbd/nodeserver.go @@ -709,8 +709,12 @@ func (ns *NodeServer) NodePublishVolume( volID := req.GetVolumeId() stagingPath += "/" + volID - // Considering kubelet make sure the stage and publish operations - // are serialized, we dont need any extra locking in nodePublish + if acquired := ns.VolumeLocks.TryAcquire(targetPath); !acquired { + log.ErrorLog(ctx, util.TargetPathOperationAlreadyExistsFmt, targetPath) + + return nil, status.Errorf(codes.Aborted, util.TargetPathOperationAlreadyExistsFmt, targetPath) + } + defer ns.VolumeLocks.Release(targetPath) // Check if that target path exists properly notMnt, err := ns.createTargetMountPath(ctx, targetPath, isBlock) @@ -913,8 +917,14 @@ func (ns *NodeServer) NodeUnpublishVolume( } targetPath := req.GetTargetPath() - // considering kubelet make sure node operations like unpublish/unstage...etc can not be called - // at same time, an explicit locking at time of nodeunpublish is not required. + + if acquired := ns.VolumeLocks.TryAcquire(targetPath); !acquired { + log.ErrorLog(ctx, util.TargetPathOperationAlreadyExistsFmt, targetPath) + + return nil, status.Errorf(codes.Aborted, util.TargetPathOperationAlreadyExistsFmt, targetPath) + } + defer ns.VolumeLocks.Release(targetPath) + isMnt, err := ns.Mounter.IsMountPoint(targetPath) if err != nil { if os.IsNotExist(err) { @@ -927,7 +937,7 @@ func (ns *NodeServer) NodeUnpublishVolume( return nil, status.Error(codes.NotFound, err.Error()) } if !isMnt { - if err = os.RemoveAll(targetPath); err != nil { + if err = os.Remove(targetPath); err != nil { return nil, status.Error(codes.Internal, err.Error()) } @@ -938,7 +948,7 @@ func (ns *NodeServer) NodeUnpublishVolume( return nil, status.Error(codes.Internal, err.Error()) } - if err = os.RemoveAll(targetPath); err != nil { + if err = os.Remove(targetPath); err != nil { return nil, status.Error(codes.Internal, err.Error()) } diff --git a/internal/util/crushlocation_test.go b/internal/util/crushlocation_test.go index 141f64d78..a761b63bc 100644 --- a/internal/util/crushlocation_test.go +++ b/internal/util/crushlocation_test.go @@ -60,7 +60,7 @@ func Test_getCrushLocationMap(t *testing.T) { want: map[string]string{"zone": "zone1"}, }, { - name: "multuple matching crushlocation and node labels", + name: "multiple matching crushlocation and node labels", args: input{ crushLocationLabels: "topology.io/zone,topology.io/rack", nodeLabels: map[string]string{ diff --git a/internal/util/idlocker.go b/internal/util/idlocker.go index 92733c19c..211081a13 100644 --- a/internal/util/idlocker.go +++ b/internal/util/idlocker.go @@ -28,6 +28,9 @@ const ( // SnapshotOperationAlreadyExistsFmt string format to return for concurrent operation. SnapshotOperationAlreadyExistsFmt = "an operation with the given Snapshot ID %s already exists" + + // TargetPathOperationAlreadyExistsFmt string format to return for concurrent operation on target path. + TargetPathOperationAlreadyExistsFmt = "an operation with the given target path %s already exists" ) // VolumeLocks implements a map with atomic operations. It stores a set of all volume IDs