Merge pull request #206 from red-hat-storage/sync_us--devel

Syncing latest changes from devel for ceph-csi
This commit is contained in:
openshift-ci[bot] 2023-11-02 08:14:36 +00:00 committed by GitHub
commit ef69b843ef
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 20 additions and 196 deletions

View File

@ -750,7 +750,7 @@ func (cs *ControllerServer) ControllerExpandVolume(
// CreateSnapshot creates the snapshot in backend and stores metadata
// in store
//
//nolint:gocognit,gocyclo,cyclop // golangci-lint did not catch this earlier, needs to get fixed late
//nolint:gocyclo,cyclop // golangci-lint did not catch this earlier, needs to get fixed late
func (cs *ControllerServer) CreateSnapshot(
ctx context.Context,
req *csi.CreateSnapshotRequest,
@ -830,29 +830,15 @@ func (cs *ControllerServer) CreateSnapshot(
}
defer cs.VolumeLocks.Release(sourceVolID)
snapName := req.GetName()
sid, snapInfo, err := store.CheckSnapExists(ctx, parentVolOptions, cephfsSnap, cs.ClusterName, cs.SetMetadata, cr)
sid, err := store.CheckSnapExists(ctx, parentVolOptions, cephfsSnap, cs.ClusterName, cs.SetMetadata, cr)
if err != nil {
return nil, status.Error(codes.Internal, err.Error())
}
// check are we able to retrieve the size of parent
// ceph fs subvolume info command got added in 14.2.10 and 15.+
// as we are not able to retrieve the parent size we are rejecting the
// request to create snapshot.
// TODO: For this purpose we could make use of cached clusterAdditionalInfo
// too.
volClient := core.NewSubVolume(parentVolOptions.GetConnection(), &parentVolOptions.SubVolume,
parentVolOptions.ClusterID, cs.ClusterName, cs.SetMetadata)
info, err := volClient.GetSubVolumeInfo(ctx)
if err != nil {
// Check error code value against ErrInvalidCommand to understand the cluster
// support it or not, It's safe to evaluate as the filtering
// is already done from GetSubVolumeInfo() and send out the error here.
if errors.Is(err, cerrors.ErrInvalidCommand) {
return nil, status.Error(
codes.FailedPrecondition,
"subvolume info command not supported in current ceph cluster")
}
if sid != nil {
errDefer := store.UndoSnapReservation(ctx, parentVolOptions, *sid, snapName, cr)
if errDefer != nil {
@ -866,22 +852,11 @@ func (cs *ControllerServer) CreateSnapshot(
metadata := k8s.GetSnapshotMetadata(req.GetParameters())
if sid != nil {
// check snapshot is protected
protected := true
snapClient := core.NewSnapshot(parentVolOptions.GetConnection(), sid.FsSnapshotName,
parentVolOptions.ClusterID, cs.ClusterName, cs.SetMetadata, &parentVolOptions.SubVolume)
if !(snapInfo.Protected == core.SnapshotIsProtected) {
err = snapClient.ProtectSnapshot(ctx)
if err != nil {
protected = false
log.WarningLog(ctx, "failed to protect snapshot of snapshot: %s (%s)",
sid.FsSnapshotName, err)
}
}
// Update snapshot-name/snapshot-namespace/snapshotcontent-name details on
// subvolume snapshot as metadata in case snapshot already exist
if len(metadata) != 0 {
snapClient := core.NewSnapshot(parentVolOptions.GetConnection(), sid.FsSnapshotName,
parentVolOptions.ClusterID, cs.ClusterName, cs.SetMetadata, &parentVolOptions.SubVolume)
err = snapClient.SetAllSnapshotMetadata(metadata)
if err != nil {
return nil, status.Error(codes.Internal, err.Error())
@ -894,7 +869,7 @@ func (cs *ControllerServer) CreateSnapshot(
SnapshotId: sid.SnapshotID,
SourceVolumeId: req.GetSourceVolumeId(),
CreationTime: sid.CreationTime,
ReadyToUse: protected,
ReadyToUse: true,
},
}, nil
}
@ -968,10 +943,6 @@ func (cs *ControllerServer) doSnapshot(
return snap, fmt.Errorf("failed to get snapshot info for snapshot:%s", snapID)
}
snap.CreationTime = timestamppb.New(snap.CreatedAt)
err = snapClient.ProtectSnapshot(ctx)
if err != nil {
log.ErrorLog(ctx, "failed to protect snapshot %s %v", snapID, err)
}
// Set snapshot-name/snapshot-namespace/snapshotcontent-name details
// on subvolume snapshot as metadata on create
@ -1006,8 +977,6 @@ func (cs *ControllerServer) validateSnapshotReq(ctx context.Context, req *csi.Cr
// DeleteSnapshot deletes the snapshot in backend and removes the
// snapshot metadata from store.
//
//nolint:gocyclo,cyclop // TODO: reduce complexity
func (cs *ControllerServer) DeleteSnapshot(
ctx context.Context,
req *csi.DeleteSnapshotRequest,
@ -1100,14 +1069,6 @@ func (cs *ControllerServer) DeleteSnapshot(
if snapInfo.HasPendingClones == "yes" {
return nil, status.Errorf(codes.FailedPrecondition, "snapshot %s has pending clones", snapshotID)
}
snapClient := core.NewSnapshot(volOpt.GetConnection(), sid.FsSnapshotName,
volOpt.ClusterID, cs.ClusterName, cs.SetMetadata, &volOpt.SubVolume)
if snapInfo.Protected == core.SnapshotIsProtected {
err = snapClient.UnprotectSnapshot(ctx)
if err != nil {
return nil, status.Error(codes.Internal, err.Error())
}
}
needsDelete, err := store.UnrefSelfInSnapshotBackedVolumes(ctx, volOpt, sid.SnapshotID)
if err != nil {
@ -1119,6 +1080,8 @@ func (cs *ControllerServer) DeleteSnapshot(
}
if needsDelete {
snapClient := core.NewSnapshot(volOpt.GetConnection(), sid.FsSnapshotName,
volOpt.ClusterID, cs.ClusterName, cs.SetMetadata, &volOpt.SubVolume)
err = deleteSnapshotAndUndoReservation(
ctx,
snapClient,

View File

@ -18,7 +18,6 @@ package core
import (
"context"
"errors"
"fmt"
cerrors "github.com/ceph/ceph-csi/internal/cephfs/errors"
@ -34,11 +33,6 @@ type cephFSCloneState struct {
errorMsg string
}
const (
// SnapshotIsProtected string indicates that the snapshot is currently protected.
SnapshotIsProtected = "yes"
)
// CephFSCloneError indicates that fetching the clone state returned an error.
var CephFSCloneError = cephFSCloneState{}
@ -73,43 +67,20 @@ func (s *subVolumeClient) CreateCloneFromSubvolume(
return err
}
var (
// if protectErr is not nil we will delete the snapshot as the protect fails
protectErr error
// if cloneErr is not nil we will unprotect the snapshot and delete the snapshot
cloneErr error
)
defer func() {
if protectErr != nil {
err = snapClient.DeleteSnapshot(ctx)
if err != nil {
log.ErrorLog(ctx, "failed to delete snapshot %s %v", snapshotID, err)
}
}
// if cloneErr is not nil we will delete the snapshot
var cloneErr error
defer func() {
if cloneErr != nil {
if err = s.PurgeVolume(ctx, true); err != nil {
log.ErrorLog(ctx, "failed to delete volume %s: %v", s.VolID, err)
}
if err = snapClient.UnprotectSnapshot(ctx); err != nil {
// In case the snap is already unprotected we get ErrSnapProtectionExist error code
// in that case we are safe and we could discard this error and we are good to go
// ahead with deletion
if !errors.Is(err, cerrors.ErrSnapProtectionExist) {
log.ErrorLog(ctx, "failed to unprotect snapshot %s %v", snapshotID, err)
}
}
if err = snapClient.DeleteSnapshot(ctx); err != nil {
log.ErrorLog(ctx, "failed to delete snapshot %s %v", snapshotID, err)
}
}
}()
protectErr = snapClient.ProtectSnapshot(ctx)
if protectErr != nil {
log.ErrorLog(ctx, "failed to protect snapshot %s %v", snapshotID, protectErr)
return protectErr
}
cloneErr = snapClient.CloneSnapshot(ctx, s.SubVolume)
if cloneErr != nil {
log.ErrorLog(ctx, "failed to clone snapshot %s %s to %s %v", parentvolOpt.VolID, snapshotID, s.VolID, cloneErr)
@ -139,16 +110,6 @@ func (s *subVolumeClient) CreateCloneFromSubvolume(
}
// As we completed clone, remove the intermediate snap
if err = snapClient.UnprotectSnapshot(ctx); err != nil {
// In case the snap is already unprotected we get ErrSnapProtectionExist error code
// in that case we are safe and we could discard this error and we are good to go
// ahead with deletion
if !errors.Is(err, cerrors.ErrSnapProtectionExist) {
log.ErrorLog(ctx, "failed to unprotect snapshot %s %v", snapshotID, err)
return err
}
}
if err = snapClient.DeleteSnapshot(ctx); err != nil {
log.ErrorLog(ctx, "failed to delete snapshot %s %v", snapshotID, err)
@ -166,24 +127,8 @@ func (s *subVolumeClient) CleanupSnapshotFromSubvolume(
// identified during PVC-PVC cloning.
snapShotID := s.VolID
snapClient := NewSnapshot(s.conn, snapShotID, s.clusterID, s.clusterName, s.enableMetadata, parentVol)
snapInfo, err := snapClient.GetSnapshotInfo(ctx)
if err != nil {
if errors.Is(err, cerrors.ErrSnapNotFound) {
return nil
}
return err
}
if snapInfo.Protected == SnapshotIsProtected {
err = snapClient.UnprotectSnapshot(ctx)
if err != nil {
log.ErrorLog(ctx, "failed to unprotect snapshot %s %v", snapShotID, err)
return err
}
}
err = snapClient.DeleteSnapshot(ctx)
err := snapClient.DeleteSnapshot(ctx)
if err != nil {
log.ErrorLog(ctx, "failed to delete snapshot %s %v", snapShotID, err)

View File

@ -30,12 +30,6 @@ import (
"github.com/golang/protobuf/ptypes/timestamp"
)
// autoProtect points to the snapshot auto-protect feature of
// the subvolume.
const (
autoProtect = "snapshot-autoprotect"
)
// SnapshotClient is the interface that holds the signature of snapshot methods
// that interacts with CephFS snapshot API's.
type SnapshotClient interface {
@ -45,10 +39,6 @@ type SnapshotClient interface {
DeleteSnapshot(ctx context.Context) error
// GetSnapshotInfo returns the snapshot info of the subvolume.
GetSnapshotInfo(ctx context.Context) (SnapshotInfo, error)
// ProtectSnapshot protects the snapshot of the subvolume.
ProtectSnapshot(ctx context.Context) error
// UnprotectSnapshot unprotects the snapshot of the subvolume.
UnprotectSnapshot(ctx context.Context) error
// CloneSnapshot clones the snapshot of the subvolume.
CloneSnapshot(ctx context.Context, cloneVolOptions *SubVolume) error
// SetAllSnapshotMetadata set all the metadata from arg parameters on
@ -139,7 +129,6 @@ type SnapshotInfo struct {
CreatedAt time.Time
CreationTime *timestamp.Timestamp
HasPendingClones string
Protected string
}
// GetSnapshotInfo returns the snapshot info of the subvolume.
@ -169,80 +158,10 @@ func (s *snapshotClient) GetSnapshotInfo(ctx context.Context) (SnapshotInfo, err
}
snap.CreatedAt = info.CreatedAt.Time
snap.HasPendingClones = info.HasPendingClones
snap.Protected = info.Protected
return snap, nil
}
// ProtectSnapshot protects the snapshot of the subvolume.
func (s *snapshotClient) ProtectSnapshot(ctx context.Context) error {
// If "snapshot-autoprotect" feature is present, The ProtectSnapshot
// call should be treated as a no-op.
if checkSubvolumeHasFeature(autoProtect, s.Features) {
return nil
}
fsa, err := s.conn.GetFSAdmin()
if err != nil {
log.ErrorLog(ctx, "could not get FSAdmin: %s", err)
return err
}
err = fsa.ProtectSubVolumeSnapshot(s.FsName, s.SubvolumeGroup, s.VolID, s.SnapshotID)
if err != nil {
if errors.Is(err, rados.ErrObjectExists) {
return nil
}
log.ErrorLog(
ctx,
"failed to protect subvolume snapshot %s %s in fs %s with error: %s",
s.VolID,
s.SnapshotID,
s.FsName,
err)
return err
}
return nil
}
// UnprotectSnapshot unprotects the snapshot of the subvolume.
func (s *snapshotClient) UnprotectSnapshot(ctx context.Context) error {
// If "snapshot-autoprotect" feature is present, The UnprotectSnapshot
// call should be treated as a no-op.
if checkSubvolumeHasFeature(autoProtect, s.Features) {
return nil
}
fsa, err := s.conn.GetFSAdmin()
if err != nil {
log.ErrorLog(ctx, "could not get FSAdmin: %s", err)
return err
}
err = fsa.UnprotectSubVolumeSnapshot(s.FsName, s.SubvolumeGroup, s.VolID,
s.SnapshotID)
if err != nil {
// In case the snap is already unprotected we get ErrSnapProtectionExist error code
// in that case we are safe and we could discard this error.
if errors.Is(err, rados.ErrObjectExists) {
return nil
}
log.ErrorLog(
ctx,
"failed to unprotect subvolume snapshot %s %s in fs %s with error: %s",
s.VolID,
s.SnapshotID,
s.FsName,
err)
return err
}
return nil
}
// CloneSnapshot clones the snapshot of the subvolume.
func (s *snapshotClient) CloneSnapshot(
ctx context.Context,

View File

@ -46,9 +46,6 @@ var (
// statically provisioned.
ErrNonStaticVolume = coreError.New("volume not static")
// ErrSnapProtectionExist is returned when the snapshot is already protected.
ErrSnapProtectionExist = coreError.New("snapshot protection already exists")
// ErrSnapNotFound is returned when snap name passed is not found in the list
// of snapshots for the given image.
ErrSnapNotFound = coreError.New("snapshot not found")

View File

@ -398,11 +398,11 @@ func CheckSnapExists(
clusterName string,
setMetadata bool,
cr *util.Credentials,
) (*SnapshotIdentifier, *core.SnapshotInfo, error) {
) (*SnapshotIdentifier, error) {
// Connect to cephfs' default radosNamespace (csi)
j, err := SnapJournal.Connect(volOptions.Monitors, fsutil.RadosNamespace, cr)
if err != nil {
return nil, nil, err
return nil, err
}
defer j.Destroy()
@ -411,10 +411,10 @@ func CheckSnapExists(
snapData, err := j.CheckReservation(
ctx, volOptions.MetadataPool, snap.RequestName, snap.NamePrefix, volOptions.VolID, kmsID, encryptionType)
if err != nil {
return nil, nil, err
return nil, err
}
if snapData == nil {
return nil, nil, nil
return nil, nil
}
sid := &SnapshotIdentifier{}
snapUUID := snapData.ImageUUID
@ -428,10 +428,10 @@ func CheckSnapExists(
err = j.UndoReservation(ctx, volOptions.MetadataPool,
volOptions.MetadataPool, snapID, snap.RequestName)
return nil, nil, err
return nil, err
}
return nil, nil, err
return nil, err
}
defer func() {
@ -455,10 +455,10 @@ func CheckSnapExists(
sid.SnapshotID, err = util.GenerateVolID(ctx, volOptions.Monitors, cr, volOptions.FscID,
"", volOptions.ClusterID, snapUUID, fsutil.VolIDVersion)
if err != nil {
return nil, nil, err
return nil, err
}
log.DebugLog(ctx, "Found existing snapshot (%s) with subvolume name (%s) for request (%s)",
snapData.ImageAttributes.RequestName, volOptions.VolID, sid.FsSnapshotName)
return sid, &snapInfo, nil
return sid, nil
}