mirror of
https://github.com/ceph/ceph-csi.git
synced 2025-01-26 22:59:29 +00:00
cephfs: add modfify volume support
add ControllerModifyVolume RPC support for cephfs Signed-off-by: Nikhil-Ladha <nikhilladha1999@gmail.com>
This commit is contained in:
parent
a32ba13045
commit
2dff79fab5
@ -746,6 +746,67 @@ func (cs *ControllerServer) ControllerExpandVolume(
|
||||
}, nil
|
||||
}
|
||||
|
||||
// ControllerExpandVolume expands CephFS Volumes on demand based on resizer request.
|
||||
func (cs *ControllerServer) ControllerModifyVolume(
|
||||
ctx context.Context,
|
||||
req *csi.ControllerModifyVolumeRequest,
|
||||
) (*csi.ControllerModifyVolumeResponse, error) {
|
||||
if err := cs.validateModifyVolumeRequest(req); err != nil {
|
||||
log.ErrorLog(ctx, "ControllerModifyVolumeRequest validation failed: %v", err)
|
||||
|
||||
return nil, err
|
||||
}
|
||||
|
||||
volID := req.GetVolumeId()
|
||||
secret := req.GetSecrets()
|
||||
params := req.GetMutableParameters()
|
||||
|
||||
// lock out parallel delete operations
|
||||
if acquired := cs.VolumeLocks.TryAcquire(volID); !acquired {
|
||||
log.ErrorLog(ctx, util.VolumeOperationAlreadyExistsFmt, volID)
|
||||
|
||||
return nil, status.Errorf(codes.Aborted, util.VolumeOperationAlreadyExistsFmt, volID)
|
||||
}
|
||||
defer cs.VolumeLocks.Release(volID)
|
||||
|
||||
// lock out volumeID for clone, delete, expand and restore operation
|
||||
if err := cs.OperationLocks.GetModifyLock(volID); err != nil {
|
||||
log.ErrorLog(ctx, err.Error())
|
||||
|
||||
return nil, status.Error(codes.Aborted, err.Error())
|
||||
}
|
||||
defer cs.OperationLocks.ReleaseModifyLock(volID)
|
||||
|
||||
cr, err := util.NewAdminCredentials(secret)
|
||||
if err != nil {
|
||||
return nil, status.Error(codes.InvalidArgument, err.Error())
|
||||
}
|
||||
defer cr.DeleteCredentials()
|
||||
|
||||
volOptions, volIdentifier, err := store.NewVolumeOptionsFromVolID(ctx, volID, params, secret,
|
||||
cs.ClusterName, cs.SetMetadata)
|
||||
if err != nil {
|
||||
log.ErrorLog(ctx, "validation and extraction of volume options failed: %v", err)
|
||||
|
||||
return nil, status.Error(codes.InvalidArgument, err.Error())
|
||||
}
|
||||
defer volOptions.Destroy()
|
||||
|
||||
if volOptions.BackingSnapshot {
|
||||
return nil, status.Error(codes.InvalidArgument, "cannot modify snapshot-backed volume")
|
||||
}
|
||||
|
||||
volClient := core.NewSubVolume(volOptions.GetConnection(),
|
||||
&volOptions.SubVolume, volOptions.ClusterID, cs.ClusterName, cs.SetMetadata)
|
||||
if err = volClient.CreateVolume(ctx); err != nil {
|
||||
log.ErrorLog(ctx, "failed to modify volume %s: %v", fsutil.VolumeID(volIdentifier.FsSubvolName), err)
|
||||
|
||||
return nil, status.Error(codes.Internal, err.Error())
|
||||
}
|
||||
|
||||
return &csi.ControllerModifyVolumeResponse{}, nil
|
||||
}
|
||||
|
||||
// CreateSnapshot creates the snapshot in backend and stores metadata
|
||||
// in store
|
||||
//
|
||||
|
@ -139,6 +139,7 @@ func (fs *Driver) Run(conf *util.Config) {
|
||||
csi.ControllerServiceCapability_RPC_EXPAND_VOLUME,
|
||||
csi.ControllerServiceCapability_RPC_CLONE_VOLUME,
|
||||
csi.ControllerServiceCapability_RPC_SINGLE_NODE_MULTI_WRITER,
|
||||
csi.ControllerServiceCapability_RPC_MODIFY_VOLUME,
|
||||
})
|
||||
|
||||
fs.cd.AddVolumeCapabilityAccessModes([]csi.VolumeCapability_AccessMode_Mode{
|
||||
|
@ -221,7 +221,7 @@ func fmtBackingSnapshotOptionMismatch(optName, expected, actual string) error {
|
||||
optName, actual, expected)
|
||||
}
|
||||
|
||||
// getVolumeOptions validates the basic required basic options provided in the
|
||||
// getVolumeOptions validates the required basic options provided in the
|
||||
// volume parameters and extract the volumeOptions from volume parameters.
|
||||
// It contains the following checks:
|
||||
// - clusterID must be set
|
||||
|
@ -111,3 +111,21 @@ func (cs *ControllerServer) validateExpandVolumeRequest(req *csi.ControllerExpan
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// validateModifyVolumeRequest validates the Controller ModifyVolume request.
|
||||
func (cs *ControllerServer) validateModifyVolumeRequest(req *csi.ControllerModifyVolumeRequest) error {
|
||||
if err := cs.Driver.ValidateControllerServiceRequest(csi.ControllerServiceCapability_RPC_MODIFY_VOLUME); err != nil {
|
||||
return fmt.Errorf("invalid ModifyVolumeRequest: %w", err)
|
||||
}
|
||||
|
||||
if req.GetVolumeId() == "" {
|
||||
return status.Error(codes.InvalidArgument, "Volume ID cannot be empty")
|
||||
}
|
||||
|
||||
mutableParam := req.GetMutableParameters()
|
||||
if mutableParam == nil {
|
||||
return status.Error(codes.InvalidArgument, "MutableParameters cannot be empty")
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
@ -75,6 +75,7 @@ const (
|
||||
cloneOpt operation = "clone"
|
||||
restoreOp operation = "restore"
|
||||
expandOp operation = "expand"
|
||||
modifyOp operation = "modify"
|
||||
)
|
||||
|
||||
// OperationLock implements a map with atomic operations.
|
||||
@ -102,6 +103,7 @@ func NewOperationLock() *OperationLock {
|
||||
lock[cloneOpt] = make(map[string]int)
|
||||
lock[restoreOp] = make(map[string]int)
|
||||
lock[expandOp] = make(map[string]int)
|
||||
lock[modifyOp] = make(map[string]int)
|
||||
|
||||
return &OperationLock{
|
||||
locks: lock,
|
||||
@ -165,12 +167,37 @@ func (ol *OperationLock) tryAcquire(op operation, volumeID string) error {
|
||||
if _, ok := ol.locks[cloneOpt][volumeID]; ok {
|
||||
return fmt.Errorf("a Clone operation with given id %s already exists", volumeID)
|
||||
}
|
||||
// check any delete operation is going on for given volume ID
|
||||
// check any create operation is going on for given volume ID
|
||||
if _, ok := ol.locks[createOp][volumeID]; ok {
|
||||
return fmt.Errorf("a Create operation with given id %s already exists", volumeID)
|
||||
}
|
||||
|
||||
ol.locks[expandOp][volumeID] = 1
|
||||
case modifyOp:
|
||||
// During modify operation the volume should not be deleted, cloned,
|
||||
// resized and restored and there should not be a create operation also.
|
||||
// check any delete operation is going on for given volume ID
|
||||
if _, ok := ol.locks[deleteOp][volumeID]; ok {
|
||||
return fmt.Errorf("a Delete operation with given id %s already exists", volumeID)
|
||||
}
|
||||
// check any clone operation is going on for given volume ID
|
||||
if _, ok := ol.locks[cloneOpt][volumeID]; ok {
|
||||
return fmt.Errorf("a Clone operation with given id %s already exists", volumeID)
|
||||
}
|
||||
// check any expand operation is going on for given volume ID
|
||||
if _, ok := ol.locks[expandOp][volumeID]; ok {
|
||||
return fmt.Errorf("a Expand operation with given id %s already exists", volumeID)
|
||||
}
|
||||
// check any restore operation is going on for given volume ID
|
||||
if _, ok := ol.locks[restoreOp][volumeID]; ok {
|
||||
return fmt.Errorf("a Restore operation with given id %s already exists", volumeID)
|
||||
}
|
||||
// check any create operation is going on for given volume ID
|
||||
if _, ok := ol.locks[createOp][volumeID]; ok {
|
||||
return fmt.Errorf("a Expand operation with given id %s already exists", volumeID)
|
||||
}
|
||||
|
||||
ol.locks[modifyOp][volumeID] = 1
|
||||
default:
|
||||
return fmt.Errorf("%v operation not supported", op)
|
||||
}
|
||||
@ -206,6 +233,12 @@ func (ol *OperationLock) GetExpandLock(volumeID string) error {
|
||||
return ol.tryAcquire(expandOp, volumeID)
|
||||
}
|
||||
|
||||
// GetModifyLock gets the modify lock on given volumeID,ensures that there is
|
||||
// no create, delete, clone, expand, and restore operation on given volumeID.
|
||||
func (ol *OperationLock) GetModifyLock(volumeID string) error {
|
||||
return ol.tryAcquire(modifyOp, volumeID)
|
||||
}
|
||||
|
||||
// ReleaseSnapshotCreateLock releases the create lock on given volumeID.
|
||||
func (ol *OperationLock) ReleaseSnapshotCreateLock(volumeID string) {
|
||||
ol.release(createOp, volumeID)
|
||||
@ -231,12 +264,17 @@ func (ol *OperationLock) ReleaseExpandLock(volumeID string) {
|
||||
ol.release(expandOp, volumeID)
|
||||
}
|
||||
|
||||
// ReleaseModifyLock releases the modify lock on given volumeID.
|
||||
func (ol *OperationLock) ReleaseModifyLock(volumeID string) {
|
||||
ol.release(modifyOp, volumeID)
|
||||
}
|
||||
|
||||
// release deletes the lock on volumeID.
|
||||
func (ol *OperationLock) release(op operation, volumeID string) {
|
||||
ol.mux.Lock()
|
||||
defer ol.mux.Unlock()
|
||||
switch op {
|
||||
case cloneOpt, createOp, expandOp, restoreOp, deleteOp:
|
||||
case cloneOpt, createOp, expandOp, restoreOp, deleteOp, modifyOp:
|
||||
if val, ok := ol.locks[op][volumeID]; ok {
|
||||
// decrement the counter for operation
|
||||
ol.locks[op][volumeID] = val - 1
|
||||
|
Loading…
Reference in New Issue
Block a user