From 856d7c264c2d4f89b3b7371595902697a76b24b0 Mon Sep 17 00:00:00 2001 From: Prasanna Kumar Kalever Date: Tue, 19 Jul 2022 16:02:58 +0530 Subject: [PATCH] cephfs: handle metadata op-failures with unsupported ceph versions Signed-off-by: Prasanna Kumar Kalever --- internal/cephfs/controllerserver.go | 11 +++-- internal/cephfs/core/clone.go | 6 +-- internal/cephfs/core/snapshot.go | 6 ++- internal/cephfs/core/snapshot_metadata.go | 49 ++++++++++++++++++++++- internal/cephfs/core/volume.go | 5 ++- internal/cephfs/store/fsjournal.go | 2 +- internal/cephfs/store/volumeoptions.go | 2 +- 7 files changed, 67 insertions(+), 14 deletions(-) diff --git a/internal/cephfs/controllerserver.go b/internal/cephfs/controllerserver.go index 6a2867c52..ea54f245a 100644 --- a/internal/cephfs/controllerserver.go +++ b/internal/cephfs/controllerserver.go @@ -601,6 +601,7 @@ func cleanUpBackingVolume( snapClient := core.NewSnapshot( snapParentVolOptions.GetConnection(), snapID.FsSnapshotName, + volOptions.ClusterID, &snapParentVolOptions.SubVolume, ) @@ -819,7 +820,11 @@ func (cs *ControllerServer) CreateSnapshot( if sid != nil { // check snapshot is protected protected := true - snapClient := core.NewSnapshot(parentVolOptions.GetConnection(), sid.FsSnapshotName, &parentVolOptions.SubVolume) + snapClient := core.NewSnapshot( + parentVolOptions.GetConnection(), + sid.FsSnapshotName, + parentVolOptions.ClusterID, + &parentVolOptions.SubVolume) if !(snapInfo.Protected == core.SnapshotIsProtected) { err = snapClient.ProtectSnapshot(ctx) if err != nil { @@ -887,7 +892,7 @@ func doSnapshot( ) (core.SnapshotInfo, error) { snapID := fsutil.VolumeID(snapshotName) snap := core.SnapshotInfo{} - snapClient := core.NewSnapshot(volOpt.GetConnection(), snapshotName, &volOpt.SubVolume) + snapClient := core.NewSnapshot(volOpt.GetConnection(), snapshotName, volOpt.ClusterID, &volOpt.SubVolume) err := snapClient.CreateSnapshot(ctx) if err != nil { log.ErrorLog(ctx, "failed to create snapshot %s %v", snapID, err) @@ -1044,7 +1049,7 @@ func (cs *ControllerServer) DeleteSnapshot( if snapInfo.HasPendingClones == "yes" { return nil, status.Errorf(codes.FailedPrecondition, "snapshot %s has pending clones", snapshotID) } - snapClient := core.NewSnapshot(volOpt.GetConnection(), sid.FsSnapshotName, &volOpt.SubVolume) + snapClient := core.NewSnapshot(volOpt.GetConnection(), sid.FsSnapshotName, volOpt.ClusterID, &volOpt.SubVolume) if snapInfo.Protected == core.SnapshotIsProtected { err = snapClient.UnprotectSnapshot(ctx) if err != nil { diff --git a/internal/cephfs/core/clone.go b/internal/cephfs/core/clone.go index ca88a4914..d2a7f6c87 100644 --- a/internal/cephfs/core/clone.go +++ b/internal/cephfs/core/clone.go @@ -66,7 +66,7 @@ func (s *subVolumeClient) CreateCloneFromSubvolume( parentvolOpt *SubVolume, ) error { snapshotID := s.VolID - snapClient := NewSnapshot(s.conn, snapshotID, parentvolOpt) + snapClient := NewSnapshot(s.conn, snapshotID, s.clusterID, parentvolOpt) err := snapClient.CreateSnapshot(ctx) if err != nil { log.ErrorLog(ctx, "failed to create snapshot %s %v", snapshotID, err) @@ -165,7 +165,7 @@ func (s *subVolumeClient) CleanupSnapshotFromSubvolume( // snapshot name is same as clone name as we need a name which can be // identified during PVC-PVC cloning. snapShotID := s.VolID - snapClient := NewSnapshot(s.conn, snapShotID, parentVol) + snapClient := NewSnapshot(s.conn, snapShotID, s.clusterID, parentVol) snapInfo, err := snapClient.GetSnapshotInfo(ctx) if err != nil { if errors.Is(err, cerrors.ErrSnapNotFound) { @@ -198,7 +198,7 @@ func (s *subVolumeClient) CreateCloneFromSnapshot( ctx context.Context, snap Snapshot, ) error { snapID := snap.SnapshotID - snapClient := NewSnapshot(s.conn, snapID, snap.SubVolume) + snapClient := NewSnapshot(s.conn, snapID, s.clusterID, snap.SubVolume) err := snapClient.CloneSnapshot(ctx, s.SubVolume) if err != nil { return err diff --git a/internal/cephfs/core/snapshot.go b/internal/cephfs/core/snapshot.go index 939b825d6..52b0146e9 100644 --- a/internal/cephfs/core/snapshot.go +++ b/internal/cephfs/core/snapshot.go @@ -62,6 +62,7 @@ type SnapshotClient interface { // snapshotClient is the implementation of SnapshotClient interface. type snapshotClient struct { *Snapshot // Embedded snapshot struct. + clusterID string // Cluster ID. conn *util.ClusterConnection // Cluster connection. } @@ -72,13 +73,14 @@ type Snapshot struct { } // NewSnapshot creates a new snapshot client. -func NewSnapshot(conn *util.ClusterConnection, snapshotID string, vol *SubVolume) SnapshotClient { +func NewSnapshot(conn *util.ClusterConnection, snapshotID, clusterID string, vol *SubVolume) SnapshotClient { return &snapshotClient{ Snapshot: &Snapshot{ SnapshotID: snapshotID, SubVolume: vol, }, - conn: conn, + clusterID: clusterID, + conn: conn, } } diff --git a/internal/cephfs/core/snapshot_metadata.go b/internal/cephfs/core/snapshot_metadata.go index ed893ffb1..f4f2c2cfc 100644 --- a/internal/cephfs/core/snapshot_metadata.go +++ b/internal/cephfs/core/snapshot_metadata.go @@ -17,30 +17,75 @@ limitations under the License. package core import ( + "errors" "fmt" "strings" + + fsAdmin "github.com/ceph/go-ceph/cephfs/admin" ) +// ErrSubVolSnapMetadataNotSupported is returned when set/get/list/remove +// subvolume snapshot metadata options are not supported. +var ErrSubVolSnapMetadataNotSupported = errors.New("subvolume snapshot metadata operations are not supported") + +func (s *snapshotClient) supportsSubVolSnapMetadata() bool { + if _, keyPresent := clusterAdditionalInfo[s.clusterID]; !keyPresent { + clusterAdditionalInfo[s.clusterID] = &localClusterState{} + } + + return clusterAdditionalInfo[s.clusterID].subVolSnapshotMetadataState != unsupported +} + +func (s *snapshotClient) isUnsupportedSubVolSnapMetadata(err error) bool { + var invalid fsAdmin.NotImplementedError + if err != nil && errors.Is(err, &invalid) { + // In case the error is other than invalid command return error to + // the caller. + clusterAdditionalInfo[s.clusterID].subVolSnapshotMetadataState = unsupported + + return false + } + clusterAdditionalInfo[s.clusterID].subVolSnapshotMetadataState = supported + + return true +} + // setSnapshotMetadata sets custom metadata on the subvolume snapshot in a // volume as a key-value pair. func (s *snapshotClient) setSnapshotMetadata(key, value string) error { + if !s.supportsSubVolSnapMetadata() { + return ErrSubVolSnapMetadataNotSupported + } fsa, err := s.conn.GetFSAdmin() if err != nil { return err } - return fsa.SetSnapshotMetadata(s.FsName, s.SubvolumeGroup, s.VolID, s.SnapshotID, key, value) + err = fsa.SetSnapshotMetadata(s.FsName, s.SubvolumeGroup, s.VolID, s.SnapshotID, key, value) + if !s.isUnsupportedSubVolSnapMetadata(err) { + return ErrSubVolSnapMetadataNotSupported + } + + return err } // removeSnapshotMetadata removes custom metadata set on the subvolume // snapshot in a volume using the metadata key. func (s *snapshotClient) removeSnapshotMetadata(key string) error { + if !s.supportsSubVolSnapMetadata() { + return ErrSubVolSnapMetadataNotSupported + } fsa, err := s.conn.GetFSAdmin() if err != nil { return err } - return fsa.RemoveSnapshotMetadata(s.FsName, s.SubvolumeGroup, s.VolID, s.SnapshotID, key) + err = fsa.RemoveSnapshotMetadata(s.FsName, s.SubvolumeGroup, s.VolID, s.SnapshotID, key) + if !s.isUnsupportedSubVolSnapMetadata(err) { + return ErrSubVolSnapMetadataNotSupported + } + + return err } // SetAllSnapshotMetadata set all the metadata from arg parameters on diff --git a/internal/cephfs/core/volume.go b/internal/cephfs/core/volume.go index 9cdec5e00..61e32c3bb 100644 --- a/internal/cephfs/core/volume.go +++ b/internal/cephfs/core/volume.go @@ -191,8 +191,9 @@ const ( type localClusterState struct { // set the enum value i.e., unknown, supported, // unsupported as per the state of the cluster. - resizeState operationState - subVolMetadataState operationState + resizeState operationState + subVolMetadataState operationState + subVolSnapshotMetadataState operationState // set true once a subvolumegroup is created // for corresponding cluster. subVolumeGroupCreated bool diff --git a/internal/cephfs/store/fsjournal.go b/internal/cephfs/store/fsjournal.go index 82c226de0..e5fab689d 100644 --- a/internal/cephfs/store/fsjournal.go +++ b/internal/cephfs/store/fsjournal.go @@ -398,7 +398,7 @@ func CheckSnapExists( snapUUID := snapData.ImageUUID snapID := snapData.ImageAttributes.ImageName sid.FsSnapshotName = snapData.ImageAttributes.ImageName - snapClient := core.NewSnapshot(volOptions.conn, snapID, &volOptions.SubVolume) + snapClient := core.NewSnapshot(volOptions.conn, snapID, volOptions.ClusterID, &volOptions.SubVolume) snapInfo, err := snapClient.GetSnapshotInfo(ctx) if err != nil { if errors.Is(err, cerrors.ErrSnapNotFound) { diff --git a/internal/cephfs/store/volumeoptions.go b/internal/cephfs/store/volumeoptions.go index 70ace8cb1..8830ae733 100644 --- a/internal/cephfs/store/volumeoptions.go +++ b/internal/cephfs/store/volumeoptions.go @@ -735,7 +735,7 @@ func NewSnapshotOptionsFromID( volOptions.Features = subvolInfo.Features volOptions.Size = subvolInfo.BytesQuota volOptions.RootPath = subvolInfo.Path - snap := core.NewSnapshot(volOptions.conn, sid.FsSnapshotName, &volOptions.SubVolume) + snap := core.NewSnapshot(volOptions.conn, sid.FsSnapshotName, volOptions.ClusterID, &volOptions.SubVolume) info, err := snap.GetSnapshotInfo(ctx) if err != nil { return &volOptions, nil, &sid, err