Handle Delete operation if pool not found

If the backend rbd or cephfs pool is already deleted
we need to return success to the  DeleteVolume RPC
call to make it idempotent.

Signed-off-by: Madhu Rajanna <madhupr007@gmail.com>
(cherry picked from commit 8dcb6a6105)
This commit is contained in:
Madhu Rajanna 2020-01-31 14:19:11 +05:30 committed by Madhu Rajanna
parent c4b6bb90d6
commit edf79d1372
8 changed files with 146 additions and 26 deletions

View File

@ -207,6 +207,15 @@ var _ = Describe("cephfs", func() {
}) })
// Make sure this should be last testcase in this file, because
// it deletes pool
By("Create a PVC and Delete PVC when backend pool deleted", func() {
err := pvcDeleteWhenPoolNotFound(pvcPath, true, f)
if err != nil {
Fail(err.Error())
}
})
}) })
}) })

View File

@ -325,6 +325,15 @@ var _ = Describe("RBD", func() {
Fail(err.Error()) Fail(err.Error())
} }
}) })
// Make sure this should be last testcase in this file, because
// it deletes pool
By("Create a PVC and Delete PVC when backend pool deleted", func() {
err := pvcDeleteWhenPoolNotFound(pvcPath, false, f)
if err != nil {
Fail(err.Error())
}
})
}) })
}) })

View File

