rbd: flatten datasource image before creating volume

This commit ensures that parent image is flattened before
creating volume.
- If the data source is a PVC, the underlying image's parent
  is flattened(which would be a temp clone or snapshot).
  hard & soft limit is reduced by 2 to account for depth that
  will be added by temp & final clone.

- If the data source is a Snapshot, the underlying image is
  itself flattened.
  hard & soft limit is reduced by 1 to account for depth that
  will be added by the clone which will be restored from the
  snapshot.

Flattening step for resulting PVC image restored from snapshot is removed.
Flattening step for temp clone & final image is removed when pvc clone is
being created.

Fixes: #2190

Signed-off-by: Rakshith R <rar@redhat.com>
This commit is contained in:
Rakshith R 2022-02-21 17:55:23 +05:30 committed by mergify[bot]
parent 6dd5fe9360
commit a56f9a0c05
4 changed files with 110 additions and 90 deletions

View File

@ -56,23 +56,12 @@ func (rv *rbdVolume) checkCloneImage(ctx context.Context, parentVol *rbdVolume)
if err != nil { if err != nil {
switch { switch {
case errors.Is(err, ErrSnapNotFound): case errors.Is(err, ErrSnapNotFound):
// check temporary image needs flatten, if yes add task to flatten the
// temporary clone
err = tempClone.flattenRbdImage(ctx, false, rbdHardMaxCloneDepth, rbdSoftMaxCloneDepth)
if err != nil {
return false, err
}
// as the snapshot is not present, create new snapshot,clone and // as the snapshot is not present, create new snapshot,clone and
// delete the temporary snapshot // delete the temporary snapshot
err = createRBDClone(ctx, tempClone, rv, snap) err = createRBDClone(ctx, tempClone, rv, snap)
if err != nil { if err != nil {
return false, err return false, err
} }
// check image needs flatten, if yes add task to flatten the clone
err = rv.flattenRbdImage(ctx, false, rbdHardMaxCloneDepth, rbdSoftMaxCloneDepth)
if err != nil {
return false, err
}
return true, nil return true, nil
@ -114,11 +103,6 @@ func (rv *rbdVolume) checkCloneImage(ctx context.Context, parentVol *rbdVolume)
return false, err return false, err
} }
// check image needs flatten, if yes add task to flatten the clone
err = rv.flattenRbdImage(ctx, false, rbdHardMaxCloneDepth, rbdSoftMaxCloneDepth)
if err != nil {
return false, err
}
return true, nil return true, nil
} }
@ -186,10 +170,7 @@ func (rv *rbdVolume) createCloneFromImage(ctx context.Context, parentVol *rbdVol
} }
func (rv *rbdVolume) doSnapClone(ctx context.Context, parentVol *rbdVolume) error { func (rv *rbdVolume) doSnapClone(ctx context.Context, parentVol *rbdVolume) error {
var ( var errClone error
errClone error
errFlatten error
)
// generate temp cloned volume // generate temp cloned volume
tempClone := rv.generateTempClone() tempClone := rv.generateTempClone()
@ -218,62 +199,22 @@ func (rv *rbdVolume) doSnapClone(ctx context.Context, parentVol *rbdVolume) erro
} }
} }
if err != nil || errFlatten != nil { if err != nil {
if !errors.Is(errFlatten, ErrFlattenInProgress) { // cleanup snapshot
// cleanup snapshot cErr := cleanUpSnapshot(ctx, parentVol, tempSnap, tempClone)
cErr := cleanUpSnapshot(ctx, parentVol, tempSnap, tempClone) if cErr != nil {
if cErr != nil { log.ErrorLog(ctx, "failed to cleanup image %s or snapshot %s: %v", tempClone, tempSnap, cErr)
log.ErrorLog(ctx, "failed to cleanup image %s or snapshot %s: %v", tempSnap, tempClone, cErr)
}
} }
} }
}() }()
// flatten clone
errFlatten = tempClone.flattenRbdImage(ctx, false, rbdHardMaxCloneDepth, rbdSoftMaxCloneDepth)
if errFlatten != nil {
return errFlatten
}
// create snap of temp clone from temporary cloned image // create snap of temp clone from temporary cloned image
// create final clone // create final clone
// delete snap of temp clone // delete snap of temp clone
errClone = createRBDClone(ctx, tempClone, rv, cloneSnap) errClone = createRBDClone(ctx, tempClone, rv, cloneSnap)
if errClone != nil { if errClone != nil {
// set errFlatten error to cleanup temporary snapshot and temporary clone
errFlatten = errors.New("failed to create user requested cloned image")
return errClone return errClone
} }
return nil return nil
} }
func (rv *rbdVolume) flattenCloneImage(ctx context.Context) error {
tempClone := rv.generateTempClone()
// reducing the limit for cloned images to make sure the limit is in range,
// If the intermediate clone reaches the depth we may need to return ABORT
// error message as it need to be flatten before continuing, this may leak
// omap entries and stale temporary snapshots in corner cases, if we reduce
// the limit and check for the depth of the parent image clain itself we
// can flatten the parent images before used to avoid the stale omap entries.
hardLimit := rbdHardMaxCloneDepth
softLimit := rbdSoftMaxCloneDepth
// choosing 2 so that we don't need to flatten the image in the request.
const depthToAvoidFlatten = 2
if rbdHardMaxCloneDepth > depthToAvoidFlatten {
hardLimit = rbdHardMaxCloneDepth - depthToAvoidFlatten
}
if rbdSoftMaxCloneDepth > depthToAvoidFlatten {
softLimit = rbdSoftMaxCloneDepth - depthToAvoidFlatten
}
err := tempClone.getImageInfo()
if err == nil {
return tempClone.flattenRbdImage(ctx, false, hardLimit, softLimit)
}
if !errors.Is(err, ErrImageNotFound) {
return err
}
return rv.flattenRbdImage(ctx, false, hardLimit, softLimit)
}

