rbd: Add support for smart cloning

Added support for RBD PVC to PVC cloning, below
commands are executed to create a PVC-PVC clone from
RBD side.

* Check the depth(n) of the cloned image if n>=(hard limit -2)
or ((soft limit-2) Add a task to flatten the image and return
about (to avoid image leak) **Note** will try to flatten the
temp clone image in the chain if available
* Reserve the key and values in omap (this will help us to
avoid the leak as it's not reserved earlier as we have returned
ABORT (the request may not come back))
* Create a snapshot of rbd image
* Clone the snapshot (temp clone)
* Delete the snapshot
* Snapshot the temp clone
* Clone the snapshot (final clone)
* Delete the snapshot

```bash
1) check the image depth of the parent image if flatten required
add a task to flatten image and return ABORT to avoid leak
(hardlimit-2 and softlimit-2 check will be done)
2) Reserve omap keys
2) rbd snap create <RBD image for src k8s volume>@<random snap name>
3) rbd clone --rbd-default-clone-format 2 --image-feature
layering,deep-flatten <RBD image for src k8s volume>@<random snap>
<RBD image for temporary snap image>
4) rbd snap rm <RBD image for src k8s volume>@<random snap name>
5) rbd snap create <cloned RBD image created in snapshot process>@<random snap name>
6) rbd clone --rbd-default-clone-format 2 --image-feature <k8s dst vol config>
 <RBD image for temporary snap image>@<random snap name> <RBD image for k8s dst vol>
7)rbd snap rm <RBD image for src k8s volume>@<random snap name>
```

* Delete temporary clone image created as part of clone(delete if present)
* Delete rbd image

Signed-off-by: Madhu Rajanna <madhupr007@gmail.com>
This commit is contained in:
Madhu Rajanna 2020-07-07 17:44:19 +05:30 committed by mergify[bot]
parent 9077c25c15
commit a0fd805a8b
8 changed files with 411 additions and 61 deletions

View File

@ -0,0 +1,17 @@
---
apiVersion: v1
kind: Pod
metadata:
name: csi-rbd-clone-demo-app
spec:
containers:
- name: web-server
image: nginx
volumeMounts:
- name: mypvc
mountPath: /var/lib/www/html
volumes:
- name: mypvc
persistentVolumeClaim:
claimName: rbd-pvc-clone
readOnly: false

View File

@ -0,0 +1,15 @@
---
apiVersion: v1
kind: PersistentVolumeClaim
metadata:
name: rbd-pvc-clone
spec:
storageClassName: csi-rbd-sc
dataSource:
name: rbd-pvc
kind: PersistentVolumeClaim
accessModes:
- ReadWriteOnce
resources:
requests:
storage: 1Gi

245
internal/rbd/clone.go Normal file
View File

@ -0,0 +1,245 @@
/*
Copyright 2020 The Ceph-CSI Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package rbd
import (
"context"
"errors"
"fmt"
"github.com/ceph/ceph-csi/internal/journal"
"github.com/ceph/ceph-csi/internal/util"
librbd "github.com/ceph/go-ceph/rbd"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
"k8s.io/klog"
)
// checkCloneImage check the cloned image exists, if the cloned image is not
// found it will check the temporary cloned snapshot exists, and again it will
// check the snapshot exists on the temporary cloned image, if yes it will
// create a new cloned and delete the temporary snapshot and adds a task to
// flatten the temp cloned image and return success.
//
// if the temporary snapshot does not exists it creates a temporary snapshot on
// temporary cloned image and creates a new cloned with user-provided image
// features and delete the temporary snapshot and adds a task to flatten the
// temp cloned image and return success
//
// if the temporary clone does not exist and if there is a temporary snapshot
// present on the parent image it will delete the temporary snapshot and
// returns.
func (rv *rbdVolume) checkCloneImage(ctx context.Context, parentVol *rbdVolume) (bool, error) {
// generate temp cloned volume
tempClone := rv.generateTempClone()
snap := &rbdSnapshot{
RbdSnapName: rv.RbdImageName,
Pool: rv.Pool,
}
var einf ErrImageNotFound
var esnf ErrSnapNotFound
// check if cloned image exists
err := rv.getImageInfo()
if err == nil {
// check if do we have temporary snapshot on temporary cloned image
sErr := tempClone.checkSnapExists(snap)
if sErr != nil {
if errors.As(err, &esnf) {
return true, nil
}
return false, err
}
err = tempClone.deleteSnapshot(ctx, snap)
if err == nil {
return true, nil
}
return false, err
}
if !errors.As(err, &einf) {
// return error if its not image not found
return false, err
}
err = tempClone.checkSnapExists(snap)
if err != nil {
if errors.As(err, &esnf) {
// check temporary image needs flatten, if yes add task to flatten the
// temporary clone
err = tempClone.flattenRbdImage(ctx, rv.conn.Creds, false, rbdHardMaxCloneDepth, rbdSoftMaxCloneDepth)
if err != nil {
return false, err
}
// as the snapshot is not present, create new snapshot,clone and
// delete the temporary snapshot
err = createRBDClone(ctx, tempClone, rv, snap, rv.conn.Creds)
if err != nil {
return false, err
}
return true, nil
} else if !errors.As(err, &einf) {
// any error other than image not found return error
return false, err
}
} else {
// snap will be create after we flatten the temporary cloned image,no
// need to check for flatten here.
// as the snap exists,create clone image and delete temporary snapshot
// and add task to flatten temporary cloned image
err = rv.cloneRbdImageFromSnapshot(ctx, snap)
if err != nil {
klog.Errorf(util.Log(ctx, "failed to clone rbd image %s from snapshot %s: %v"), rv.RbdImageName, snap.RbdSnapName, err)
err = fmt.Errorf("failed to clone rbd image %s from snapshot %s: %v", rv.RbdImageName, snap.RbdSnapName, err)
return false, err
}
err = tempClone.deleteSnapshot(ctx, snap)
if err != nil {
klog.Errorf(util.Log(ctx, "failed to delete snapshot: %v"), err)
return false, err
}
return true, nil
}
// as the temp clone doesnot exists,check snapshot exists on parent volume
err = parentVol.checkSnapExists(snap)
if err == nil {
// the temp clone exists, delete it lets reserve a new ID and
// create new resources for clearner approach
err = parentVol.deleteSnapshot(ctx, snap)
}
if errors.As(err, &esnf) {
return false, nil
}
return false, err
}
func (rv *rbdVolume) generateTempClone() *rbdVolume {
tempClone := rbdVolume{}
tempClone.conn = rv.conn
// The temp clone image need to have deep flatten feature
f := []string{librbd.FeatureNameLayering, librbd.FeatureNameDeepFlatten}
tempClone.imageFeatureSet = librbd.FeatureSetFromNames(f)
tempClone.ClusterID = rv.ClusterID
tempClone.Monitors = rv.Monitors
tempClone.Pool = rv.Pool
// The temp cloned image name will be always (rbd image name + "-temp")
// this name will be always unique, as cephcsi never creates an image with
// this format for new rbd images
tempClone.RbdImageName = rv.RbdImageName + "-temp"
return &tempClone
}
func (rv *rbdVolume) createCloneFromImage(ctx context.Context, parentVol *rbdVolume) error {
// generate temp cloned volume
tempClone := rv.generateTempClone()
snap := &rbdSnapshot{
RbdSnapName: rv.RbdImageName,
Pool: rv.Pool,
}
var (
errClone error
errFlatten error
err error
)
var efip ErrFlattenInProgress
var j = &journal.Connection{}
j, err = volJournal.Connect(rv.Monitors, rv.conn.Creds)
if err != nil {
return status.Error(codes.Internal, err.Error())
}
defer j.Destroy()
// create snapshot and temporary clone and delete snapshot
err = createRBDClone(ctx, parentVol, tempClone, snap, rv.conn.Creds)
if err != nil {
return err
}
defer func() {
if err != nil || errFlatten != nil {
if !errors.Is(errFlatten, &efip) {
// cleanup snapshot
cErr := cleanUpSnapshot(ctx, parentVol, snap, tempClone, rv.conn.Creds)
if cErr != nil {
klog.Errorf(util.Log(ctx, "failed to cleanup image %s or snapshot %s: %v"), snap, tempClone, cErr)
}
}
}
if err != nil || errClone != nil {
cErr := cleanUpSnapshot(ctx, tempClone, snap, rv, rv.conn.Creds)
if cErr != nil {
klog.Errorf(util.Log(ctx, "failed to cleanup image %s or snapshot %s: %v"), snap, tempClone, cErr)
}
}
}()
// flatten clone
errFlatten = tempClone.flattenRbdImage(ctx, rv.conn.Creds, false, rbdHardMaxCloneDepth, rbdSoftMaxCloneDepth)
if errFlatten != nil {
return err
}
// create snap of temp clone from temporary cloned image
// create final clone
// delete snap of temp clone
errClone = createRBDClone(ctx, tempClone, rv, snap, rv.conn.Creds)
if errClone != nil {
// set errFlatten error to cleanup temporary snapshot and temporary clone
errFlatten = errors.New("failed to create user requested cloned image")
return errClone
}
err = rv.getImageID()
if err != nil {
klog.Errorf(util.Log(ctx, "failed to get volume id %s: %v"), rv, err)
return err
}
err = j.StoreImageID(ctx, rv.JournalPool, rv.ReservedID, rv.ImageID, rv.conn.Creds)
if err != nil {
klog.Errorf(util.Log(ctx, "failed to store volume %s: %v"), rv, err)
return err
}
return nil
}
func (rv *rbdVolume) flattenCloneImage(ctx context.Context) error {
tempClone := rv.generateTempClone()
// reducing the limit for cloned images to make sure the limit is in range,
// If the intermediate clone reaches the depth we may need to return ABORT
// error message as it need to be flatten before continuing, this may leak
// omap entries and stale temporary snapshots in corner cases, if we reduce
// the limit and check for the depth of the parent image clain it self we
// can flatten the parent images before use to avoid the stale omap entries.
hardLimit := rbdHardMaxCloneDepth
softLimit := rbdSoftMaxCloneDepth
if rbdHardMaxCloneDepth < 2 {
hardLimit = rbdHardMaxCloneDepth - 2
}
if rbdSoftMaxCloneDepth < 2 {
softLimit = rbdSoftMaxCloneDepth - 2
}
err := tempClone.getImageInfo()
if err == nil {
return tempClone.flattenRbdImage(ctx, tempClone.conn.Creds, false, hardLimit, softLimit)
}
if err != nil {
var einf ErrImageNotFound
if !errors.As(err, &einf) {
return err
}
}
return rv.flattenRbdImage(ctx, rv.conn.Creds, false, hardLimit, softLimit)
}

View File

@ -161,6 +161,21 @@ func buildCreateVolumeResponse(ctx context.Context, req *csi.CreateVolumeRequest
return &csi.CreateVolumeResponse{Volume: volume}, nil
}
// getGRPCErrorForCreateVolume converts the returns the GRPC errors based on
// the input error types it expected to use only for CreateVolume as we need to
// return different GRPC codes for different functions based on the input.
func getGRPCErrorForCreateVolume(err error) error {
var evnc ErrVolNameConflict
if errors.As(err, &evnc) {
return status.Error(codes.AlreadyExists, err.Error())
}
var efip ErrFlattenInProgress
if errors.As(err, &efip) {
return status.Error(codes.Aborted, err.Error())
}
return status.Error(codes.Internal, err.Error())
}
// CreateVolume creates the volume in backend
func (cs *ControllerServer) CreateVolume(ctx context.Context, req *csi.CreateVolumeRequest) (*csi.CreateVolumeResponse, error) {
if err := cs.validateVolumeReq(ctx, req); err != nil {
@ -193,18 +208,14 @@ func (cs *ControllerServer) CreateVolume(ctx context.Context, req *csi.CreateVol
return nil, status.Error(codes.Internal, err.Error())
}
rbdSnap, err := cs.checkSnapshotSource(ctx, req, cr)
parentVol, rbdSnap, err := checkContentSource(ctx, req, cr)
if err != nil {
return nil, err
}
found, err := rbdVol.Exists(ctx)
found, err := rbdVol.Exists(ctx, parentVol)
if err != nil {
var evnc ErrVolNameConflict
if errors.As(err, &evnc) {
return nil, status.Error(codes.AlreadyExists, err.Error())
}
return nil, status.Error(codes.Internal, err.Error())
return nil, getGRPCErrorForCreateVolume(err)
}
if found {
if rbdSnap != nil {
@ -217,6 +228,15 @@ func (cs *ControllerServer) CreateVolume(ctx context.Context, req *csi.CreateVol
return buildCreateVolumeResponse(ctx, req, rbdVol)
}
if parentVol != nil {
// flatten the image or its parent before the reservation to avoid
// stale entries in post creation if we return ABORT error and the
// delete volume is not called
err = parentVol.flattenCloneImage(ctx)
if err != nil {
return nil, getGRPCErrorForCreateVolume(err)
}
}
err = reserveVol(ctx, rbdVol, rbdSnap, cr)
if err != nil {
return nil, status.Error(codes.Internal, err.Error())
@ -233,7 +253,7 @@ func (cs *ControllerServer) CreateVolume(ctx context.Context, req *csi.CreateVol
}
}()
err = cs.createBackingImage(ctx, cr, rbdVol, rbdSnap)
err = cs.createBackingImage(ctx, cr, rbdVol, parentVol, rbdSnap)
if err != nil {
var efip ErrFlattenInProgress
if errors.As(err, &efip) {
@ -268,7 +288,7 @@ func (cs *ControllerServer) CreateVolume(ctx context.Context, req *csi.CreateVol
// return success,the hardlimit is reached it starts a task to flatten the
// image and return Aborted
func checkFlatten(ctx context.Context, rbdVol *rbdVolume, cr *util.Credentials) error {
err := rbdVol.flattenRbdImage(ctx, cr, false)
err := rbdVol.flattenRbdImage(ctx, cr, false, rbdHardMaxCloneDepth, rbdSoftMaxCloneDepth)
if err != nil {
var efip ErrFlattenInProgress
if errors.As(err, &efip) {
@ -318,7 +338,7 @@ func (cs *ControllerServer) createVolumeFromSnapshot(ctx context.Context, cr *ut
return nil
}
func (cs *ControllerServer) createBackingImage(ctx context.Context, cr *util.Credentials, rbdVol *rbdVolume, rbdSnap *rbdSnapshot) error {
func (cs *ControllerServer) createBackingImage(ctx context.Context, cr *util.Credentials, rbdVol, parentVol *rbdVolume, rbdSnap *rbdSnapshot) error {
var err error
var j = &journal.Connection{}
@ -328,12 +348,20 @@ func (cs *ControllerServer) createBackingImage(ctx context.Context, cr *util.Cre
}
defer j.Destroy()
// nolint:gocritic // this ifElseChain can not be rewritten to a switch statement
if rbdSnap != nil {
err = cs.createVolumeFromSnapshot(ctx, cr, rbdVol, rbdSnap.SnapID)
if err != nil {
return err
}
util.DebugLog(ctx, "created volume %s from snapshot %s", rbdVol.RequestName, rbdSnap.RbdSnapName)
} else if parentVol != nil {
if acquired := cs.VolumeLocks.TryAcquire(parentVol.VolID); !acquired {
klog.Infof(util.Log(ctx, util.VolumeOperationAlreadyExistsFmt), parentVol.VolID)
return status.Errorf(codes.Aborted, util.VolumeOperationAlreadyExistsFmt, parentVol.VolID)
}
defer cs.VolumeLocks.Release(parentVol.VolID)
return rbdVol.createCloneFromImage(ctx, parentVol)
} else {
err = createImage(ctx, rbdVol, cr)
if err != nil {
@ -367,7 +395,7 @@ func (cs *ControllerServer) createBackingImage(ctx context.Context, cr *util.Cre
}
if rbdSnap != nil {
err = rbdVol.flattenRbdImage(ctx, cr, false)
err = rbdVol.flattenRbdImage(ctx, cr, false, rbdHardMaxCloneDepth, rbdSoftMaxCloneDepth)
if err != nil {
klog.Errorf(util.Log(ctx, "failed to flatten image %s: %v"), rbdVol, err)
return err
@ -384,38 +412,53 @@ func (cs *ControllerServer) createBackingImage(ctx context.Context, cr *util.Cre
return nil
}
func (cs *ControllerServer) checkSnapshotSource(ctx context.Context, req *csi.CreateVolumeRequest,
cr *util.Credentials) (*rbdSnapshot, error) {
func checkContentSource(ctx context.Context, req *csi.CreateVolumeRequest, cr *util.Credentials) (*rbdVolume, *rbdSnapshot, error) {
if req.VolumeContentSource == nil {
return nil, nil
return nil, nil, nil
}
snapshot := req.VolumeContentSource.GetSnapshot()
if snapshot == nil {
return nil, status.Error(codes.InvalidArgument, "volume Snapshot cannot be empty")
}
snapshotID := snapshot.GetSnapshotId()
if snapshotID == "" {
return nil, status.Error(codes.InvalidArgument, "volume Snapshot ID cannot be empty")
}
rbdSnap := &rbdSnapshot{}
if err := genSnapFromSnapID(ctx, rbdSnap, snapshotID, cr); err != nil {
var esnf ErrSnapNotFound
if !errors.As(err, &esnf) {
return nil, status.Error(codes.Internal, err.Error())
volumeSource := req.VolumeContentSource
switch volumeSource.Type.(type) {
case *csi.VolumeContentSource_Snapshot:
snapshot := req.VolumeContentSource.GetSnapshot()
if snapshot == nil {
return nil, nil, status.Error(codes.NotFound, "volume Snapshot cannot be empty")
}
var epnf util.ErrPoolNotFound
if errors.As(err, &epnf) {
snapshotID := snapshot.GetSnapshotId()
if snapshotID == "" {
return nil, nil, status.Errorf(codes.NotFound, "volume Snapshot ID cannot be empty")
}
rbdSnap := &rbdSnapshot{}
if err := genSnapFromSnapID(ctx, rbdSnap, snapshotID, cr); err != nil {
klog.Errorf(util.Log(ctx, "failed to get backend snapshot for %s: %v"), snapshotID, err)
return nil, status.Error(codes.InvalidArgument, err.Error())
var esnf ErrSnapNotFound
if !errors.As(err, &esnf) {
return nil, nil, status.Error(codes.Internal, err.Error())
}
return nil, nil, status.Errorf(codes.NotFound, "%s snapshot doesnot exists", snapshotID)
}
return nil, status.Error(codes.InvalidArgument, "missing requested Snapshot ID")
return nil, rbdSnap, nil
case *csi.VolumeContentSource_Volume:
vol := req.VolumeContentSource.GetVolume()
if vol == nil {
return nil, nil, status.Error(codes.NotFound, "volume cannot be empty")
}
volID := vol.GetVolumeId()
if volID == "" {
return nil, nil, status.Errorf(codes.NotFound, "volume ID cannot be empty")
}
// TODO need to support cloning for encrypted volume
rbdvol, err := genVolFromVolID(ctx, volID, cr, nil)
if err != nil {
klog.Errorf(util.Log(ctx, "failed to get backend image for %s: %v"), volID, err)
var esnf ErrImageNotFound
if !errors.As(err, &esnf) {
return nil, nil, status.Error(codes.Internal, err.Error())
}
return nil, nil, status.Errorf(codes.NotFound, "%s image doesnot exists", volID)
}
return rbdvol, nil, nil
}
return rbdSnap, nil
return nil, nil, status.Errorf(codes.InvalidArgument, "not a proper volume source")
}
// DeleteLegacyVolume deletes a volume provisioned using version 1.0.0 of the plugin
@ -573,6 +616,20 @@ func (cs *ControllerServer) DeleteVolume(ctx context.Context, req *csi.DeleteVol
return nil, status.Errorf(codes.Internal, "rbd %s is still being used", rbdVol.RbdImageName)
}
// delete the temporary rbd image created as part of volume clone during
// create volume
var einf ErrImageNotFound
tempClone := rbdVol.generateTempClone()
err = deleteImage(ctx, tempClone, cr)
if err != nil {
// return error if it is not ErrImageNotFound
if !errors.As(err, &einf) {
klog.Errorf(util.Log(ctx, "failed to delete rbd image: %s with error: %v"),
tempClone, err)
return nil, status.Error(codes.Internal, err.Error())
}
}
// Deleting rbd image
util.DebugLog(ctx, "deleting image %s", rbdVol.RbdImageName)
if err = deleteImage(ctx, rbdVol, cr); err != nil {
@ -705,7 +762,7 @@ func (cs *ControllerServer) CreateSnapshot(ctx context.Context, req *csi.CreateS
}
defer vol.Destroy()
err = vol.flattenRbdImage(ctx, cr, false)
err = vol.flattenRbdImage(ctx, cr, false, rbdHardMaxCloneDepth, rbdSoftMaxCloneDepth)
var efip ErrFlattenInProgress
if errors.As(err, &efip) {
return &csi.CreateSnapshotResponse{
@ -869,7 +926,7 @@ func (cs *ControllerServer) doSnapshotClone(ctx context.Context, parentVol *rbdV
return ready, cloneRbd, err
}
err = cloneRbd.flattenRbdImage(ctx, cr, false)
err = cloneRbd.flattenRbdImage(ctx, cr, false, rbdHardMaxCloneDepth, rbdSoftMaxCloneDepth)
if err != nil {
var efip ErrFlattenInProgress
if errors.As(err, &efip) {

View File

@ -268,7 +268,7 @@ func (ns *NodeServer) stageTransaction(ctx context.Context, req *csi.NodeStageVo
return transaction, err
}
if feature {
err = volOptions.flattenRbdImage(ctx, cr, true)
err = volOptions.flattenRbdImage(ctx, cr, true, rbdHardMaxCloneDepth, rbdSoftMaxCloneDepth)
if err != nil {
return transaction, err
}

View File

@ -181,7 +181,7 @@ func checkSnapCloneExists(ctx context.Context, parentVol *rbdVolume, rbdSnap *rb
}
// check snapshot exists if not create it
_, err = vol.getSnapInfo(rbdSnap)
err = vol.checkSnapExists(rbdSnap)
var esnf ErrSnapNotFound
if errors.As(err, &esnf) {
// create snapshot
@ -223,12 +223,18 @@ func checkSnapCloneExists(ctx context.Context, parentVol *rbdVolume, rbdSnap *rb
/*
Check comment on checkSnapExists, to understand how this function behaves
**NOTE:** These functions manipulate the rados omaps that hold information regarding
volume names as requested by the CSI drivers. Hence, these need to be invoked only when the
respective CSI snapshot or volume name based locks are held, as otherwise racy access to these
omaps may end up leaving the omaps in an inconsistent state.
**NOTE:** These functions manipulate the rados omaps that hold information
regarding volume names as requested by the CSI drivers. Hence, these need to be
invoked only when the respective CSI snapshot or volume name based locks are
held, as otherwise racy access to these omaps may end up leaving the omaps in
an inconsistent state.
parentVol is required to check the clone is created from the requested parent
image or not, if temporary snapshots and clones created for the volume when the
content source is volume we need to recover from the stale entries or complete
the pending operations.
*/
func (rv *rbdVolume) Exists(ctx context.Context) (bool, error) {
func (rv *rbdVolume) Exists(ctx context.Context, parentVol *rbdVolume) (bool, error) {
err := validateRbdVol(rv)
if err != nil {
return false, err
@ -277,6 +283,16 @@ func (rv *rbdVolume) Exists(ctx context.Context) (bool, error) {
if err != nil {
var einf ErrImageNotFound
if errors.As(err, &einf) {
// Need to check cloned info here not on createvolume,
if parentVol != nil {
found, cErr := rv.checkCloneImage(ctx, parentVol)
if found && cErr == nil {
return true, nil
}
if cErr != nil {
return false, cErr
}
}
err = j.UndoReservation(ctx, rv.JournalPool, rv.Pool,
rv.RbdImageName, rv.RequestName)
return false, err

View File

@ -433,7 +433,7 @@ func flattenClonedRbdImages(ctx context.Context, snaps []snapshotInfo, pool, mon
for _, s := range snaps {
if s.Namespace.Type == "trash" {
rv.RbdImageName = s.Namespace.OriginalName
err = rv.flattenRbdImage(ctx, cr, true)
err = rv.flattenRbdImage(ctx, cr, true, rbdHardMaxCloneDepth, rbdSoftMaxCloneDepth)
if err != nil {
klog.Errorf(util.Log(ctx, "failed to flatten %s; err %v"), rv, err)
continue
@ -443,7 +443,7 @@ func flattenClonedRbdImages(ctx context.Context, snaps []snapshotInfo, pool, mon
return nil
}
func (rv *rbdVolume) flattenRbdImage(ctx context.Context, cr *util.Credentials, forceFlatten bool) error {
func (rv *rbdVolume) flattenRbdImage(ctx context.Context, cr *util.Credentials, forceFlatten bool, hardlimit, softlimit uint) error {
var depth uint
var err error
@ -453,10 +453,10 @@ func (rv *rbdVolume) flattenRbdImage(ctx context.Context, cr *util.Credentials,
if err != nil {
return err
}
klog.Infof(util.Log(ctx, "clone depth is (%d), configured softlimit (%d) and hardlimit (%d) for %s"), depth, rbdSoftMaxCloneDepth, rbdHardMaxCloneDepth, rv)
klog.Infof(util.Log(ctx, "clone depth is (%d), configured softlimit (%d) and hardlimit (%d) for %s"), depth, softlimit, hardlimit, rv)
}
if forceFlatten || (depth >= rbdHardMaxCloneDepth) || (depth >= rbdSoftMaxCloneDepth) {
if forceFlatten || (depth >= hardlimit) || (depth >= softlimit) {
args := []string{"flatten", rv.Pool + "/" + rv.RbdImageName, "--id", cr.ID, "--keyfile=" + cr.KeyFile, "-m", rv.Monitors}
supported, err := addRbdManagerTask(ctx, rv, args)
if supported {
@ -464,13 +464,13 @@ func (rv *rbdVolume) flattenRbdImage(ctx context.Context, cr *util.Credentials,
klog.Errorf(util.Log(ctx, "failed to add task flatten for %s : %v"), rv, err)
return err
}
if forceFlatten || depth >= rbdHardMaxCloneDepth {
if forceFlatten || depth >= hardlimit {
return ErrFlattenInProgress{err: fmt.Errorf("flatten is in progress for image %s", rv.RbdImageName)}
}
}
if !supported {
klog.Errorf(util.Log(ctx, "task manager does not support flatten,image will be flattened once hardlimit is reached: %v"), err)
if forceFlatten || depth >= rbdHardMaxCloneDepth {
if forceFlatten || depth >= hardlimit {
err = rv.Connect(cr)
if err != nil {
return err
@ -1008,30 +1008,29 @@ func (rv *rbdVolume) getImageInfo() error {
}
/*
getSnapInfo queries rbd about the snapshots of the given image and returns its metadata, and
returns ErrImageNotFound if provided image is not found, and ErrSnapNotFound if provided snap
is not found in the images snapshot list
checkSnapExists queries rbd about the snapshots of the given image and returns
ErrImageNotFound if provided image is not found, and ErrSnapNotFound if
provided snap is not found in the images snapshot list
*/
func (rv *rbdVolume) getSnapInfo(rbdSnap *rbdSnapshot) (librbd.SnapInfo, error) {
invalidSnap := librbd.SnapInfo{}
func (rv *rbdVolume) checkSnapExists(rbdSnap *rbdSnapshot) error {
image, err := rv.open()
if err != nil {
return invalidSnap, err
return err
}
defer image.Close()
snaps, err := image.GetSnapshotNames()
if err != nil {
return invalidSnap, err
return err
}
for _, snap := range snaps {
if snap.Name == rbdSnap.RbdSnapName {
return snap, nil
return nil
}
}
return invalidSnap, ErrSnapNotFound{rbdSnap.RbdSnapName, fmt.Errorf("snap %s not found", rbdSnap.String())}
return ErrSnapNotFound{rbdSnap.RbdSnapName, fmt.Errorf("snap %s not found", rbdSnap.String())}
}
// rbdImageMetadataStash strongly typed JSON spec for stashed RBD image metadata

View File

@ -33,6 +33,7 @@ func createRBDClone(ctx context.Context, parentVol, cloneRbdVol *rbdVolume, snap
return err
}
snap.RbdImageName = parentVol.RbdImageName
// create clone image and delete snapshot
err = cloneRbdVol.cloneRbdImageFromSnapshot(ctx, snap)
if err != nil {