mirror of
https://github.com/ceph/ceph-csi.git
synced 2024-11-14 02:10:21 +00:00
CephFS: Added ENOENT checks for possible missing volumes
Added checks in DeleteVolume RPC, for image missing errors, and taking appropriate actions to cleanup the CSI reservations. Further removed forcing a volume purge, and instead added checks for missing volume errors in purgeVolume. This should now fix issues where an continuation of an interrupted DeleteVolume call, that only deleted the backing volume, will proceed and not error out. Signed-off-by: ShyamsundarR <srangana@redhat.com>
This commit is contained in:
parent
b93ed21fe8
commit
35e8c3b3a5
@ -243,9 +243,25 @@ func (cs *ControllerServer) DeleteVolume(ctx context.Context, req *csi.DeleteVol
|
|||||||
return cs.deleteVolumeDeprecated(ctx, req)
|
return cs.deleteVolumeDeprecated(ctx, req)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// All errors other than ErrVolumeNotFound should return an error back to the caller
|
||||||
|
if _, ok := err.(ErrVolumeNotFound); !ok {
|
||||||
return nil, status.Error(codes.Internal, err.Error())
|
return nil, status.Error(codes.Internal, err.Error())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// If error is ErrImageNotFound then we failed to find the subvolume, but found the imageOMap
|
||||||
|
// to lead us to the image, hence the imageOMap needs to be garbage collected, by calling
|
||||||
|
// unreserve for the same
|
||||||
|
if acquired := cs.VolumeLocks.TryAcquire(volOptions.RequestName); !acquired {
|
||||||
|
return nil, status.Errorf(codes.Aborted, util.VolumeOperationAlreadyExistsFmt, volOptions.RequestName)
|
||||||
|
}
|
||||||
|
defer cs.VolumeLocks.Release(volOptions.RequestName)
|
||||||
|
|
||||||
|
if err = undoVolReservation(ctx, volOptions, *vID, secrets); err != nil {
|
||||||
|
return nil, status.Error(codes.Internal, err.Error())
|
||||||
|
}
|
||||||
|
return &csi.DeleteVolumeResponse{}, nil
|
||||||
|
}
|
||||||
|
|
||||||
// lock out parallel delete and create requests against the same volume name as we
|
// lock out parallel delete and create requests against the same volume name as we
|
||||||
// cleanup the subvolume and associated omaps for the same
|
// cleanup the subvolume and associated omaps for the same
|
||||||
if acquired := cs.VolumeLocks.TryAcquire(volOptions.RequestName); !acquired {
|
if acquired := cs.VolumeLocks.TryAcquire(volOptions.RequestName); !acquired {
|
||||||
@ -263,8 +279,11 @@ func (cs *ControllerServer) DeleteVolume(ctx context.Context, req *csi.DeleteVol
|
|||||||
|
|
||||||
if err = purgeVolume(ctx, volumeID(vID.FsSubvolName), cr, volOptions); err != nil {
|
if err = purgeVolume(ctx, volumeID(vID.FsSubvolName), cr, volOptions); err != nil {
|
||||||
klog.Errorf(util.Log(ctx, "failed to delete volume %s: %v"), volID, err)
|
klog.Errorf(util.Log(ctx, "failed to delete volume %s: %v"), volID, err)
|
||||||
|
// All errors other than ErrVolumeNotFound should return an error back to the caller
|
||||||
|
if _, ok := err.(ErrVolumeNotFound); !ok {
|
||||||
return nil, status.Error(codes.Internal, err.Error())
|
return nil, status.Error(codes.Internal, err.Error())
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
if err := undoVolReservation(ctx, volOptions, *vID, secrets); err != nil {
|
if err := undoVolReservation(ctx, volOptions, *vID, secrets); err != nil {
|
||||||
return nil, status.Error(codes.Internal, err.Error())
|
return nil, status.Error(codes.Internal, err.Error())
|
||||||
|
@ -35,3 +35,12 @@ type ErrNonStaticVolume struct {
|
|||||||
func (e ErrNonStaticVolume) Error() string {
|
func (e ErrNonStaticVolume) Error() string {
|
||||||
return e.err.Error()
|
return e.err.Error()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// ErrVolumeNotFound is returned when a subvolume is not found in CephFS
|
||||||
|
type ErrVolumeNotFound struct {
|
||||||
|
err error
|
||||||
|
}
|
||||||
|
|
||||||
|
func (e ErrVolumeNotFound) Error() string {
|
||||||
|
return e.err.Error()
|
||||||
|
}
|
||||||
|
@ -52,8 +52,12 @@ func getCephRootPathLocalDeprecated(volID volumeID) string {
|
|||||||
return fmt.Sprintf("%s/controller/volumes/root-%s", PluginFolder, string(volID))
|
return fmt.Sprintf("%s/controller/volumes/root-%s", PluginFolder, string(volID))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func getVolumeNotFoundErrorString(volID volumeID) string {
|
||||||
|
return fmt.Sprintf("Error ENOENT: Subvolume '%s' not found", string(volID))
|
||||||
|
}
|
||||||
|
|
||||||
func getVolumeRootPathCeph(ctx context.Context, volOptions *volumeOptions, cr *util.Credentials, volID volumeID) (string, error) {
|
func getVolumeRootPathCeph(ctx context.Context, volOptions *volumeOptions, cr *util.Credentials, volID volumeID) (string, error) {
|
||||||
stdout, _, err := util.ExecCommand(
|
stdout, stderr, err := util.ExecCommand(
|
||||||
"ceph",
|
"ceph",
|
||||||
"fs",
|
"fs",
|
||||||
"subvolume",
|
"subvolume",
|
||||||
@ -69,6 +73,11 @@ func getVolumeRootPathCeph(ctx context.Context, volOptions *volumeOptions, cr *u
|
|||||||
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
klog.Errorf(util.Log(ctx, "failed to get the rootpath for the vol %s(%s)"), string(volID), err)
|
klog.Errorf(util.Log(ctx, "failed to get the rootpath for the vol %s(%s)"), string(volID), err)
|
||||||
|
|
||||||
|
if strings.Contains(string(stderr), getVolumeNotFoundErrorString(volID)) {
|
||||||
|
return "", ErrVolumeNotFound{err}
|
||||||
|
}
|
||||||
|
|
||||||
return "", err
|
return "", err
|
||||||
}
|
}
|
||||||
return strings.TrimSuffix(string(stdout), "\n"), nil
|
return strings.TrimSuffix(string(stdout), "\n"), nil
|
||||||
@ -204,13 +213,17 @@ func purgeVolume(ctx context.Context, volID volumeID, cr *util.Credentials, volO
|
|||||||
string(volID),
|
string(volID),
|
||||||
"--group_name",
|
"--group_name",
|
||||||
csiSubvolumeGroup,
|
csiSubvolumeGroup,
|
||||||
"--force",
|
|
||||||
"-m", volOptions.Monitors,
|
"-m", volOptions.Monitors,
|
||||||
"-c", util.CephConfigPath,
|
"-c", util.CephConfigPath,
|
||||||
"-n", cephEntityClientPrefix+cr.ID,
|
"-n", cephEntityClientPrefix+cr.ID,
|
||||||
"--keyfile="+cr.KeyFile)
|
"--keyfile="+cr.KeyFile)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
klog.Errorf(util.Log(ctx, "failed to purge subvolume %s(%s) in fs %s"), string(volID), err, volOptions.FsName)
|
klog.Errorf(util.Log(ctx, "failed to purge subvolume %s(%s) in fs %s"), string(volID), err, volOptions.FsName)
|
||||||
|
|
||||||
|
if strings.Contains(err.Error(), getVolumeNotFoundErrorString(volID)) {
|
||||||
|
return ErrVolumeNotFound{err}
|
||||||
|
}
|
||||||
|
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -245,13 +245,13 @@ func newVolumeOptionsFromVolID(ctx context.Context, volID string, volOpt, secret
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
volOptions.ProvisionVolume = true
|
||||||
|
|
||||||
volOptions.RootPath, err = getVolumeRootPathCeph(ctx, &volOptions, cr, volumeID(vid.FsSubvolName))
|
volOptions.RootPath, err = getVolumeRootPathCeph(ctx, &volOptions, cr, volumeID(vid.FsSubvolName))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, nil, err
|
return &volOptions, &vid, err
|
||||||
}
|
}
|
||||||
|
|
||||||
volOptions.ProvisionVolume = true
|
|
||||||
|
|
||||||
return &volOptions, &vid, nil
|
return &volOptions, &vid, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
Loading…
Reference in New Issue
Block a user