cleanup: move cephfs errors to new util package

As part of the refactoring, moving the cephfs errors file to a new
package.

Updates: #852
Signed-off-by: Madhu Rajanna <madhupr007@gmail.com>
This commit is contained in:
Madhu Rajanna 2021-08-25 12:16:03 +05:30 committed by mergify[bot]
parent aeebd5d03b
commit b383af20b4
10 changed files with 69 additions and 59 deletions

View File

@ -20,6 +20,7 @@ import (
"context"
"fmt"
cerrors "github.com/ceph/ceph-csi/internal/cephfs/errors"
"github.com/ceph/ceph-csi/internal/util"
"github.com/ceph/ceph-csi/internal/util/log"
)
@ -47,7 +48,7 @@ func (vo *volumeOptions) getFscID(ctx context.Context) (int64, error) {
log.ErrorLog(ctx, "failed to list volume %s", vo.FsName)
return 0, ErrVolumeNotFound
return 0, cerrors.ErrVolumeNotFound
}
func (vo *volumeOptions) getMetadataPool(ctx context.Context) (string, error) {

View File

@ -20,6 +20,7 @@ import (
"context"
"errors"
cerrors "github.com/ceph/ceph-csi/internal/cephfs/errors"
"github.com/ceph/ceph-csi/internal/util/log"
)
@ -48,13 +49,13 @@ func (cs cephFSCloneState) toError() error {
case cephFSCloneComplete:
return nil
case cephFSCloneError:
return ErrInvalidClone
return cerrors.ErrInvalidClone
case cephFSCloneInprogress:
return ErrCloneInProgress
return cerrors.ErrCloneInProgress
case cephFSClonePending:
return ErrClonePending
return cerrors.ErrClonePending
case cephFSCloneFailed:
return ErrCloneFailed
return cerrors.ErrCloneFailed
}
return nil
@ -90,7 +91,7 @@ func createCloneFromSubvolume(ctx context.Context, volID, cloneID volumeID, volO
// In case the snap is already unprotected we get ErrSnapProtectionExist error code
// in that case we are safe and we could discard this error and we are good to go
// ahead with deletion
if !errors.Is(err, ErrSnapProtectionExist) {
if !errors.Is(err, cerrors.ErrSnapProtectionExist) {
log.ErrorLog(ctx, "failed to unprotect snapshot %s %v", snapshotID, err)
}
}
@ -137,7 +138,7 @@ func createCloneFromSubvolume(ctx context.Context, volID, cloneID volumeID, volO
// In case the snap is already unprotected we get ErrSnapProtectionExist error code
// in that case we are safe and we could discard this error and we are good to go
// ahead with deletion
if !errors.Is(err, ErrSnapProtectionExist) {
if !errors.Is(err, cerrors.ErrSnapProtectionExist) {
log.ErrorLog(ctx, "failed to unprotect snapshot %s %v", snapshotID, err)
return err
@ -161,7 +162,7 @@ func cleanupCloneFromSubvolumeSnapshot(
snapShotID := cloneID
snapInfo, err := parentVolOpt.getSnapshotInfo(ctx, snapShotID, volID)
if err != nil {
if errors.Is(err, ErrSnapNotFound) {
if errors.Is(err, cerrors.ErrSnapNotFound) {
return nil
}
@ -189,7 +190,7 @@ func cleanupCloneFromSubvolumeSnapshot(
// isCloneRetryError returns true if the clone error is pending,in-progress
// error.
func isCloneRetryError(err error) bool {
return errors.Is(err, ErrCloneInProgress) || errors.Is(err, ErrClonePending)
return errors.Is(err, cerrors.ErrCloneInProgress) || errors.Is(err, cerrors.ErrClonePending)
}
func createCloneFromSnapshot(

View File

@ -19,6 +19,8 @@ package cephfs
import (
"testing"
cerrors "github.com/ceph/ceph-csi/internal/cephfs/errors"
"github.com/stretchr/testify/assert"
)
@ -26,10 +28,10 @@ func TestCloneStateToError(t *testing.T) {
t.Parallel()
errorState := make(map[cephFSCloneState]error)
errorState[cephFSCloneComplete] = nil
errorState[cephFSCloneError] = ErrInvalidClone
errorState[cephFSCloneInprogress] = ErrCloneInProgress
errorState[cephFSClonePending] = ErrClonePending
errorState[cephFSCloneFailed] = ErrCloneFailed
errorState[cephFSCloneError] = cerrors.ErrInvalidClone
errorState[cephFSCloneInprogress] = cerrors.ErrCloneInProgress
errorState[cephFSClonePending] = cerrors.ErrClonePending
errorState[cephFSCloneFailed] = cerrors.ErrCloneFailed
for state, err := range errorState {
assert.Equal(t, state.toError(), err)

View File

@ -21,6 +21,7 @@ import (
"errors"
"fmt"
cerrors "github.com/ceph/ceph-csi/internal/cephfs/errors"
csicommon "github.com/ceph/ceph-csi/internal/csi-common"
"github.com/ceph/ceph-csi/internal/util"
"github.com/ceph/ceph-csi/internal/util/log"
@ -119,7 +120,7 @@ func checkContentSource(
snapshotID := req.VolumeContentSource.GetSnapshot().GetSnapshotId()
volOpt, _, sid, err := newSnapshotOptionsFromID(ctx, snapshotID, cr)
if err != nil {
if errors.Is(err, ErrSnapNotFound) {
if errors.Is(err, cerrors.ErrSnapNotFound) {
return nil, nil, nil, status.Error(codes.NotFound, err.Error())
}
@ -132,7 +133,7 @@ func checkContentSource(
volID := req.VolumeContentSource.GetVolume().GetVolumeId()
parentVol, pvID, err := newVolumeOptionsFromVolID(ctx, volID, nil, req.Secrets)
if err != nil {
if !errors.Is(err, ErrVolumeNotFound) {
if !errors.Is(err, cerrors.ErrVolumeNotFound) {
return nil, nil, nil, status.Error(codes.NotFound, err.Error())
}
@ -219,7 +220,7 @@ func (cs *ControllerServer) CreateVolume(
if purgeErr != nil {
log.ErrorLog(ctx, "failed to delete volume %s: %v", requestName, purgeErr)
// All errors other than ErrVolumeNotFound should return an error back to the caller
if !errors.Is(purgeErr, ErrVolumeNotFound) {
if !errors.Is(purgeErr, cerrors.ErrVolumeNotFound) {
return nil, status.Error(codes.Internal, purgeErr.Error())
}
}
@ -288,7 +289,7 @@ func (cs *ControllerServer) CreateVolume(
if purgeErr != nil {
log.ErrorLog(ctx, "failed to delete volume %s: %v", vID.FsSubvolName, purgeErr)
// All errors other than ErrVolumeNotFound should return an error back to the caller
if !errors.Is(purgeErr, ErrVolumeNotFound) {
if !errors.Is(purgeErr, cerrors.ErrVolumeNotFound) {
// If the subvolume deletion is failed, we should not cleanup
// the OMAP entry it will stale subvolume in cluster.
// set err=nil so that when we get the request again we can get
@ -375,7 +376,7 @@ func (cs *ControllerServer) DeleteVolume(
log.ErrorLog(ctx, "Error returned from newVolumeOptionsFromVolID: %v", err)
// All errors other than ErrVolumeNotFound should return an error back to the caller
if !errors.Is(err, ErrVolumeNotFound) {
if !errors.Is(err, cerrors.ErrVolumeNotFound) {
return nil, status.Error(codes.Internal, err.Error())
}
@ -413,11 +414,11 @@ func (cs *ControllerServer) DeleteVolume(
if err = volOptions.purgeVolume(ctx, volumeID(vID.FsSubvolName), false); err != nil {
log.ErrorLog(ctx, "failed to delete volume %s: %v", volID, err)
if errors.Is(err, ErrVolumeHasSnapshots) {
if errors.Is(err, cerrors.ErrVolumeHasSnapshots) {
return nil, status.Error(codes.FailedPrecondition, err.Error())
}
if !errors.Is(err, ErrVolumeNotFound) {
if !errors.Is(err, cerrors.ErrVolumeNotFound) {
return nil, status.Error(codes.Internal, err.Error())
}
}
@ -554,7 +555,7 @@ func (cs *ControllerServer) CreateSnapshot(
return nil, status.Error(codes.NotFound, err.Error())
}
if errors.Is(err, ErrVolumeNotFound) {
if errors.Is(err, cerrors.ErrVolumeNotFound) {
return nil, status.Error(codes.NotFound, err.Error())
}
@ -598,7 +599,7 @@ func (cs *ControllerServer) CreateSnapshot(
// Check error code value against ErrInvalidCommand to understand the cluster
// support it or not, It's safe to evaluate as the filtering
// is already done from getSubVolumeInfo() and send out the error here.
if errors.Is(err, ErrInvalidCommand) {
if errors.Is(err, cerrors.ErrInvalidCommand) {
return nil, status.Error(
codes.FailedPrecondition,
"subvolume info command not supported in current ceph cluster")
@ -775,7 +776,7 @@ func (cs *ControllerServer) DeleteSnapshot(
// or partially complete (snap and snapOMap are garbage collected already), hence return
// success as deletion is complete
return &csi.DeleteSnapshotResponse{}, nil
case errors.Is(err, ErrSnapNotFound):
case errors.Is(err, cerrors.ErrSnapNotFound):
err = undoSnapReservation(ctx, volOpt, *sid, sid.FsSnapshotName, cr)
if err != nil {
log.ErrorLog(ctx, "failed to remove reservation for snapname (%s) with backing snap (%s) (%s)",
@ -785,7 +786,7 @@ func (cs *ControllerServer) DeleteSnapshot(
}
return &csi.DeleteSnapshotResponse{}, nil
case errors.Is(err, ErrVolumeNotFound):
case errors.Is(err, cerrors.ErrVolumeNotFound):
// if the error is ErrVolumeNotFound, the subvolume is already deleted
// from backend, Hence undo the omap entries and return success
log.ErrorLog(ctx, "Volume not present")

View File

@ -14,51 +14,51 @@ See the License for the specific language governing permissions and
limitations under the License.
*/
package cephfs
package errors
import (
"errors"
coreError "errors"
)
// Error strings for comparison with CLI errors.
const (
// volumeNotEmpty is returned when the volume is not empty.
volumeNotEmpty = "Directory not empty"
// VolumeNotEmpty is returned when the volume is not empty.
VolumeNotEmpty = "Directory not empty"
)
var (
// ErrCloneInProgress is returned when snapshot clone state is `in progress`.
ErrCloneInProgress = errors.New("clone from snapshot is already in progress")
ErrCloneInProgress = coreError.New("clone from snapshot is already in progress")
// ErrClonePending is returned when snapshot clone state is `pending`.
ErrClonePending = errors.New("clone from snapshot is pending")
ErrClonePending = coreError.New("clone from snapshot is pending")
// ErrInvalidClone is returned when the clone state is invalid.
ErrInvalidClone = errors.New("invalid clone state")
ErrInvalidClone = coreError.New("invalid clone state")
// ErrCloneFailed is returned when the clone state is failed.
ErrCloneFailed = errors.New("clone from snapshot failed")
ErrCloneFailed = coreError.New("clone from snapshot failed")
// ErrInvalidVolID is returned when a CSI passed VolumeID is not conformant to any known volume ID
// formats.
ErrInvalidVolID = errors.New("invalid VolumeID")
ErrInvalidVolID = coreError.New("invalid VolumeID")
// ErrNonStaticVolume is returned when a volume is detected as not being
// statically provisioned.
ErrNonStaticVolume = errors.New("volume not static")
ErrNonStaticVolume = coreError.New("volume not static")
// ErrSnapProtectionExist is returned when the snapshot is already protected.
ErrSnapProtectionExist = errors.New("snapshot protection already exists")
ErrSnapProtectionExist = coreError.New("snapshot protection already exists")
// ErrSnapNotFound is returned when snap name passed is not found in the list
// of snapshots for the given image.
ErrSnapNotFound = errors.New("snapshot not found")
ErrSnapNotFound = coreError.New("snapshot not found")
// ErrVolumeNotFound is returned when a subvolume is not found in CephFS.
ErrVolumeNotFound = errors.New("volume not found")
ErrVolumeNotFound = coreError.New("volume not found")
// ErrInvalidCommand is returned when a command is not known to the cluster.
ErrInvalidCommand = errors.New("invalid command")
ErrInvalidCommand = coreError.New("invalid command")
// ErrVolumeHasSnapshots is returned when a subvolume has snapshots.
ErrVolumeHasSnapshots = errors.New("volume has snapshots")
ErrVolumeHasSnapshots = coreError.New("volume has snapshots")
)

View File

@ -21,6 +21,7 @@ import (
"errors"
"fmt"
cerrors "github.com/ceph/ceph-csi/internal/cephfs/errors"
"github.com/ceph/ceph-csi/internal/util"
"github.com/ceph/ceph-csi/internal/util/log"
@ -86,7 +87,7 @@ func checkVolExists(ctx context.Context,
if sID != nil || pvID != nil {
cloneState, cloneStateErr := volOptions.getCloneState(ctx, volumeID(vid.FsSubvolName))
if cloneStateErr != nil {
if errors.Is(cloneStateErr, ErrVolumeNotFound) {
if errors.Is(cloneStateErr, cerrors.ErrVolumeNotFound) {
if pvID != nil {
err = cleanupCloneFromSubvolumeSnapshot(
ctx, volumeID(pvID.FsSubvolName),
@ -105,10 +106,10 @@ func checkVolExists(ctx context.Context,
return nil, err
}
if cloneState == cephFSCloneInprogress {
return nil, ErrCloneInProgress
return nil, cerrors.ErrCloneInProgress
}
if cloneState == cephFSClonePending {
return nil, ErrClonePending
return nil, cerrors.ErrClonePending
}
if cloneState == cephFSCloneFailed {
err = volOptions.purgeVolume(ctx, volumeID(vid.FsSubvolName), true)
@ -137,7 +138,7 @@ func checkVolExists(ctx context.Context,
}
volOptions.RootPath, err = volOptions.getVolumeRootPathCeph(ctx, volumeID(vid.FsSubvolName))
if err != nil {
if errors.Is(err, ErrVolumeNotFound) {
if errors.Is(err, cerrors.ErrVolumeNotFound) {
// If the subvolume is not present, cleanup the stale snapshot
// created for clone.
if parentVolOpt != nil && pvID != nil {
@ -379,7 +380,7 @@ func checkSnapExists(
sid.FsSnapshotName = snapData.ImageAttributes.ImageName
snapInfo, err := volOptions.getSnapshotInfo(ctx, volumeID(snapID), volumeID(parentSubVolName))
if err != nil {
if errors.Is(err, ErrSnapNotFound) {
if errors.Is(err, cerrors.ErrSnapNotFound) {
err = j.UndoReservation(ctx, volOptions.MetadataPool,
volOptions.MetadataPool, snapID, snap.RequestName)

View File

@ -23,6 +23,7 @@ import (
"os"
"strings"
cerrors "github.com/ceph/ceph-csi/internal/cephfs/errors"
csicommon "github.com/ceph/ceph-csi/internal/csi-common"
"github.com/ceph/ceph-csi/internal/util"
"github.com/ceph/ceph-csi/internal/util/log"
@ -90,14 +91,14 @@ func (ns *NodeServer) NodeStageVolume(
volOptions, _, err := newVolumeOptionsFromVolID(ctx, string(volID), req.GetVolumeContext(), req.GetSecrets())
if err != nil {
if !errors.Is(err, ErrInvalidVolID) {
if !errors.Is(err, cerrors.ErrInvalidVolID) {
return nil, status.Error(codes.Internal, err.Error())
}
// gets mon IPs from the supplied cluster info
volOptions, _, err = newVolumeOptionsFromStaticVolume(string(volID), req.GetVolumeContext())
if err != nil {
if !errors.Is(err, ErrNonStaticVolume) {
if !errors.Is(err, cerrors.ErrNonStaticVolume) {
return nil, status.Error(codes.Internal, err.Error())
}

View File

@ -21,6 +21,7 @@ import (
"errors"
"time"
cerrors "github.com/ceph/ceph-csi/internal/cephfs/errors"
"github.com/ceph/ceph-csi/internal/util/log"
"github.com/ceph/go-ceph/cephfs/admin"
@ -105,7 +106,7 @@ func (vo *volumeOptions) getSnapshotInfo(ctx context.Context, snapID, volID volu
info, err := fsa.SubVolumeSnapshotInfo(vo.FsName, vo.SubvolumeGroup, string(volID), string(snapID))
if err != nil {
if errors.Is(err, rados.ErrNotFound) {
return snap, ErrSnapNotFound
return snap, cerrors.ErrSnapNotFound
}
log.ErrorLog(
ctx,
@ -221,7 +222,7 @@ func (vo *volumeOptions) cloneSnapshot(
vo.FsName,
err)
if errors.Is(err, rados.ErrNotFound) {
return ErrVolumeNotFound
return cerrors.ErrVolumeNotFound
}
return err

View File

@ -23,6 +23,7 @@ import (
"path"
"strings"
cerrors "github.com/ceph/ceph-csi/internal/cephfs/errors"
"github.com/ceph/ceph-csi/internal/util"
"github.com/ceph/ceph-csi/internal/util/log"
@ -68,7 +69,7 @@ func (vo *volumeOptions) getVolumeRootPathCeph(ctx context.Context, volID volume
if err != nil {
log.ErrorLog(ctx, "failed to get the rootpath for the vol %s: %s", string(volID), err)
if errors.Is(err, rados.ErrNotFound) {
return "", util.JoinErrors(ErrVolumeNotFound, err)
return "", util.JoinErrors(cerrors.ErrVolumeNotFound, err)
}
return "", err
@ -89,12 +90,12 @@ func (vo *volumeOptions) getSubVolumeInfo(ctx context.Context, volID volumeID) (
if err != nil {
log.ErrorLog(ctx, "failed to get subvolume info for the vol %s: %s", string(volID), err)
if errors.Is(err, rados.ErrNotFound) {
return nil, ErrVolumeNotFound
return nil, cerrors.ErrVolumeNotFound
}
// In case the error is invalid command return error to the caller.
var invalid fsAdmin.NotImplementedError
if errors.As(err, &invalid) {
return nil, ErrInvalidCommand
return nil, cerrors.ErrInvalidCommand
}
return nil, err
@ -249,11 +250,11 @@ func (vo *volumeOptions) purgeVolume(ctx context.Context, volID volumeID, force
err = fsa.RemoveSubVolumeWithFlags(vo.FsName, vo.SubvolumeGroup, string(volID), opt)
if err != nil {
log.ErrorLog(ctx, "failed to purge subvolume %s in fs %s: %s", string(volID), vo.FsName, err)
if strings.Contains(err.Error(), volumeNotEmpty) {
return util.JoinErrors(ErrVolumeHasSnapshots, err)
if strings.Contains(err.Error(), cerrors.VolumeNotEmpty) {
return util.JoinErrors(cerrors.ErrVolumeHasSnapshots, err)
}
if errors.Is(err, rados.ErrNotFound) {
return util.JoinErrors(ErrVolumeNotFound, err)
return util.JoinErrors(cerrors.ErrVolumeNotFound, err)
}
return err

View File

@ -25,6 +25,7 @@ import (
"github.com/container-storage-interface/spec/lib/go/csi"
cerrors "github.com/ceph/ceph-csi/internal/cephfs/errors"
"github.com/ceph/ceph-csi/internal/util"
)
@ -273,7 +274,7 @@ func newVolumeOptionsFromVolID(
if err != nil {
err = fmt.Errorf("error decoding volume ID (%s): %w", volID, err)
return nil, nil, util.JoinErrors(ErrInvalidVolID, err)
return nil, nil, util.JoinErrors(cerrors.ErrInvalidVolID, err)
}
volOptions.ClusterID = vi.ClusterID
vid.VolumeID = volID
@ -360,7 +361,7 @@ func newVolumeOptionsFromVolID(
volOptions.Features = info.Features
}
if errors.Is(err, ErrInvalidCommand) {
if errors.Is(err, cerrors.ErrInvalidCommand) {
volOptions.RootPath, err = volOptions.getVolumeRootPathCeph(ctx, volumeID(vid.FsSubvolName))
}
@ -444,7 +445,7 @@ func newVolumeOptionsFromStaticVolume(
val, ok := options["staticVolume"]
if !ok {
return nil, nil, ErrNonStaticVolume
return nil, nil, cerrors.ErrNonStaticVolume
}
if staticVol, err = strconv.ParseBool(val); err != nil {
@ -452,7 +453,7 @@ func newVolumeOptionsFromStaticVolume(
}
if !staticVol {
return nil, nil, ErrNonStaticVolume
return nil, nil, cerrors.ErrNonStaticVolume
}
// Volume is static, and ProvisionVolume carries bool stating if it was provisioned, hence
@ -512,7 +513,7 @@ func newSnapshotOptionsFromID(
// Decode the snapID first, to detect pre-provisioned snapshot before other errors
err := vi.DecomposeCSIID(snapID)
if err != nil {
return &volOptions, nil, &sid, ErrInvalidVolID
return &volOptions, nil, &sid, cerrors.ErrInvalidVolID
}
volOptions.ClusterID = vi.ClusterID
sid.SnapshotID = snapID