mirror of
https://github.com/ceph/ceph-csi.git
synced 2024-11-17 20:00:23 +00:00
rbd: take operation locks before operating on resource
Take operation locks on the resources before operating on the resouces. This allows us to do parallel operations for some RPC calls such as Clone and Restore of PVC. This operations will only be blocked if the image is expanding or Snapshot and RBD image is getting deleted. Signed-off-by: Madhu Rajanna <madhupr007@gmail.com>
This commit is contained in:
parent
d6348545ab
commit
b3a4f510e6
@ -47,6 +47,9 @@ type ControllerServer struct {
|
||||
// A map storing all volumes with ongoing operations so that additional operations
|
||||
// for that same snapshot (as defined by SnapshotID/snapshot name) return an Aborted error
|
||||
SnapshotLocks *util.VolumeLocks
|
||||
|
||||
// A map storing all volumes/snapshots with ongoing operations.
|
||||
OperationLocks *util.OperationLock
|
||||
}
|
||||
|
||||
func (cs *ControllerServer) validateVolumeReq(ctx context.Context, req *csi.CreateVolumeRequest) error {
|
||||
@ -421,17 +424,23 @@ func (cs *ControllerServer) createBackingImage(ctx context.Context, cr *util.Cre
|
||||
|
||||
// nolint:gocritic // this ifElseChain can not be rewritten to a switch statement
|
||||
if rbdSnap != nil {
|
||||
if err = cs.OperationLocks.GetRestoreLock(rbdSnap.SnapID); err != nil {
|
||||
klog.Error(util.Log(ctx, err.Error()))
|
||||
return status.Error(codes.Aborted, err.Error())
|
||||
}
|
||||
defer cs.OperationLocks.ReleaseRestoreLock(rbdSnap.SnapID)
|
||||
|
||||
err = cs.createVolumeFromSnapshot(ctx, cr, rbdVol, rbdSnap.SnapID)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
util.DebugLog(ctx, "created volume %s from snapshot %s", rbdVol.RequestName, rbdSnap.RbdSnapName)
|
||||
} else if parentVol != nil {
|
||||
if acquired := cs.VolumeLocks.TryAcquire(parentVol.VolID); !acquired {
|
||||
klog.Infof(util.Log(ctx, util.VolumeOperationAlreadyExistsFmt), parentVol.VolID)
|
||||
return status.Errorf(codes.Aborted, util.VolumeOperationAlreadyExistsFmt, parentVol.VolID)
|
||||
if err = cs.OperationLocks.GetCloneLock(parentVol.VolID); err != nil {
|
||||
klog.Error(util.Log(ctx, err.Error()))
|
||||
return status.Error(codes.Aborted, err.Error())
|
||||
}
|
||||
defer cs.VolumeLocks.Release(parentVol.VolID)
|
||||
defer cs.OperationLocks.ReleaseCloneLock(parentVol.VolID)
|
||||
return rbdVol.createCloneFromImage(ctx, parentVol)
|
||||
} else {
|
||||
err = createImage(ctx, rbdVol, cr)
|
||||
@ -560,6 +569,13 @@ func (cs *ControllerServer) DeleteVolume(ctx context.Context, req *csi.DeleteVol
|
||||
}
|
||||
defer cs.VolumeLocks.Release(volumeID)
|
||||
|
||||
// lock out volumeID for clone and expand operation
|
||||
if err = cs.OperationLocks.GetDeleteLock(volumeID); err != nil {
|
||||
klog.Error(util.Log(ctx, err.Error()))
|
||||
return nil, status.Error(codes.Aborted, err.Error())
|
||||
}
|
||||
defer cs.OperationLocks.ReleaseDeleteLock(volumeID)
|
||||
|
||||
rbdVol := &rbdVolume{}
|
||||
defer rbdVol.Destroy()
|
||||
|
||||
@ -739,11 +755,11 @@ func (cs *ControllerServer) CreateSnapshot(ctx context.Context, req *csi.CreateS
|
||||
defer cs.SnapshotLocks.Release(req.GetName())
|
||||
|
||||
// Take lock on parent rbd image
|
||||
if acquired := cs.VolumeLocks.TryAcquire(rbdSnap.SourceVolumeID); !acquired {
|
||||
klog.Errorf(util.Log(ctx, util.VolumeOperationAlreadyExistsFmt), rbdSnap.SourceVolumeID)
|
||||
return nil, status.Errorf(codes.Aborted, util.VolumeOperationAlreadyExistsFmt, rbdSnap.SourceVolumeID)
|
||||
if err = cs.OperationLocks.GetSnapshotCreateLock(rbdSnap.SourceVolumeID); err != nil {
|
||||
klog.Error(util.Log(ctx, err.Error()))
|
||||
return nil, status.Error(codes.Aborted, err.Error())
|
||||
}
|
||||
defer cs.VolumeLocks.Release(rbdSnap.SourceVolumeID)
|
||||
defer cs.OperationLocks.ReleaseSnapshotCreateLock(rbdSnap.SourceVolumeID)
|
||||
|
||||
// Need to check for already existing snapshot name, and if found
|
||||
// check for the requested source volume id and already allocated source volume id
|
||||
@ -956,6 +972,13 @@ func (cs *ControllerServer) DeleteSnapshot(ctx context.Context, req *csi.DeleteS
|
||||
}
|
||||
defer cs.SnapshotLocks.Release(snapshotID)
|
||||
|
||||
// lock out snapshotID for restore operation
|
||||
if err = cs.OperationLocks.GetDeleteLock(snapshotID); err != nil {
|
||||
klog.Error(util.Log(ctx, err.Error()))
|
||||
return nil, status.Error(codes.Aborted, err.Error())
|
||||
}
|
||||
defer cs.OperationLocks.ReleaseDeleteLock(snapshotID)
|
||||
|
||||
rbdSnap := &rbdSnapshot{}
|
||||
if err = genSnapFromSnapID(ctx, rbdSnap, snapshotID, cr); err != nil {
|
||||
// if error is ErrPoolNotFound, the pool is already deleted we dont
|
||||
@ -1047,6 +1070,13 @@ func (cs *ControllerServer) ControllerExpandVolume(ctx context.Context, req *csi
|
||||
}
|
||||
defer cs.VolumeLocks.Release(volID)
|
||||
|
||||
// lock out volumeID for clone and delete operation
|
||||
if err := cs.OperationLocks.GetExpandLock(volID); err != nil {
|
||||
klog.Error(util.Log(ctx, err.Error()))
|
||||
return nil, status.Error(codes.Aborted, err.Error())
|
||||
}
|
||||
defer cs.OperationLocks.ReleaseExpandLock(volID)
|
||||
|
||||
cr, err := util.NewUserCredentials(req.GetSecrets())
|
||||
if err != nil {
|
||||
return nil, status.Error(codes.Internal, err.Error())
|
||||
|
@ -80,6 +80,7 @@ func NewControllerServer(d *csicommon.CSIDriver) *ControllerServer {
|
||||
DefaultControllerServer: csicommon.NewDefaultControllerServer(d),
|
||||
VolumeLocks: util.NewVolumeLocks(),
|
||||
SnapshotLocks: util.NewVolumeLocks(),
|
||||
OperationLocks: util.NewOperationLock(),
|
||||
}
|
||||
}
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user