@ -557,7 +557,7 @@ func validatePVCAndAppBinding(pvcPath, appPath string, f *framework.Framework) {
} }
} }
func getImageIDFromPVC(pvcNamespace, pvcName string, f *framework.Framework) (string, error) { func getImageInfoFromPVC(pvcNamespace, pvcName string, f *framework.Framework) (string, error) {
c := f.ClientSet.CoreV1() c := f.ClientSet.CoreV1()
pvc, err := c.PersistentVolumeClaims(pvcNamespace).Get(pvcName, metav1.GetOptions{}) pvc, err := c.PersistentVolumeClaims(pvcNamespace).Get(pvcName, metav1.GetOptions{})
if err != nil { if err != nil {
@ -572,23 +572,6 @@ func getImageIDFromPVC(pvcNamespace, pvcName string, f *framework.Framework) (st
imageIDRegex := regexp.MustCompile(`(\w+\-?){5}$`) imageIDRegex := regexp.MustCompile(`(\w+\-?){5}$`)
imageID := imageIDRegex.FindString(pv.Spec.CSI.VolumeHandle) imageID := imageIDRegex.FindString(pv.Spec.CSI.VolumeHandle)
return imageID, nil
}
func getRBDImageSpec(pvcNamespace, pvcName string, f *framework.Framework) (string, error) {
imageID, err := getImageIDFromPVC(pvcNamespace, pvcName, f)
if err != nil {
return "", err
}
return fmt.Sprintf("replicapool/csi-vol-%s", imageID), nil
}
func getCephFSVolumeName(pvcNamespace, pvcName string, f *framework.Framework) (string, error) {
imageID, err := getImageIDFromPVC(pvcNamespace, pvcName, f)
if err != nil {
return "", err
}
return fmt.Sprintf("csi-vol-%s", imageID), nil return fmt.Sprintf("csi-vol-%s", imageID), nil
} }
@ -619,11 +602,11 @@ func getMountType(appName, appNamespace, mountPath string, f *framework.Framewor
func validateEncryptedPVCAndAppBinding(pvcPath, appPath string, f *framework.Framework) { func validateEncryptedPVCAndAppBinding(pvcPath, appPath string, f *framework.Framework) {
pvc, app := createPVCAndAppBinding(pvcPath, appPath, f) pvc, app := createPVCAndAppBinding(pvcPath, appPath, f)
rbdImageSpec, err := getRBDImageSpec(pvc.Namespace, pvc.Name, f) rbdImageID, err := getImageInfoFromPVC(pvc.Namespace, pvc.Name, f)
if err != nil { if err != nil {
Fail(err.Error()) Fail(err.Error())
} }
encryptedState, err := getImageMeta(rbdImageSpec, ".rbd.csi.ceph.com/encrypted", f) encryptedState, err := getImageMeta("replicapool/"+rbdImageID, ".rbd.csi.ceph.com/encrypted", f)
if err != nil { if err != nil {
Fail(err.Error()) Fail(err.Error())
} }
@ -794,7 +777,7 @@ func validateNormalUserPVCAccess(pvcPath string, f *framework.Framework) {
// } // }
func deleteBackingCephFSVolume(f *framework.Framework, pvc *v1.PersistentVolumeClaim) error { func deleteBackingCephFSVolume(f *framework.Framework, pvc *v1.PersistentVolumeClaim) error {
volname, err := getCephFSVolumeName(pvc.Namespace, pvc.Name, f) volname, err := getImageInfoFromPVC(pvc.Namespace, pvc.Name, f)
if err != nil { if err != nil {
return err return err
} }
@ -887,3 +870,76 @@ func checkDataPersist(pvcPath, appPath string, f *framework.Framework) error {
err = deletePVCAndApp("", f, pvc, app) err = deletePVCAndApp("", f, pvc, app)
return err return err
} }
func deleteBackingRBDImage(f *framework.Framework, pvc *v1.PersistentVolumeClaim) error {
rbdImage, err := getImageInfoFromPVC(pvc.Namespace, pvc.Name, f)
if err != nil {
return err
}
opt := metav1.ListOptions{
LabelSelector: "app=rook-ceph-tools",
}
cmd := fmt.Sprintf("rbd rm %s --pool=replicapool", rbdImage)
execCommandInPod(f, cmd, rookNS, &opt)
return nil
}
func deletePool(name string, cephfs bool, f *framework.Framework) {
opt := metav1.ListOptions{
LabelSelector: "app=rook-ceph-tools",
}
var cmds = []string{}
if cephfs {
// ceph fs fail
// ceph fs rm myfs --yes-i-really-mean-it
// ceph osd pool delete myfs-metadata myfs-metadata
// --yes-i-really-mean-it
// ceph osd pool delete myfs-data0 myfs-data0
// --yes-i-really-mean-it
cmds = append(cmds, fmt.Sprintf("ceph fs fail %s", name),
fmt.Sprintf("ceph fs rm %s --yes-i-really-mean-it", name),
fmt.Sprintf("ceph osd pool delete %s-metadata %s-metadata --yes-i-really-really-mean-it", name, name),
fmt.Sprintf("ceph osd pool delete %s-data0 %s-data0 --yes-i-really-really-mean-it", name, name))
} else {
// ceph osd pool delete replicapool replicapool
// --yes-i-really-mean-it
cmds = append(cmds, fmt.Sprintf("ceph osd pool delete %s %s --yes-i-really-really-mean-it", name, name))
}
for _, cmd := range cmds {
// discard stdErr as some commands prints warning in strErr
execCommandInPod(f, cmd, rookNS, &opt)
}
}
func pvcDeleteWhenPoolNotFound(pvcPath string, cephfs bool, f *framework.Framework) error {
pvc, err := loadPVC(pvcPath)
if err != nil {
return err
}
pvc.Namespace = f.UniqueName
err = createPVCAndvalidatePV(f.ClientSet, pvc, deployTimeout)
if err != nil {
return err
}
if cephfs {
err = deleteBackingCephFSVolume(f, pvc)
if err != nil {
return err
}
// delete cephfs filesystem
deletePool("myfs", cephfs, f)
} else {
err = deleteBackingRBDImage(f, pvc)
if err != nil {
return err
}
// delete rbd pool
deletePool("replicapool", cephfs, f)
}
err = deletePVCAndValidatePV(f.ClientSet, pvc, deployTimeout)
return err
}

View File

@ -84,10 +84,10 @@ func getMetadataPool(ctx context.Context, monitors string, cr *util.Credentials,
} }
} }
return "", fmt.Errorf("fsName (%s) not found in Ceph cluster", fsName) return "", util.ErrPoolNotFound{Pool: fsName, Err: fmt.Errorf("fsName (%s) not found in Ceph cluster", fsName)}
} }
// CephFilesystemDetails is a representation of the main json structure returned by 'ceph fs dump' // CephFilesystemDump is a representation of the main json structure returned by 'ceph fs dump'
type CephFilesystemDump struct { type CephFilesystemDump struct {
Filesystems []CephFilesystemDetails `json:"filesystems"` Filesystems []CephFilesystemDetails `json:"filesystems"`
} }
@ -114,5 +114,5 @@ func getFsName(ctx context.Context, monitors string, cr *util.Credentials, fscID
} }
} }
return "", fmt.Errorf("fscID (%d) not found in Ceph cluster", fscID) return "", util.ErrPoolNotFound{Pool: string(fscID), Err: fmt.Errorf("fscID (%d) not found in Ceph cluster", fscID)}
} }

