diff --git a/internal/util/idlocker.go b/internal/util/idlocker.go index af3a90874..af044c238 100644 --- a/internal/util/idlocker.go +++ b/internal/util/idlocker.go @@ -14,9 +14,11 @@ limitations under the License. package util import ( + "fmt" "sync" "k8s.io/apimachinery/pkg/util/sets" + klog "k8s.io/klog/v2" ) const ( @@ -59,3 +61,199 @@ func (vl *VolumeLocks) Release(volumeID string) { defer vl.mux.Unlock() vl.locks.Delete(volumeID) } + +type operation string + +const ( + createOp operation = "create" + deleteOp operation = "delete" + cloneOpt operation = "clone" + restoreOp operation = "restore" + expandOp operation = "expand" +) + +// OperationLock implements a map with atomic operations. +type OperationLock struct { + // lock is a map of map, internal key is the list of id and its counters + // and the outer map key is the operation type it will be one of the above + // const + // + // example map[restore][xxx-xxx-xxx-xxx]1 + // map[restore][xxx-xxx-xxx-xxx]2 + // the counter value will be increased for allowed parallel operations and + // it will be decreased when the operation is completed, when the counter + // value goes to zero the `xxx-xxx-xxx` key will be removed from the + // operation map. + locks map[operation]map[string]int + // lock to avoid concurrent operation on map + mux sync.Mutex +} + +// NewOperationLock returns new OperationLock. +func NewOperationLock() *OperationLock { + lock := make(map[operation]map[string]int) + lock[createOp] = make(map[string]int) + lock[deleteOp] = make(map[string]int) + lock[cloneOpt] = make(map[string]int) + lock[restoreOp] = make(map[string]int) + lock[expandOp] = make(map[string]int) + return &OperationLock{ + locks: lock, + } +} + +// tryAcquire tries to acquire the lock for operating on volumeID and returns true if successful. +// If another operation is already using volumeID, returns false. +func (ol *OperationLock) tryAcquire(op operation, volumeID string) error { + ol.mux.Lock() + defer ol.mux.Unlock() + switch op { + case createOp: + // During snapshot create operation there should not be any delete + // operation going on for the volume. + + // check any delete operation is going on for given volume ID + if _, ok := ol.locks[deleteOp][volumeID]; ok { + return fmt.Errorf("a Delete operation with given id %s already exists", volumeID) + } + // increment the counter for snapshot create operation + val := ol.locks[createOp][volumeID] + ol.locks[createOp][volumeID] = val + 1 + case cloneOpt: + // During clone operation we need to check any ongoing delete,expand + // operation. if yes we need to return an error to avoid issues. + + // check any delete operation is going on for given volume ID + if _, ok := ol.locks[deleteOp][volumeID]; ok { + return fmt.Errorf("a Delete operation with given id %s already exists", volumeID) + } + + // check any expand operation is going on for given volume ID + if _, ok := ol.locks[expandOp][volumeID]; ok { + return fmt.Errorf("an Expand operation with given id %s already exists", volumeID) + } + // increment the counter for clone operation + val := ol.locks[cloneOpt][volumeID] + ol.locks[cloneOpt][volumeID] = val + 1 + case deleteOp: + // / During delete operation the volume should not be under expand, + // clone or snapshot operation. + // check any expand operation is going on for given volume ID + if _, ok := ol.locks[expandOp][volumeID]; ok { + return fmt.Errorf("an Expand operation with given id %s already exists", volumeID) + } + // check any clone operation is going on for given volume ID + if _, ok := ol.locks[cloneOpt][volumeID]; ok { + return fmt.Errorf("a Clone operation with given id %s already exists", volumeID) + } + // check any delete operation is going on for given volume ID + if _, ok := ol.locks[createOp][volumeID]; ok { + return fmt.Errorf("a Create operation with given id %s already exists", volumeID) + } + // check any restore operation is going on for given volume ID + if _, ok := ol.locks[restoreOp][volumeID]; ok { + return fmt.Errorf("a Restore operation with given id %s already exists", volumeID) + } + ol.locks[deleteOp][volumeID] = 1 + case restoreOp: + // During restore operation the volume should not be deleted + // check any delete operation is going on for given volume ID + if _, ok := ol.locks[deleteOp][volumeID]; ok { + return fmt.Errorf("a Delete operation with given id %s already exists", volumeID) + } + // increment the counter for restore operation + val := ol.locks[restoreOp][volumeID] + ol.locks[restoreOp][volumeID] = val + 1 + case expandOp: + // During expand operation the volume should not be deleted or cloned + // and there should not be a create operation also. + // check any delete operation is going on for given volume ID + if _, ok := ol.locks[deleteOp][volumeID]; ok { + return fmt.Errorf("a Delete operation with given id %s already exists", volumeID) + } + // check any clone operation is going on for given volume ID + if _, ok := ol.locks[cloneOpt][volumeID]; ok { + return fmt.Errorf("a Clone operation with given id %s already exists", volumeID) + } + // check any delete operation is going on for given volume ID + if _, ok := ol.locks[createOp][volumeID]; ok { + return fmt.Errorf("a Create operation with given id %s already exists", volumeID) + } + + ol.locks[expandOp][volumeID] = 1 + default: + return fmt.Errorf("%v operation not supported", op) + } + return nil +} + +// GetSnapshotCreateLock gets the snapshot lock on given volumeID. +func (ol *OperationLock) GetSnapshotCreateLock(volumeID string) error { + return ol.tryAcquire(createOp, volumeID) +} + +// GetCloneLock gets the clone lock on given volumeID. +func (ol *OperationLock) GetCloneLock(volumeID string) error { + return ol.tryAcquire(cloneOpt, volumeID) +} + +// GetDeleteLock gets the delete lock on given volumeID,ensures that there is +// no clone,restore and expand operation on given volumeID. +func (ol *OperationLock) GetDeleteLock(volumeID string) error { + return ol.tryAcquire(deleteOp, volumeID) +} + +// GetRestoreLock gets the restore lock on given volumeID,ensures that there is +// no delete operation on given volumeID. +func (ol *OperationLock) GetRestoreLock(volumeID string) error { + return ol.tryAcquire(restoreOp, volumeID) +} + +// GetExpandLock gets the expand lock on given volumeID,ensures that there is +// no delete and clone operation on given volumeID. +func (ol *OperationLock) GetExpandLock(volumeID string) error { + return ol.tryAcquire(expandOp, volumeID) +} + +// ReleaseSnapshotCreateLock releases the create lock on given volumeID. +func (ol *OperationLock) ReleaseSnapshotCreateLock(volumeID string) { + ol.release(createOp, volumeID) +} + +// ReleaseCloneLock releases the clone lock on given volumeID. +func (ol *OperationLock) ReleaseCloneLock(volumeID string) { + ol.release(cloneOpt, volumeID) +} + +// ReleaseDeleteLock releases the delete lock on given volumeID. +func (ol *OperationLock) ReleaseDeleteLock(volumeID string) { + ol.release(deleteOp, volumeID) +} + +// ReleaseRestoreLock releases the restore lock on given volumeID. +func (ol *OperationLock) ReleaseRestoreLock(volumeID string) { + ol.release(restoreOp, volumeID) +} + +// ReleaseExpandLock releases the expand lock on given volumeID. +func (ol *OperationLock) ReleaseExpandLock(volumeID string) { + ol.release(expandOp, volumeID) +} + +// release deletes the lock on volumeID. +func (ol *OperationLock) release(op operation, volumeID string) { + ol.mux.Lock() + defer ol.mux.Unlock() + switch op { + case cloneOpt, createOp, expandOp, restoreOp, deleteOp: + if val, ok := ol.locks[op][volumeID]; ok { + // decrement the counter for operation + ol.locks[op][volumeID] = val - 1 + if ol.locks[op][volumeID] == 0 { + delete(ol.locks[op], volumeID) + } + } + default: + klog.Errorf("%v operation not supported", op) + } +} diff --git a/internal/util/idlocker_test.go b/internal/util/idlocker_test.go index 240c966e5..ba89f39d5 100644 --- a/internal/util/idlocker_test.go +++ b/internal/util/idlocker_test.go @@ -50,3 +50,84 @@ func TestIDLocker(t *testing.T) { true, ok) } } + +func TestOperationLocks(t *testing.T) { + volumeID := "test-vol" + lock := NewOperationLock() + err := lock.GetCloneLock(volumeID) + + if err != nil { + t.Errorf("failed to acquire clone lock for %s %s", volumeID, err) + } + err = lock.GetDeleteLock(volumeID) + if err == nil { + t.Errorf("expected to fail for GetDeleteLock for %s", volumeID) + } + + err = lock.GetExpandLock(volumeID) + if err == nil { + t.Errorf("expected to fail for GetExpandLock for %s", volumeID) + } + lock.ReleaseCloneLock(volumeID) + + // Get multiple clone operation + err = lock.GetCloneLock(volumeID) + if err != nil { + t.Errorf("failed to acquire clone lock for %s %s", volumeID, err) + } + err = lock.GetCloneLock(volumeID) + if err != nil { + t.Errorf("failed to acquire clone lock for %s %s", volumeID, err) + } + err = lock.GetCloneLock(volumeID) + if err != nil { + t.Errorf("failed to acquire clone lock for %s %s", volumeID, err) + } + // release all clone locks + lock.ReleaseCloneLock(volumeID) + lock.ReleaseCloneLock(volumeID) + lock.ReleaseCloneLock(volumeID) + + // release extra lock it should not cause any issue as the key is already + // deleted from the map + lock.ReleaseCloneLock(volumeID) + + // get multiple restore lock + err = lock.GetRestoreLock(volumeID) + if err != nil { + t.Errorf("failed to acquire restore lock for %s %s", volumeID, err) + } + err = lock.GetRestoreLock(volumeID) + if err != nil { + t.Errorf("failed to acquire restore lock for %s %s", volumeID, err) + } + err = lock.GetRestoreLock(volumeID) + if err != nil { + t.Errorf("failed to acquire restore lock for %s %s", volumeID, err) + } + // try for snapshot delete lock + err = lock.GetDeleteLock(volumeID) + if err == nil { + t.Errorf("expected to fail for GetDeleteLock for %s", volumeID) + } + // release all restore locks + lock.ReleaseRestoreLock(volumeID) + lock.ReleaseRestoreLock(volumeID) + lock.ReleaseRestoreLock(volumeID) + + err = lock.GetSnapshotCreateLock(volumeID) + if err != nil { + t.Errorf("failed to acquire createSnapshot lock for %s %s", volumeID, err) + } + err = lock.GetDeleteLock(volumeID) + if err == nil { + t.Errorf("expected to fail for GetDeleteLock for %s", volumeID) + } + lock.ReleaseSnapshotCreateLock(volumeID) + + err = lock.GetDeleteLock(volumeID) + if err != nil { + t.Errorf("failed to get GetDeleteLock for %s %v", volumeID, err) + } + lock.ReleaseDeleteLock(volumeID) +}