diff --git a/internal/rbd/controllerserver.go b/internal/rbd/controllerserver.go index d5637b76d..d59e7a93c 100644 --- a/internal/rbd/controllerserver.go +++ b/internal/rbd/controllerserver.go @@ -47,6 +47,9 @@ type ControllerServer struct { // A map storing all volumes with ongoing operations so that additional operations // for that same snapshot (as defined by SnapshotID/snapshot name) return an Aborted error SnapshotLocks *util.VolumeLocks + + // A map storing all volumes/snapshots with ongoing operations. + OperationLocks *util.OperationLock } func (cs *ControllerServer) validateVolumeReq(ctx context.Context, req *csi.CreateVolumeRequest) error { @@ -421,17 +424,23 @@ func (cs *ControllerServer) createBackingImage(ctx context.Context, cr *util.Cre // nolint:gocritic // this ifElseChain can not be rewritten to a switch statement if rbdSnap != nil { + if err = cs.OperationLocks.GetRestoreLock(rbdSnap.SnapID); err != nil { + klog.Error(util.Log(ctx, err.Error())) + return status.Error(codes.Aborted, err.Error()) + } + defer cs.OperationLocks.ReleaseRestoreLock(rbdSnap.SnapID) + err = cs.createVolumeFromSnapshot(ctx, cr, rbdVol, rbdSnap.SnapID) if err != nil { return err } util.DebugLog(ctx, "created volume %s from snapshot %s", rbdVol.RequestName, rbdSnap.RbdSnapName) } else if parentVol != nil { - if acquired := cs.VolumeLocks.TryAcquire(parentVol.VolID); !acquired { - klog.Infof(util.Log(ctx, util.VolumeOperationAlreadyExistsFmt), parentVol.VolID) - return status.Errorf(codes.Aborted, util.VolumeOperationAlreadyExistsFmt, parentVol.VolID) + if err = cs.OperationLocks.GetCloneLock(parentVol.VolID); err != nil { + klog.Error(util.Log(ctx, err.Error())) + return status.Error(codes.Aborted, err.Error()) } - defer cs.VolumeLocks.Release(parentVol.VolID) + defer cs.OperationLocks.ReleaseCloneLock(parentVol.VolID) return rbdVol.createCloneFromImage(ctx, parentVol) } else { err = createImage(ctx, rbdVol, cr) @@ -560,6 +569,13 @@ func (cs *ControllerServer) DeleteVolume(ctx context.Context, req *csi.DeleteVol } defer cs.VolumeLocks.Release(volumeID) + // lock out volumeID for clone and expand operation + if err = cs.OperationLocks.GetDeleteLock(volumeID); err != nil { + klog.Error(util.Log(ctx, err.Error())) + return nil, status.Error(codes.Aborted, err.Error()) + } + defer cs.OperationLocks.ReleaseDeleteLock(volumeID) + rbdVol := &rbdVolume{} defer rbdVol.Destroy() @@ -739,11 +755,11 @@ func (cs *ControllerServer) CreateSnapshot(ctx context.Context, req *csi.CreateS defer cs.SnapshotLocks.Release(req.GetName()) // Take lock on parent rbd image - if acquired := cs.VolumeLocks.TryAcquire(rbdSnap.SourceVolumeID); !acquired { - klog.Errorf(util.Log(ctx, util.VolumeOperationAlreadyExistsFmt), rbdSnap.SourceVolumeID) - return nil, status.Errorf(codes.Aborted, util.VolumeOperationAlreadyExistsFmt, rbdSnap.SourceVolumeID) + if err = cs.OperationLocks.GetSnapshotCreateLock(rbdSnap.SourceVolumeID); err != nil { + klog.Error(util.Log(ctx, err.Error())) + return nil, status.Error(codes.Aborted, err.Error()) } - defer cs.VolumeLocks.Release(rbdSnap.SourceVolumeID) + defer cs.OperationLocks.ReleaseSnapshotCreateLock(rbdSnap.SourceVolumeID) // Need to check for already existing snapshot name, and if found // check for the requested source volume id and already allocated source volume id @@ -956,6 +972,13 @@ func (cs *ControllerServer) DeleteSnapshot(ctx context.Context, req *csi.DeleteS } defer cs.SnapshotLocks.Release(snapshotID) + // lock out snapshotID for restore operation + if err = cs.OperationLocks.GetDeleteLock(snapshotID); err != nil { + klog.Error(util.Log(ctx, err.Error())) + return nil, status.Error(codes.Aborted, err.Error()) + } + defer cs.OperationLocks.ReleaseDeleteLock(snapshotID) + rbdSnap := &rbdSnapshot{} if err = genSnapFromSnapID(ctx, rbdSnap, snapshotID, cr); err != nil { // if error is ErrPoolNotFound, the pool is already deleted we dont @@ -1047,6 +1070,13 @@ func (cs *ControllerServer) ControllerExpandVolume(ctx context.Context, req *csi } defer cs.VolumeLocks.Release(volID) + // lock out volumeID for clone and delete operation + if err := cs.OperationLocks.GetExpandLock(volID); err != nil { + klog.Error(util.Log(ctx, err.Error())) + return nil, status.Error(codes.Aborted, err.Error()) + } + defer cs.OperationLocks.ReleaseExpandLock(volID) + cr, err := util.NewUserCredentials(req.GetSecrets()) if err != nil { return nil, status.Error(codes.Internal, err.Error()) diff --git a/internal/rbd/driver.go b/internal/rbd/driver.go index faf6aeb99..0767f7acc 100644 --- a/internal/rbd/driver.go +++ b/internal/rbd/driver.go @@ -80,6 +80,7 @@ func NewControllerServer(d *csicommon.CSIDriver) *ControllerServer { DefaultControllerServer: csicommon.NewDefaultControllerServer(d), VolumeLocks: util.NewVolumeLocks(), SnapshotLocks: util.NewVolumeLocks(), + OperationLocks: util.NewOperationLock(), } }