diff --git a/internal/cephfs/nodeserver.go b/internal/cephfs/nodeserver.go index 7287cde76..eec82a8bf 100644 --- a/internal/cephfs/nodeserver.go +++ b/internal/cephfs/nodeserver.go @@ -452,10 +452,26 @@ func (ns *NodeServer) NodePublishVolume( targetPath := req.GetTargetPath() volID := fsutil.VolumeID(req.GetVolumeId()) - // Considering kubelet make sure the stage and publish operations - // are serialized, we dont need any extra locking in nodePublish + if acquired := ns.VolumeLocks.TryAcquire(targetPath); !acquired { + log.ErrorLog(ctx, util.TargetPathOperationAlreadyExistsFmt, targetPath) - if err := util.CreateMountPoint(targetPath); err != nil { + return nil, status.Errorf(codes.Aborted, util.TargetPathOperationAlreadyExistsFmt, targetPath) + } + defer ns.VolumeLocks.Release(targetPath) + + volOptions := &store.VolumeOptions{} + defer volOptions.Destroy() + + if err := volOptions.DetectMounter(req.GetVolumeContext()); err != nil { + return nil, status.Errorf(codes.Internal, "failed to detect mounter for volume %s: %v", volID, err.Error()) + } + + volMounter, err := mounter.New(volOptions) + if err != nil { + return nil, status.Errorf(codes.Internal, "failed to create mounter for volume %s: %v", volID, err.Error()) + } + + if err = util.CreateMountPoint(targetPath); err != nil { log.ErrorLog(ctx, "failed to create mount point at %s: %v", targetPath, err) return nil, status.Error(codes.Internal, err.Error()) @@ -542,9 +558,18 @@ func (ns *NodeServer) NodeUnpublishVolume( return nil, err } - // considering kubelet make sure node operations like unpublish/unstage...etc can not be called - // at same time, an explicit locking at time of nodeunpublish is not required. targetPath := req.GetTargetPath() + volID := req.GetVolumeId() + if acquired := ns.VolumeLocks.TryAcquire(targetPath); !acquired { + log.ErrorLog(ctx, util.TargetPathOperationAlreadyExistsFmt, targetPath) + + return nil, status.Errorf(codes.Aborted, util.TargetPathOperationAlreadyExistsFmt, targetPath) + } + defer ns.VolumeLocks.Release(targetPath) + + // stop the health-checker that may have been started in NodeGetVolumeStats() + ns.healthChecker.StopChecker(volID, targetPath) + isMnt, err := util.IsMountPoint(ns.Mounter, targetPath) if err != nil { log.ErrorLog(ctx, "stat failed: %v", err)