View File

@ -286,7 +286,7 @@ func (cs *ControllerServer) CreateVolume(
return nil, err return nil, err
} }
err = flattenParentImage(ctx, parentVol, cr) err = flattenParentImage(ctx, parentVol, rbdSnap, cr)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -297,11 +297,9 @@ func (cs *ControllerServer) CreateVolume(
} }
defer func() { defer func() {
if err != nil { if err != nil {
if !errors.Is(err, ErrFlattenInProgress) { errDefer := undoVolReservation(ctx, rbdVol, cr)
errDefer := undoVolReservation(ctx, rbdVol, cr) if errDefer != nil {
if errDefer != nil { log.WarningLog(ctx, "failed undoing reservation of volume: %s (%s)", req.GetName(), errDefer)
log.WarningLog(ctx, "failed undoing reservation of volume: %s (%s)", req.GetName(), errDefer)
}
} }
} }
}() }()
@ -318,12 +316,38 @@ func (cs *ControllerServer) CreateVolume(
return buildCreateVolumeResponse(req, rbdVol), nil return buildCreateVolumeResponse(req, rbdVol), nil
} }
func flattenParentImage(ctx context.Context, rbdVol *rbdVolume, cr *util.Credentials) error { // flattenParentImage is to be called before proceeding with creating volume,
// with datasource. This function flattens the parent image accordingly to
// make sure no flattening is required during or after the new volume creation.
// For parent volume, it's parent(temp clone or snapshot) is flattened.
// For parent snapshot, the snapshot itself is flattened.
func flattenParentImage(
ctx context.Context,
rbdVol *rbdVolume,
rbdSnap *rbdSnapshot,
cr *util.Credentials) error {
// flatten the image's parent before the reservation to avoid
// stale entries in post creation if we return ABORT error and the
// DeleteVolume RPC is not called.
// reducing the limit for cloned images to make sure the limit is in range,
// If the intermediate clone reaches the depth we may need to return ABORT
// error message as it need to be flatten before continuing, this may leak
// omap entries and stale temporary snapshots in corner cases, if we reduce
// the limit and check for the depth of the parent image clain itself we
// can flatten the parent images before used to avoid the stale omap entries.
hardLimit := rbdHardMaxCloneDepth
softLimit := rbdSoftMaxCloneDepth
if rbdVol != nil { if rbdVol != nil {
// flatten the image or its parent before the reservation to avoid // choosing 2, since cloning image creates a temp clone and a final clone which
// stale entries in post creation if we return ABORT error and the // will add a total depth of 2.
// delete volume is not called const depthToAvoidFlatten = 2
err := rbdVol.flattenCloneImage(ctx) if rbdHardMaxCloneDepth > depthToAvoidFlatten {
hardLimit = rbdHardMaxCloneDepth - depthToAvoidFlatten
}
if rbdSoftMaxCloneDepth > depthToAvoidFlatten {
softLimit = rbdSoftMaxCloneDepth - depthToAvoidFlatten
}
err := rbdVol.flattenParent(ctx, hardLimit, softLimit)
if err != nil { if err != nil {
return getGRPCErrorForCreateVolume(err) return getGRPCErrorForCreateVolume(err)
} }
@ -335,6 +359,32 @@ func flattenParentImage(ctx context.Context, rbdVol *rbdVolume, cr *util.Credent
return err return err
} }
} }
if rbdSnap != nil {
err := rbdSnap.Connect(cr)
if err != nil {
return getGRPCErrorForCreateVolume(err)
}
// in case of any error call Destroy for cleanup.
defer func() {
if err != nil {
rbdSnap.Destroy()
}
}()
// choosing 1, since restore from snapshot adds one depth.
const depthToAvoidFlatten = 1
if rbdHardMaxCloneDepth > depthToAvoidFlatten {
hardLimit = rbdHardMaxCloneDepth - depthToAvoidFlatten
}
if rbdSoftMaxCloneDepth > depthToAvoidFlatten {
softLimit = rbdSoftMaxCloneDepth - depthToAvoidFlatten
}
err = rbdSnap.flattenRbdImage(ctx, false, hardLimit, softLimit)
if err != nil {
return getGRPCErrorForCreateVolume(err)
}
}
return nil return nil
} }
@ -574,10 +624,8 @@ func (cs *ControllerServer) createBackingImage(
defer func() { defer func() {
if err != nil { if err != nil {
if !errors.Is(err, ErrFlattenInProgress) { if deleteErr := rbdVol.deleteImage(ctx); deleteErr != nil {
if deleteErr := rbdVol.deleteImage(ctx); deleteErr != nil { log.ErrorLog(ctx, "failed to delete rbd image: %s with error: %v", rbdVol, deleteErr)
log.ErrorLog(ctx, "failed to delete rbd image: %s with error: %v", rbdVol, deleteErr)
}
} }
} }
}() }()
@ -586,15 +634,6 @@ func (cs *ControllerServer) createBackingImage(
return status.Error(codes.Internal, err.Error()) return status.Error(codes.Internal, err.Error())
} }
if rbdSnap != nil {
err = rbdVol.flattenRbdImage(ctx, false, rbdHardMaxCloneDepth, rbdSoftMaxCloneDepth)
if err != nil {
log.ErrorLog(ctx, "failed to flatten image %s: %v", rbdVol, err)
return err
}
}
return nil return nil
} }

