cephfs: handle metadata op-failures with unsupported ceph versions

Signed-off-by: Prasanna Kumar Kalever <prasanna.kalever@redhat.com>
This commit is contained in:
Prasanna Kumar Kalever 2022-07-19 16:02:58 +05:30 committed by mergify[bot]
parent c32e71b31c
commit 856d7c264c
7 changed files with 67 additions and 14 deletions

View File

@ -601,6 +601,7 @@ func cleanUpBackingVolume(
snapClient := core.NewSnapshot( snapClient := core.NewSnapshot(
snapParentVolOptions.GetConnection(), snapParentVolOptions.GetConnection(),
snapID.FsSnapshotName, snapID.FsSnapshotName,
volOptions.ClusterID,
&snapParentVolOptions.SubVolume, &snapParentVolOptions.SubVolume,
) )
@ -819,7 +820,11 @@ func (cs *ControllerServer) CreateSnapshot(
if sid != nil { if sid != nil {
// check snapshot is protected // check snapshot is protected
protected := true protected := true
snapClient := core.NewSnapshot(parentVolOptions.GetConnection(), sid.FsSnapshotName, &parentVolOptions.SubVolume) snapClient := core.NewSnapshot(
parentVolOptions.GetConnection(),
sid.FsSnapshotName,
parentVolOptions.ClusterID,
&parentVolOptions.SubVolume)
if !(snapInfo.Protected == core.SnapshotIsProtected) { if !(snapInfo.Protected == core.SnapshotIsProtected) {
err = snapClient.ProtectSnapshot(ctx) err = snapClient.ProtectSnapshot(ctx)
if err != nil { if err != nil {
@ -887,7 +892,7 @@ func doSnapshot(
) (core.SnapshotInfo, error) { ) (core.SnapshotInfo, error) {
snapID := fsutil.VolumeID(snapshotName) snapID := fsutil.VolumeID(snapshotName)
snap := core.SnapshotInfo{} snap := core.SnapshotInfo{}
snapClient := core.NewSnapshot(volOpt.GetConnection(), snapshotName, &volOpt.SubVolume) snapClient := core.NewSnapshot(volOpt.GetConnection(), snapshotName, volOpt.ClusterID, &volOpt.SubVolume)
err := snapClient.CreateSnapshot(ctx) err := snapClient.CreateSnapshot(ctx)
if err != nil { if err != nil {
log.ErrorLog(ctx, "failed to create snapshot %s %v", snapID, err) log.ErrorLog(ctx, "failed to create snapshot %s %v", snapID, err)
@ -1044,7 +1049,7 @@ func (cs *ControllerServer) DeleteSnapshot(
if snapInfo.HasPendingClones == "yes" { if snapInfo.HasPendingClones == "yes" {
return nil, status.Errorf(codes.FailedPrecondition, "snapshot %s has pending clones", snapshotID) return nil, status.Errorf(codes.FailedPrecondition, "snapshot %s has pending clones", snapshotID)
} }
snapClient := core.NewSnapshot(volOpt.GetConnection(), sid.FsSnapshotName, &volOpt.SubVolume) snapClient := core.NewSnapshot(volOpt.GetConnection(), sid.FsSnapshotName, volOpt.ClusterID, &volOpt.SubVolume)
if snapInfo.Protected == core.SnapshotIsProtected { if snapInfo.Protected == core.SnapshotIsProtected {
err = snapClient.UnprotectSnapshot(ctx) err = snapClient.UnprotectSnapshot(ctx)
if err != nil { if err != nil {

View File

@ -66,7 +66,7 @@ func (s *subVolumeClient) CreateCloneFromSubvolume(
parentvolOpt *SubVolume, parentvolOpt *SubVolume,
) error { ) error {
snapshotID := s.VolID snapshotID := s.VolID
snapClient := NewSnapshot(s.conn, snapshotID, parentvolOpt) snapClient := NewSnapshot(s.conn, snapshotID, s.clusterID, parentvolOpt)
err := snapClient.CreateSnapshot(ctx) err := snapClient.CreateSnapshot(ctx)
if err != nil { if err != nil {
log.ErrorLog(ctx, "failed to create snapshot %s %v", snapshotID, err) log.ErrorLog(ctx, "failed to create snapshot %s %v", snapshotID, err)
@ -165,7 +165,7 @@ func (s *subVolumeClient) CleanupSnapshotFromSubvolume(
// snapshot name is same as clone name as we need a name which can be // snapshot name is same as clone name as we need a name which can be
// identified during PVC-PVC cloning. // identified during PVC-PVC cloning.
snapShotID := s.VolID snapShotID := s.VolID
snapClient := NewSnapshot(s.conn, snapShotID, parentVol) snapClient := NewSnapshot(s.conn, snapShotID, s.clusterID, parentVol)
snapInfo, err := snapClient.GetSnapshotInfo(ctx) snapInfo, err := snapClient.GetSnapshotInfo(ctx)
if err != nil { if err != nil {
if errors.Is(err, cerrors.ErrSnapNotFound) { if errors.Is(err, cerrors.ErrSnapNotFound) {
@ -198,7 +198,7 @@ func (s *subVolumeClient) CreateCloneFromSnapshot(
ctx context.Context, snap Snapshot, ctx context.Context, snap Snapshot,
) error { ) error {
snapID := snap.SnapshotID snapID := snap.SnapshotID
snapClient := NewSnapshot(s.conn, snapID, snap.SubVolume) snapClient := NewSnapshot(s.conn, snapID, s.clusterID, snap.SubVolume)
err := snapClient.CloneSnapshot(ctx, s.SubVolume) err := snapClient.CloneSnapshot(ctx, s.SubVolume)
if err != nil { if err != nil {
return err return err

View File

@ -62,6 +62,7 @@ type SnapshotClient interface {
// snapshotClient is the implementation of SnapshotClient interface. // snapshotClient is the implementation of SnapshotClient interface.
type snapshotClient struct { type snapshotClient struct {
*Snapshot // Embedded snapshot struct. *Snapshot // Embedded snapshot struct.
clusterID string // Cluster ID.
conn *util.ClusterConnection // Cluster connection. conn *util.ClusterConnection // Cluster connection.
} }
@ -72,13 +73,14 @@ type Snapshot struct {
} }
// NewSnapshot creates a new snapshot client. // NewSnapshot creates a new snapshot client.
func NewSnapshot(conn *util.ClusterConnection, snapshotID string, vol *SubVolume) SnapshotClient { func NewSnapshot(conn *util.ClusterConnection, snapshotID, clusterID string, vol *SubVolume) SnapshotClient {
return &snapshotClient{ return &snapshotClient{
Snapshot: &Snapshot{ Snapshot: &Snapshot{
SnapshotID: snapshotID, SnapshotID: snapshotID,
SubVolume: vol, SubVolume: vol,
}, },
conn: conn, clusterID: clusterID,
conn: conn,
} }
} }

View File

@ -17,30 +17,75 @@ limitations under the License.
package core package core
import ( import (
"errors"
"fmt" "fmt"
"strings" "strings"
fsAdmin "github.com/ceph/go-ceph/cephfs/admin"
) )
// ErrSubVolSnapMetadataNotSupported is returned when set/get/list/remove
// subvolume snapshot metadata options are not supported.
var ErrSubVolSnapMetadataNotSupported = errors.New("subvolume snapshot metadata operations are not supported")
func (s *snapshotClient) supportsSubVolSnapMetadata() bool {
if _, keyPresent := clusterAdditionalInfo[s.clusterID]; !keyPresent {
clusterAdditionalInfo[s.clusterID] = &localClusterState{}
}
return clusterAdditionalInfo[s.clusterID].subVolSnapshotMetadataState != unsupported
}
func (s *snapshotClient) isUnsupportedSubVolSnapMetadata(err error) bool {
var invalid fsAdmin.NotImplementedError
if err != nil && errors.Is(err, &invalid) {
// In case the error is other than invalid command return error to
// the caller.
clusterAdditionalInfo[s.clusterID].subVolSnapshotMetadataState = unsupported
return false
}
clusterAdditionalInfo[s.clusterID].subVolSnapshotMetadataState = supported
return true
}
// setSnapshotMetadata sets custom metadata on the subvolume snapshot in a // setSnapshotMetadata sets custom metadata on the subvolume snapshot in a
// volume as a key-value pair. // volume as a key-value pair.
func (s *snapshotClient) setSnapshotMetadata(key, value string) error { func (s *snapshotClient) setSnapshotMetadata(key, value string) error {
if !s.supportsSubVolSnapMetadata() {
return ErrSubVolSnapMetadataNotSupported
}
fsa, err := s.conn.GetFSAdmin() fsa, err := s.conn.GetFSAdmin()
if err != nil { if err != nil {
return err return err
} }
return fsa.SetSnapshotMetadata(s.FsName, s.SubvolumeGroup, s.VolID, s.SnapshotID, key, value) err = fsa.SetSnapshotMetadata(s.FsName, s.SubvolumeGroup, s.VolID, s.SnapshotID, key, value)
if !s.isUnsupportedSubVolSnapMetadata(err) {
return ErrSubVolSnapMetadataNotSupported
}
return err
} }
// removeSnapshotMetadata removes custom metadata set on the subvolume // removeSnapshotMetadata removes custom metadata set on the subvolume
// snapshot in a volume using the metadata key. // snapshot in a volume using the metadata key.
func (s *snapshotClient) removeSnapshotMetadata(key string) error { func (s *snapshotClient) removeSnapshotMetadata(key string) error {
if !s.supportsSubVolSnapMetadata() {
return ErrSubVolSnapMetadataNotSupported
}
fsa, err := s.conn.GetFSAdmin() fsa, err := s.conn.GetFSAdmin()
if err != nil { if err != nil {
return err return err
} }
return fsa.RemoveSnapshotMetadata(s.FsName, s.SubvolumeGroup, s.VolID, s.SnapshotID, key) err = fsa.RemoveSnapshotMetadata(s.FsName, s.SubvolumeGroup, s.VolID, s.SnapshotID, key)
if !s.isUnsupportedSubVolSnapMetadata(err) {
return ErrSubVolSnapMetadataNotSupported
}
return err
} }
// SetAllSnapshotMetadata set all the metadata from arg parameters on // SetAllSnapshotMetadata set all the metadata from arg parameters on

View File

@ -191,8 +191,9 @@ const (
type localClusterState struct { type localClusterState struct {
// set the enum value i.e., unknown, supported, // set the enum value i.e., unknown, supported,
// unsupported as per the state of the cluster. // unsupported as per the state of the cluster.
resizeState operationState resizeState operationState
subVolMetadataState operationState subVolMetadataState operationState
subVolSnapshotMetadataState operationState
// set true once a subvolumegroup is created // set true once a subvolumegroup is created
// for corresponding cluster. // for corresponding cluster.
subVolumeGroupCreated bool subVolumeGroupCreated bool

View File

@ -398,7 +398,7 @@ func CheckSnapExists(
snapUUID := snapData.ImageUUID snapUUID := snapData.ImageUUID
snapID := snapData.ImageAttributes.ImageName snapID := snapData.ImageAttributes.ImageName
sid.FsSnapshotName = snapData.ImageAttributes.ImageName sid.FsSnapshotName = snapData.ImageAttributes.ImageName
snapClient := core.NewSnapshot(volOptions.conn, snapID, &volOptions.SubVolume) snapClient := core.NewSnapshot(volOptions.conn, snapID, volOptions.ClusterID, &volOptions.SubVolume)
snapInfo, err := snapClient.GetSnapshotInfo(ctx) snapInfo, err := snapClient.GetSnapshotInfo(ctx)
if err != nil { if err != nil {
if errors.Is(err, cerrors.ErrSnapNotFound) { if errors.Is(err, cerrors.ErrSnapNotFound) {

View File

@ -735,7 +735,7 @@ func NewSnapshotOptionsFromID(
volOptions.Features = subvolInfo.Features volOptions.Features = subvolInfo.Features
volOptions.Size = subvolInfo.BytesQuota volOptions.Size = subvolInfo.BytesQuota
volOptions.RootPath = subvolInfo.Path volOptions.RootPath = subvolInfo.Path
snap := core.NewSnapshot(volOptions.conn, sid.FsSnapshotName, &volOptions.SubVolume) snap := core.NewSnapshot(volOptions.conn, sid.FsSnapshotName, volOptions.ClusterID, &volOptions.SubVolume)
info, err := snap.GetSnapshotInfo(ctx) info, err := snap.GetSnapshotInfo(ctx)
if err != nil { if err != nil {
return &volOptions, nil, &sid, err return &volOptions, nil, &sid, err