cephfs: implement DeleteVolumeGroupSnapshot RPC

implemented DeleteVolumeGroupSnapshot RPC which
does below operations

* Basic request validation
* Get the snapshotId's and volumeId's
mapping reserved for the UUID
* Delete snapshot and remove its mapping
from the omap
* Repeat above steps until all the mapping
are removed
* Remove the reserved uuid from the omap
* Reset the filesystem quiesce, This might be
required as cephfs doesnt provide any options to
remove the quiesce, if we get any request with same
ID again we can reuse the quiesce API for same set-id
* Return success if the received error is
Pool not found or key not found.

Signed-off-by: Madhu Rajanna <madhupr007@gmail.com>
This commit is contained in:
Madhu Rajanna 2024-02-14 13:52:58 +01:00 committed by mergify[bot]
parent df770e4139
commit 0f724480f5

View File

@ -671,3 +671,109 @@ func (cs *ControllerServer) deleteSnapshotsAndUndoReservation(ctx context.Contex
return nil return nil
} }
// validateVolumeGroupSnapshotDeleteRequest validates the request for creating a group
// snapshot of volumes.
func (cs *ControllerServer) validateVolumeGroupSnapshotDeleteRequest(
ctx context.Context,
req *csi.DeleteVolumeGroupSnapshotRequest,
) error {
if err := cs.Driver.ValidateGroupControllerServiceRequest(
csi.GroupControllerServiceCapability_RPC_CREATE_DELETE_GET_VOLUME_GROUP_SNAPSHOT); err != nil {
log.ErrorLog(ctx, "invalid create volume group snapshot req: %v", protosanitizer.StripSecrets(req))
return err
}
// Check sanity of request volume group snapshot Name, Source Volume Id's
if req.GetGroupSnapshotId() == "" {
return status.Error(codes.InvalidArgument, "volume group snapshot id cannot be empty")
}
return nil
}
// DeleteVolumeGroupSnapshot deletes a group snapshot of volumes.
func (cs *ControllerServer) DeleteVolumeGroupSnapshot(ctx context.Context,
req *csi.DeleteVolumeGroupSnapshotRequest) (
*csi.DeleteVolumeGroupSnapshotResponse,
error,
) {
if err := cs.validateVolumeGroupSnapshotDeleteRequest(ctx, req); err != nil {
return nil, err
}
groupSnapshotID := req.GroupSnapshotId
// Existence and conflict checks
if acquired := cs.VolumeGroupLocks.TryAcquire(groupSnapshotID); !acquired {
log.ErrorLog(ctx, util.VolumeOperationAlreadyExistsFmt, groupSnapshotID)
return nil, status.Errorf(codes.Aborted, util.VolumeOperationAlreadyExistsFmt, groupSnapshotID)
}
defer cs.VolumeGroupLocks.Release(groupSnapshotID)
cr, err := util.NewAdminCredentials(req.GetSecrets())
if err != nil {
return nil, status.Error(codes.Internal, err.Error())
}
defer cr.DeleteCredentials()
vgo, vgsi, err := store.NewVolumeGroupOptionsFromID(ctx, req.GroupSnapshotId, cr)
if err != nil {
log.ErrorLog(ctx, "failed to get volume group options: %v", err)
err = extractDeleteVolumeGroupError(err)
if err != nil {
return nil, status.Error(codes.Internal, err.Error())
}
return &csi.DeleteVolumeGroupSnapshotResponse{}, nil
}
vgo.Destroy()
volIds := vgsi.GetVolumeIDs()
fsMap, err := getFsNamesAndSubVolumeFromVolumeIDs(ctx, req.GetSecrets(), volIds, cr)
err = extractDeleteVolumeGroupError(err)
if err != nil {
log.ErrorLog(ctx, "failed to get volume group options: %v", err)
err = extractDeleteVolumeGroupError(err)
if err != nil {
return nil, status.Error(codes.Internal, err.Error())
}
return &csi.DeleteVolumeGroupSnapshotResponse{}, nil
}
defer destroyFSConnections(fsMap)
err = cs.deleteSnapshotsAndUndoReservation(ctx, vgsi, cr, fsMap, req.GetSecrets())
if err != nil {
log.ErrorLog(ctx, "failed to delete snapshot and undo reservation: %v", err)
err = extractDeleteVolumeGroupError(err)
if err != nil {
return nil, status.Error(codes.Internal, err.Error())
}
return &csi.DeleteVolumeGroupSnapshotResponse{}, nil
}
return &csi.DeleteVolumeGroupSnapshotResponse{}, nil
}
// extractDeleteVolumeGroupError extracts the error from the delete volume
// group snapshot and returns the error if it is not a ErrKeyNotFound or
// ErrPoolNotFound error.
func extractDeleteVolumeGroupError(err error) error {
switch {
case errors.Is(err, util.ErrPoolNotFound):
// if error is ErrPoolNotFound, the pool is already deleted we dont
// need to worry about deleting snapshot or omap data, return success
return nil
case errors.Is(err, util.ErrKeyNotFound):
// if error is ErrKeyNotFound, then a previous attempt at deletion was complete
// or partially complete (snap and snapOMap are garbage collected already), hence return
// success as deletion is complete
return nil
}
return err
}