From bc92611d1b85043981b7a1c21be19def7d377e12 Mon Sep 17 00:00:00 2001 From: Madhu Rajanna Date: Wed, 20 Nov 2024 10:12:59 +0100 Subject: [PATCH] cephfs: take lock on targetpath on node operation We should not be dependent on the CO to ensure that it will serialize the request instead of that we need to have own internal locks to ensure that we dont do concurrent operations for same request. Signed-off-by: Madhu Rajanna (cherry picked from commit 38b0a4cbadfb20e818e76cc0d3b603274513ec8e) --- internal/cephfs/nodeserver.go | 35 ++++++++++++++++++++++++++++++----- 1 file changed, 30 insertions(+), 5 deletions(-) diff --git a/internal/cephfs/nodeserver.go b/internal/cephfs/nodeserver.go index 7287cde76..eec82a8bf 100644 --- a/internal/cephfs/nodeserver.go +++ b/internal/cephfs/nodeserver.go @@ -452,10 +452,26 @@ func (ns *NodeServer) NodePublishVolume( targetPath := req.GetTargetPath() volID := fsutil.VolumeID(req.GetVolumeId()) - // Considering kubelet make sure the stage and publish operations - // are serialized, we dont need any extra locking in nodePublish + if acquired := ns.VolumeLocks.TryAcquire(targetPath); !acquired { + log.ErrorLog(ctx, util.TargetPathOperationAlreadyExistsFmt, targetPath) - if err := util.CreateMountPoint(targetPath); err != nil { + return nil, status.Errorf(codes.Aborted, util.TargetPathOperationAlreadyExistsFmt, targetPath) + } + defer ns.VolumeLocks.Release(targetPath) + + volOptions := &store.VolumeOptions{} + defer volOptions.Destroy() + + if err := volOptions.DetectMounter(req.GetVolumeContext()); err != nil { + return nil, status.Errorf(codes.Internal, "failed to detect mounter for volume %s: %v", volID, err.Error()) + } + + volMounter, err := mounter.New(volOptions) + if err != nil { + return nil, status.Errorf(codes.Internal, "failed to create mounter for volume %s: %v", volID, err.Error()) + } + + if err = util.CreateMountPoint(targetPath); err != nil { log.ErrorLog(ctx, "failed to create mount point at %s: %v", targetPath, err) return nil, status.Error(codes.Internal, err.Error()) @@ -542,9 +558,18 @@ func (ns *NodeServer) NodeUnpublishVolume( return nil, err } - // considering kubelet make sure node operations like unpublish/unstage...etc can not be called - // at same time, an explicit locking at time of nodeunpublish is not required. targetPath := req.GetTargetPath() + volID := req.GetVolumeId() + if acquired := ns.VolumeLocks.TryAcquire(targetPath); !acquired { + log.ErrorLog(ctx, util.TargetPathOperationAlreadyExistsFmt, targetPath) + + return nil, status.Errorf(codes.Aborted, util.TargetPathOperationAlreadyExistsFmt, targetPath) + } + defer ns.VolumeLocks.Release(targetPath) + + // stop the health-checker that may have been started in NodeGetVolumeStats() + ns.healthChecker.StopChecker(volID, targetPath) + isMnt, err := util.IsMountPoint(ns.Mounter, targetPath) if err != nil { log.ErrorLog(ctx, "stat failed: %v", err)