cephfs: snapshots honor --setmetadata option

`--setmetadata` is false by default, honoring it
will keep the metadata disabled by default

Signed-off-by: Prasanna Kumar Kalever <prasanna.kalever@redhat.com>
This commit is contained in:
Prasanna Kumar Kalever 2022-07-28 17:56:55 +05:30 committed by mergify[bot]
parent 14d6211d6d
commit 30244bf11b
6 changed files with 35 additions and 32 deletions

View File

@ -601,13 +601,8 @@ func (cs *ControllerServer) cleanUpBackingVolume(
return status.Error(codes.Internal, err.Error()) return status.Error(codes.Internal, err.Error())
} }
} else { } else {
snapClient := core.NewSnapshot( snapClient := core.NewSnapshot(snapParentVolOptions.GetConnection(), snapID.FsSnapshotName,
snapParentVolOptions.GetConnection(), volOptions.ClusterID, cs.ClusterName, cs.SetMetadata, &snapParentVolOptions.SubVolume)
snapID.FsSnapshotName,
volOptions.ClusterID,
cs.ClusterName,
&snapParentVolOptions.SubVolume,
)
err = deleteSnapshotAndUndoReservation(ctx, snapClient, snapParentVolOptions, snapID, cr) err = deleteSnapshotAndUndoReservation(ctx, snapClient, snapParentVolOptions, snapID, cr)
if err != nil { if err != nil {
@ -785,7 +780,7 @@ func (cs *ControllerServer) CreateSnapshot(
} }
defer cs.VolumeLocks.Release(sourceVolID) defer cs.VolumeLocks.Release(sourceVolID)
snapName := req.GetName() snapName := req.GetName()
sid, snapInfo, err := store.CheckSnapExists(ctx, parentVolOptions, cephfsSnap, cs.ClusterName, cr) sid, snapInfo, err := store.CheckSnapExists(ctx, parentVolOptions, cephfsSnap, cs.ClusterName, cs.SetMetadata, cr)
if err != nil { if err != nil {
return nil, status.Error(codes.Internal, err.Error()) return nil, status.Error(codes.Internal, err.Error())
} }
@ -823,12 +818,8 @@ func (cs *ControllerServer) CreateSnapshot(
if sid != nil { if sid != nil {
// check snapshot is protected // check snapshot is protected
protected := true protected := true
snapClient := core.NewSnapshot( snapClient := core.NewSnapshot(parentVolOptions.GetConnection(), sid.FsSnapshotName,
parentVolOptions.GetConnection(), parentVolOptions.ClusterID, cs.ClusterName, cs.SetMetadata, &parentVolOptions.SubVolume)
sid.FsSnapshotName,
parentVolOptions.ClusterID,
cs.ClusterName,
&parentVolOptions.SubVolume)
if !(snapInfo.Protected == core.SnapshotIsProtected) { if !(snapInfo.Protected == core.SnapshotIsProtected) {
err = snapClient.ProtectSnapshot(ctx) err = snapClient.ProtectSnapshot(ctx)
if err != nil { if err != nil {
@ -872,7 +863,7 @@ func (cs *ControllerServer) CreateSnapshot(
} }
} }
}() }()
snap, err := doSnapshot(ctx, parentVolOptions, sID.FsSnapshotName, cs.ClusterName, metadata) snap, err := cs.doSnapshot(ctx, parentVolOptions, sID.FsSnapshotName, metadata)
if err != nil { if err != nil {
return nil, status.Error(codes.Internal, err.Error()) return nil, status.Error(codes.Internal, err.Error())
} }
@ -888,17 +879,16 @@ func (cs *ControllerServer) CreateSnapshot(
}, nil }, nil
} }
func doSnapshot( func (cs *ControllerServer) doSnapshot(
ctx context.Context, ctx context.Context,
volOpt *store.VolumeOptions, volOpt *store.VolumeOptions,
snapshotName, snapshotName string,
clusterName string,
metadata map[string]string, metadata map[string]string,
) (core.SnapshotInfo, error) { ) (core.SnapshotInfo, error) {
snapID := fsutil.VolumeID(snapshotName) snapID := fsutil.VolumeID(snapshotName)
snap := core.SnapshotInfo{} snap := core.SnapshotInfo{}
snapClient := core.NewSnapshot(volOpt.GetConnection(), snapshotName, snapClient := core.NewSnapshot(volOpt.GetConnection(), snapshotName,
volOpt.ClusterID, clusterName, &volOpt.SubVolume) volOpt.ClusterID, cs.ClusterName, cs.SetMetadata, &volOpt.SubVolume)
err := snapClient.CreateSnapshot(ctx) err := snapClient.CreateSnapshot(ctx)
if err != nil { if err != nil {
log.ErrorLog(ctx, "failed to create snapshot %s %v", snapID, err) log.ErrorLog(ctx, "failed to create snapshot %s %v", snapID, err)
@ -1056,7 +1046,7 @@ func (cs *ControllerServer) DeleteSnapshot(
return nil, status.Errorf(codes.FailedPrecondition, "snapshot %s has pending clones", snapshotID) return nil, status.Errorf(codes.FailedPrecondition, "snapshot %s has pending clones", snapshotID)
} }
snapClient := core.NewSnapshot(volOpt.GetConnection(), sid.FsSnapshotName, snapClient := core.NewSnapshot(volOpt.GetConnection(), sid.FsSnapshotName,
volOpt.ClusterID, cs.ClusterName, &volOpt.SubVolume) volOpt.ClusterID, cs.ClusterName, cs.SetMetadata, &volOpt.SubVolume)
if snapInfo.Protected == core.SnapshotIsProtected { if snapInfo.Protected == core.SnapshotIsProtected {
err = snapClient.UnprotectSnapshot(ctx) err = snapClient.UnprotectSnapshot(ctx)
if err != nil { if err != nil {

View File

@ -66,7 +66,7 @@ func (s *subVolumeClient) CreateCloneFromSubvolume(
parentvolOpt *SubVolume, parentvolOpt *SubVolume,
) error { ) error {
snapshotID := s.VolID snapshotID := s.VolID
snapClient := NewSnapshot(s.conn, snapshotID, s.clusterID, s.clusterName, parentvolOpt) snapClient := NewSnapshot(s.conn, snapshotID, s.clusterID, s.clusterName, s.enableMetadata, parentvolOpt)
err := snapClient.CreateSnapshot(ctx) err := snapClient.CreateSnapshot(ctx)
if err != nil { if err != nil {
log.ErrorLog(ctx, "failed to create snapshot %s %v", snapshotID, err) log.ErrorLog(ctx, "failed to create snapshot %s %v", snapshotID, err)
@ -165,7 +165,7 @@ func (s *subVolumeClient) CleanupSnapshotFromSubvolume(
// snapshot name is same as clone name as we need a name which can be // snapshot name is same as clone name as we need a name which can be
// identified during PVC-PVC cloning. // identified during PVC-PVC cloning.
snapShotID := s.VolID snapShotID := s.VolID
snapClient := NewSnapshot(s.conn, snapShotID, s.clusterID, s.clusterName, parentVol) snapClient := NewSnapshot(s.conn, snapShotID, s.clusterID, s.clusterName, s.enableMetadata, parentVol)
snapInfo, err := snapClient.GetSnapshotInfo(ctx) snapInfo, err := snapClient.GetSnapshotInfo(ctx)
if err != nil { if err != nil {
if errors.Is(err, cerrors.ErrSnapNotFound) { if errors.Is(err, cerrors.ErrSnapNotFound) {
@ -198,7 +198,7 @@ func (s *subVolumeClient) CreateCloneFromSnapshot(
ctx context.Context, snap Snapshot, ctx context.Context, snap Snapshot,
) error { ) error {
snapID := snap.SnapshotID snapID := snap.SnapshotID
snapClient := NewSnapshot(s.conn, snapID, s.clusterID, s.clusterName, snap.SubVolume) snapClient := NewSnapshot(s.conn, snapID, s.clusterID, s.clusterName, s.enableMetadata, snap.SubVolume)
err := snapClient.CloneSnapshot(ctx, s.SubVolume) err := snapClient.CloneSnapshot(ctx, s.SubVolume)
if err != nil { if err != nil {
return err return err

View File

@ -64,6 +64,7 @@ type snapshotClient struct {
*Snapshot // Embedded snapshot struct. *Snapshot // Embedded snapshot struct.
clusterID string // Cluster ID. clusterID string // Cluster ID.
clusterName string // Cluster Name. clusterName string // Cluster Name.
enableMetadata bool // Set metadata on volume
conn *util.ClusterConnection // Cluster connection. conn *util.ClusterConnection // Cluster connection.
} }
@ -79,6 +80,7 @@ func NewSnapshot(
snapshotID, snapshotID,
clusterID, clusterID,
clusterName string, clusterName string,
setMetadata bool,
vol *SubVolume, vol *SubVolume,
) SnapshotClient { ) SnapshotClient {
return &snapshotClient{ return &snapshotClient{
@ -88,6 +90,7 @@ func NewSnapshot(
}, },
clusterID: clusterID, clusterID: clusterID,
clusterName: clusterName, clusterName: clusterName,
enableMetadata: setMetadata,
conn: conn, conn: conn,
} }
} }

View File

@ -91,6 +91,10 @@ func (s *snapshotClient) removeSnapshotMetadata(key string) error {
// SetAllSnapshotMetadata set all the metadata from arg parameters on // SetAllSnapshotMetadata set all the metadata from arg parameters on
// subvolume snapshot. // subvolume snapshot.
func (s *snapshotClient) SetAllSnapshotMetadata(parameters map[string]string) error { func (s *snapshotClient) SetAllSnapshotMetadata(parameters map[string]string) error {
if !s.enableMetadata {
return nil
}
for k, v := range parameters { for k, v := range parameters {
err := s.setSnapshotMetadata(k, v) err := s.setSnapshotMetadata(k, v)
if err != nil { if err != nil {
@ -113,6 +117,10 @@ func (s *snapshotClient) SetAllSnapshotMetadata(parameters map[string]string) er
// UnsetAllSnapshotMetadata unset all the metadata from arg keys on subvolume // UnsetAllSnapshotMetadata unset all the metadata from arg keys on subvolume
// snapshot. // snapshot.
func (s *snapshotClient) UnsetAllSnapshotMetadata(keys []string) error { func (s *snapshotClient) UnsetAllSnapshotMetadata(keys []string) error {
if !s.enableMetadata {
return nil
}
for _, key := range keys { for _, key := range keys {
err := s.removeSnapshotMetadata(key) err := s.removeSnapshotMetadata(key)
// TODO: replace string comparison with errno. // TODO: replace string comparison with errno.

View File

@ -379,6 +379,7 @@ func CheckSnapExists(
volOptions *VolumeOptions, volOptions *VolumeOptions,
snap *SnapshotOption, snap *SnapshotOption,
clusterName string, clusterName string,
setMetadata bool,
cr *util.Credentials, cr *util.Credentials,
) (*SnapshotIdentifier, *core.SnapshotInfo, error) { ) (*SnapshotIdentifier, *core.SnapshotInfo, error) {
// Connect to cephfs' default radosNamespace (csi) // Connect to cephfs' default radosNamespace (csi)
@ -400,7 +401,8 @@ func CheckSnapExists(
snapUUID := snapData.ImageUUID snapUUID := snapData.ImageUUID
snapID := snapData.ImageAttributes.ImageName snapID := snapData.ImageAttributes.ImageName
sid.FsSnapshotName = snapData.ImageAttributes.ImageName sid.FsSnapshotName = snapData.ImageAttributes.ImageName
snapClient := core.NewSnapshot(volOptions.conn, snapID, volOptions.ClusterID, clusterName, &volOptions.SubVolume) snapClient := core.NewSnapshot(volOptions.conn, snapID,
volOptions.ClusterID, clusterName, setMetadata, &volOptions.SubVolume)
snapInfo, err := snapClient.GetSnapshotInfo(ctx) snapInfo, err := snapClient.GetSnapshotInfo(ctx)
if err != nil { if err != nil {
if errors.Is(err, cerrors.ErrSnapNotFound) { if errors.Is(err, cerrors.ErrSnapNotFound) {

View File

@ -749,7 +749,7 @@ func NewSnapshotOptionsFromID(
volOptions.Size = subvolInfo.BytesQuota volOptions.Size = subvolInfo.BytesQuota
volOptions.RootPath = subvolInfo.Path volOptions.RootPath = subvolInfo.Path
snap := core.NewSnapshot(volOptions.conn, sid.FsSnapshotName, snap := core.NewSnapshot(volOptions.conn, sid.FsSnapshotName,
volOptions.ClusterID, clusterName, &volOptions.SubVolume) volOptions.ClusterID, clusterName, setMetadata, &volOptions.SubVolume)
info, err := snap.GetSnapshotInfo(ctx) info, err := snap.GetSnapshotInfo(ctx)
if err != nil { if err != nil {
return &volOptions, nil, &sid, err return &volOptions, nil, &sid, err