cephfs: check clone pending state

sometimes, cephfs returns pending as clone
status if we request for the subvolume clone.
cephcsi need to consider the pending state
when checking the clone status and it should
return abort error message if the clone is in
pending state or in-progress state.

Co-authored-by: Yug <yuggupta27@gmail.com>
Signed-off-by: Madhu Rajanna <madhupr007@gmail.com>
This commit is contained in:
Madhu Rajanna 2020-11-06 08:42:55 +05:30 committed by Madhu Rajanna
parent 75d190f430
commit 09c303c2cb
4 changed files with 22 additions and 5 deletions

View File

@ -27,6 +27,8 @@ import (
const ( const (
// cephFSCloneFailed indicates that clone is in failed state. // cephFSCloneFailed indicates that clone is in failed state.
cephFSCloneFailed = "failed" cephFSCloneFailed = "failed"
// cephFSClonePending indicates that clone is in pending state.
cephFSClonePending = "pending"
// cephFSCloneCompleted indicates that clone is in in-progress state. // cephFSCloneCompleted indicates that clone is in in-progress state.
cephFSCloneInprogress = "in-progress" cephFSCloneInprogress = "in-progress"
// cephFSCloneComplete indicates that clone is in complete state. // cephFSCloneComplete indicates that clone is in complete state.
@ -94,6 +96,9 @@ func createCloneFromSubvolume(ctx context.Context, volID, cloneID volumeID, volO
case cephFSCloneInprogress: case cephFSCloneInprogress:
util.ErrorLog(ctx, "clone is in progress for %v", cloneID) util.ErrorLog(ctx, "clone is in progress for %v", cloneID)
return ErrCloneInProgress return ErrCloneInProgress
case cephFSClonePending:
util.ErrorLog(ctx, "clone is pending for %v", cloneID)
return ErrClonePending
case cephFSCloneFailed: case cephFSCloneFailed:
util.ErrorLog(ctx, "clone failed for %v", cloneID) util.ErrorLog(ctx, "clone failed for %v", cloneID)
cloneFailedErr := fmt.Errorf("clone %s is in %s state", cloneID, clone.Status.State) cloneFailedErr := fmt.Errorf("clone %s is in %s state", cloneID, clone.Status.State)
@ -150,6 +155,12 @@ func cleanupCloneFromSubvolumeSnapshot(ctx context.Context, volID, cloneID volum
return nil return nil
} }
// isCloneRetryError returns true if the clone error is pending,in-progress
// error.
func isCloneRetryError(err error) bool {
return errors.Is(err, ErrCloneInProgress) || errors.Is(err, ErrClonePending)
}
func createCloneFromSnapshot(ctx context.Context, parentVolOpt, volOptions *volumeOptions, vID *volumeIdentifier, sID *snapshotIdentifier, cr *util.Credentials) error { func createCloneFromSnapshot(ctx context.Context, parentVolOpt, volOptions *volumeOptions, vID *volumeIdentifier, sID *snapshotIdentifier, cr *util.Credentials) error {
snapID := volumeID(sID.FsSnapshotName) snapID := volumeID(sID.FsSnapshotName)
err := cloneSnapshot(ctx, parentVolOpt, cr, volumeID(sID.FsSubvolName), snapID, volumeID(vID.FsSubvolName), volOptions) err := cloneSnapshot(ctx, parentVolOpt, cr, volumeID(sID.FsSubvolName), snapID, volumeID(vID.FsSubvolName), volOptions)
@ -158,7 +169,7 @@ func createCloneFromSnapshot(ctx context.Context, parentVolOpt, volOptions *volu
} }
defer func() { defer func() {
if err != nil { if err != nil {
if !errors.Is(err, ErrCloneInProgress) { if !isCloneRetryError(err) {
if dErr := purgeVolume(ctx, volumeID(vID.FsSubvolName), cr, volOptions, true); dErr != nil { if dErr := purgeVolume(ctx, volumeID(vID.FsSubvolName), cr, volOptions, true); dErr != nil {
util.ErrorLog(ctx, "failed to delete volume %s: %v", vID.FsSubvolName, dErr) util.ErrorLog(ctx, "failed to delete volume %s: %v", vID.FsSubvolName, dErr)
} }
@ -173,6 +184,8 @@ func createCloneFromSnapshot(ctx context.Context, parentVolOpt, volOptions *volu
switch clone.Status.State { switch clone.Status.State {
case cephFSCloneInprogress: case cephFSCloneInprogress:
return ErrCloneInProgress return ErrCloneInProgress
case cephFSClonePending:
return ErrClonePending
case cephFSCloneFailed: case cephFSCloneFailed:
return fmt.Errorf("clone %s is in %s state", vID.FsSubvolName, clone.Status.State) return fmt.Errorf("clone %s is in %s state", vID.FsSubvolName, clone.Status.State)
case cephFSCloneComplete: case cephFSCloneComplete:

View File

@ -168,7 +168,7 @@ func (cs *ControllerServer) CreateVolume(ctx context.Context, req *csi.CreateVol
} }
vID, err := checkVolExists(ctx, volOptions, parentVol, pvID, sID, cr) vID, err := checkVolExists(ctx, volOptions, parentVol, pvID, sID, cr)
if err != nil { if err != nil {
if errors.Is(err, ErrCloneInProgress) { if isCloneRetryError(err) {
return nil, status.Error(codes.Aborted, err.Error()) return nil, status.Error(codes.Aborted, err.Error())
} }
return nil, status.Error(codes.Internal, err.Error()) return nil, status.Error(codes.Internal, err.Error())
@ -228,7 +228,7 @@ func (cs *ControllerServer) CreateVolume(ctx context.Context, req *csi.CreateVol
defer func() { defer func() {
if err != nil { if err != nil {
if !errors.Is(err, ErrCloneInProgress) { if !isCloneRetryError(err) {
errDefer := undoVolReservation(ctx, volOptions, *vID, secret) errDefer := undoVolReservation(ctx, volOptions, *vID, secret)
if errDefer != nil { if errDefer != nil {
util.WarningLog(ctx, "failed undoing reservation of volume: %s (%s)", util.WarningLog(ctx, "failed undoing reservation of volume: %s (%s)",
@ -241,7 +241,7 @@ func (cs *ControllerServer) CreateVolume(ctx context.Context, req *csi.CreateVol
// Create a volume // Create a volume
err = cs.createBackingVolume(ctx, volOptions, parentVol, vID, pvID, sID, cr) err = cs.createBackingVolume(ctx, volOptions, parentVol, vID, pvID, sID, cr)
if err != nil { if err != nil {
if errors.Is(err, ErrCloneInProgress) { if isCloneRetryError(err) {
return nil, status.Error(codes.Aborted, err.Error()) return nil, status.Error(codes.Aborted, err.Error())
} }
return nil, err return nil, err

View File

@ -38,7 +38,8 @@ const (
var ( var (
// ErrCloneInProgress is returned when snapshot clone state is `in progress` // ErrCloneInProgress is returned when snapshot clone state is `in progress`
ErrCloneInProgress = errors.New("in progress") ErrCloneInProgress = errors.New("in progress")
// ErrClonePending is returned when snapshot clone state is `pending`
ErrClonePending = errors.New("pending")
// ErrInvalidVolID is returned when a CSI passed VolumeID is not conformant to any known volume ID // ErrInvalidVolID is returned when a CSI passed VolumeID is not conformant to any known volume ID
// formats. // formats.
ErrInvalidVolID = errors.New("invalid VolumeID") ErrInvalidVolID = errors.New("invalid VolumeID")

View File

@ -105,6 +105,9 @@ func checkVolExists(ctx context.Context,
if clone.Status.State == cephFSCloneInprogress { if clone.Status.State == cephFSCloneInprogress {
return nil, ErrCloneInProgress return nil, ErrCloneInProgress
} }
if clone.Status.State == cephFSClonePending {
return nil, ErrClonePending
}
if clone.Status.State == cephFSCloneFailed { if clone.Status.State == cephFSCloneFailed {
err = purgeVolume(ctx, volumeID(vid.FsSubvolName), cr, volOptions, true) err = purgeVolume(ctx, volumeID(vid.FsSubvolName), cr, volOptions, true)
if err != nil { if err != nil {