diff --git a/internal/cephfs/nodeserver.go b/internal/cephfs/nodeserver.go index 44c068601..a97cc0919 100644 --- a/internal/cephfs/nodeserver.go +++ b/internal/cephfs/nodeserver.go @@ -494,6 +494,13 @@ func (ns *NodeServer) NodePublishVolume( targetPath := req.GetTargetPath() volID := fsutil.VolumeID(req.GetVolumeId()) + if acquired := ns.VolumeLocks.TryAcquire(targetPath); !acquired { + log.ErrorLog(ctx, util.TargetPathOperationAlreadyExistsFmt, targetPath) + + return nil, status.Errorf(codes.Aborted, util.TargetPathOperationAlreadyExistsFmt, targetPath) + } + defer ns.VolumeLocks.Release(targetPath) + volOptions := &store.VolumeOptions{} defer volOptions.Destroy() @@ -506,9 +513,6 @@ func (ns *NodeServer) NodePublishVolume( return nil, status.Errorf(codes.Internal, "failed to create mounter for volume %s: %v", volID, err.Error()) } - // Considering kubelet make sure the stage and publish operations - // are serialized, we dont need any extra locking in nodePublish - if err = util.CreateMountPoint(targetPath); err != nil { log.ErrorLog(ctx, "failed to create mount point at %s: %v", targetPath, err) @@ -599,12 +603,17 @@ func (ns *NodeServer) NodeUnpublishVolume( return nil, err } - // considering kubelet make sure node operations like unpublish/unstage...etc can not be called - // at same time, an explicit locking at time of nodeunpublish is not required. targetPath := req.GetTargetPath() + volID := req.GetVolumeId() + if acquired := ns.VolumeLocks.TryAcquire(targetPath); !acquired { + log.ErrorLog(ctx, util.TargetPathOperationAlreadyExistsFmt, targetPath) + + return nil, status.Errorf(codes.Aborted, util.TargetPathOperationAlreadyExistsFmt, targetPath) + } + defer ns.VolumeLocks.Release(targetPath) // stop the health-checker that may have been started in NodeGetVolumeStats() - ns.healthChecker.StopChecker(req.GetVolumeId(), targetPath) + ns.healthChecker.StopChecker(volID, targetPath) isMnt, err := util.IsMountPoint(ns.Mounter, targetPath) if err != nil {