cephfs: added support for snapshot-backed volumes

This commit implements most of
docs/design/proposals/cephfs-snapshot-shallow-ro-vol.md design document;
specifically (de-)provisioning of snapshot-backed volumes, mounting such
volumes as well as mounting pre-provisioned snapshot-backed volumes.

Signed-off-by: Robert Vasek <robert.vasek@cern.ch>
This commit is contained in:
Robert Vasek
2022-04-06 15:26:07 +02:00
committed by mergify[bot]
parent 0807fd2e6c
commit fd7559a903
7 changed files with 710 additions and 107 deletions

View File

@ -29,6 +29,7 @@ import (
"github.com/ceph/ceph-csi/internal/util"
"github.com/ceph/ceph-csi/internal/util/k8s"
"github.com/ceph/ceph-csi/internal/util/log"
rterrors "github.com/ceph/ceph-csi/internal/util/reftracker/errors"
"github.com/container-storage-interface/spec/lib/go/csi"
"github.com/golang/protobuf/ptypes/timestamp"
@ -65,47 +66,77 @@ func (cs *ControllerServer) createBackingVolume(
volClient := core.NewSubVolume(volOptions.GetConnection(), &volOptions.SubVolume, volOptions.ClusterID)
if sID != nil {
if err = cs.OperationLocks.GetRestoreLock(sID.SnapshotID); err != nil {
log.ErrorLog(ctx, err.Error())
return status.Error(codes.Aborted, err.Error())
}
defer cs.OperationLocks.ReleaseRestoreLock(sID.SnapshotID)
snap := core.Snapshot{
SnapshotID: sID.FsSnapshotName,
SubVolume: &parentVolOpt.SubVolume,
}
err = volClient.CreateCloneFromSnapshot(ctx, snap)
if err != nil {
log.ErrorLog(ctx, "failed to create clone from snapshot %s: %v", sID.FsSnapshotName, err)
return err
}
return err
return cs.createBackingVolumeFromSnapshotSource(ctx, volOptions, parentVolOpt, volClient, sID)
}
if parentVolOpt != nil {
if err = cs.OperationLocks.GetCloneLock(pvID.VolumeID); err != nil {
log.ErrorLog(ctx, err.Error())
return status.Error(codes.Aborted, err.Error())
}
defer cs.OperationLocks.ReleaseCloneLock(pvID.VolumeID)
err = volClient.CreateCloneFromSubvolume(
ctx, &parentVolOpt.SubVolume)
if err != nil {
log.ErrorLog(ctx, "failed to create clone from subvolume %s: %v", fsutil.VolumeID(pvID.FsSubvolName), err)
if parentVolOpt != nil {
return cs.createBackingVolumeFromVolumeSource(ctx, parentVolOpt, volClient, pvID)
}
if err = volClient.CreateVolume(ctx); err != nil {
log.ErrorLog(ctx, "failed to create volume %s: %v", volOptions.RequestName, err)
return status.Error(codes.Internal, err.Error())
}
return nil
}
func (cs *ControllerServer) createBackingVolumeFromSnapshotSource(
ctx context.Context,
volOptions *store.VolumeOptions,
parentVolOpt *store.VolumeOptions,
volClient core.SubVolumeClient,
sID *store.SnapshotIdentifier,
) error {
if err := cs.OperationLocks.GetRestoreLock(sID.SnapshotID); err != nil {
log.ErrorLog(ctx, err.Error())
return status.Error(codes.Aborted, err.Error())
}
defer cs.OperationLocks.ReleaseRestoreLock(sID.SnapshotID)
if volOptions.BackingSnapshot {
if err := store.AddSnapshotBackedVolumeRef(ctx, volOptions); err != nil {
log.ErrorLog(ctx, "failed to create snapshot-backed volume from snapshot %s: %v",
sID.FsSnapshotName, err)
return err
}
return nil
}
if err = volClient.CreateVolume(ctx); err != nil {
log.ErrorLog(ctx, "failed to create volume %s: %v", volOptions.RequestName, err)
return status.Error(codes.Internal, err.Error())
err := volClient.CreateCloneFromSnapshot(ctx, core.Snapshot{
SnapshotID: sID.FsSnapshotName,
SubVolume: &parentVolOpt.SubVolume,
})
if err != nil {
log.ErrorLog(ctx, "failed to create clone from snapshot %s: %v", sID.FsSnapshotName, err)
return err
}
return nil
}
func (cs *ControllerServer) createBackingVolumeFromVolumeSource(
ctx context.Context,
parentVolOpt *store.VolumeOptions,
volClient core.SubVolumeClient,
pvID *store.VolumeIdentifier,
) error {
if err := cs.OperationLocks.GetCloneLock(pvID.VolumeID); err != nil {
log.ErrorLog(ctx, err.Error())
return status.Error(codes.Aborted, err.Error())
}
defer cs.OperationLocks.ReleaseCloneLock(pvID.VolumeID)
if err := volClient.CreateCloneFromSubvolume(ctx, &parentVolOpt.SubVolume); err != nil {
log.ErrorLog(ctx, "failed to create clone from subvolume %s: %v", fsutil.VolumeID(pvID.FsSubvolName), err)
return err
}
return nil
@ -158,6 +189,7 @@ func checkValidCreateVolumeRequest(
parentVol *store.VolumeOptions,
pvID *store.VolumeIdentifier,
sID *store.SnapshotIdentifier,
req *csi.CreateVolumeRequest,
) error {
switch {
case pvID != nil:
@ -168,6 +200,10 @@ func checkValidCreateVolumeRequest(
parentVol.Size,
vol.Size)
}
if vol.BackingSnapshot {
return errors.New("cloning snapshot-backed volumes is currently not supported")
}
case sID != nil:
if vol.Size < parentVol.Size {
return fmt.Errorf(
@ -176,6 +212,25 @@ func checkValidCreateVolumeRequest(
parentVol.Size,
vol.Size)
}
if vol.BackingSnapshot {
if vol.Size != parentVol.Size {
return fmt.Errorf(
"cannot create snapshot-backed volume of different size: expected %d bytes, got %d bytes",
parentVol.Size,
vol.Size,
)
}
volCaps := req.GetVolumeCapabilities()
for _, volCap := range volCaps {
mode := volCap.AccessMode.Mode
if mode != csi.VolumeCapability_AccessMode_MULTI_NODE_READER_ONLY &&
mode != csi.VolumeCapability_AccessMode_SINGLE_NODE_READER_ONLY {
return errors.New("backingSnapshot may be used only with read-only access modes")
}
}
}
}
return nil
@ -233,7 +288,7 @@ func (cs *ControllerServer) CreateVolume(
defer parentVol.Destroy()
}
err = checkValidCreateVolumeRequest(volOptions, parentVol, pvID, sID)
err = checkValidCreateVolumeRequest(volOptions, parentVol, pvID, sID, req)
if err != nil {
return nil, status.Error(codes.InvalidArgument, err.Error())
}
@ -249,7 +304,7 @@ func (cs *ControllerServer) CreateVolume(
// TODO return error message if requested vol size greater than found volume return error
if vID != nil {
if sID != nil || pvID != nil {
if sID != nil || pvID != nil && !volOptions.BackingSnapshot {
volClient := core.NewSubVolume(volOptions.GetConnection(), &volOptions.SubVolume, volOptions.ClusterID)
err = volClient.ExpandVolume(ctx, volOptions.Size)
if err != nil {
@ -321,26 +376,32 @@ func (cs *ControllerServer) CreateVolume(
return nil, err
}
volClient := core.NewSubVolume(volOptions.GetConnection(), &volOptions.SubVolume, volOptions.ClusterID)
volOptions.RootPath, err = volClient.GetVolumeRootPathCeph(ctx)
if err != nil {
purgeErr := volClient.PurgeVolume(ctx, true)
if purgeErr != nil {
log.ErrorLog(ctx, "failed to delete volume %s: %v", vID.FsSubvolName, purgeErr)
// All errors other than ErrVolumeNotFound should return an error back to the caller
if !errors.Is(purgeErr, cerrors.ErrVolumeNotFound) {
// If the subvolume deletion is failed, we should not cleanup
// the OMAP entry it will stale subvolume in cluster.
// set err=nil so that when we get the request again we can get
// the subvolume info.
err = nil
if !volOptions.BackingSnapshot {
// Get root path for the created subvolume.
// Note that root path for snapshot-backed volumes has been already set when
// building VolumeOptions.
return nil, status.Error(codes.Internal, purgeErr.Error())
volClient := core.NewSubVolume(volOptions.GetConnection(), &volOptions.SubVolume, volOptions.ClusterID)
volOptions.RootPath, err = volClient.GetVolumeRootPathCeph(ctx)
if err != nil {
purgeErr := volClient.PurgeVolume(ctx, true)
if purgeErr != nil {
log.ErrorLog(ctx, "failed to delete volume %s: %v", vID.FsSubvolName, purgeErr)
// All errors other than ErrVolumeNotFound should return an error back to the caller
if !errors.Is(purgeErr, cerrors.ErrVolumeNotFound) {
// If the subvolume deletion is failed, we should not cleanup
// the OMAP entry it will stale subvolume in cluster.
// set err=nil so that when we get the request again we can get
// the subvolume info.
err = nil
return nil, status.Error(codes.Internal, purgeErr.Error())
}
}
}
log.ErrorLog(ctx, "failed to get subvolume path %s: %v", vID.FsSubvolName, err)
log.ErrorLog(ctx, "failed to get subvolume path %s: %v", vID.FsSubvolName, err)
return nil, status.Error(codes.Internal, err.Error())
return nil, status.Error(codes.Internal, err.Error())
}
}
log.DebugLog(ctx, "cephfs: successfully created backing volume named %s for request name %s",
@ -452,16 +513,8 @@ func (cs *ControllerServer) DeleteVolume(
}
defer cr.DeleteCredentials()
volClient := core.NewSubVolume(volOptions.GetConnection(), &volOptions.SubVolume, volOptions.ClusterID)
if err = volClient.PurgeVolume(ctx, false); err != nil {
log.ErrorLog(ctx, "failed to delete volume %s: %v", volID, err)
if errors.Is(err, cerrors.ErrVolumeHasSnapshots) {
return nil, status.Error(codes.FailedPrecondition, err.Error())
}
if !errors.Is(err, cerrors.ErrVolumeNotFound) {
return nil, status.Error(codes.Internal, err.Error())
}
if err := cleanUpBackingVolume(ctx, volOptions, vID, cr); err != nil {
return nil, err
}
if err := store.UndoVolReservation(ctx, volOptions, *vID, secrets); err != nil {
@ -473,6 +526,84 @@ func (cs *ControllerServer) DeleteVolume(
return &csi.DeleteVolumeResponse{}, nil
}
func cleanUpBackingVolume(
ctx context.Context,
volOptions *store.VolumeOptions,
volID *store.VolumeIdentifier,
cr *util.Credentials,
) error {
if !volOptions.BackingSnapshot {
// Regular volumes need to be purged.
volClient := core.NewSubVolume(volOptions.GetConnection(), &volOptions.SubVolume, volOptions.ClusterID)
if err := volClient.PurgeVolume(ctx, false); err != nil {
log.ErrorLog(ctx, "failed to delete volume %s: %v", volID, err)
if errors.Is(err, cerrors.ErrVolumeHasSnapshots) {
return status.Error(codes.FailedPrecondition, err.Error())
}
if !errors.Is(err, cerrors.ErrVolumeNotFound) {
return status.Error(codes.Internal, err.Error())
}
}
return nil
}
// Snapshot-backed volumes need to un-reference the backing snapshot, and
// the snapshot itself may need to be deleted if its reftracker doesn't
// hold any references anymore.
backingSnapNeedsDelete, err := store.UnrefSnapshotBackedVolume(ctx, volOptions)
if err != nil {
if errors.Is(err, rterrors.ErrObjectOutOfDate) {
return status.Error(codes.Aborted, err.Error())
}
return status.Error(codes.Internal, err.Error())
}
if !backingSnapNeedsDelete {
return nil
}
snapParentVolOptions, _, snapID, err := store.NewSnapshotOptionsFromID(ctx, volOptions.BackingSnapshotID, cr)
if err != nil {
absorbErrs := []error{
util.ErrPoolNotFound,
util.ErrKeyNotFound,
cerrors.ErrSnapNotFound,
cerrors.ErrVolumeNotFound,
}
fatalErr := true
for i := range absorbErrs {
if errors.Is(err, absorbErrs[i]) {
fatalErr = false
break
}
}
if fatalErr {
return status.Error(codes.Internal, err.Error())
}
} else {
snapClient := core.NewSnapshot(
snapParentVolOptions.GetConnection(),
snapID.FsSnapshotName,
&snapParentVolOptions.SubVolume,
)
err = deleteSnapshotAndUndoReservation(ctx, snapClient, snapParentVolOptions, snapID, cr)
if err != nil {
return status.Error(codes.Internal, err.Error())
}
}
return nil
}
// ValidateVolumeCapabilities checks whether the volume capabilities requested
// are supported.
func (cs *ControllerServer) ValidateVolumeCapabilities(
@ -537,6 +668,10 @@ func (cs *ControllerServer) ControllerExpandVolume(
}
defer volOptions.Destroy()
if volOptions.BackingSnapshot {
return nil, status.Error(codes.InvalidArgument, "cannot expand snapshot-backed volume")
}
RoundOffSize := util.RoundOffBytes(req.GetCapacityRange().GetRequiredBytes())
volClient := core.NewSubVolume(volOptions.GetConnection(), &volOptions.SubVolume, volOptions.ClusterID)
if err = volClient.ResizeVolume(ctx, RoundOffSize); err != nil {
@ -615,6 +750,10 @@ func (cs *ControllerServer) CreateSnapshot(
parentVolOptions.ClusterID)
}
if parentVolOptions.BackingSnapshot {
return nil, status.Error(codes.InvalidArgument, "cannot snapshot a snapshot-backed volume")
}
cephfsSnap, genSnapErr := store.GenSnapFromOptions(ctx, req)
if genSnapErr != nil {
return nil, status.Error(codes.Internal, genSnapErr.Error())
@ -780,6 +919,7 @@ func (cs *ControllerServer) validateSnapshotReq(ctx context.Context, req *csi.Cr
// DeleteSnapshot deletes the snapshot in backend and removes the
// snapshot metadata from store.
// nolint:gocyclo,cyclop // TODO: reduce complexity
func (cs *ControllerServer) DeleteSnapshot(
ctx context.Context,
req *csi.DeleteSnapshotRequest,
@ -878,17 +1018,51 @@ func (cs *ControllerServer) DeleteSnapshot(
return nil, status.Error(codes.Internal, err.Error())
}
}
err = snapClient.DeleteSnapshot(ctx)
needsDelete, err := store.UnrefSelfInSnapshotBackedVolumes(ctx, volOpt, sid.SnapshotID)
if err != nil {
return nil, status.Error(codes.Internal, err.Error())
}
err = store.UndoSnapReservation(ctx, volOpt, *sid, sid.RequestName, cr)
if err != nil {
log.ErrorLog(ctx, "failed to remove reservation for snapname (%s) with backing snap (%s) (%s)",
sid.RequestName, sid.FsSnapshotName, err)
if errors.Is(err, rterrors.ErrObjectOutOfDate) {
return nil, status.Error(codes.Aborted, err.Error())
}
return nil, status.Error(codes.Internal, err.Error())
}
if needsDelete {
err = deleteSnapshotAndUndoReservation(
ctx,
snapClient,
volOpt,
sid,
cr,
)
if err != nil {
return nil, status.Error(codes.Internal, err.Error())
}
}
return &csi.DeleteSnapshotResponse{}, nil
}
func deleteSnapshotAndUndoReservation(
ctx context.Context,
snapClient core.SnapshotClient,
parentVolOptions *store.VolumeOptions,
snapID *store.SnapshotIdentifier,
cr *util.Credentials,
) error {
err := snapClient.DeleteSnapshot(ctx)
if err != nil {
return err
}
err = store.UndoSnapReservation(ctx, parentVolOptions, *snapID, snapID.RequestName, cr)
if err != nil {
log.ErrorLog(ctx, "failed to remove reservation for snapname (%s) with backing snap (%s) (%s)",
snapID.RequestName, snapID.RequestName, err)
return err
}
return nil
}