View File

@ -303,7 +303,6 @@ func (rv *rbdVolume) Exists(ctx context.Context, parentVol *rbdVolume) (bool, er
return false, err return false, err
} }
// TODO: check image needs flattening and completed?
err = rv.repairImageID(ctx, j, false) err = rv.repairImageID(ctx, j, false)
if err != nil { if err != nil {

View File

@ -1440,6 +1440,47 @@ func (ri *rbdImage) getImageInfo() error {
return nil return nil
} }
// getParent returns parent image if it exists.
func (ri *rbdImage) getParent() (*rbdImage, error) {
err := ri.getImageInfo()
if err != nil {
return nil, err
}
if ri.ParentName == "" {
return nil, nil
}
parentImage := rbdImage{}
parentImage.conn = ri.conn.Copy()
parentImage.ClusterID = ri.ClusterID
parentImage.Monitors = ri.Monitors
parentImage.Pool = ri.ParentPool
parentImage.RadosNamespace = ri.RadosNamespace
parentImage.RbdImageName = ri.ParentName
err = parentImage.getImageInfo()
if err != nil {
return nil, err
}
return &parentImage, nil
}
// flattenParent flatten the given image's parent if it exists according to hard and soft
// limits.
func (ri *rbdImage) flattenParent(ctx context.Context, hardLimit, softLimit uint) error {
parentImage, err := ri.getParent()
if err != nil {
return err
}
if parentImage == nil {
return nil
}
return parentImage.flattenRbdImage(ctx, false, hardLimit, softLimit)
}
/* /*
checkSnapExists queries rbd about the snapshots of the given image and returns checkSnapExists queries rbd about the snapshots of the given image and returns
ErrImageNotFound if provided image is not found, and ErrSnapNotFound if ErrImageNotFound if provided image is not found, and ErrSnapNotFound if