diff --git a/examples/rbd/pod-clone.yaml b/examples/rbd/pod-clone.yaml new file mode 100644 index 000000000..150e2f93e --- /dev/null +++ b/examples/rbd/pod-clone.yaml @@ -0,0 +1,17 @@ +--- +apiVersion: v1 +kind: Pod +metadata: + name: csi-rbd-clone-demo-app +spec: + containers: + - name: web-server + image: nginx + volumeMounts: + - name: mypvc + mountPath: /var/lib/www/html + volumes: + - name: mypvc + persistentVolumeClaim: + claimName: rbd-pvc-clone + readOnly: false diff --git a/examples/rbd/pvc-clone.yaml b/examples/rbd/pvc-clone.yaml new file mode 100644 index 000000000..129dd473a --- /dev/null +++ b/examples/rbd/pvc-clone.yaml @@ -0,0 +1,15 @@ +--- +apiVersion: v1 +kind: PersistentVolumeClaim +metadata: + name: rbd-pvc-clone +spec: + storageClassName: csi-rbd-sc + dataSource: + name: rbd-pvc + kind: PersistentVolumeClaim + accessModes: + - ReadWriteOnce + resources: + requests: + storage: 1Gi diff --git a/internal/rbd/clone.go b/internal/rbd/clone.go new file mode 100644 index 000000000..1b0ce9fb1 --- /dev/null +++ b/internal/rbd/clone.go @@ -0,0 +1,245 @@ +/* +Copyright 2020 The Ceph-CSI Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package rbd + +import ( + "context" + "errors" + "fmt" + + "github.com/ceph/ceph-csi/internal/journal" + "github.com/ceph/ceph-csi/internal/util" + + librbd "github.com/ceph/go-ceph/rbd" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" + "k8s.io/klog" +) + +// checkCloneImage check the cloned image exists, if the cloned image is not +// found it will check the temporary cloned snapshot exists, and again it will +// check the snapshot exists on the temporary cloned image, if yes it will +// create a new cloned and delete the temporary snapshot and adds a task to +// flatten the temp cloned image and return success. +// +// if the temporary snapshot does not exists it creates a temporary snapshot on +// temporary cloned image and creates a new cloned with user-provided image +// features and delete the temporary snapshot and adds a task to flatten the +// temp cloned image and return success +// +// if the temporary clone does not exist and if there is a temporary snapshot +// present on the parent image it will delete the temporary snapshot and +// returns. +func (rv *rbdVolume) checkCloneImage(ctx context.Context, parentVol *rbdVolume) (bool, error) { + // generate temp cloned volume + tempClone := rv.generateTempClone() + snap := &rbdSnapshot{ + RbdSnapName: rv.RbdImageName, + Pool: rv.Pool, + } + var einf ErrImageNotFound + var esnf ErrSnapNotFound + // check if cloned image exists + err := rv.getImageInfo() + if err == nil { + // check if do we have temporary snapshot on temporary cloned image + sErr := tempClone.checkSnapExists(snap) + if sErr != nil { + if errors.As(err, &esnf) { + return true, nil + } + return false, err + } + err = tempClone.deleteSnapshot(ctx, snap) + if err == nil { + return true, nil + } + return false, err + } + if !errors.As(err, &einf) { + // return error if its not image not found + return false, err + } + + err = tempClone.checkSnapExists(snap) + if err != nil { + if errors.As(err, &esnf) { + // check temporary image needs flatten, if yes add task to flatten the + // temporary clone + err = tempClone.flattenRbdImage(ctx, rv.conn.Creds, false, rbdHardMaxCloneDepth, rbdSoftMaxCloneDepth) + if err != nil { + return false, err + } + // as the snapshot is not present, create new snapshot,clone and + // delete the temporary snapshot + err = createRBDClone(ctx, tempClone, rv, snap, rv.conn.Creds) + if err != nil { + return false, err + } + return true, nil + } else if !errors.As(err, &einf) { + // any error other than image not found return error + return false, err + } + } else { + // snap will be create after we flatten the temporary cloned image,no + // need to check for flatten here. + // as the snap exists,create clone image and delete temporary snapshot + // and add task to flatten temporary cloned image + err = rv.cloneRbdImageFromSnapshot(ctx, snap) + if err != nil { + klog.Errorf(util.Log(ctx, "failed to clone rbd image %s from snapshot %s: %v"), rv.RbdImageName, snap.RbdSnapName, err) + err = fmt.Errorf("failed to clone rbd image %s from snapshot %s: %v", rv.RbdImageName, snap.RbdSnapName, err) + return false, err + } + err = tempClone.deleteSnapshot(ctx, snap) + if err != nil { + klog.Errorf(util.Log(ctx, "failed to delete snapshot: %v"), err) + return false, err + } + return true, nil + } + // as the temp clone doesnot exists,check snapshot exists on parent volume + err = parentVol.checkSnapExists(snap) + if err == nil { + // the temp clone exists, delete it lets reserve a new ID and + // create new resources for clearner approach + err = parentVol.deleteSnapshot(ctx, snap) + } + if errors.As(err, &esnf) { + return false, nil + } + return false, err +} + +func (rv *rbdVolume) generateTempClone() *rbdVolume { + tempClone := rbdVolume{} + tempClone.conn = rv.conn + // The temp clone image need to have deep flatten feature + f := []string{librbd.FeatureNameLayering, librbd.FeatureNameDeepFlatten} + tempClone.imageFeatureSet = librbd.FeatureSetFromNames(f) + tempClone.ClusterID = rv.ClusterID + tempClone.Monitors = rv.Monitors + tempClone.Pool = rv.Pool + // The temp cloned image name will be always (rbd image name + "-temp") + // this name will be always unique, as cephcsi never creates an image with + // this format for new rbd images + tempClone.RbdImageName = rv.RbdImageName + "-temp" + return &tempClone +} + +func (rv *rbdVolume) createCloneFromImage(ctx context.Context, parentVol *rbdVolume) error { + // generate temp cloned volume + tempClone := rv.generateTempClone() + snap := &rbdSnapshot{ + RbdSnapName: rv.RbdImageName, + Pool: rv.Pool, + } + + var ( + errClone error + errFlatten error + err error + ) + var efip ErrFlattenInProgress + var j = &journal.Connection{} + + j, err = volJournal.Connect(rv.Monitors, rv.conn.Creds) + if err != nil { + return status.Error(codes.Internal, err.Error()) + } + defer j.Destroy() + + // create snapshot and temporary clone and delete snapshot + err = createRBDClone(ctx, parentVol, tempClone, snap, rv.conn.Creds) + if err != nil { + return err + } + + defer func() { + if err != nil || errFlatten != nil { + if !errors.Is(errFlatten, &efip) { + // cleanup snapshot + cErr := cleanUpSnapshot(ctx, parentVol, snap, tempClone, rv.conn.Creds) + if cErr != nil { + klog.Errorf(util.Log(ctx, "failed to cleanup image %s or snapshot %s: %v"), snap, tempClone, cErr) + } + } + } + if err != nil || errClone != nil { + cErr := cleanUpSnapshot(ctx, tempClone, snap, rv, rv.conn.Creds) + if cErr != nil { + klog.Errorf(util.Log(ctx, "failed to cleanup image %s or snapshot %s: %v"), snap, tempClone, cErr) + } + } + }() + // flatten clone + errFlatten = tempClone.flattenRbdImage(ctx, rv.conn.Creds, false, rbdHardMaxCloneDepth, rbdSoftMaxCloneDepth) + if errFlatten != nil { + return err + } + // create snap of temp clone from temporary cloned image + // create final clone + // delete snap of temp clone + errClone = createRBDClone(ctx, tempClone, rv, snap, rv.conn.Creds) + if errClone != nil { + // set errFlatten error to cleanup temporary snapshot and temporary clone + errFlatten = errors.New("failed to create user requested cloned image") + return errClone + } + err = rv.getImageID() + if err != nil { + klog.Errorf(util.Log(ctx, "failed to get volume id %s: %v"), rv, err) + return err + } + + err = j.StoreImageID(ctx, rv.JournalPool, rv.ReservedID, rv.ImageID, rv.conn.Creds) + if err != nil { + klog.Errorf(util.Log(ctx, "failed to store volume %s: %v"), rv, err) + return err + } + return nil +} + +func (rv *rbdVolume) flattenCloneImage(ctx context.Context) error { + tempClone := rv.generateTempClone() + // reducing the limit for cloned images to make sure the limit is in range, + // If the intermediate clone reaches the depth we may need to return ABORT + // error message as it need to be flatten before continuing, this may leak + // omap entries and stale temporary snapshots in corner cases, if we reduce + // the limit and check for the depth of the parent image clain it self we + // can flatten the parent images before use to avoid the stale omap entries. + hardLimit := rbdHardMaxCloneDepth + softLimit := rbdSoftMaxCloneDepth + if rbdHardMaxCloneDepth < 2 { + hardLimit = rbdHardMaxCloneDepth - 2 + } + if rbdSoftMaxCloneDepth < 2 { + softLimit = rbdSoftMaxCloneDepth - 2 + } + err := tempClone.getImageInfo() + if err == nil { + return tempClone.flattenRbdImage(ctx, tempClone.conn.Creds, false, hardLimit, softLimit) + } + if err != nil { + var einf ErrImageNotFound + if !errors.As(err, &einf) { + return err + } + } + return rv.flattenRbdImage(ctx, rv.conn.Creds, false, hardLimit, softLimit) +} diff --git a/internal/rbd/controllerserver.go b/internal/rbd/controllerserver.go index a64d17854..85b8af0f8 100644 --- a/internal/rbd/controllerserver.go +++ b/internal/rbd/controllerserver.go @@ -161,6 +161,21 @@ func buildCreateVolumeResponse(ctx context.Context, req *csi.CreateVolumeRequest return &csi.CreateVolumeResponse{Volume: volume}, nil } +// getGRPCErrorForCreateVolume converts the returns the GRPC errors based on +// the input error types it expected to use only for CreateVolume as we need to +// return different GRPC codes for different functions based on the input. +func getGRPCErrorForCreateVolume(err error) error { + var evnc ErrVolNameConflict + if errors.As(err, &evnc) { + return status.Error(codes.AlreadyExists, err.Error()) + } + var efip ErrFlattenInProgress + if errors.As(err, &efip) { + return status.Error(codes.Aborted, err.Error()) + } + return status.Error(codes.Internal, err.Error()) +} + // CreateVolume creates the volume in backend func (cs *ControllerServer) CreateVolume(ctx context.Context, req *csi.CreateVolumeRequest) (*csi.CreateVolumeResponse, error) { if err := cs.validateVolumeReq(ctx, req); err != nil { @@ -193,18 +208,14 @@ func (cs *ControllerServer) CreateVolume(ctx context.Context, req *csi.CreateVol return nil, status.Error(codes.Internal, err.Error()) } - rbdSnap, err := cs.checkSnapshotSource(ctx, req, cr) + parentVol, rbdSnap, err := checkContentSource(ctx, req, cr) if err != nil { return nil, err } - found, err := rbdVol.Exists(ctx) + found, err := rbdVol.Exists(ctx, parentVol) if err != nil { - var evnc ErrVolNameConflict - if errors.As(err, &evnc) { - return nil, status.Error(codes.AlreadyExists, err.Error()) - } - return nil, status.Error(codes.Internal, err.Error()) + return nil, getGRPCErrorForCreateVolume(err) } if found { if rbdSnap != nil { @@ -217,6 +228,15 @@ func (cs *ControllerServer) CreateVolume(ctx context.Context, req *csi.CreateVol return buildCreateVolumeResponse(ctx, req, rbdVol) } + if parentVol != nil { + // flatten the image or its parent before the reservation to avoid + // stale entries in post creation if we return ABORT error and the + // delete volume is not called + err = parentVol.flattenCloneImage(ctx) + if err != nil { + return nil, getGRPCErrorForCreateVolume(err) + } + } err = reserveVol(ctx, rbdVol, rbdSnap, cr) if err != nil { return nil, status.Error(codes.Internal, err.Error()) @@ -233,7 +253,7 @@ func (cs *ControllerServer) CreateVolume(ctx context.Context, req *csi.CreateVol } }() - err = cs.createBackingImage(ctx, cr, rbdVol, rbdSnap) + err = cs.createBackingImage(ctx, cr, rbdVol, parentVol, rbdSnap) if err != nil { var efip ErrFlattenInProgress if errors.As(err, &efip) { @@ -268,7 +288,7 @@ func (cs *ControllerServer) CreateVolume(ctx context.Context, req *csi.CreateVol // return success,the hardlimit is reached it starts a task to flatten the // image and return Aborted func checkFlatten(ctx context.Context, rbdVol *rbdVolume, cr *util.Credentials) error { - err := rbdVol.flattenRbdImage(ctx, cr, false) + err := rbdVol.flattenRbdImage(ctx, cr, false, rbdHardMaxCloneDepth, rbdSoftMaxCloneDepth) if err != nil { var efip ErrFlattenInProgress if errors.As(err, &efip) { @@ -318,7 +338,7 @@ func (cs *ControllerServer) createVolumeFromSnapshot(ctx context.Context, cr *ut return nil } -func (cs *ControllerServer) createBackingImage(ctx context.Context, cr *util.Credentials, rbdVol *rbdVolume, rbdSnap *rbdSnapshot) error { +func (cs *ControllerServer) createBackingImage(ctx context.Context, cr *util.Credentials, rbdVol, parentVol *rbdVolume, rbdSnap *rbdSnapshot) error { var err error var j = &journal.Connection{} @@ -328,12 +348,20 @@ func (cs *ControllerServer) createBackingImage(ctx context.Context, cr *util.Cre } defer j.Destroy() + // nolint:gocritic // this ifElseChain can not be rewritten to a switch statement if rbdSnap != nil { err = cs.createVolumeFromSnapshot(ctx, cr, rbdVol, rbdSnap.SnapID) if err != nil { return err } util.DebugLog(ctx, "created volume %s from snapshot %s", rbdVol.RequestName, rbdSnap.RbdSnapName) + } else if parentVol != nil { + if acquired := cs.VolumeLocks.TryAcquire(parentVol.VolID); !acquired { + klog.Infof(util.Log(ctx, util.VolumeOperationAlreadyExistsFmt), parentVol.VolID) + return status.Errorf(codes.Aborted, util.VolumeOperationAlreadyExistsFmt, parentVol.VolID) + } + defer cs.VolumeLocks.Release(parentVol.VolID) + return rbdVol.createCloneFromImage(ctx, parentVol) } else { err = createImage(ctx, rbdVol, cr) if err != nil { @@ -367,7 +395,7 @@ func (cs *ControllerServer) createBackingImage(ctx context.Context, cr *util.Cre } if rbdSnap != nil { - err = rbdVol.flattenRbdImage(ctx, cr, false) + err = rbdVol.flattenRbdImage(ctx, cr, false, rbdHardMaxCloneDepth, rbdSoftMaxCloneDepth) if err != nil { klog.Errorf(util.Log(ctx, "failed to flatten image %s: %v"), rbdVol, err) return err @@ -384,38 +412,53 @@ func (cs *ControllerServer) createBackingImage(ctx context.Context, cr *util.Cre return nil } -func (cs *ControllerServer) checkSnapshotSource(ctx context.Context, req *csi.CreateVolumeRequest, - cr *util.Credentials) (*rbdSnapshot, error) { +func checkContentSource(ctx context.Context, req *csi.CreateVolumeRequest, cr *util.Credentials) (*rbdVolume, *rbdSnapshot, error) { if req.VolumeContentSource == nil { - return nil, nil + return nil, nil, nil } - - snapshot := req.VolumeContentSource.GetSnapshot() - if snapshot == nil { - return nil, status.Error(codes.InvalidArgument, "volume Snapshot cannot be empty") - } - - snapshotID := snapshot.GetSnapshotId() - if snapshotID == "" { - return nil, status.Error(codes.InvalidArgument, "volume Snapshot ID cannot be empty") - } - - rbdSnap := &rbdSnapshot{} - if err := genSnapFromSnapID(ctx, rbdSnap, snapshotID, cr); err != nil { - var esnf ErrSnapNotFound - if !errors.As(err, &esnf) { - return nil, status.Error(codes.Internal, err.Error()) + volumeSource := req.VolumeContentSource + switch volumeSource.Type.(type) { + case *csi.VolumeContentSource_Snapshot: + snapshot := req.VolumeContentSource.GetSnapshot() + if snapshot == nil { + return nil, nil, status.Error(codes.NotFound, "volume Snapshot cannot be empty") } - - var epnf util.ErrPoolNotFound - if errors.As(err, &epnf) { + snapshotID := snapshot.GetSnapshotId() + if snapshotID == "" { + return nil, nil, status.Errorf(codes.NotFound, "volume Snapshot ID cannot be empty") + } + rbdSnap := &rbdSnapshot{} + if err := genSnapFromSnapID(ctx, rbdSnap, snapshotID, cr); err != nil { klog.Errorf(util.Log(ctx, "failed to get backend snapshot for %s: %v"), snapshotID, err) - return nil, status.Error(codes.InvalidArgument, err.Error()) + var esnf ErrSnapNotFound + if !errors.As(err, &esnf) { + return nil, nil, status.Error(codes.Internal, err.Error()) + } + return nil, nil, status.Errorf(codes.NotFound, "%s snapshot doesnot exists", snapshotID) } - - return nil, status.Error(codes.InvalidArgument, "missing requested Snapshot ID") + return nil, rbdSnap, nil + case *csi.VolumeContentSource_Volume: + vol := req.VolumeContentSource.GetVolume() + if vol == nil { + return nil, nil, status.Error(codes.NotFound, "volume cannot be empty") + } + volID := vol.GetVolumeId() + if volID == "" { + return nil, nil, status.Errorf(codes.NotFound, "volume ID cannot be empty") + } + // TODO need to support cloning for encrypted volume + rbdvol, err := genVolFromVolID(ctx, volID, cr, nil) + if err != nil { + klog.Errorf(util.Log(ctx, "failed to get backend image for %s: %v"), volID, err) + var esnf ErrImageNotFound + if !errors.As(err, &esnf) { + return nil, nil, status.Error(codes.Internal, err.Error()) + } + return nil, nil, status.Errorf(codes.NotFound, "%s image doesnot exists", volID) + } + return rbdvol, nil, nil } - return rbdSnap, nil + return nil, nil, status.Errorf(codes.InvalidArgument, "not a proper volume source") } // DeleteLegacyVolume deletes a volume provisioned using version 1.0.0 of the plugin @@ -573,6 +616,20 @@ func (cs *ControllerServer) DeleteVolume(ctx context.Context, req *csi.DeleteVol return nil, status.Errorf(codes.Internal, "rbd %s is still being used", rbdVol.RbdImageName) } + // delete the temporary rbd image created as part of volume clone during + // create volume + var einf ErrImageNotFound + tempClone := rbdVol.generateTempClone() + err = deleteImage(ctx, tempClone, cr) + if err != nil { + // return error if it is not ErrImageNotFound + if !errors.As(err, &einf) { + klog.Errorf(util.Log(ctx, "failed to delete rbd image: %s with error: %v"), + tempClone, err) + return nil, status.Error(codes.Internal, err.Error()) + } + } + // Deleting rbd image util.DebugLog(ctx, "deleting image %s", rbdVol.RbdImageName) if err = deleteImage(ctx, rbdVol, cr); err != nil { @@ -705,7 +762,7 @@ func (cs *ControllerServer) CreateSnapshot(ctx context.Context, req *csi.CreateS } defer vol.Destroy() - err = vol.flattenRbdImage(ctx, cr, false) + err = vol.flattenRbdImage(ctx, cr, false, rbdHardMaxCloneDepth, rbdSoftMaxCloneDepth) var efip ErrFlattenInProgress if errors.As(err, &efip) { return &csi.CreateSnapshotResponse{ @@ -869,7 +926,7 @@ func (cs *ControllerServer) doSnapshotClone(ctx context.Context, parentVol *rbdV return ready, cloneRbd, err } - err = cloneRbd.flattenRbdImage(ctx, cr, false) + err = cloneRbd.flattenRbdImage(ctx, cr, false, rbdHardMaxCloneDepth, rbdSoftMaxCloneDepth) if err != nil { var efip ErrFlattenInProgress if errors.As(err, &efip) { diff --git a/internal/rbd/nodeserver.go b/internal/rbd/nodeserver.go index 85ab621ac..081d170eb 100644 --- a/internal/rbd/nodeserver.go +++ b/internal/rbd/nodeserver.go @@ -268,7 +268,7 @@ func (ns *NodeServer) stageTransaction(ctx context.Context, req *csi.NodeStageVo return transaction, err } if feature { - err = volOptions.flattenRbdImage(ctx, cr, true) + err = volOptions.flattenRbdImage(ctx, cr, true, rbdHardMaxCloneDepth, rbdSoftMaxCloneDepth) if err != nil { return transaction, err } diff --git a/internal/rbd/rbd_journal.go b/internal/rbd/rbd_journal.go index 1a0093c38..1146b5f01 100644 --- a/internal/rbd/rbd_journal.go +++ b/internal/rbd/rbd_journal.go @@ -181,7 +181,7 @@ func checkSnapCloneExists(ctx context.Context, parentVol *rbdVolume, rbdSnap *rb } // check snapshot exists if not create it - _, err = vol.getSnapInfo(rbdSnap) + err = vol.checkSnapExists(rbdSnap) var esnf ErrSnapNotFound if errors.As(err, &esnf) { // create snapshot @@ -223,12 +223,18 @@ func checkSnapCloneExists(ctx context.Context, parentVol *rbdVolume, rbdSnap *rb /* Check comment on checkSnapExists, to understand how this function behaves -**NOTE:** These functions manipulate the rados omaps that hold information regarding -volume names as requested by the CSI drivers. Hence, these need to be invoked only when the -respective CSI snapshot or volume name based locks are held, as otherwise racy access to these -omaps may end up leaving the omaps in an inconsistent state. +**NOTE:** These functions manipulate the rados omaps that hold information +regarding volume names as requested by the CSI drivers. Hence, these need to be +invoked only when the respective CSI snapshot or volume name based locks are +held, as otherwise racy access to these omaps may end up leaving the omaps in +an inconsistent state. + +parentVol is required to check the clone is created from the requested parent +image or not, if temporary snapshots and clones created for the volume when the +content source is volume we need to recover from the stale entries or complete +the pending operations. */ -func (rv *rbdVolume) Exists(ctx context.Context) (bool, error) { +func (rv *rbdVolume) Exists(ctx context.Context, parentVol *rbdVolume) (bool, error) { err := validateRbdVol(rv) if err != nil { return false, err @@ -277,6 +283,16 @@ func (rv *rbdVolume) Exists(ctx context.Context) (bool, error) { if err != nil { var einf ErrImageNotFound if errors.As(err, &einf) { + // Need to check cloned info here not on createvolume, + if parentVol != nil { + found, cErr := rv.checkCloneImage(ctx, parentVol) + if found && cErr == nil { + return true, nil + } + if cErr != nil { + return false, cErr + } + } err = j.UndoReservation(ctx, rv.JournalPool, rv.Pool, rv.RbdImageName, rv.RequestName) return false, err diff --git a/internal/rbd/rbd_util.go b/internal/rbd/rbd_util.go index 70ae87f96..de71a51ae 100644 --- a/internal/rbd/rbd_util.go +++ b/internal/rbd/rbd_util.go @@ -433,7 +433,7 @@ func flattenClonedRbdImages(ctx context.Context, snaps []snapshotInfo, pool, mon for _, s := range snaps { if s.Namespace.Type == "trash" { rv.RbdImageName = s.Namespace.OriginalName - err = rv.flattenRbdImage(ctx, cr, true) + err = rv.flattenRbdImage(ctx, cr, true, rbdHardMaxCloneDepth, rbdSoftMaxCloneDepth) if err != nil { klog.Errorf(util.Log(ctx, "failed to flatten %s; err %v"), rv, err) continue @@ -443,7 +443,7 @@ func flattenClonedRbdImages(ctx context.Context, snaps []snapshotInfo, pool, mon return nil } -func (rv *rbdVolume) flattenRbdImage(ctx context.Context, cr *util.Credentials, forceFlatten bool) error { +func (rv *rbdVolume) flattenRbdImage(ctx context.Context, cr *util.Credentials, forceFlatten bool, hardlimit, softlimit uint) error { var depth uint var err error @@ -453,10 +453,10 @@ func (rv *rbdVolume) flattenRbdImage(ctx context.Context, cr *util.Credentials, if err != nil { return err } - klog.Infof(util.Log(ctx, "clone depth is (%d), configured softlimit (%d) and hardlimit (%d) for %s"), depth, rbdSoftMaxCloneDepth, rbdHardMaxCloneDepth, rv) + klog.Infof(util.Log(ctx, "clone depth is (%d), configured softlimit (%d) and hardlimit (%d) for %s"), depth, softlimit, hardlimit, rv) } - if forceFlatten || (depth >= rbdHardMaxCloneDepth) || (depth >= rbdSoftMaxCloneDepth) { + if forceFlatten || (depth >= hardlimit) || (depth >= softlimit) { args := []string{"flatten", rv.Pool + "/" + rv.RbdImageName, "--id", cr.ID, "--keyfile=" + cr.KeyFile, "-m", rv.Monitors} supported, err := addRbdManagerTask(ctx, rv, args) if supported { @@ -464,13 +464,13 @@ func (rv *rbdVolume) flattenRbdImage(ctx context.Context, cr *util.Credentials, klog.Errorf(util.Log(ctx, "failed to add task flatten for %s : %v"), rv, err) return err } - if forceFlatten || depth >= rbdHardMaxCloneDepth { + if forceFlatten || depth >= hardlimit { return ErrFlattenInProgress{err: fmt.Errorf("flatten is in progress for image %s", rv.RbdImageName)} } } if !supported { klog.Errorf(util.Log(ctx, "task manager does not support flatten,image will be flattened once hardlimit is reached: %v"), err) - if forceFlatten || depth >= rbdHardMaxCloneDepth { + if forceFlatten || depth >= hardlimit { err = rv.Connect(cr) if err != nil { return err @@ -1008,30 +1008,29 @@ func (rv *rbdVolume) getImageInfo() error { } /* -getSnapInfo queries rbd about the snapshots of the given image and returns its metadata, and -returns ErrImageNotFound if provided image is not found, and ErrSnapNotFound if provided snap -is not found in the images snapshot list +checkSnapExists queries rbd about the snapshots of the given image and returns +ErrImageNotFound if provided image is not found, and ErrSnapNotFound if +provided snap is not found in the images snapshot list */ -func (rv *rbdVolume) getSnapInfo(rbdSnap *rbdSnapshot) (librbd.SnapInfo, error) { - invalidSnap := librbd.SnapInfo{} +func (rv *rbdVolume) checkSnapExists(rbdSnap *rbdSnapshot) error { image, err := rv.open() if err != nil { - return invalidSnap, err + return err } defer image.Close() snaps, err := image.GetSnapshotNames() if err != nil { - return invalidSnap, err + return err } for _, snap := range snaps { if snap.Name == rbdSnap.RbdSnapName { - return snap, nil + return nil } } - return invalidSnap, ErrSnapNotFound{rbdSnap.RbdSnapName, fmt.Errorf("snap %s not found", rbdSnap.String())} + return ErrSnapNotFound{rbdSnap.RbdSnapName, fmt.Errorf("snap %s not found", rbdSnap.String())} } // rbdImageMetadataStash strongly typed JSON spec for stashed RBD image metadata diff --git a/internal/rbd/snapshot.go b/internal/rbd/snapshot.go index 8af1f9c4d..0c0244177 100644 --- a/internal/rbd/snapshot.go +++ b/internal/rbd/snapshot.go @@ -33,6 +33,7 @@ func createRBDClone(ctx context.Context, parentVol, cloneRbdVol *rbdVolume, snap return err } + snap.RbdImageName = parentVol.RbdImageName // create clone image and delete snapshot err = cloneRbdVol.cloneRbdImageFromSnapshot(ctx, snap) if err != nil {