mirror of
https://github.com/ceph/ceph-csi.git
synced 2024-11-09 16:00:22 +00:00
journal: Add additional operation based locking
As we are adding new functionalities like Create/Delete snapshot,Clone from Snapshot and Clone from Volume. with the current implementation, there are only serial operations allowed for this functionalities, for some function we can allow parallel operations like Clone from snapshot and Clone from Volume and Create `N` snapshots on a single volume. Delete Volume: Need to ensure that there is no clone, Snapshot create and Expand volume in progress. Expand Volume: Need to ensure that there is no clone, snapshot create and cloning in progress Delete Snapshot: Need to ensure that there is no cloning in progress Restore Volume/Snapshot: Need to ensure that there is no Expand or delete operation in progress. Signed-off-by: Madhu Rajanna <madhupr007@gmail.com>
This commit is contained in:
parent
71ddf51544
commit
d6348545ab
@ -14,9 +14,11 @@ limitations under the License.
|
||||
package util
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"sync"
|
||||
|
||||
"k8s.io/apimachinery/pkg/util/sets"
|
||||
klog "k8s.io/klog/v2"
|
||||
)
|
||||
|
||||
const (
|
||||
@ -59,3 +61,199 @@ func (vl *VolumeLocks) Release(volumeID string) {
|
||||
defer vl.mux.Unlock()
|
||||
vl.locks.Delete(volumeID)
|
||||
}
|
||||
|
||||
type operation string
|
||||
|
||||
const (
|
||||
createOp operation = "create"
|
||||
deleteOp operation = "delete"
|
||||
cloneOpt operation = "clone"
|
||||
restoreOp operation = "restore"
|
||||
expandOp operation = "expand"
|
||||
)
|
||||
|
||||
// OperationLock implements a map with atomic operations.
|
||||
type OperationLock struct {
|
||||
// lock is a map of map, internal key is the list of id and its counters
|
||||
// and the outer map key is the operation type it will be one of the above
|
||||
// const
|
||||
//
|
||||
// example map[restore][xxx-xxx-xxx-xxx]1
|
||||
// map[restore][xxx-xxx-xxx-xxx]2
|
||||
// the counter value will be increased for allowed parallel operations and
|
||||
// it will be decreased when the operation is completed, when the counter
|
||||
// value goes to zero the `xxx-xxx-xxx` key will be removed from the
|
||||
// operation map.
|
||||
locks map[operation]map[string]int
|
||||
// lock to avoid concurrent operation on map
|
||||
mux sync.Mutex
|
||||
}
|
||||
|
||||
// NewOperationLock returns new OperationLock.
|
||||
func NewOperationLock() *OperationLock {
|
||||
lock := make(map[operation]map[string]int)
|
||||
lock[createOp] = make(map[string]int)
|
||||
lock[deleteOp] = make(map[string]int)
|
||||
lock[cloneOpt] = make(map[string]int)
|
||||
lock[restoreOp] = make(map[string]int)
|
||||
lock[expandOp] = make(map[string]int)
|
||||
return &OperationLock{
|
||||
locks: lock,
|
||||
}
|
||||
}
|
||||
|
||||
// tryAcquire tries to acquire the lock for operating on volumeID and returns true if successful.
|
||||
// If another operation is already using volumeID, returns false.
|
||||
func (ol *OperationLock) tryAcquire(op operation, volumeID string) error {
|
||||
ol.mux.Lock()
|
||||
defer ol.mux.Unlock()
|
||||
switch op {
|
||||
case createOp:
|
||||
// During snapshot create operation there should not be any delete
|
||||
// operation going on for the volume.
|
||||
|
||||
// check any delete operation is going on for given volume ID
|
||||
if _, ok := ol.locks[deleteOp][volumeID]; ok {
|
||||
return fmt.Errorf("a Delete operation with given id %s already exists", volumeID)
|
||||
}
|
||||
// increment the counter for snapshot create operation
|
||||
val := ol.locks[createOp][volumeID]
|
||||
ol.locks[createOp][volumeID] = val + 1
|
||||
case cloneOpt:
|
||||
// During clone operation we need to check any ongoing delete,expand
|
||||
// operation. if yes we need to return an error to avoid issues.
|
||||
|
||||
// check any delete operation is going on for given volume ID
|
||||
if _, ok := ol.locks[deleteOp][volumeID]; ok {
|
||||
return fmt.Errorf("a Delete operation with given id %s already exists", volumeID)
|
||||
}
|
||||
|
||||
// check any expand operation is going on for given volume ID
|
||||
if _, ok := ol.locks[expandOp][volumeID]; ok {
|
||||
return fmt.Errorf("an Expand operation with given id %s already exists", volumeID)
|
||||
}
|
||||
// increment the counter for clone operation
|
||||
val := ol.locks[cloneOpt][volumeID]
|
||||
ol.locks[cloneOpt][volumeID] = val + 1
|
||||
case deleteOp:
|
||||
// / During delete operation the volume should not be under expand,
|
||||
// clone or snapshot operation.
|
||||
// check any expand operation is going on for given volume ID
|
||||
if _, ok := ol.locks[expandOp][volumeID]; ok {
|
||||
return fmt.Errorf("an Expand operation with given id %s already exists", volumeID)
|
||||
}
|
||||
// check any clone operation is going on for given volume ID
|
||||
if _, ok := ol.locks[cloneOpt][volumeID]; ok {
|
||||
return fmt.Errorf("a Clone operation with given id %s already exists", volumeID)
|
||||
}
|
||||
// check any delete operation is going on for given volume ID
|
||||
if _, ok := ol.locks[createOp][volumeID]; ok {
|
||||
return fmt.Errorf("a Create operation with given id %s already exists", volumeID)
|
||||
}
|
||||
// check any restore operation is going on for given volume ID
|
||||
if _, ok := ol.locks[restoreOp][volumeID]; ok {
|
||||
return fmt.Errorf("a Restore operation with given id %s already exists", volumeID)
|
||||
}
|
||||
ol.locks[deleteOp][volumeID] = 1
|
||||
case restoreOp:
|
||||
// During restore operation the volume should not be deleted
|
||||
// check any delete operation is going on for given volume ID
|
||||
if _, ok := ol.locks[deleteOp][volumeID]; ok {
|
||||
return fmt.Errorf("a Delete operation with given id %s already exists", volumeID)
|
||||
}
|
||||
// increment the counter for restore operation
|
||||
val := ol.locks[restoreOp][volumeID]
|
||||
ol.locks[restoreOp][volumeID] = val + 1
|
||||
case expandOp:
|
||||
// During expand operation the volume should not be deleted or cloned
|
||||
// and there should not be a create operation also.
|
||||
// check any delete operation is going on for given volume ID
|
||||
if _, ok := ol.locks[deleteOp][volumeID]; ok {
|
||||
return fmt.Errorf("a Delete operation with given id %s already exists", volumeID)
|
||||
}
|
||||
// check any clone operation is going on for given volume ID
|
||||
if _, ok := ol.locks[cloneOpt][volumeID]; ok {
|
||||
return fmt.Errorf("a Clone operation with given id %s already exists", volumeID)
|
||||
}
|
||||
// check any delete operation is going on for given volume ID
|
||||
if _, ok := ol.locks[createOp][volumeID]; ok {
|
||||
return fmt.Errorf("a Create operation with given id %s already exists", volumeID)
|
||||
}
|
||||
|
||||
ol.locks[expandOp][volumeID] = 1
|
||||
default:
|
||||
return fmt.Errorf("%v operation not supported", op)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// GetSnapshotCreateLock gets the snapshot lock on given volumeID.
|
||||
func (ol *OperationLock) GetSnapshotCreateLock(volumeID string) error {
|
||||
return ol.tryAcquire(createOp, volumeID)
|
||||
}
|
||||
|
||||
// GetCloneLock gets the clone lock on given volumeID.
|
||||
func (ol *OperationLock) GetCloneLock(volumeID string) error {
|
||||
return ol.tryAcquire(cloneOpt, volumeID)
|
||||
}
|
||||
|
||||
// GetDeleteLock gets the delete lock on given volumeID,ensures that there is
|
||||
// no clone,restore and expand operation on given volumeID.
|
||||
func (ol *OperationLock) GetDeleteLock(volumeID string) error {
|
||||
return ol.tryAcquire(deleteOp, volumeID)
|
||||
}
|
||||
|
||||
// GetRestoreLock gets the restore lock on given volumeID,ensures that there is
|
||||
// no delete operation on given volumeID.
|
||||
func (ol *OperationLock) GetRestoreLock(volumeID string) error {
|
||||
return ol.tryAcquire(restoreOp, volumeID)
|
||||
}
|
||||
|
||||
// GetExpandLock gets the expand lock on given volumeID,ensures that there is
|
||||
// no delete and clone operation on given volumeID.
|
||||
func (ol *OperationLock) GetExpandLock(volumeID string) error {
|
||||
return ol.tryAcquire(expandOp, volumeID)
|
||||
}
|
||||
|
||||
// ReleaseSnapshotCreateLock releases the create lock on given volumeID.
|
||||
func (ol *OperationLock) ReleaseSnapshotCreateLock(volumeID string) {
|
||||
ol.release(createOp, volumeID)
|
||||
}
|
||||
|
||||
// ReleaseCloneLock releases the clone lock on given volumeID.
|
||||
func (ol *OperationLock) ReleaseCloneLock(volumeID string) {
|
||||
ol.release(cloneOpt, volumeID)
|
||||
}
|
||||
|
||||
// ReleaseDeleteLock releases the delete lock on given volumeID.
|
||||
func (ol *OperationLock) ReleaseDeleteLock(volumeID string) {
|
||||
ol.release(deleteOp, volumeID)
|
||||
}
|
||||
|
||||
// ReleaseRestoreLock releases the restore lock on given volumeID.
|
||||
func (ol *OperationLock) ReleaseRestoreLock(volumeID string) {
|
||||
ol.release(restoreOp, volumeID)
|
||||
}
|
||||
|
||||
// ReleaseExpandLock releases the expand lock on given volumeID.
|
||||
func (ol *OperationLock) ReleaseExpandLock(volumeID string) {
|
||||
ol.release(expandOp, volumeID)
|
||||
}
|
||||
|
||||
// release deletes the lock on volumeID.
|
||||
func (ol *OperationLock) release(op operation, volumeID string) {
|
||||
ol.mux.Lock()
|
||||
defer ol.mux.Unlock()
|
||||
switch op {
|
||||
case cloneOpt, createOp, expandOp, restoreOp, deleteOp:
|
||||
if val, ok := ol.locks[op][volumeID]; ok {
|
||||
// decrement the counter for operation
|
||||
ol.locks[op][volumeID] = val - 1
|
||||
if ol.locks[op][volumeID] == 0 {
|
||||
delete(ol.locks[op], volumeID)
|
||||
}
|
||||
}
|
||||
default:
|
||||
klog.Errorf("%v operation not supported", op)
|
||||
}
|
||||
}
|
||||
|
@ -50,3 +50,84 @@ func TestIDLocker(t *testing.T) {
|
||||
true, ok)
|
||||
}
|
||||
}
|
||||
|
||||
func TestOperationLocks(t *testing.T) {
|
||||
volumeID := "test-vol"
|
||||
lock := NewOperationLock()
|
||||
err := lock.GetCloneLock(volumeID)
|
||||
|
||||
if err != nil {
|
||||
t.Errorf("failed to acquire clone lock for %s %s", volumeID, err)
|
||||
}
|
||||
err = lock.GetDeleteLock(volumeID)
|
||||
if err == nil {
|
||||
t.Errorf("expected to fail for GetDeleteLock for %s", volumeID)
|
||||
}
|
||||
|
||||
err = lock.GetExpandLock(volumeID)
|
||||
if err == nil {
|
||||
t.Errorf("expected to fail for GetExpandLock for %s", volumeID)
|
||||
}
|
||||
lock.ReleaseCloneLock(volumeID)
|
||||
|
||||
// Get multiple clone operation
|
||||
err = lock.GetCloneLock(volumeID)
|
||||
if err != nil {
|
||||
t.Errorf("failed to acquire clone lock for %s %s", volumeID, err)
|
||||
}
|
||||
err = lock.GetCloneLock(volumeID)
|
||||
if err != nil {
|
||||
t.Errorf("failed to acquire clone lock for %s %s", volumeID, err)
|
||||
}
|
||||
err = lock.GetCloneLock(volumeID)
|
||||
if err != nil {
|
||||
t.Errorf("failed to acquire clone lock for %s %s", volumeID, err)
|
||||
}
|
||||
// release all clone locks
|
||||
lock.ReleaseCloneLock(volumeID)
|
||||
lock.ReleaseCloneLock(volumeID)
|
||||
lock.ReleaseCloneLock(volumeID)
|
||||
|
||||
// release extra lock it should not cause any issue as the key is already
|
||||
// deleted from the map
|
||||
lock.ReleaseCloneLock(volumeID)
|
||||
|
||||
// get multiple restore lock
|
||||
err = lock.GetRestoreLock(volumeID)
|
||||
if err != nil {
|
||||
t.Errorf("failed to acquire restore lock for %s %s", volumeID, err)
|
||||
}
|
||||
err = lock.GetRestoreLock(volumeID)
|
||||
if err != nil {
|
||||
t.Errorf("failed to acquire restore lock for %s %s", volumeID, err)
|
||||
}
|
||||
err = lock.GetRestoreLock(volumeID)
|
||||
if err != nil {
|
||||
t.Errorf("failed to acquire restore lock for %s %s", volumeID, err)
|
||||
}
|
||||
// try for snapshot delete lock
|
||||
err = lock.GetDeleteLock(volumeID)
|
||||
if err == nil {
|
||||
t.Errorf("expected to fail for GetDeleteLock for %s", volumeID)
|
||||
}
|
||||
// release all restore locks
|
||||
lock.ReleaseRestoreLock(volumeID)
|
||||
lock.ReleaseRestoreLock(volumeID)
|
||||
lock.ReleaseRestoreLock(volumeID)
|
||||
|
||||
err = lock.GetSnapshotCreateLock(volumeID)
|
||||
if err != nil {
|
||||
t.Errorf("failed to acquire createSnapshot lock for %s %s", volumeID, err)
|
||||
}
|
||||
err = lock.GetDeleteLock(volumeID)
|
||||
if err == nil {
|
||||
t.Errorf("expected to fail for GetDeleteLock for %s", volumeID)
|
||||
}
|
||||
lock.ReleaseSnapshotCreateLock(volumeID)
|
||||
|
||||
err = lock.GetDeleteLock(volumeID)
|
||||
if err != nil {
|
||||
t.Errorf("failed to get GetDeleteLock for %s %v", volumeID, err)
|
||||
}
|
||||
lock.ReleaseDeleteLock(volumeID)
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user