cephfs: Change checkVolExist for snapshot and clone workflow

Signed-off-by: Humble Chirammal <hchiramm@redhat.com>
This commit is contained in:
Humble Chirammal 2020-08-07 12:37:36 +05:30 committed by mergify[bot]
parent c773097f85
commit 9c000add29

View File

@ -19,8 +19,12 @@ package cephfs
import (
"context"
"errors"
"fmt"
"github.com/ceph/ceph-csi/internal/util"
"github.com/golang/protobuf/ptypes/timestamp"
klog "k8s.io/klog/v2"
)
// volumeIdentifier structure contains an association between the CSI VolumeID to its subvolume
@ -30,6 +34,14 @@ type volumeIdentifier struct {
VolumeID string
}
type snapshotIdentifier struct {
FsSnapshotName string
SnapshotID string
RequestName string
CreationTime *timestamp.Timestamp
FsSubvolName string
}
/*
checkVolExists checks to determine if passed in RequestName in volOptions exists on the backend.
@ -44,15 +56,15 @@ because, the order of omap creation and deletion are inverse of each other, and
request name lock, and hence any stale omaps are leftovers from incomplete transactions and are
hence safe to garbage collect.
*/
func checkVolExists(ctx context.Context, volOptions *volumeOptions, secret map[string]string) (*volumeIdentifier, error) {
// nolint:gocognit:gocyclo // TODO: reduce complexity
func checkVolExists(ctx context.Context,
volOptions,
parentVolOpt *volumeOptions,
pvID *volumeIdentifier,
sID *snapshotIdentifier,
cr *util.Credentials) (*volumeIdentifier, error) {
var vid volumeIdentifier
cr, err := util.NewAdminCredentials(secret)
if err != nil {
return nil, err
}
defer cr.DeleteCredentials()
j, err := volJournal.Connect(volOptions.Monitors, cr)
if err != nil {
return nil, err
@ -70,14 +82,62 @@ func checkVolExists(ctx context.Context, volOptions *volumeOptions, secret map[s
imageUUID := imageData.ImageUUID
vid.FsSubvolName = imageData.ImageAttributes.ImageName
_, err = getVolumeRootPathCeph(ctx, volOptions, cr, volumeID(vid.FsSubvolName))
if err != nil {
if errors.Is(err, ErrVolumeNotFound) {
if sID != nil || pvID != nil {
clone, cloneInfoErr := getCloneInfo(ctx, volOptions, cr, volumeID(vid.FsSubvolName))
if cloneInfoErr != nil {
if errors.Is(cloneInfoErr, ErrVolumeNotFound) {
if pvID != nil {
err = cleanupCloneFromSubvolumeSnapshot(
ctx, volumeID(pvID.FsSubvolName),
volumeID(vid.FsSubvolName),
parentVolOpt,
cr)
if err != nil {
return nil, err
}
}
err = j.UndoReservation(ctx, volOptions.MetadataPool,
volOptions.MetadataPool, vid.FsSubvolName, volOptions.RequestName)
return nil, err
}
return nil, err
}
if clone.Status.State == cephFSCloneInprogress {
return nil, ErrCloneInProgress
}
if clone.Status.State == cephFSCloneFailed {
err = purgeVolume(ctx, volumeID(vid.FsSubvolName), cr, volOptions, true)
if err != nil {
klog.Errorf(util.Log(ctx, "failed to delete volume %s: %v"), vid.FsSubvolName, err)
return nil, err
}
if pvID != nil {
err = cleanupCloneFromSubvolumeSnapshot(
ctx, volumeID(pvID.FsSubvolName),
volumeID(vid.FsSubvolName),
parentVolOpt,
cr)
if err != nil {
return nil, err
}
}
err = j.UndoReservation(ctx, volOptions.MetadataPool,
volOptions.MetadataPool, vid.FsSubvolName, volOptions.RequestName)
return nil, err
}
return nil, err
if clone.Status.State != cephFSCloneComplete {
return nil, fmt.Errorf("clone is not in complete state for %s", vid.FsSubvolName)
}
} else {
_, err = getVolumeRootPathCeph(ctx, volOptions, cr, volumeID(vid.FsSubvolName))
if err != nil {
if errors.Is(err, ErrVolumeNotFound) {
err = j.UndoReservation(ctx, volOptions.MetadataPool,
volOptions.MetadataPool, vid.FsSubvolName, volOptions.RequestName)
return nil, err
}
return nil, err
}
}
// check if topology constraints match what is found
@ -96,6 +156,13 @@ func checkVolExists(ctx context.Context, volOptions *volumeOptions, secret map[s
util.DebugLog(ctx, "Found existing volume (%s) with subvolume name (%s) for request (%s)",
vid.VolumeID, vid.FsSubvolName, volOptions.RequestName)
if parentVolOpt != nil && pvID != nil {
err = cleanupCloneFromSubvolumeSnapshot(ctx, volumeID(pvID.FsSubvolName), volumeID(vid.FsSubvolName), parentVolOpt, cr)
if err != nil {
return nil, err
}
}
return &vid, nil
}