diff --git a/internal/rbd/nodeserver.go b/internal/rbd/nodeserver.go index b1170b905..d8fb91094 100644 --- a/internal/rbd/nodeserver.go +++ b/internal/rbd/nodeserver.go @@ -709,8 +709,12 @@ func (ns *NodeServer) NodePublishVolume( volID := req.GetVolumeId() stagingPath += "/" + volID - // Considering kubelet make sure the stage and publish operations - // are serialized, we dont need any extra locking in nodePublish + if acquired := ns.VolumeLocks.TryAcquire(targetPath); !acquired { + log.ErrorLog(ctx, util.TargetPathOperationAlreadyExistsFmt, targetPath) + + return nil, status.Errorf(codes.Aborted, util.TargetPathOperationAlreadyExistsFmt, targetPath) + } + defer ns.VolumeLocks.Release(targetPath) // Check if that target path exists properly notMnt, err := ns.createTargetMountPath(ctx, targetPath, isBlock) @@ -913,8 +917,14 @@ func (ns *NodeServer) NodeUnpublishVolume( } targetPath := req.GetTargetPath() - // considering kubelet make sure node operations like unpublish/unstage...etc can not be called - // at same time, an explicit locking at time of nodeunpublish is not required. + + if acquired := ns.VolumeLocks.TryAcquire(targetPath); !acquired { + log.ErrorLog(ctx, util.TargetPathOperationAlreadyExistsFmt, targetPath) + + return nil, status.Errorf(codes.Aborted, util.TargetPathOperationAlreadyExistsFmt, targetPath) + } + defer ns.VolumeLocks.Release(targetPath) + isMnt, err := ns.Mounter.IsMountPoint(targetPath) if err != nil { if os.IsNotExist(err) { diff --git a/internal/util/idlocker.go b/internal/util/idlocker.go index 92733c19c..211081a13 100644 --- a/internal/util/idlocker.go +++ b/internal/util/idlocker.go @@ -28,6 +28,9 @@ const ( // SnapshotOperationAlreadyExistsFmt string format to return for concurrent operation. SnapshotOperationAlreadyExistsFmt = "an operation with the given Snapshot ID %s already exists" + + // TargetPathOperationAlreadyExistsFmt string format to return for concurrent operation on target path. + TargetPathOperationAlreadyExistsFmt = "an operation with the given target path %s already exists" ) // VolumeLocks implements a map with atomic operations. It stores a set of all volume IDs