cephfs: Add clusterName as metadata on snapshots

Example:
sh-4.4$ ceph fs subvolume snapshot metadata ls myfs csi-vol-ba248f9e-0e75-11ed-b774-8e97192ff5ec \
			csi-snap-ce24e3bb-0e75-11ed-b774-8e97192ff5ec --group_name csi
{
    "csi.ceph.com/cluster/name": "\"K8s-cluster-1\"",
    "csi.storage.k8s.io/volumesnapshot/name": "cephfs-pvc-snapshot",
    "csi.storage.k8s.io/volumesnapshot/namespace": "rook-ceph",
    "csi.storage.k8s.io/volumesnapshotcontent/name": "snapcontent-2e89e1b2-e6e9-48fe-b365-edb493d7022e"
}

Signed-off-by: Prasanna Kumar Kalever <prasanna.kalever@redhat.com>
This commit is contained in:
Prasanna Kumar Kalever 2022-07-28 14:14:52 +05:30 committed by mergify[bot]
parent 56d7d3cd15
commit de7128b3a2
6 changed files with 46 additions and 16 deletions

View File

@ -602,6 +602,7 @@ func cleanUpBackingVolume(
snapParentVolOptions.GetConnection(),
snapID.FsSnapshotName,
volOptions.ClusterID,
clusterName,
&snapParentVolOptions.SubVolume,
)
@ -780,7 +781,7 @@ func (cs *ControllerServer) CreateSnapshot(
}
defer cs.VolumeLocks.Release(sourceVolID)
snapName := req.GetName()
sid, snapInfo, err := store.CheckSnapExists(ctx, parentVolOptions, cephfsSnap, cr)
sid, snapInfo, err := store.CheckSnapExists(ctx, parentVolOptions, cephfsSnap, cs.ClusterName, cr)
if err != nil {
return nil, status.Error(codes.Internal, err.Error())
}
@ -824,6 +825,7 @@ func (cs *ControllerServer) CreateSnapshot(
parentVolOptions.GetConnection(),
sid.FsSnapshotName,
parentVolOptions.ClusterID,
cs.ClusterName,
&parentVolOptions.SubVolume)
if !(snapInfo.Protected == core.SnapshotIsProtected) {
err = snapClient.ProtectSnapshot(ctx)
@ -868,7 +870,7 @@ func (cs *ControllerServer) CreateSnapshot(
}
}
}()
snap, err := doSnapshot(ctx, parentVolOptions, sID.FsSnapshotName, metadata)
snap, err := doSnapshot(ctx, parentVolOptions, sID.FsSnapshotName, cs.ClusterName, metadata)
if err != nil {
return nil, status.Error(codes.Internal, err.Error())
}
@ -887,12 +889,14 @@ func (cs *ControllerServer) CreateSnapshot(
func doSnapshot(
ctx context.Context,
volOpt *store.VolumeOptions,
snapshotName string,
snapshotName,
clusterName string,
metadata map[string]string,
) (core.SnapshotInfo, error) {
snapID := fsutil.VolumeID(snapshotName)
snap := core.SnapshotInfo{}
snapClient := core.NewSnapshot(volOpt.GetConnection(), snapshotName, volOpt.ClusterID, &volOpt.SubVolume)
snapClient := core.NewSnapshot(volOpt.GetConnection(), snapshotName,
volOpt.ClusterID, clusterName, &volOpt.SubVolume)
err := snapClient.CreateSnapshot(ctx)
if err != nil {
log.ErrorLog(ctx, "failed to create snapshot %s %v", snapID, err)
@ -1049,7 +1053,8 @@ func (cs *ControllerServer) DeleteSnapshot(
if snapInfo.HasPendingClones == "yes" {
return nil, status.Errorf(codes.FailedPrecondition, "snapshot %s has pending clones", snapshotID)
}
snapClient := core.NewSnapshot(volOpt.GetConnection(), sid.FsSnapshotName, volOpt.ClusterID, &volOpt.SubVolume)
snapClient := core.NewSnapshot(volOpt.GetConnection(), sid.FsSnapshotName,
volOpt.ClusterID, cs.ClusterName, &volOpt.SubVolume)
if snapInfo.Protected == core.SnapshotIsProtected {
err = snapClient.UnprotectSnapshot(ctx)
if err != nil {

View File

@ -66,7 +66,7 @@ func (s *subVolumeClient) CreateCloneFromSubvolume(
parentvolOpt *SubVolume,
) error {
snapshotID := s.VolID
snapClient := NewSnapshot(s.conn, snapshotID, s.clusterID, parentvolOpt)
snapClient := NewSnapshot(s.conn, snapshotID, s.clusterID, s.clusterName, parentvolOpt)
err := snapClient.CreateSnapshot(ctx)
if err != nil {
log.ErrorLog(ctx, "failed to create snapshot %s %v", snapshotID, err)
@ -165,7 +165,7 @@ func (s *subVolumeClient) CleanupSnapshotFromSubvolume(
// snapshot name is same as clone name as we need a name which can be
// identified during PVC-PVC cloning.
snapShotID := s.VolID
snapClient := NewSnapshot(s.conn, snapShotID, s.clusterID, parentVol)
snapClient := NewSnapshot(s.conn, snapShotID, s.clusterID, s.clusterName, parentVol)
snapInfo, err := snapClient.GetSnapshotInfo(ctx)
if err != nil {
if errors.Is(err, cerrors.ErrSnapNotFound) {
@ -198,7 +198,7 @@ func (s *subVolumeClient) CreateCloneFromSnapshot(
ctx context.Context, snap Snapshot,
) error {
snapID := snap.SnapshotID
snapClient := NewSnapshot(s.conn, snapID, s.clusterID, snap.SubVolume)
snapClient := NewSnapshot(s.conn, snapID, s.clusterID, s.clusterName, snap.SubVolume)
err := snapClient.CloneSnapshot(ctx, s.SubVolume)
if err != nil {
return err

View File

@ -63,6 +63,7 @@ type SnapshotClient interface {
type snapshotClient struct {
*Snapshot // Embedded snapshot struct.
clusterID string // Cluster ID.
clusterName string // Cluster Name.
conn *util.ClusterConnection // Cluster connection.
}
@ -73,13 +74,20 @@ type Snapshot struct {
}
// NewSnapshot creates a new snapshot client.
func NewSnapshot(conn *util.ClusterConnection, snapshotID, clusterID string, vol *SubVolume) SnapshotClient {
func NewSnapshot(
conn *util.ClusterConnection,
snapshotID,
clusterID,
clusterName string,
vol *SubVolume,
) SnapshotClient {
return &snapshotClient{
Snapshot: &Snapshot{
SnapshotID: snapshotID,
SubVolume: vol,
},
clusterID: clusterID,
clusterName: clusterName,
conn: conn,
}
}

View File

@ -99,6 +99,14 @@ func (s *snapshotClient) SetAllSnapshotMetadata(parameters map[string]string) er
}
}
if s.clusterName != "" {
err := s.setSnapshotMetadata(clusterNameKey, s.clusterName)
if err != nil {
return fmt.Errorf("failed to set metadata key %q, value %q on subvolume snapshot %s %s in fs %s: %w",
clusterNameKey, s.clusterName, s.SnapshotID, s.VolID, s.FsName, err)
}
}
return nil
}
@ -114,5 +122,12 @@ func (s *snapshotClient) UnsetAllSnapshotMetadata(keys []string) error {
}
}
err := s.removeSnapshotMetadata(clusterNameKey)
// TODO: replace string comparison with errno.
if err != nil && !strings.Contains(err.Error(), "No such file or directory") {
return fmt.Errorf("failed to unset metadata key %q on subvolume snapshot %s %s in fs %s: %w",
clusterNameKey, s.SnapshotID, s.VolID, s.FsName, err)
}
return nil
}

View File

@ -377,6 +377,7 @@ func CheckSnapExists(
ctx context.Context,
volOptions *VolumeOptions,
snap *SnapshotOption,
clusterName string,
cr *util.Credentials,
) (*SnapshotIdentifier, *core.SnapshotInfo, error) {
// Connect to cephfs' default radosNamespace (csi)
@ -398,7 +399,7 @@ func CheckSnapExists(
snapUUID := snapData.ImageUUID
snapID := snapData.ImageAttributes.ImageName
sid.FsSnapshotName = snapData.ImageAttributes.ImageName
snapClient := core.NewSnapshot(volOptions.conn, snapID, volOptions.ClusterID, &volOptions.SubVolume)
snapClient := core.NewSnapshot(volOptions.conn, snapID, volOptions.ClusterID, clusterName, &volOptions.SubVolume)
snapInfo, err := snapClient.GetSnapshotInfo(ctx)
if err != nil {
if errors.Is(err, cerrors.ErrSnapNotFound) {

View File

@ -735,7 +735,8 @@ func NewSnapshotOptionsFromID(
volOptions.Features = subvolInfo.Features
volOptions.Size = subvolInfo.BytesQuota
volOptions.RootPath = subvolInfo.Path
snap := core.NewSnapshot(volOptions.conn, sid.FsSnapshotName, volOptions.ClusterID, &volOptions.SubVolume)
snap := core.NewSnapshot(volOptions.conn, sid.FsSnapshotName,
volOptions.ClusterID, clusterName, &volOptions.SubVolume)
info, err := snap.GetSnapshotInfo(ctx)
if err != nil {
return &volOptions, nil, &sid, err