cephfs: Add 'pending' state for clone status

In certain cases, clone status can be 'pending'.
In that case, abort error message should be
returned similar to that during 'in-progress'
state.

Co-authored-by: Madhu Rajanna <madhupr007@gmail.com>
Signed-off-by: Yug <yuggupta27@gmail.com>
This commit is contained in:
Yug 2020-11-09 11:30:18 +05:30 committed by mergify[bot]
parent 565038fdfd
commit acbedc52bf
4 changed files with 17 additions and 4 deletions

View File

@ -27,6 +27,8 @@ import (
const ( const (
// cephFSCloneFailed indicates that clone is in failed state. // cephFSCloneFailed indicates that clone is in failed state.
cephFSCloneFailed = "failed" cephFSCloneFailed = "failed"
// cephFSClonePending indicates that clone is in pending state.
cephFSClonePending = "pending"
// cephFSCloneInprogress indicates that clone is in in-progress state. // cephFSCloneInprogress indicates that clone is in in-progress state.
cephFSCloneInprogress = "in-progress" cephFSCloneInprogress = "in-progress"
// cephFSCloneComplete indicates that clone is in complete state. // cephFSCloneComplete indicates that clone is in complete state.
@ -94,6 +96,9 @@ func createCloneFromSubvolume(ctx context.Context, volID, cloneID volumeID, volO
case cephFSCloneInprogress: case cephFSCloneInprogress:
util.ErrorLog(ctx, "clone is in progress for %v", cloneID) util.ErrorLog(ctx, "clone is in progress for %v", cloneID)
return ErrCloneInProgress return ErrCloneInProgress
case cephFSClonePending:
util.ErrorLog(ctx, "clone is pending for %v", cloneID)
return ErrClonePending
case cephFSCloneFailed: case cephFSCloneFailed:
util.ErrorLog(ctx, "clone failed for %v", cloneID) util.ErrorLog(ctx, "clone failed for %v", cloneID)
cloneFailedErr := fmt.Errorf("clone %s is in %s state", cloneID, clone.Status.State) cloneFailedErr := fmt.Errorf("clone %s is in %s state", cloneID, clone.Status.State)
@ -158,7 +163,7 @@ func createCloneFromSnapshot(ctx context.Context, parentVolOpt, volOptions *volu
} }
defer func() { defer func() {
if err != nil { if err != nil {
if !errors.Is(err, ErrCloneInProgress) { if !isCloneRetryError(err) {
if dErr := purgeVolume(ctx, volumeID(vID.FsSubvolName), cr, volOptions, true); dErr != nil { if dErr := purgeVolume(ctx, volumeID(vID.FsSubvolName), cr, volOptions, true); dErr != nil {
util.ErrorLog(ctx, "failed to delete volume %s: %v", vID.FsSubvolName, dErr) util.ErrorLog(ctx, "failed to delete volume %s: %v", vID.FsSubvolName, dErr)
} }
@ -173,6 +178,8 @@ func createCloneFromSnapshot(ctx context.Context, parentVolOpt, volOptions *volu
switch clone.Status.State { switch clone.Status.State {
case cephFSCloneInprogress: case cephFSCloneInprogress:
return ErrCloneInProgress return ErrCloneInProgress
case cephFSClonePending:
return ErrClonePending
case cephFSCloneFailed: case cephFSCloneFailed:
return fmt.Errorf("clone %s is in %s state", vID.FsSubvolName, clone.Status.State) return fmt.Errorf("clone %s is in %s state", vID.FsSubvolName, clone.Status.State)
case cephFSCloneComplete: case cephFSCloneComplete:

View File

@ -173,7 +173,7 @@ func (cs *ControllerServer) CreateVolume(ctx context.Context, req *csi.CreateVol
vID, err := checkVolExists(ctx, volOptions, parentVol, pvID, sID, cr) vID, err := checkVolExists(ctx, volOptions, parentVol, pvID, sID, cr)
if err != nil { if err != nil {
if errors.Is(err, ErrCloneInProgress) { if isCloneRetryError(err) {
return nil, status.Error(codes.Aborted, err.Error()) return nil, status.Error(codes.Aborted, err.Error())
} }
return nil, status.Error(codes.Internal, err.Error()) return nil, status.Error(codes.Internal, err.Error())
@ -233,7 +233,7 @@ func (cs *ControllerServer) CreateVolume(ctx context.Context, req *csi.CreateVol
defer func() { defer func() {
if err != nil { if err != nil {
if !errors.Is(err, ErrCloneInProgress) { if !isCloneRetryError(err) {
errDefer := undoVolReservation(ctx, volOptions, *vID, secret) errDefer := undoVolReservation(ctx, volOptions, *vID, secret)
if errDefer != nil { if errDefer != nil {
util.WarningLog(ctx, "failed undoing reservation of volume: %s (%s)", util.WarningLog(ctx, "failed undoing reservation of volume: %s (%s)",
@ -246,7 +246,7 @@ func (cs *ControllerServer) CreateVolume(ctx context.Context, req *csi.CreateVol
// Create a volume // Create a volume
err = cs.createBackingVolume(ctx, volOptions, parentVol, vID, pvID, sID, cr) err = cs.createBackingVolume(ctx, volOptions, parentVol, vID, pvID, sID, cr)
if err != nil { if err != nil {
if errors.Is(err, ErrCloneInProgress) { if isCloneRetryError(err) {
return nil, status.Error(codes.Aborted, err.Error()) return nil, status.Error(codes.Aborted, err.Error())
} }
return nil, err return nil, err

View File

@ -39,6 +39,9 @@ var (
// ErrCloneInProgress is returned when snapshot clone state is `in progress` // ErrCloneInProgress is returned when snapshot clone state is `in progress`
ErrCloneInProgress = errors.New("in progress") ErrCloneInProgress = errors.New("in progress")
// ErrClonePending is returned when snapshot clone state is `pending`
ErrClonePending = errors.New("pending")
// ErrInvalidVolID is returned when a CSI passed VolumeID is not conformant to any known volume ID // ErrInvalidVolID is returned when a CSI passed VolumeID is not conformant to any known volume ID
// formats. // formats.
ErrInvalidVolID = errors.New("invalid VolumeID") ErrInvalidVolID = errors.New("invalid VolumeID")

View File

@ -105,6 +105,9 @@ func checkVolExists(ctx context.Context,
if clone.Status.State == cephFSCloneInprogress { if clone.Status.State == cephFSCloneInprogress {
return nil, ErrCloneInProgress return nil, ErrCloneInProgress
} }
if clone.Status.State == cephFSClonePending {
return nil, ErrClonePending
}
if clone.Status.State == cephFSCloneFailed { if clone.Status.State == cephFSCloneFailed {
err = purgeVolume(ctx, volumeID(vid.FsSubvolName), cr, volOptions, true) err = purgeVolume(ctx, volumeID(vid.FsSubvolName), cr, volOptions, true)
if err != nil { if err != nil {