From c773097f856f12ae0a6d7cd5059636e9501460cd Mon Sep 17 00:00:00 2001 From: Humble Chirammal Date: Fri, 7 Aug 2020 12:36:45 +0530 Subject: [PATCH] cephfs: add snapshot and clone helper functions Signed-off-by: Humble Chirammal --- internal/cephfs/clone.go | 225 +++++++++++++++++++++++++++++++++ internal/cephfs/snapshot.go | 240 ++++++++++++++++++++++++++++++++++++ 2 files changed, 465 insertions(+) create mode 100644 internal/cephfs/clone.go create mode 100644 internal/cephfs/snapshot.go diff --git a/internal/cephfs/clone.go b/internal/cephfs/clone.go new file mode 100644 index 000000000..b1bf46ea8 --- /dev/null +++ b/internal/cephfs/clone.go @@ -0,0 +1,225 @@ +/* +Copyright 2020 The Ceph-CSI Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package cephfs + +import ( + "context" + "errors" + "fmt" + + "github.com/ceph/ceph-csi/internal/util" + + klog "k8s.io/klog/v2" +) + +const ( + // cephFSCloneFailed indicates that clone is in failed state. + cephFSCloneFailed = "failed" + // cephFSCloneCompleted indicates that clone is in in-progress state. + cephFSCloneInprogress = "in-progress" + // cephFSCloneComplete indicates that clone is in complete state. + cephFSCloneComplete = "complete" + // snapshotIsProtected string indicates that the snapshot is currently protected. + snapshotIsProtected = "yes" +) + +func createCloneFromSubvolume(ctx context.Context, volID, cloneID volumeID, volOpt, parentvolOpt *volumeOptions, cr *util.Credentials) error { + snapshotID := cloneID + err := createSnapshot(ctx, parentvolOpt, cr, snapshotID, volID) + if err != nil { + klog.Errorf(util.Log(ctx, "failed to create snapshot %s %v"), snapshotID, err) + return err + } + var ( + // if protectErr is not nil we will delete the snapshot as the protect fails + protectErr error + // if cloneErr is not nil we will unprotect the snapshot and delete the snapshot + cloneErr error + ) + defer func() { + if protectErr != nil { + err = deleteSnapshot(ctx, parentvolOpt, cr, snapshotID, volID) + if err != nil { + klog.Errorf(util.Log(ctx, "failed to delete snapshot %s %v"), snapshotID, err) + } + } + + if cloneErr != nil { + if err = purgeVolume(ctx, cloneID, cr, volOpt, true); err != nil { + klog.Errorf(util.Log(ctx, "failed to delete volume %s: %v"), cloneID, err) + } + if err = unprotectSnapshot(ctx, parentvolOpt, cr, snapshotID, volID); err != nil { + // Incase the snap is already unprotected we get ErrSnapProtectionExist error code + // in that case we are safe and we could discard this error and we are good to go + // ahead with deletion + if !errors.Is(err, ErrSnapProtectionExist) { + klog.Errorf(util.Log(ctx, "failed to unprotect snapshot %s %v"), snapshotID, err) + } + } + if err = deleteSnapshot(ctx, parentvolOpt, cr, snapshotID, volID); err != nil { + klog.Errorf(util.Log(ctx, "failed to delete snapshot %s %v"), snapshotID, err) + } + } + }() + protectErr = protectSnapshot(ctx, parentvolOpt, cr, snapshotID, volID) + if protectErr != nil { + klog.Errorf(util.Log(ctx, "failed to protect snapshot %s %v"), snapshotID, protectErr) + return protectErr + } + + cloneErr = cloneSnapshot(ctx, parentvolOpt, cr, volID, snapshotID, cloneID, volOpt) + if cloneErr != nil { + klog.Errorf(util.Log(ctx, "failed to clone snapshot %s %s to %s %v"), volID, snapshotID, cloneID, cloneErr) + return cloneErr + } + var clone CloneStatus + clone, cloneErr = getCloneInfo(ctx, volOpt, cr, cloneID) + if cloneErr != nil { + return cloneErr + } + + switch clone.Status.State { + case cephFSCloneInprogress: + klog.Errorf(util.Log(ctx, "clone is in progress for %v"), cloneID) + return ErrCloneInProgress + case cephFSCloneFailed: + klog.Errorf(util.Log(ctx, "clone failed for %v"), cloneID) + cloneFailedErr := fmt.Errorf("clone %s is in %s state", cloneID, clone.Status.State) + return cloneFailedErr + case cephFSCloneComplete: + // This is a work around to fix sizing issue for cloned images + err = resizeVolume(ctx, volOpt, cr, cloneID, volOpt.Size) + if err != nil { + klog.Errorf(util.Log(ctx, "failed to expand volume %s: %v"), cloneID, err) + return err + } + // As we completed clone, remove the intermediate snap + if err = unprotectSnapshot(ctx, parentvolOpt, cr, snapshotID, volID); err != nil { + // Incase the snap is already unprotected we get ErrSnapProtectionExist error code + // in that case we are safe and we could discard this error and we are good to go + // ahead with deletion + if !errors.Is(err, ErrSnapProtectionExist) { + klog.Errorf(util.Log(ctx, "failed to unprotect snapshot %s %v"), snapshotID, err) + return err + } + } + if err = deleteSnapshot(ctx, parentvolOpt, cr, snapshotID, volID); err != nil { + klog.Errorf(util.Log(ctx, "failed to delete snapshot %s %v"), snapshotID, err) + return err + } + } + return nil +} + +func cleanupCloneFromSubvolumeSnapshot(ctx context.Context, volID, cloneID volumeID, parentVolOpt *volumeOptions, cr *util.Credentials) error { + // snapshot name is same as clone name as we need a name which can be + // identified during PVC-PVC cloning. + snapShotID := cloneID + snapInfo, err := getSnapshotInfo(ctx, parentVolOpt, cr, snapShotID, volID) + if err != nil { + if errors.Is(err, ErrSnapNotFound) { + return nil + } + return err + } + + if snapInfo.Protected == snapshotIsProtected { + err = unprotectSnapshot(ctx, parentVolOpt, cr, snapShotID, volID) + if err != nil { + klog.Errorf(util.Log(ctx, "failed to unprotect snapshot %s %v"), snapShotID, err) + return err + } + } + err = deleteSnapshot(ctx, parentVolOpt, cr, snapShotID, volID) + if err != nil { + klog.Errorf(util.Log(ctx, "failed to delete snapshot %s %v"), snapShotID, err) + return err + } + return nil +} + +func createCloneFromSnapshot(ctx context.Context, parentVolOpt, volOptions *volumeOptions, vID *volumeIdentifier, sID *snapshotIdentifier, cr *util.Credentials) error { + snapID := volumeID(sID.FsSnapshotName) + err := cloneSnapshot(ctx, parentVolOpt, cr, volumeID(sID.FsSubvolName), snapID, volumeID(vID.FsSubvolName), volOptions) + if err != nil { + return err + } + defer func() { + if err != nil { + if !errors.Is(err, ErrCloneInProgress) { + if dErr := purgeVolume(ctx, volumeID(vID.FsSubvolName), cr, volOptions, true); dErr != nil { + klog.Errorf(util.Log(ctx, "failed to delete volume %s: %v"), vID.FsSubvolName, dErr) + } + } + } + }() + var clone = CloneStatus{} + clone, err = getCloneInfo(ctx, volOptions, cr, volumeID(vID.FsSubvolName)) + if err != nil { + return err + } + switch clone.Status.State { + case cephFSCloneInprogress: + return ErrCloneInProgress + case cephFSCloneFailed: + return fmt.Errorf("clone %s is in %s state", vID.FsSubvolName, clone.Status.State) + case cephFSCloneComplete: + // The clonedvolume currently does not reflect the proper size due to an issue in cephfs + // however this is getting addressed in cephfs and the parentvolume size will be reflected + // in the new cloned volume too. Till then we are explicitly making the size set + err = resizeVolume(ctx, volOptions, cr, volumeID(vID.FsSubvolName), volOptions.Size) + if err != nil { + klog.Errorf(util.Log(ctx, "failed to expand volume %s with error: %v"), vID.FsSubvolName, err) + return err + } + } + return nil +} + +type CloneStatus struct { + Status struct { + State string `json:"state"` + } `json:"status"` +} + +func getCloneInfo(ctx context.Context, volOptions *volumeOptions, cr *util.Credentials, volID volumeID) (CloneStatus, error) { + clone := CloneStatus{} + args := []string{ + "fs", + "clone", + "status", + volOptions.FsName, + string(volID), + "--group_name", + volOptions.SubvolumeGroup, + "-m", volOptions.Monitors, + "-c", util.CephConfigPath, + "-n", cephEntityClientPrefix + cr.ID, + "--keyfile=" + cr.KeyFile, + "--format=json", + } + err := execCommandJSON( + ctx, + &clone, + "ceph", + args[:]...) + if err != nil { + klog.Errorf(util.Log(ctx, "failed to get subvolume clone info %s(%s) in fs %s"), string(volID), err, volOptions.FsName) + return clone, err + } + return clone, nil +} diff --git a/internal/cephfs/snapshot.go b/internal/cephfs/snapshot.go new file mode 100644 index 000000000..c7bfafff9 --- /dev/null +++ b/internal/cephfs/snapshot.go @@ -0,0 +1,240 @@ +/* +Copyright 2020 The Ceph-CSI Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package cephfs + +import ( + "context" + "strings" + + "github.com/ceph/ceph-csi/internal/util" + + "github.com/golang/protobuf/ptypes/timestamp" + klog "k8s.io/klog/v2" +) + +// cephfsSnapshot represents a CSI snapshot and its cluster information. +type cephfsSnapshot struct { + NamePrefix string + Monitors string + // MetadataPool & Pool fields are not used atm. But its definitely good to have it in this struct + // so keeping it here + MetadataPool string + Pool string + ClusterID string + RequestName string +} + +func createSnapshot(ctx context.Context, volOptions *volumeOptions, cr *util.Credentials, snapID, volID volumeID) error { + args := []string{ + "fs", + "subvolume", + "snapshot", + "create", + volOptions.FsName, + string(volID), + string(snapID), + "--group_name", + volOptions.SubvolumeGroup, + "-m", volOptions.Monitors, + "-c", util.CephConfigPath, + "-n", cephEntityClientPrefix + cr.ID, + "--keyfile=" + cr.KeyFile, + } + + err := execCommandErr( + ctx, + "ceph", + args[:]...) + if err != nil { + klog.Errorf(util.Log(ctx, "failed to create subvolume snapshot %s %s(%s) in fs %s"), string(snapID), string(volID), err, volOptions.FsName) + return err + } + return nil +} + +func deleteSnapshot(ctx context.Context, volOptions *volumeOptions, cr *util.Credentials, snapID, volID volumeID) error { + args := []string{ + "fs", + "subvolume", + "snapshot", + "rm", + volOptions.FsName, + string(volID), + string(snapID), + "--group_name", + volOptions.SubvolumeGroup, + "-m", volOptions.Monitors, + "-c", util.CephConfigPath, + "-n", cephEntityClientPrefix + cr.ID, + "--keyfile=" + cr.KeyFile, + "--force", + } + + err := execCommandErr( + ctx, + "ceph", + args[:]...) + if err != nil { + klog.Errorf(util.Log(ctx, "failed to delete subvolume snapshot %s %s(%s) in fs %s"), string(snapID), string(volID), err, volOptions.FsName) + return err + } + return nil +} + +type snapshotInfo struct { + CreatedAt string `json:"created_at"` + CreationTime *timestamp.Timestamp + DataPool string `json:"data_pool"` + HasPendingClones string `json:"has_pending_clones"` + Protected string `json:"protected"` + Size int `json:"size"` +} + +func getSnapshotInfo(ctx context.Context, volOptions *volumeOptions, cr *util.Credentials, snapID, volID volumeID) (snapshotInfo, error) { + snap := snapshotInfo{} + args := []string{ + "fs", + "subvolume", + "snapshot", + "info", + volOptions.FsName, + string(volID), + string(snapID), + "--group_name", + volOptions.SubvolumeGroup, + "-m", volOptions.Monitors, + "-c", util.CephConfigPath, + "-n", cephEntityClientPrefix + cr.ID, + "--keyfile=" + cr.KeyFile, + "--format=json", + } + err := execCommandJSON( + ctx, + &snap, + "ceph", + args[:]...) + if err != nil { + if strings.Contains(err.Error(), ErrSnapNotFound.Error()) { + return snapshotInfo{}, err + } + klog.Errorf(util.Log(ctx, "failed to get subvolume snapshot info %s %s(%s) in fs %s"), string(snapID), string(volID), err, volOptions.FsName) + return snapshotInfo{}, err + } + return snap, nil +} + +func protectSnapshot(ctx context.Context, volOptions *volumeOptions, cr *util.Credentials, snapID, volID volumeID) error { + args := []string{ + "fs", + "subvolume", + "snapshot", + "protect", + volOptions.FsName, + string(volID), + string(snapID), + "--group_name", + volOptions.SubvolumeGroup, + "-m", volOptions.Monitors, + "-c", util.CephConfigPath, + "-n", cephEntityClientPrefix + cr.ID, + "--keyfile=" + cr.KeyFile, + } + + err := execCommandErr( + ctx, + "ceph", + args[:]...) + if err != nil { + if strings.Contains(err.Error(), ErrSnapProtectionExist.Error()) { + return nil + } + klog.Errorf(util.Log(ctx, "failed to protect subvolume snapshot %s %s(%s) in fs %s"), string(snapID), string(volID), err, volOptions.FsName) + return err + } + return nil +} + +func unprotectSnapshot(ctx context.Context, volOptions *volumeOptions, cr *util.Credentials, snapID, volID volumeID) error { + args := []string{ + "fs", + "subvolume", + "snapshot", + "unprotect", + volOptions.FsName, + string(volID), + string(snapID), + "--group_name", + volOptions.SubvolumeGroup, + "-m", volOptions.Monitors, + "-c", util.CephConfigPath, + "-n", cephEntityClientPrefix + cr.ID, + "--keyfile=" + cr.KeyFile, + } + + err := execCommandErr( + ctx, + "ceph", + args[:]...) + if err != nil { + // Incase the snap is already unprotected we get ErrSnapProtectionExist error code + // in that case we are safe and we could discard this error. + if strings.Contains(err.Error(), ErrSnapProtectionExist.Error()) { + return nil + } + klog.Errorf(util.Log(ctx, "failed to unprotect subvolume snapshot %s %s(%s) in fs %s"), string(snapID), string(volID), err, volOptions.FsName) + return err + } + return nil +} + +func cloneSnapshot(ctx context.Context, parentVolOptions *volumeOptions, cr *util.Credentials, volID, snapID, cloneID volumeID, cloneVolOptions *volumeOptions) error { + args := []string{ + "fs", + "subvolume", + "snapshot", + "clone", + parentVolOptions.FsName, + string(volID), + string(snapID), + string(cloneID), + "--group_name", + parentVolOptions.SubvolumeGroup, + "--target_group_name", + cloneVolOptions.SubvolumeGroup, + "-m", parentVolOptions.Monitors, + "-c", util.CephConfigPath, + "-n", cephEntityClientPrefix + cr.ID, + "--keyfile=" + cr.KeyFile, + } + if cloneVolOptions.Pool != "" { + args = append(args, "--pool_layout", cloneVolOptions.Pool) + } + + err := execCommandErr( + ctx, + "ceph", + args[:]...) + + if err != nil { + klog.Errorf(util.Log(ctx, "failed to clone subvolume snapshot %s %s(%s) in fs %s"), string(cloneID), string(volID), err, parentVolOptions.FsName) + if strings.HasPrefix(err.Error(), ErrVolumeNotFound.Error()) { + return ErrVolumeNotFound + } + return err + } + return nil +}