cephfs: take lock on targetpath on node operation

We should not be dependent on the CO to ensure
that it will serialize the request instead of
that we need to have own internal locks to ensure
that we dont do concurrent operations for same
request.

Signed-off-by: Madhu Rajanna <madhupr007@gmail.com>
(cherry picked from commit 38b0a4cbad)
This commit is contained in:
Madhu Rajanna 2024-11-20 10:12:59 +01:00 committed by yati1998
parent 6bad9bcafc
commit bc92611d1b

View File

@ -452,10 +452,26 @@ func (ns *NodeServer) NodePublishVolume(
targetPath := req.GetTargetPath() targetPath := req.GetTargetPath()
volID := fsutil.VolumeID(req.GetVolumeId()) volID := fsutil.VolumeID(req.GetVolumeId())
// Considering kubelet make sure the stage and publish operations if acquired := ns.VolumeLocks.TryAcquire(targetPath); !acquired {
// are serialized, we dont need any extra locking in nodePublish log.ErrorLog(ctx, util.TargetPathOperationAlreadyExistsFmt, targetPath)
if err := util.CreateMountPoint(targetPath); err != nil { return nil, status.Errorf(codes.Aborted, util.TargetPathOperationAlreadyExistsFmt, targetPath)
}
defer ns.VolumeLocks.Release(targetPath)
volOptions := &store.VolumeOptions{}
defer volOptions.Destroy()
if err := volOptions.DetectMounter(req.GetVolumeContext()); err != nil {
return nil, status.Errorf(codes.Internal, "failed to detect mounter for volume %s: %v", volID, err.Error())
}
volMounter, err := mounter.New(volOptions)
if err != nil {
return nil, status.Errorf(codes.Internal, "failed to create mounter for volume %s: %v", volID, err.Error())
}
if err = util.CreateMountPoint(targetPath); err != nil {
log.ErrorLog(ctx, "failed to create mount point at %s: %v", targetPath, err) log.ErrorLog(ctx, "failed to create mount point at %s: %v", targetPath, err)
return nil, status.Error(codes.Internal, err.Error()) return nil, status.Error(codes.Internal, err.Error())
@ -542,9 +558,18 @@ func (ns *NodeServer) NodeUnpublishVolume(
return nil, err return nil, err
} }
// considering kubelet make sure node operations like unpublish/unstage...etc can not be called
// at same time, an explicit locking at time of nodeunpublish is not required.
targetPath := req.GetTargetPath() targetPath := req.GetTargetPath()
volID := req.GetVolumeId()
if acquired := ns.VolumeLocks.TryAcquire(targetPath); !acquired {
log.ErrorLog(ctx, util.TargetPathOperationAlreadyExistsFmt, targetPath)
return nil, status.Errorf(codes.Aborted, util.TargetPathOperationAlreadyExistsFmt, targetPath)
}
defer ns.VolumeLocks.Release(targetPath)
// stop the health-checker that may have been started in NodeGetVolumeStats()
ns.healthChecker.StopChecker(volID, targetPath)
isMnt, err := util.IsMountPoint(ns.Mounter, targetPath) isMnt, err := util.IsMountPoint(ns.Mounter, targetPath)
if err != nil { if err != nil {
log.ErrorLog(ctx, "stat failed: %v", err) log.ErrorLog(ctx, "stat failed: %v", err)