mirror of
https://github.com/ceph/ceph-csi.git
synced 2025-06-13 10:33:35 +00:00
rbd: Implement snapshot and clone from snapshot
This Adds a support for create,delete snapshot and creating a new rbd image from the snapshot. * Create a snapshot * Create a temporary snapshot from the parent volume * Clone a new image from a temporary snapshot with options --rbd-default-clone-format 2 --image-feature layering,deep-flatten * Delete temporary snapshot created * Create a new snapshot from cloned image * Check the image chain depth, if the Softlimit is reached Add a task Flatten the cloned image and return success. if the depth is reached hard limit Add a task Flatten the cloned image and return snapshot status ready as false ```bash 1) rbd snap create <RBD image for src k8s volume>@<random snap name> 2) rbd clone --rbd-default-clone-format 2 --image-feature layering,deep-flatten <RBD image for src k8s volume>@<random snap> <RBD image for temporary snap image> 3) rbd snap rm <RBD image for src k8s volume>@<random snap name> 4) rbd snap rm <RBD image for temporary snap image>@<random snap name> 5) check the depth, if the depth is greater than configured hard limit add a task to flatten the cloned image return snapshot status ready as false if the depth is greater than soft limit add a task to flatten the image and return success ``` * Create a clone from snapshot * Clone a new image from the snapshot with user-provided options * Check the depth(n) of the cloned image if n>=(hard limit) Add task to flatten the image and return ABORT (to avoid image leak) ```bash 1) rbd clone --rbd-default-clone-format 2 --image-feature <k8s dst vol config> <RBD image for temporary snap image>@<random snap name> <RBD image for k8s dst vol> 2) check the depth, if the depth is greater than configured hard limit add a task to flatten the cloned image return ABORT error if the depth is greater than soft limit add a task to flatten the image and return success ``` * Delete snapshot or pvc * Move the temporary cloned image to the trash * Add task to remove the image from the trash ```bash 1) rbd trash mv <cloned image> 2) ceph rbd task trash remove <cloned image> ``` With earlier implementation to delete the image, we used to add a task to remove the image with new changes this cannot be done as the image may contain snapshots or linking.so we will be doing below steps to delete an image(this will be applicable for both normal image and cloned image) * Move the rbd image to the trash * Add task to remove the image from the trash ```bash 1) rbd trash mv <image> 2) ceph rbd task trash remove <image> ``` Signed-off-by: Madhu Rajanna <madhupr007@gmail.com>
This commit is contained in:
committed by
mergify[bot]
parent
d95b07e39b
commit
e3a63029a3
@ -33,6 +33,7 @@ import (
|
||||
"github.com/ceph/go-ceph/rados"
|
||||
librbd "github.com/ceph/go-ceph/rbd"
|
||||
"github.com/container-storage-interface/spec/lib/go/csi"
|
||||
"github.com/golang/protobuf/ptypes"
|
||||
"github.com/golang/protobuf/ptypes/timestamp"
|
||||
"github.com/pborman/uuid"
|
||||
"github.com/pkg/errors"
|
||||
@ -62,6 +63,9 @@ const (
|
||||
rbdImageRequiresEncryption = "requiresEncryption"
|
||||
// image metadata key for encryption
|
||||
encryptionMetaKey = ".rbd.csi.ceph.com/encrypted"
|
||||
|
||||
// go-ceph will provide rbd.ImageOptionCloneFormat
|
||||
imageOptionCloneFormat = librbd.RbdImageOption(12)
|
||||
)
|
||||
|
||||
// rbdVolume represents a CSI volume and its RBD image specifics
|
||||
@ -89,6 +93,7 @@ type rbdVolume struct {
|
||||
Pool string `json:"pool"`
|
||||
DataPool string
|
||||
ImageID string
|
||||
ParentName string
|
||||
imageFeatureSet librbd.FeatureSet
|
||||
AdminID string `json:"adminId"`
|
||||
UserID string `json:"userId"`
|
||||
@ -103,7 +108,7 @@ type rbdVolume struct {
|
||||
Encrypted bool
|
||||
readOnly bool
|
||||
KMS util.EncryptionKMS
|
||||
|
||||
CreatedAt *timestamp.Timestamp
|
||||
// conn is a connection to the Ceph cluster obtained from a ConnPool
|
||||
conn *util.ClusterConnection
|
||||
// an opened IOContext, call .openIoctx() before using
|
||||
@ -391,24 +396,113 @@ func deleteImage(ctx context.Context, pOpts *rbdVolume, cr *util.Credentials) er
|
||||
return nil
|
||||
}
|
||||
|
||||
func (rv *rbdVolume) removeImageFromTrash() error {
|
||||
err := rv.openIoctx()
|
||||
func (rv *rbdVolume) getCloneDepth(ctx context.Context) (uint, error) {
|
||||
var depth uint
|
||||
vol := rbdVolume{
|
||||
Pool: rv.Pool,
|
||||
Monitors: rv.Monitors,
|
||||
RbdImageName: rv.RbdImageName,
|
||||
conn: rv.conn,
|
||||
}
|
||||
|
||||
err := vol.openIoctx()
|
||||
if err != nil {
|
||||
return depth, err
|
||||
}
|
||||
|
||||
defer func() {
|
||||
vol.ioctx.Destroy()
|
||||
}()
|
||||
for {
|
||||
if vol.RbdImageName == "" {
|
||||
return depth, nil
|
||||
}
|
||||
err = vol.getImageInfo()
|
||||
if err != nil {
|
||||
klog.Errorf(util.Log(ctx, "failed to check depth on image %s: %s"), vol, err)
|
||||
return depth, err
|
||||
}
|
||||
if vol.ParentName != "" {
|
||||
depth++
|
||||
}
|
||||
vol.RbdImageName = vol.ParentName
|
||||
}
|
||||
}
|
||||
|
||||
func (rv *rbdVolume) flattenRbdImage(ctx context.Context, cr *util.Credentials, forceFlatten bool) error {
|
||||
depth, err := rv.getCloneDepth(ctx)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
list, err := librbd.GetTrashList(rv.ioctx)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
for _, l := range list {
|
||||
if l.Name == rv.RbdImageName {
|
||||
err = librbd.TrashRemove(rv.ioctx, l.Id, true)
|
||||
return err
|
||||
klog.Infof(util.Log(ctx, "clone depth is (%d), configured softlimit (%d) and hardlimit (%d) for %s"), depth, rbdSoftMaxCloneDepth, rbdHardMaxCloneDepth, rv)
|
||||
|
||||
if forceFlatten || (depth >= rbdHardMaxCloneDepth) || (depth >= rbdSoftMaxCloneDepth) {
|
||||
args := []string{"flatten", rv.Pool + "/" + rv.RbdImageName, "--id", cr.ID, "--keyfile=" + cr.KeyFile, "-m", rv.Monitors}
|
||||
supported, err := addRbdManagerTask(ctx, rv, args)
|
||||
if supported {
|
||||
if err != nil {
|
||||
klog.Errorf(util.Log(ctx, "failed to add task flatten for %s : %v"), rv, err)
|
||||
return err
|
||||
}
|
||||
if forceFlatten || depth >= rbdHardMaxCloneDepth {
|
||||
return ErrFlattenInProgress{err: fmt.Errorf("flatten is in progress for image %s", rv.RbdImageName)}
|
||||
}
|
||||
}
|
||||
if !supported {
|
||||
klog.Errorf(util.Log(ctx, "task manager does not support flatten,image will be flattened once hardlimit is reached: %v"), err)
|
||||
if forceFlatten || depth >= rbdHardMaxCloneDepth {
|
||||
err = rv.Connect(cr)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
rbdImage, err := rv.open()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer rbdImage.Close()
|
||||
if err = rbdImage.Flatten(); err != nil {
|
||||
klog.Errorf(util.Log(ctx, "rbd failed to flatten image %s %s: %v"), rv.Pool, rv.RbdImageName, err)
|
||||
return err
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (rv *rbdVolume) hasFeature(feature uint64) bool {
|
||||
return (uint64(rv.imageFeatureSet) & feature) == feature
|
||||
}
|
||||
|
||||
func (rv *rbdVolume) checkImageChainHasFeature(ctx context.Context, feature uint64) (bool, error) {
|
||||
vol := rbdVolume{
|
||||
Pool: rv.Pool,
|
||||
Monitors: rv.Monitors,
|
||||
RbdImageName: rv.RbdImageName,
|
||||
conn: rv.conn,
|
||||
}
|
||||
err := vol.openIoctx()
|
||||
if err != nil {
|
||||
return false, err
|
||||
}
|
||||
defer vol.ioctx.Destroy()
|
||||
|
||||
for {
|
||||
if vol.RbdImageName == "" {
|
||||
return false, nil
|
||||
}
|
||||
err = vol.getImageInfo()
|
||||
if err != nil {
|
||||
klog.Errorf(util.Log(ctx, "failed to get image info for %s: %s"), vol, err)
|
||||
return false, err
|
||||
}
|
||||
if f := vol.hasFeature(feature); f {
|
||||
return true, nil
|
||||
}
|
||||
vol.RbdImageName = vol.ParentName
|
||||
}
|
||||
}
|
||||
|
||||
// genSnapFromSnapID generates a rbdSnapshot structure from the provided identifier, updating
|
||||
// the structure with elements from on-disk snapshot metadata as well
|
||||
func genSnapFromSnapID(ctx context.Context, rbdSnap *rbdSnapshot, snapshotID string, cr *util.Credentials) error {
|
||||
@ -454,7 +548,7 @@ func genSnapFromSnapID(ctx context.Context, rbdSnap *rbdSnapshot, snapshotID str
|
||||
rbdSnap.RequestName = imageAttributes.RequestName
|
||||
rbdSnap.RbdImageName = imageAttributes.SourceName
|
||||
rbdSnap.RbdSnapName = imageAttributes.ImageName
|
||||
|
||||
rbdSnap.ReservedID = vi.ObjectUUID
|
||||
// convert the journal pool ID to name, for use in DeleteSnapshot cases
|
||||
if imageAttributes.JournalPoolID != util.InvalidPoolID {
|
||||
rbdSnap.JournalPool, err = util.GetPoolName(rbdSnap.Monitors, cr, imageAttributes.JournalPoolID)
|
||||
@ -478,13 +572,13 @@ func genVolFromVolID(ctx context.Context, volumeID string, cr *util.Credentials,
|
||||
options = make(map[string]string)
|
||||
|
||||
// rbdVolume fields that are not filled up in this function are:
|
||||
// Mounter, MultiNodeWritable
|
||||
// Mounter, MultiNodeWritable
|
||||
rbdVol = &rbdVolume{VolID: volumeID}
|
||||
|
||||
err := vi.DecomposeCSIID(rbdVol.VolID)
|
||||
if err != nil {
|
||||
err = fmt.Errorf("error decoding volume ID (%s) (%s)", err, rbdVol.VolID)
|
||||
return nil, ErrInvalidVolID{err}
|
||||
return rbdVol, ErrInvalidVolID{err}
|
||||
}
|
||||
|
||||
rbdVol.ClusterID = vi.ClusterID
|
||||
@ -492,53 +586,71 @@ func genVolFromVolID(ctx context.Context, volumeID string, cr *util.Credentials,
|
||||
|
||||
rbdVol.Monitors, _, err = getMonsAndClusterID(ctx, options)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
return rbdVol, err
|
||||
}
|
||||
|
||||
rbdVol.Pool, err = util.GetPoolName(rbdVol.Monitors, cr, vi.LocationID)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
return rbdVol, err
|
||||
}
|
||||
|
||||
err = rbdVol.Connect(cr)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
return rbdVol, err
|
||||
}
|
||||
rbdVol.JournalPool = rbdVol.Pool
|
||||
|
||||
j, err := volJournal.Connect(rbdVol.Monitors, cr)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
return rbdVol, err
|
||||
}
|
||||
defer j.Destroy()
|
||||
|
||||
imageAttributes, err := j.GetImageAttributes(
|
||||
ctx, rbdVol.Pool, vi.ObjectUUID, false)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
return rbdVol, err
|
||||
}
|
||||
|
||||
rbdVol.RequestName = imageAttributes.RequestName
|
||||
rbdVol.RbdImageName = imageAttributes.ImageName
|
||||
rbdVol.ReservedID = vi.ObjectUUID
|
||||
|
||||
if imageAttributes.KmsID != "" {
|
||||
rbdVol.Encrypted = true
|
||||
rbdVol.KMS, err = util.GetKMS(imageAttributes.KmsID, secrets)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
return rbdVol, err
|
||||
}
|
||||
}
|
||||
rbdVol.RequestName = imageAttributes.RequestName
|
||||
rbdVol.RbdImageName = imageAttributes.ImageName
|
||||
|
||||
// convert the journal pool ID to name, for use in DeleteVolume cases
|
||||
if imageAttributes.JournalPoolID >= 0 {
|
||||
rbdVol.JournalPool, err = util.GetPoolName(rbdVol.Monitors, cr, imageAttributes.JournalPoolID)
|
||||
if err != nil {
|
||||
// TODO: If pool is not found we may leak the image (as DeleteVolume will return success)
|
||||
return nil, err
|
||||
return rbdVol, err
|
||||
}
|
||||
}
|
||||
|
||||
err = rbdVol.getImageInfo()
|
||||
rbdVol.ImageID, err = j.GetStoredImageID(ctx, rbdVol.JournalPool, rbdVol.ReservedID, cr)
|
||||
if _, ok := err.(util.ErrKeyNotFound); ok {
|
||||
err = rbdVol.getImageID()
|
||||
if err != nil {
|
||||
klog.Errorf(util.Log(ctx, "failed to get image id %s: %v"), rbdVol, err)
|
||||
return rbdVol, err
|
||||
}
|
||||
err = j.StoreImageID(ctx, rbdVol.JournalPool, rbdVol.ReservedID, rbdVol.ImageID, cr)
|
||||
if err != nil {
|
||||
klog.Errorf(util.Log(ctx, "failed to store volume id %s: %v"), rbdVol, err)
|
||||
return rbdVol, err
|
||||
}
|
||||
}
|
||||
if err != nil {
|
||||
klog.Errorf(util.Log(ctx, "failed to get stored image id: %v"), err)
|
||||
return rbdVol, err
|
||||
}
|
||||
|
||||
err = rbdVol.getImageInfo()
|
||||
return rbdVol, err
|
||||
}
|
||||
|
||||
@ -789,6 +901,62 @@ func (rv *rbdVolume) cloneRbdImageFromSnapshot(ctx context.Context, pSnapOpts *r
|
||||
return nil
|
||||
}
|
||||
|
||||
// imageInfo strongly typed JSON spec for image info
|
||||
type imageInfo struct {
|
||||
ObjectUUID string `json:"name"`
|
||||
Size int64 `json:"size"`
|
||||
Features []string `json:"features"`
|
||||
CreatedAt string `json:"create_timestamp"`
|
||||
Parent parentInfo `json:"parent"`
|
||||
}
|
||||
|
||||
// parentInfo spec for parent volume info
|
||||
type parentInfo struct {
|
||||
Image string `json:"image"`
|
||||
Pool string `json:"pool"`
|
||||
Snapshot string `json:"snapshost"`
|
||||
}
|
||||
|
||||
// updateVolWithImageInfo updates provided rbdVolume with information from on-disk data
|
||||
// regarding the same
|
||||
func (rv *rbdVolume) updateVolWithImageInfo(cr *util.Credentials) error {
|
||||
// rbd --format=json info [image-spec | snap-spec]
|
||||
var imgInfo imageInfo
|
||||
|
||||
stdout, stderr, err := util.ExecCommand("rbd",
|
||||
"-m", rv.Monitors,
|
||||
"--id", cr.ID,
|
||||
"--keyfile="+cr.KeyFile,
|
||||
"-c", util.CephConfigPath,
|
||||
"--format="+"json",
|
||||
"info", rv.String())
|
||||
if err != nil {
|
||||
klog.Errorf("failed getting information for image (%s): (%s)", rv, err)
|
||||
if strings.Contains(string(stderr), "rbd: error opening image "+rv.RbdImageName+
|
||||
": (2) No such file or directory") {
|
||||
return ErrImageNotFound{rv.String(), err}
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
||||
err = json.Unmarshal(stdout, &imgInfo)
|
||||
if err != nil {
|
||||
klog.Errorf("failed to parse JSON output of image info (%s): (%s)", rv, err)
|
||||
return fmt.Errorf("unmarshal failed: %+v. raw buffer response: %s", err, string(stdout))
|
||||
}
|
||||
|
||||
rv.VolSize = imgInfo.Size
|
||||
rv.ParentName = imgInfo.Parent.Image
|
||||
|
||||
tm, err := time.Parse(time.ANSIC, imgInfo.CreatedAt)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
rv.CreatedAt, err = ptypes.TimestampProto(tm)
|
||||
return err
|
||||
}
|
||||
|
||||
// getImageInfo queries rbd about the given image and returns its metadata, and returns
|
||||
// ErrImageNotFound if provided image is not found
|
||||
func (rv *rbdVolume) getImageInfo() error {
|
||||
@ -810,6 +978,10 @@ func (rv *rbdVolume) getImageInfo() error {
|
||||
return err
|
||||
}
|
||||
rv.imageFeatureSet = librbd.FeatureSet(features)
|
||||
err = rv.updateVolWithImageInfo(rv.conn.Creds)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
Reference in New Issue
Block a user