cephfs: implement snapshot protect and unprotect to go-ceph

Signed-off-by: Madhu Rajanna <madhupr007@gmail.com>
This commit is contained in:
Madhu Rajanna 2020-12-02 15:58:31 +05:30 committed by mergify[bot]
parent 78968f76e8
commit 814bf4459a
4 changed files with 29 additions and 53 deletions

View File

@ -68,7 +68,7 @@ func createCloneFromSubvolume(ctx context.Context, volID, cloneID volumeID, volO
if err = purgeVolume(ctx, cloneID, cr, volOpt, true); err != nil { if err = purgeVolume(ctx, cloneID, cr, volOpt, true); err != nil {
util.ErrorLog(ctx, "failed to delete volume %s: %v", cloneID, err) util.ErrorLog(ctx, "failed to delete volume %s: %v", cloneID, err)
} }
if err = unprotectSnapshot(ctx, parentvolOpt, cr, snapshotID, volID); err != nil { if err = parentvolOpt.unprotectSnapshot(ctx, snapshotID, volID); err != nil {
// In case the snap is already unprotected we get ErrSnapProtectionExist error code // In case the snap is already unprotected we get ErrSnapProtectionExist error code
// in that case we are safe and we could discard this error and we are good to go // in that case we are safe and we could discard this error and we are good to go
// ahead with deletion // ahead with deletion
@ -81,7 +81,7 @@ func createCloneFromSubvolume(ctx context.Context, volID, cloneID volumeID, volO
} }
} }
}() }()
protectErr = protectSnapshot(ctx, parentvolOpt, cr, snapshotID, volID) protectErr = parentvolOpt.protectSnapshot(ctx, snapshotID, volID)
if protectErr != nil { if protectErr != nil {
util.ErrorLog(ctx, "failed to protect snapshot %s %v", snapshotID, protectErr) util.ErrorLog(ctx, "failed to protect snapshot %s %v", snapshotID, protectErr)
return protectErr return protectErr
@ -117,7 +117,7 @@ func createCloneFromSubvolume(ctx context.Context, volID, cloneID volumeID, volO
return err return err
} }
// As we completed clone, remove the intermediate snap // As we completed clone, remove the intermediate snap
if err = unprotectSnapshot(ctx, parentvolOpt, cr, snapshotID, volID); err != nil { if err = parentvolOpt.unprotectSnapshot(ctx, snapshotID, volID); err != nil {
// In case the snap is already unprotected we get ErrSnapProtectionExist error code // In case the snap is already unprotected we get ErrSnapProtectionExist error code
// in that case we are safe and we could discard this error and we are good to go // in that case we are safe and we could discard this error and we are good to go
// ahead with deletion // ahead with deletion
@ -147,7 +147,7 @@ func cleanupCloneFromSubvolumeSnapshot(ctx context.Context, volID, cloneID volum
} }
if snapInfo.Protected == snapshotIsProtected { if snapInfo.Protected == snapshotIsProtected {
err = unprotectSnapshot(ctx, parentVolOpt, cr, snapShotID, volID) err = parentVolOpt.unprotectSnapshot(ctx, snapShotID, volID)
if err != nil { if err != nil {
util.ErrorLog(ctx, "failed to unprotect snapshot %s %v", snapShotID, err) util.ErrorLog(ctx, "failed to unprotect snapshot %s %v", snapShotID, err)
return err return err

View File

@ -534,7 +534,7 @@ func (cs *ControllerServer) CreateSnapshot(ctx context.Context, req *csi.CreateS
// check snapshot is protected // check snapshot is protected
protected := true protected := true
if !(snapInfo.Protected == snapshotIsProtected) { if !(snapInfo.Protected == snapshotIsProtected) {
err = protectSnapshot(ctx, parentVolOptions, cr, volumeID(sid.FsSnapshotName), volumeID(vid.FsSubvolName)) err = parentVolOptions.protectSnapshot(ctx, volumeID(sid.FsSnapshotName), volumeID(vid.FsSubvolName))
if err != nil { if err != nil {
protected = false protected = false
util.WarningLog(ctx, "failed to protect snapshot of snapshot: %s (%s)", util.WarningLog(ctx, "failed to protect snapshot of snapshot: %s (%s)",
@ -611,7 +611,7 @@ func doSnapshot(ctx context.Context, volOpt *volumeOptions, subvolumeName, snaps
return snap, err return snap, err
} }
snap.CreationTime = t snap.CreationTime = t
err = protectSnapshot(ctx, volOpt, cr, snapID, volID) err = volOpt.protectSnapshot(ctx, snapID, volID)
if err != nil { if err != nil {
util.ErrorLog(ctx, "failed to protect snapshot %s %v", snapID, err) util.ErrorLog(ctx, "failed to protect snapshot %s %v", snapID, err)
} }
@ -706,7 +706,7 @@ func (cs *ControllerServer) DeleteSnapshot(ctx context.Context, req *csi.DeleteS
return nil, status.Errorf(codes.FailedPrecondition, "snapshot %s has pending clones", snapshotID) return nil, status.Errorf(codes.FailedPrecondition, "snapshot %s has pending clones", snapshotID)
} }
if snapInfo.Protected == snapshotIsProtected { if snapInfo.Protected == snapshotIsProtected {
err = unprotectSnapshot(ctx, volOpt, cr, volumeID(sid.FsSnapshotName), volumeID(sid.FsSubvolName)) err = volOpt.unprotectSnapshot(ctx, volumeID(sid.FsSnapshotName), volumeID(sid.FsSubvolName))
if err != nil { if err != nil {
return nil, status.Error(codes.Internal, err.Error()) return nil, status.Error(codes.Internal, err.Error())
} }

View File

@ -27,8 +27,6 @@ const (
// snapNotFound is returned when snap name passed is not found in the list // snapNotFound is returned when snap name passed is not found in the list
// of snapshots for the given image. // of snapshots for the given image.
snapNotFound = "Error ENOENT" snapNotFound = "Error ENOENT"
// snapProtectionExist is returned when the snapshot is already protected
snapProtectionExist = "Error EEXIST"
// volumeNotEmpty is returned when the volume is not empty. // volumeNotEmpty is returned when the volume is not empty.
volumeNotEmpty = "Error ENOTEMPTY" volumeNotEmpty = "Error ENOTEMPTY"
) )

View File

@ -18,9 +18,11 @@ package cephfs
import ( import (
"context" "context"
"errors"
"strings" "strings"
"github.com/ceph/ceph-csi/internal/util" "github.com/ceph/ceph-csi/internal/util"
"github.com/ceph/go-ceph/rados"
"github.com/golang/protobuf/ptypes/timestamp" "github.com/golang/protobuf/ptypes/timestamp"
) )
@ -119,75 +121,51 @@ func getSnapshotInfo(ctx context.Context, volOptions *volumeOptions, cr *util.Cr
return snap, nil return snap, nil
} }
func protectSnapshot(ctx context.Context, volOptions *volumeOptions, cr *util.Credentials, snapID, volID volumeID) error { func (vo *volumeOptions) protectSnapshot(ctx context.Context, snapID, volID volumeID) error {
// If "snapshot-autoprotect" feature is present, The ProtectSnapshot // If "snapshot-autoprotect" feature is present, The ProtectSnapshot
// call should be treated as a no-op. // call should be treated as a no-op.
if checkSubvolumeHasFeature(autoProtect, volOptions.Features) { if checkSubvolumeHasFeature(autoProtect, vo.Features) {
return nil return nil
} }
args := []string{ fsa, err := vo.conn.GetFSAdmin()
"fs", if err != nil {
"subvolume", util.ErrorLog(ctx, "could not get FSAdmin: %s", err)
"snapshot", return err
"protect",
volOptions.FsName,
string(volID),
string(snapID),
"--group_name",
volOptions.SubvolumeGroup,
"-m", volOptions.Monitors,
"-c", util.CephConfigPath,
"-n", cephEntityClientPrefix + cr.ID,
"--keyfile=" + cr.KeyFile,
} }
err := execCommandErr( err = fsa.ProtectSubVolumeSnapshot(vo.FsName, vo.SubvolumeGroup, string(volID),
ctx, string(snapID))
"ceph",
args[:]...)
if err != nil { if err != nil {
if strings.Contains(err.Error(), snapProtectionExist) { if errors.Is(err, rados.ErrObjectExists) {
return nil return nil
} }
util.ErrorLog(ctx, "failed to protect subvolume snapshot %s %s(%s) in fs %s", string(snapID), string(volID), err, volOptions.FsName) util.ErrorLog(ctx, "failed to protect subvolume snapshot %s %s in fs %s with error: %s", string(volID), string(snapID), vo.FsName, err)
return err return err
} }
return nil return nil
} }
func unprotectSnapshot(ctx context.Context, volOptions *volumeOptions, cr *util.Credentials, snapID, volID volumeID) error { func (vo *volumeOptions) unprotectSnapshot(ctx context.Context, snapID, volID volumeID) error {
// If "snapshot-autoprotect" feature is present, The UnprotectSnapshot // If "snapshot-autoprotect" feature is present, The UnprotectSnapshot
// call should be treated as a no-op. // call should be treated as a no-op.
if checkSubvolumeHasFeature(autoProtect, volOptions.Features) { if checkSubvolumeHasFeature(autoProtect, vo.Features) {
return nil return nil
} }
args := []string{ fsa, err := vo.conn.GetFSAdmin()
"fs", if err != nil {
"subvolume", util.ErrorLog(ctx, "could not get FSAdmin: %s", err)
"snapshot", return err
"unprotect",
volOptions.FsName,
string(volID),
string(snapID),
"--group_name",
volOptions.SubvolumeGroup,
"-m", volOptions.Monitors,
"-c", util.CephConfigPath,
"-n", cephEntityClientPrefix + cr.ID,
"--keyfile=" + cr.KeyFile,
} }
err := execCommandErr( err = fsa.UnprotectSubVolumeSnapshot(vo.FsName, vo.SubvolumeGroup, string(volID),
ctx, string(snapID))
"ceph",
args[:]...)
if err != nil { if err != nil {
// In case the snap is already unprotected we get ErrSnapProtectionExist error code // In case the snap is already unprotected we get ErrSnapProtectionExist error code
// in that case we are safe and we could discard this error. // in that case we are safe and we could discard this error.
if strings.Contains(err.Error(), snapProtectionExist) { if errors.Is(err, rados.ErrObjectExists) {
return nil return nil
} }
util.ErrorLog(ctx, "failed to unprotect subvolume snapshot %s %s(%s) in fs %s", string(snapID), string(volID), err, volOptions.FsName) util.ErrorLog(ctx, "failed to unprotect subvolume snapshot %s %s in fs %s with error: %s", string(volID), string(snapID), vo.FsName, err)
return err return err
} }
return nil return nil