View File

@ -231,6 +231,12 @@ func (cs *ControllerServer) DeleteVolume(ctx context.Context, req *csi.DeleteVol
// Find the volume using the provided VolumeID // Find the volume using the provided VolumeID
volOptions, vID, err := newVolumeOptionsFromVolID(ctx, string(volID), nil, secrets) volOptions, vID, err := newVolumeOptionsFromVolID(ctx, string(volID), nil, secrets)
if err != nil { if err != nil {
// if error is ErrPoolNotFound, the pool is already deleted we dont
// need to worry about deleting subvolume or omap data, return success
if _, ok := err.(util.ErrPoolNotFound); ok {
klog.Warningf(util.Log(ctx, "failed to get backend volume for %s: %v"), string(volID), err)
return &csi.DeleteVolumeResponse{}, nil
}
// if error is ErrKeyNotFound, then a previous attempt at deletion was complete // if error is ErrKeyNotFound, then a previous attempt at deletion was complete
// or partially complete (subvolume and imageOMap are garbage collected already), hence // or partially complete (subvolume and imageOMap are garbage collected already), hence
// return success as deletion is complete // return success as deletion is complete

View File

@ -258,6 +258,12 @@ func (cs *ControllerServer) checkSnapshot(ctx context.Context, req *csi.CreateVo
if _, ok := err.(ErrSnapNotFound); !ok { if _, ok := err.(ErrSnapNotFound); !ok {
return status.Error(codes.Internal, err.Error()) return status.Error(codes.Internal, err.Error())
} }
if _, ok := err.(util.ErrPoolNotFound); ok {
klog.Errorf(util.Log(ctx, "failed to get backend snapshot for %s: %v"), snapshotID, err)
return status.Error(codes.InvalidArgument, err.Error())
}
return status.Error(codes.InvalidArgument, "missing requested Snapshot ID") return status.Error(codes.InvalidArgument, "missing requested Snapshot ID")
} }
@ -343,7 +349,12 @@ func (cs *ControllerServer) DeleteVolume(ctx context.Context, req *csi.DeleteVol
defer cs.VolumeLocks.Release(volumeID) defer cs.VolumeLocks.Release(volumeID)
rbdVol := &rbdVolume{} rbdVol := &rbdVolume{}
if err := genVolFromVolID(ctx, rbdVol, volumeID, cr); err != nil { if err = genVolFromVolID(ctx, rbdVol, volumeID, cr); err != nil {
if _, ok := err.(util.ErrPoolNotFound); ok {
klog.Warningf(util.Log(ctx, "failed to get backend volume for %s: %v"), volumeID, err)
return &csi.DeleteVolumeResponse{}, nil
}
// If error is ErrInvalidVolID it could be a version 1.0.0 or lower volume, attempt // If error is ErrInvalidVolID it could be a version 1.0.0 or lower volume, attempt
// to process it as such // to process it as such
if _, ok := err.(ErrInvalidVolID); ok { if _, ok := err.(ErrInvalidVolID); ok {
@ -360,6 +371,7 @@ func (cs *ControllerServer) DeleteVolume(ctx context.Context, req *csi.DeleteVol
// or partially complete (image and imageOMap are garbage collected already), hence return // or partially complete (image and imageOMap are garbage collected already), hence return
// success as deletion is complete // success as deletion is complete
if _, ok := err.(util.ErrKeyNotFound); ok { if _, ok := err.(util.ErrKeyNotFound); ok {
klog.Warningf(util.Log(ctx, "Failed to volume options for %s: %v"), volumeID, err)
return &csi.DeleteVolumeResponse{}, nil return &csi.DeleteVolumeResponse{}, nil
} }
@ -452,6 +464,11 @@ func (cs *ControllerServer) CreateSnapshot(ctx context.Context, req *csi.CreateS
if _, ok := err.(ErrImageNotFound); ok { if _, ok := err.(ErrImageNotFound); ok {
return nil, status.Errorf(codes.NotFound, "source Volume ID %s not found", req.GetSourceVolumeId()) return nil, status.Errorf(codes.NotFound, "source Volume ID %s not found", req.GetSourceVolumeId())
} }
if _, ok := err.(util.ErrPoolNotFound); ok {
klog.Errorf(util.Log(ctx, "failed to get backend volume for %s: %v"), req.GetSourceVolumeId(), err)
return nil, status.Errorf(codes.Internal, err.Error())
}
return nil, status.Errorf(codes.Internal, err.Error()) return nil, status.Errorf(codes.Internal, err.Error())
} }
@ -613,6 +630,13 @@ func (cs *ControllerServer) DeleteSnapshot(ctx context.Context, req *csi.DeleteS
rbdSnap := &rbdSnapshot{} rbdSnap := &rbdSnapshot{}
if err = genSnapFromSnapID(ctx, rbdSnap, snapshotID, cr); err != nil { if err = genSnapFromSnapID(ctx, rbdSnap, snapshotID, cr); err != nil {
// if error is ErrPoolNotFound, the pool is already deleted we dont
// need to worry about deleting snapshot or omap data, return success
if _, ok := err.(util.ErrPoolNotFound); ok {
klog.Warningf(util.Log(ctx, "failed to get backend snapshot for %s: %v"), snapshotID, err)
return &csi.DeleteSnapshotResponse{}, nil
}
// if error is ErrKeyNotFound, then a previous attempt at deletion was complete // if error is ErrKeyNotFound, then a previous attempt at deletion was complete
// or partially complete (snap and snapOMap are garbage collected already), hence return // or partially complete (snap and snapOMap are garbage collected already), hence return
// success as deletion is complete // success as deletion is complete
@ -703,6 +727,12 @@ func (cs *ControllerServer) ControllerExpandVolume(ctx context.Context, req *csi
if _, ok := err.(ErrImageNotFound); ok { if _, ok := err.(ErrImageNotFound); ok {
return nil, status.Errorf(codes.NotFound, "volume ID %s not found", volID) return nil, status.Errorf(codes.NotFound, "volume ID %s not found", volID)
} }
if _, ok := err.(util.ErrPoolNotFound); ok {
klog.Errorf(util.Log(ctx, "failed to get backend volume for %s: %v"), volID, err)
return nil, status.Errorf(codes.InvalidArgument, err.Error())
}
return nil, status.Errorf(codes.Internal, err.Error()) return nil, status.Errorf(codes.Internal, err.Error())
} }

View File

@ -114,7 +114,7 @@ func GetPoolName(ctx context.Context, monitors string, cr *Credentials, poolID i
} }
} }
return "", fmt.Errorf("pool ID (%d) not found in Ceph cluster", poolID) return "", ErrPoolNotFound{string(poolID), fmt.Errorf("pool ID (%d) not found in Ceph cluster", poolID)}
} }
// SetOMapKeyValue sets the given key and value into the provided Ceph omap name // SetOMapKeyValue sets the given key and value into the provided Ceph omap name

View File

@ -56,3 +56,13 @@ type ErrSnapNameConflict struct {
func (e ErrSnapNameConflict) Error() string { func (e ErrSnapNameConflict) Error() string {
return e.err.Error() return e.err.Error()
} }
// ErrPoolNotFound is returned when pool is not found
type ErrPoolNotFound struct {
Pool string
Err error
}
func (e ErrPoolNotFound) Error() string {
return e.Err.Error()
}