diff --git a/internal/rbd/clone.go b/internal/rbd/clone.go index ef82c714a..7788d4add 100644 --- a/internal/rbd/clone.go +++ b/internal/rbd/clone.go @@ -56,23 +56,12 @@ func (rv *rbdVolume) checkCloneImage(ctx context.Context, parentVol *rbdVolume) if err != nil { switch { case errors.Is(err, ErrSnapNotFound): - // check temporary image needs flatten, if yes add task to flatten the - // temporary clone - err = tempClone.flattenRbdImage(ctx, false, rbdHardMaxCloneDepth, rbdSoftMaxCloneDepth) - if err != nil { - return false, err - } // as the snapshot is not present, create new snapshot,clone and // delete the temporary snapshot err = createRBDClone(ctx, tempClone, rv, snap) if err != nil { return false, err } - // check image needs flatten, if yes add task to flatten the clone - err = rv.flattenRbdImage(ctx, false, rbdHardMaxCloneDepth, rbdSoftMaxCloneDepth) - if err != nil { - return false, err - } return true, nil @@ -114,11 +103,6 @@ func (rv *rbdVolume) checkCloneImage(ctx context.Context, parentVol *rbdVolume) return false, err } - // check image needs flatten, if yes add task to flatten the clone - err = rv.flattenRbdImage(ctx, false, rbdHardMaxCloneDepth, rbdSoftMaxCloneDepth) - if err != nil { - return false, err - } return true, nil } @@ -186,10 +170,7 @@ func (rv *rbdVolume) createCloneFromImage(ctx context.Context, parentVol *rbdVol } func (rv *rbdVolume) doSnapClone(ctx context.Context, parentVol *rbdVolume) error { - var ( - errClone error - errFlatten error - ) + var errClone error // generate temp cloned volume tempClone := rv.generateTempClone() @@ -218,62 +199,22 @@ func (rv *rbdVolume) doSnapClone(ctx context.Context, parentVol *rbdVolume) erro } } - if err != nil || errFlatten != nil { - if !errors.Is(errFlatten, ErrFlattenInProgress) { - // cleanup snapshot - cErr := cleanUpSnapshot(ctx, parentVol, tempSnap, tempClone) - if cErr != nil { - log.ErrorLog(ctx, "failed to cleanup image %s or snapshot %s: %v", tempSnap, tempClone, cErr) - } + if err != nil { + // cleanup snapshot + cErr := cleanUpSnapshot(ctx, parentVol, tempSnap, tempClone) + if cErr != nil { + log.ErrorLog(ctx, "failed to cleanup image %s or snapshot %s: %v", tempClone, tempSnap, cErr) } } }() - // flatten clone - errFlatten = tempClone.flattenRbdImage(ctx, false, rbdHardMaxCloneDepth, rbdSoftMaxCloneDepth) - if errFlatten != nil { - return errFlatten - } - // create snap of temp clone from temporary cloned image // create final clone // delete snap of temp clone errClone = createRBDClone(ctx, tempClone, rv, cloneSnap) if errClone != nil { - // set errFlatten error to cleanup temporary snapshot and temporary clone - errFlatten = errors.New("failed to create user requested cloned image") - return errClone } return nil } - -func (rv *rbdVolume) flattenCloneImage(ctx context.Context) error { - tempClone := rv.generateTempClone() - // reducing the limit for cloned images to make sure the limit is in range, - // If the intermediate clone reaches the depth we may need to return ABORT - // error message as it need to be flatten before continuing, this may leak - // omap entries and stale temporary snapshots in corner cases, if we reduce - // the limit and check for the depth of the parent image clain itself we - // can flatten the parent images before used to avoid the stale omap entries. - hardLimit := rbdHardMaxCloneDepth - softLimit := rbdSoftMaxCloneDepth - // choosing 2 so that we don't need to flatten the image in the request. - const depthToAvoidFlatten = 2 - if rbdHardMaxCloneDepth > depthToAvoidFlatten { - hardLimit = rbdHardMaxCloneDepth - depthToAvoidFlatten - } - if rbdSoftMaxCloneDepth > depthToAvoidFlatten { - softLimit = rbdSoftMaxCloneDepth - depthToAvoidFlatten - } - err := tempClone.getImageInfo() - if err == nil { - return tempClone.flattenRbdImage(ctx, false, hardLimit, softLimit) - } - if !errors.Is(err, ErrImageNotFound) { - return err - } - - return rv.flattenRbdImage(ctx, false, hardLimit, softLimit) -} diff --git a/internal/rbd/controllerserver.go b/internal/rbd/controllerserver.go index c6451dd92..0adc51377 100644 --- a/internal/rbd/controllerserver.go +++ b/internal/rbd/controllerserver.go @@ -286,7 +286,7 @@ func (cs *ControllerServer) CreateVolume( return nil, err } - err = flattenParentImage(ctx, parentVol, cr) + err = flattenParentImage(ctx, parentVol, rbdSnap, cr) if err != nil { return nil, err } @@ -297,11 +297,9 @@ func (cs *ControllerServer) CreateVolume( } defer func() { if err != nil { - if !errors.Is(err, ErrFlattenInProgress) { - errDefer := undoVolReservation(ctx, rbdVol, cr) - if errDefer != nil { - log.WarningLog(ctx, "failed undoing reservation of volume: %s (%s)", req.GetName(), errDefer) - } + errDefer := undoVolReservation(ctx, rbdVol, cr) + if errDefer != nil { + log.WarningLog(ctx, "failed undoing reservation of volume: %s (%s)", req.GetName(), errDefer) } } }() @@ -318,12 +316,38 @@ func (cs *ControllerServer) CreateVolume( return buildCreateVolumeResponse(req, rbdVol), nil } -func flattenParentImage(ctx context.Context, rbdVol *rbdVolume, cr *util.Credentials) error { +// flattenParentImage is to be called before proceeding with creating volume, +// with datasource. This function flattens the parent image accordingly to +// make sure no flattening is required during or after the new volume creation. +// For parent volume, it's parent(temp clone or snapshot) is flattened. +// For parent snapshot, the snapshot itself is flattened. +func flattenParentImage( + ctx context.Context, + rbdVol *rbdVolume, + rbdSnap *rbdSnapshot, + cr *util.Credentials) error { + // flatten the image's parent before the reservation to avoid + // stale entries in post creation if we return ABORT error and the + // DeleteVolume RPC is not called. + // reducing the limit for cloned images to make sure the limit is in range, + // If the intermediate clone reaches the depth we may need to return ABORT + // error message as it need to be flatten before continuing, this may leak + // omap entries and stale temporary snapshots in corner cases, if we reduce + // the limit and check for the depth of the parent image clain itself we + // can flatten the parent images before used to avoid the stale omap entries. + hardLimit := rbdHardMaxCloneDepth + softLimit := rbdSoftMaxCloneDepth if rbdVol != nil { - // flatten the image or its parent before the reservation to avoid - // stale entries in post creation if we return ABORT error and the - // delete volume is not called - err := rbdVol.flattenCloneImage(ctx) + // choosing 2, since cloning image creates a temp clone and a final clone which + // will add a total depth of 2. + const depthToAvoidFlatten = 2 + if rbdHardMaxCloneDepth > depthToAvoidFlatten { + hardLimit = rbdHardMaxCloneDepth - depthToAvoidFlatten + } + if rbdSoftMaxCloneDepth > depthToAvoidFlatten { + softLimit = rbdSoftMaxCloneDepth - depthToAvoidFlatten + } + err := rbdVol.flattenParent(ctx, hardLimit, softLimit) if err != nil { return getGRPCErrorForCreateVolume(err) } @@ -335,6 +359,32 @@ func flattenParentImage(ctx context.Context, rbdVol *rbdVolume, cr *util.Credent return err } } + if rbdSnap != nil { + err := rbdSnap.Connect(cr) + if err != nil { + return getGRPCErrorForCreateVolume(err) + } + // in case of any error call Destroy for cleanup. + defer func() { + if err != nil { + rbdSnap.Destroy() + } + }() + + // choosing 1, since restore from snapshot adds one depth. + const depthToAvoidFlatten = 1 + if rbdHardMaxCloneDepth > depthToAvoidFlatten { + hardLimit = rbdHardMaxCloneDepth - depthToAvoidFlatten + } + if rbdSoftMaxCloneDepth > depthToAvoidFlatten { + softLimit = rbdSoftMaxCloneDepth - depthToAvoidFlatten + } + + err = rbdSnap.flattenRbdImage(ctx, false, hardLimit, softLimit) + if err != nil { + return getGRPCErrorForCreateVolume(err) + } + } return nil } @@ -574,10 +624,8 @@ func (cs *ControllerServer) createBackingImage( defer func() { if err != nil { - if !errors.Is(err, ErrFlattenInProgress) { - if deleteErr := rbdVol.deleteImage(ctx); deleteErr != nil { - log.ErrorLog(ctx, "failed to delete rbd image: %s with error: %v", rbdVol, deleteErr) - } + if deleteErr := rbdVol.deleteImage(ctx); deleteErr != nil { + log.ErrorLog(ctx, "failed to delete rbd image: %s with error: %v", rbdVol, deleteErr) } } }() @@ -586,15 +634,6 @@ func (cs *ControllerServer) createBackingImage( return status.Error(codes.Internal, err.Error()) } - if rbdSnap != nil { - err = rbdVol.flattenRbdImage(ctx, false, rbdHardMaxCloneDepth, rbdSoftMaxCloneDepth) - if err != nil { - log.ErrorLog(ctx, "failed to flatten image %s: %v", rbdVol, err) - - return err - } - } - return nil } diff --git a/internal/rbd/rbd_journal.go b/internal/rbd/rbd_journal.go index 5f81fafc5..bace5e0ff 100644 --- a/internal/rbd/rbd_journal.go +++ b/internal/rbd/rbd_journal.go @@ -303,7 +303,6 @@ func (rv *rbdVolume) Exists(ctx context.Context, parentVol *rbdVolume) (bool, er return false, err } - // TODO: check image needs flattening and completed? err = rv.repairImageID(ctx, j, false) if err != nil { diff --git a/internal/rbd/rbd_util.go b/internal/rbd/rbd_util.go index 562d22815..a64c29870 100644 --- a/internal/rbd/rbd_util.go +++ b/internal/rbd/rbd_util.go @@ -1440,6 +1440,47 @@ func (ri *rbdImage) getImageInfo() error { return nil } +// getParent returns parent image if it exists. +func (ri *rbdImage) getParent() (*rbdImage, error) { + err := ri.getImageInfo() + if err != nil { + return nil, err + } + if ri.ParentName == "" { + return nil, nil + } + + parentImage := rbdImage{} + parentImage.conn = ri.conn.Copy() + parentImage.ClusterID = ri.ClusterID + parentImage.Monitors = ri.Monitors + parentImage.Pool = ri.ParentPool + parentImage.RadosNamespace = ri.RadosNamespace + parentImage.RbdImageName = ri.ParentName + + err = parentImage.getImageInfo() + if err != nil { + return nil, err + } + + return &parentImage, nil +} + +// flattenParent flatten the given image's parent if it exists according to hard and soft +// limits. +func (ri *rbdImage) flattenParent(ctx context.Context, hardLimit, softLimit uint) error { + parentImage, err := ri.getParent() + if err != nil { + return err + } + + if parentImage == nil { + return nil + } + + return parentImage.flattenRbdImage(ctx, false, hardLimit, softLimit) +} + /* checkSnapExists queries rbd about the snapshots of the given image and returns ErrImageNotFound if provided image is not found, and ErrSnapNotFound if