rbd: simplify error handling

This change replaces the sentinel errors in rbd module with
standard errors created with errors.New().

Related: #1203

Signed-off-by: Sven Anderson <sven@redhat.com>
This commit is contained in:
Sven Anderson 2020-07-10 03:05:42 +02:00 committed by mergify[bot]
parent dba2c27bcb
commit 92884f56f4
7 changed files with 61 additions and 167 deletions

View File

@ -51,15 +51,13 @@ func (rv *rbdVolume) checkCloneImage(ctx context.Context, parentVol *rbdVolume)
RbdSnapName: rv.RbdImageName,
Pool: rv.Pool,
}
var einf ErrImageNotFound
var esnf ErrSnapNotFound
// check if cloned image exists
err := rv.getImageInfo()
if err == nil {
// check if do we have temporary snapshot on temporary cloned image
sErr := tempClone.checkSnapExists(snap)
if sErr != nil {
if errors.As(err, &esnf) {
if errors.Is(err, ErrSnapNotFound) {
return true, nil
}
return false, err
@ -70,14 +68,14 @@ func (rv *rbdVolume) checkCloneImage(ctx context.Context, parentVol *rbdVolume)
}
return false, err
}
if !errors.As(err, &einf) {
if !errors.Is(err, ErrImageNotFound) {
// return error if its not image not found
return false, err
}
err = tempClone.checkSnapExists(snap)
if err != nil {
if errors.As(err, &esnf) {
if errors.Is(err, ErrSnapNotFound) {
// check temporary image needs flatten, if yes add task to flatten the
// temporary clone
err = tempClone.flattenRbdImage(ctx, rv.conn.Creds, false, rbdHardMaxCloneDepth, rbdSoftMaxCloneDepth)
@ -91,7 +89,7 @@ func (rv *rbdVolume) checkCloneImage(ctx context.Context, parentVol *rbdVolume)
return false, err
}
return true, nil
} else if !errors.As(err, &einf) {
} else if !errors.Is(err, ErrImageNotFound) {
// any error other than image not found return error
return false, err
}
@ -122,7 +120,7 @@ func (rv *rbdVolume) checkCloneImage(ctx context.Context, parentVol *rbdVolume)
// create new resources for clearner approach
err = parentVol.deleteSnapshot(ctx, snap)
}
if errors.As(err, &esnf) {
if errors.Is(err, ErrSnapNotFound) {
return false, nil
}
return false, err
@ -164,7 +162,6 @@ func (rv *rbdVolume) createCloneFromImage(ctx context.Context, parentVol *rbdVol
errFlatten error
err error
)
var efip ErrFlattenInProgress
var j = &journal.Connection{}
j, err = volJournal.Connect(rv.Monitors, rv.conn.Creds)
@ -181,7 +178,7 @@ func (rv *rbdVolume) createCloneFromImage(ctx context.Context, parentVol *rbdVol
defer func() {
if err != nil || errFlatten != nil {
if !errors.Is(errFlatten, &efip) {
if !errors.Is(errFlatten, ErrFlattenInProgress) {
// cleanup snapshot
cErr := cleanUpSnapshot(ctx, parentVol, tempSnap, tempClone, rv.conn.Creds)
if cErr != nil {
@ -247,8 +244,7 @@ func (rv *rbdVolume) flattenCloneImage(ctx context.Context) error {
return tempClone.flattenRbdImage(ctx, tempClone.conn.Creds, false, hardLimit, softLimit)
}
if err != nil {
var einf ErrImageNotFound
if !errors.As(err, &einf) {
if !errors.Is(err, ErrImageNotFound) {
return err
}
}

View File

@ -167,12 +167,10 @@ func buildCreateVolumeResponse(ctx context.Context, req *csi.CreateVolumeRequest
// the input error types it expected to use only for CreateVolume as we need to
// return different GRPC codes for different functions based on the input.
func getGRPCErrorForCreateVolume(err error) error {
var evnc ErrVolNameConflict
if errors.As(err, &evnc) {
if errors.Is(err, ErrVolNameConflict) {
return status.Error(codes.AlreadyExists, err.Error())
}
var efip ErrFlattenInProgress
if errors.As(err, &efip) {
if errors.Is(err, ErrFlattenInProgress) {
return status.Error(codes.Aborted, err.Error())
}
return status.Error(codes.Internal, err.Error())
@ -273,8 +271,7 @@ func (cs *ControllerServer) CreateVolume(ctx context.Context, req *csi.CreateVol
}
defer func() {
if err != nil {
var efip ErrFlattenInProgress
if !errors.As(err, &efip) {
if !errors.Is(err, ErrFlattenInProgress) {
errDefer := undoVolReservation(ctx, rbdVol, cr)
if errDefer != nil {
klog.Warningf(util.Log(ctx, "failed undoing reservation of volume: %s (%s)"), req.GetName(), errDefer)
@ -285,8 +282,7 @@ func (cs *ControllerServer) CreateVolume(ctx context.Context, req *csi.CreateVol
err = cs.createBackingImage(ctx, cr, rbdVol, parentVol, rbdSnap)
if err != nil {
var efip ErrFlattenInProgress
if errors.As(err, &efip) {
if errors.Is(err, ErrFlattenInProgress) {
return nil, status.Error(codes.Aborted, err.Error())
}
return nil, err
@ -340,8 +336,7 @@ func flattenParentImage(ctx context.Context, rbdVol *rbdVolume, cr *util.Credent
func flattenTemporaryClonedImages(ctx context.Context, rbdVol *rbdVolume, cr *util.Credentials) error {
snaps, err := rbdVol.listSnapshots(ctx, cr)
if err != nil {
var einf ErrImageNotFound
if errors.As(err, &einf) {
if errors.Is(err, ErrImageNotFound) {
return status.Error(codes.InvalidArgument, err.Error())
}
return status.Error(codes.Internal, err.Error())
@ -364,8 +359,7 @@ func flattenTemporaryClonedImages(ctx context.Context, rbdVol *rbdVolume, cr *ut
func checkFlatten(ctx context.Context, rbdVol *rbdVolume, cr *util.Credentials) error {
err := rbdVol.flattenRbdImage(ctx, cr, false, rbdHardMaxCloneDepth, rbdSoftMaxCloneDepth)
if err != nil {
var efip ErrFlattenInProgress
if errors.As(err, &efip) {
if errors.Is(err, ErrFlattenInProgress) {
return status.Error(codes.Aborted, err.Error())
}
if errDefer := deleteImage(ctx, rbdVol, cr); errDefer != nil {
@ -453,8 +447,7 @@ func (cs *ControllerServer) createBackingImage(ctx context.Context, cr *util.Cre
defer func() {
if err != nil {
var efip ErrFlattenInProgress
if !errors.As(err, &efip) {
if !errors.Is(err, ErrFlattenInProgress) {
if deleteErr := deleteImage(ctx, rbdVol, cr); deleteErr != nil {
klog.Errorf(util.Log(ctx, "failed to delete rbd image: %s with error: %v"), rbdVol, deleteErr)
}
@ -509,8 +502,7 @@ func checkContentSource(ctx context.Context, req *csi.CreateVolumeRequest, cr *u
rbdSnap := &rbdSnapshot{}
if err := genSnapFromSnapID(ctx, rbdSnap, snapshotID, cr); err != nil {
klog.Errorf(util.Log(ctx, "failed to get backend snapshot for %s: %v"), snapshotID, err)
var esnf ErrSnapNotFound
if !errors.As(err, &esnf) {
if !errors.Is(err, ErrSnapNotFound) {
return nil, nil, status.Error(codes.Internal, err.Error())
}
return nil, nil, status.Errorf(codes.NotFound, "%s snapshot doesnot exists", snapshotID)
@ -529,8 +521,7 @@ func checkContentSource(ctx context.Context, req *csi.CreateVolumeRequest, cr *u
rbdvol, err := genVolFromVolID(ctx, volID, cr, nil)
if err != nil {
klog.Errorf(util.Log(ctx, "failed to get backend image for %s: %v"), volID, err)
var esnf ErrImageNotFound
if !errors.As(err, &esnf) {
if !errors.Is(err, ErrImageNotFound) {
return nil, nil, status.Error(codes.Internal, err.Error())
}
return nil, nil, status.Errorf(codes.NotFound, "%s image doesnot exists", volID)
@ -594,8 +585,7 @@ func (cs *ControllerServer) DeleteVolume(ctx context.Context, req *csi.DeleteVol
}
// All errors other than ErrImageNotFound should return an error back to the caller
var einf ErrImageNotFound
if !errors.As(err, &einf) {
if !errors.Is(err, ErrImageNotFound) {
return nil, status.Error(codes.Internal, err.Error())
}
@ -635,12 +625,11 @@ func (cs *ControllerServer) DeleteVolume(ctx context.Context, req *csi.DeleteVol
// delete the temporary rbd image created as part of volume clone during
// create volume
var einf ErrImageNotFound
tempClone := rbdVol.generateTempClone()
err = deleteImage(ctx, tempClone, cr)
if err != nil {
// return error if it is not ErrImageNotFound
if !errors.As(err, &einf) {
if !errors.Is(err, ErrImageNotFound) {
klog.Errorf(util.Log(ctx, "failed to delete rbd image: %s with error: %v"),
tempClone, err)
return nil, status.Error(codes.Internal, err.Error())
@ -713,9 +702,8 @@ func (cs *ControllerServer) CreateSnapshot(ctx context.Context, req *csi.CreateS
// Fetch source volume information
rbdVol, err = genVolFromVolID(ctx, req.GetSourceVolumeId(), cr, req.GetSecrets())
if err != nil {
var einf ErrImageNotFound
// nolint:gocritic // this ifElseChain can not be rewritten to a switch statement
if errors.As(err, &einf) {
if errors.Is(err, ErrImageNotFound) {
err = status.Errorf(codes.NotFound, "source Volume ID %s not found", req.GetSourceVolumeId())
} else if errors.Is(err, util.ErrPoolNotFound) {
klog.Errorf(util.Log(ctx, "failed to get backend volume for %s: %v"), req.GetSourceVolumeId(), err)
@ -779,8 +767,7 @@ func (cs *ControllerServer) CreateSnapshot(ctx context.Context, req *csi.CreateS
defer vol.Destroy()
err = vol.flattenRbdImage(ctx, cr, false, rbdHardMaxCloneDepth, rbdSoftMaxCloneDepth)
var efip ErrFlattenInProgress
if errors.As(err, &efip) {
if errors.Is(err, ErrFlattenInProgress) {
return &csi.CreateSnapshotResponse{
Snapshot: &csi.Snapshot{
SizeBytes: rbdSnap.SizeBytes,
@ -891,8 +878,7 @@ func (cs *ControllerServer) doSnapshotClone(ctx context.Context, parentVol *rbdV
defer func() {
if err != nil {
var efip ErrFlattenInProgress
if !errors.As(err, &efip) {
if !errors.Is(err, ErrFlattenInProgress) {
// cleanup clone and snapshot
errCleanUp := cleanUpSnapshot(ctx, cloneRbd, rbdSnap, cloneRbd, cr)
if errCleanUp != nil {
@ -932,8 +918,7 @@ func (cs *ControllerServer) doSnapshotClone(ctx context.Context, parentVol *rbdV
err = cloneRbd.flattenRbdImage(ctx, cr, false, rbdHardMaxCloneDepth, rbdSoftMaxCloneDepth)
if err != nil {
var efip ErrFlattenInProgress
if errors.As(err, &efip) {
if errors.Is(err, ErrFlattenInProgress) {
return ready, cloneRbd, nil
}
return ready, cloneRbd, err
@ -1014,8 +999,7 @@ func (cs *ControllerServer) DeleteSnapshot(ctx context.Context, req *csi.DeleteS
err = rbdVol.getImageInfo()
if err != nil {
var einf ErrImageNotFound
if !errors.As(err, &einf) {
if !errors.Is(err, ErrImageNotFound) {
klog.Errorf(util.Log(ctx, "failed to delete rbd image: %s/%s with error: %v"), rbdVol.Pool, rbdVol.VolName, err)
return nil, status.Error(codes.Internal, err.Error())
}
@ -1081,9 +1065,8 @@ func (cs *ControllerServer) ControllerExpandVolume(ctx context.Context, req *csi
rbdVol, err = genVolFromVolID(ctx, volID, cr, req.GetSecrets())
if err != nil {
var einf ErrImageNotFound
// nolint:gocritic // this ifElseChain can not be rewritten to a switch statement
if errors.As(err, &einf) {
if errors.Is(err, ErrImageNotFound) {
err = status.Errorf(codes.NotFound, "volume ID %s not found", volID)
} else if errors.Is(err, util.ErrPoolNotFound) {
klog.Errorf(util.Log(ctx, "failed to get backend volume for %s: %v"), volID, err)

View File

@ -16,98 +16,22 @@ limitations under the License.
package rbd
// ErrImageNotFound is returned when image name is not found in the cluster on the given pool.
type ErrImageNotFound struct {
imageName string
err error
}
import "errors"
// Error returns a user presentable string of the error.
func (e ErrImageNotFound) Error() string {
return e.err.Error()
}
// Unwrap returns the encapsulated error of ErrImageNotFound.
func (e ErrImageNotFound) Unwrap() error {
return e.err
}
// ErrSnapNotFound is returned when snap name passed is not found in the list of snapshots for the
// given image.
type ErrSnapNotFound struct {
snapName string
err error
}
// Error returns a user presentable string of the error.
func (e ErrSnapNotFound) Error() string {
return e.err.Error()
}
// Unwrap returns the encapsulated error of ErrSnapNotFound.
func (e ErrSnapNotFound) Unwrap() error {
return e.err
}
// ErrVolNameConflict is generated when a requested CSI volume name already exists on RBD but with
// different properties, and hence is in conflict with the passed in CSI volume name.
type ErrVolNameConflict struct {
requestName string
err error
}
// Error returns a user presentable string of the error.
func (e ErrVolNameConflict) Error() string {
return e.err.Error()
}
// Unwrap returns the encapsulated error of ErrVolNameConflict.
func (e ErrVolNameConflict) Unwrap() error {
return e.err
}
// ErrInvalidVolID is returned when a CSI passed VolumeID does not conform to any known volume ID
// formats.
type ErrInvalidVolID struct {
err error
}
// Error returns a user presentable string of the error.
func (e ErrInvalidVolID) Error() string {
return e.err.Error()
}
// Unwrap returns the encapsulated error of ErrInvalidVolID.
func (e ErrInvalidVolID) Unwrap() error {
return e.err
}
// ErrMissingStash is returned when the image metadata stash file is not found.
type ErrMissingStash struct {
err error
}
// Error returns a user presentable string of the error.
func (e ErrMissingStash) Error() string {
return e.err.Error()
}
// Unwrap returns the encapsulated error of ErrMissingStash.
func (e ErrMissingStash) Unwrap() error {
return e.err
}
// ErrFlattenInProgress is returned when flatten is inprogess for an image.
type ErrFlattenInProgress struct {
err error
}
// Error returns a user presentable string of the error.
func (e ErrFlattenInProgress) Error() string {
return e.err.Error()
}
// Unwrap returns the encapsulated error of ErrFlattenInProgress.
func (e ErrFlattenInProgress) Unwrap() error {
return e.err
}
var (
// ErrImageNotFound is returned when image name is not found in the cluster on the given pool.
ErrImageNotFound = errors.New("image not found")
// ErrSnapNotFound is returned when snap name passed is not found in the list of snapshots for the
// given image.
ErrSnapNotFound = errors.New("snapshot not found")
// ErrVolNameConflict is generated when a requested CSI volume name already exists on RBD but with
// different properties, and hence is in conflict with the passed in CSI volume name.
ErrVolNameConflict = errors.New("volume name conflict")
// ErrInvalidVolID is returned when a CSI passed VolumeID does not conform to any known volume ID
// formats.
ErrInvalidVolID = errors.New("invalid VolumeID")
// ErrMissingStash is returned when the image metadata stash file is not found.
ErrMissingStash = errors.New("missing stash")
// ErrFlattenInProgress is returned when flatten is in progess for an image.
ErrFlattenInProgress = errors.New("flatten in progress")
)

View File

@ -654,8 +654,7 @@ func (ns *NodeServer) NodeUnstageVolume(ctx context.Context, req *csi.NodeUnstag
}
// If not mounted, and error is anything other than metadata file missing, it is an error
var ems ErrMissingStash
if !errors.As(err, &ems) {
if !errors.Is(err, ErrMissingStash) {
return nil, status.Error(codes.Internal, err.Error())
}

View File

@ -151,12 +151,10 @@ func checkSnapCloneExists(ctx context.Context, parentVol *rbdVolume, rbdSnap *rb
// Fetch on-disk image attributes
err = vol.getImageInfo()
if err != nil {
var einf ErrImageNotFound
if errors.As(err, &einf) {
if errors.Is(err, ErrImageNotFound) {
err = parentVol.deleteSnapshot(ctx, rbdSnap)
if err != nil {
var esnf ErrSnapNotFound
if !errors.As(err, &esnf) {
if !errors.Is(err, ErrSnapNotFound) {
klog.Errorf(util.Log(ctx, "failed to delete snapshot %s: %v"), rbdSnap, err)
return false, err
}
@ -182,8 +180,7 @@ func checkSnapCloneExists(ctx context.Context, parentVol *rbdVolume, rbdSnap *rb
// check snapshot exists if not create it
err = vol.checkSnapExists(rbdSnap)
var esnf ErrSnapNotFound
if errors.As(err, &esnf) {
if errors.Is(err, ErrSnapNotFound) {
// create snapshot
sErr := vol.createSnapshot(ctx, rbdSnap)
if sErr != nil {
@ -281,8 +278,7 @@ func (rv *rbdVolume) Exists(ctx context.Context, parentVol *rbdVolume) (bool, er
// Fetch on-disk image attributes and compare against request
err = rv.getImageInfo()
if err != nil {
var einf ErrImageNotFound
if errors.As(err, &einf) {
if errors.Is(err, ErrImageNotFound) {
// Need to check cloned info here not on createvolume,
if parentVol != nil {
found, cErr := rv.checkCloneImage(ctx, parentVol)
@ -319,9 +315,8 @@ func (rv *rbdVolume) Exists(ctx context.Context, parentVol *rbdVolume) (bool, er
// size checks
if rv.VolSize < requestSize {
err = fmt.Errorf("image with the same name (%s) but with different size already exists",
rv.RbdImageName)
return false, ErrVolNameConflict{rv.RbdImageName, err}
return false, fmt.Errorf("%w: image with the same name (%s) but with different size already exists",
ErrVolNameConflict, rv.RbdImageName)
}
// TODO: We should also ensure image features and format is the same

View File

@ -268,7 +268,7 @@ func (rv *rbdVolume) open() (*librbd.Image, error) {
image, err := librbd.OpenImage(rv.ioctx, rv.RbdImageName, librbd.NoSnapshot)
if err != nil {
if errors.Is(err, librbd.ErrNotFound) {
err = ErrImageNotFound{rv.RbdImageName, err}
err = util.JoinErrors(ErrImageNotFound, err)
}
return nil, err
}
@ -408,8 +408,7 @@ func (rv *rbdVolume) getCloneDepth(ctx context.Context) (uint, error) {
// if the parent image is moved to trash the name will be present
// in rbd image info but the image will be in trash, in that case
// return the found depth
var einf ErrImageNotFound
if errors.As(err, &einf) {
if errors.Is(err, ErrImageNotFound) {
return depth, nil
}
klog.Errorf(util.Log(ctx, "failed to check depth on image %s: %s"), vol, err)
@ -468,7 +467,7 @@ func (rv *rbdVolume) flattenRbdImage(ctx context.Context, cr *util.Credentials,
return err
}
if forceFlatten || depth >= hardlimit {
return ErrFlattenInProgress{err: fmt.Errorf("flatten is in progress for image %s", rv.RbdImageName)}
return fmt.Errorf("%w: flatten is in progress for image %s", ErrFlattenInProgress, rv.RbdImageName)
}
}
if !supported {
@ -601,8 +600,8 @@ func genVolFromVolID(ctx context.Context, volumeID string, cr *util.Credentials,
err := vi.DecomposeCSIID(rbdVol.VolID)
if err != nil {
err = fmt.Errorf("error decoding volume ID (%s) (%s)", err, rbdVol.VolID)
return rbdVol, ErrInvalidVolID{err}
return rbdVol, fmt.Errorf("%w: error decoding volume ID (%s) (%s)",
ErrInvalidVolID, err, rbdVol.VolID)
}
rbdVol.ClusterID = vi.ClusterID
@ -822,7 +821,7 @@ func (rv *rbdVolume) deleteSnapshot(ctx context.Context, pOpts *rbdSnapshot) err
}
err = snap.Remove()
if errors.Is(err, librbd.ErrNotFound) {
return ErrSnapNotFound{snapName: pOpts.RbdSnapName, err: err}
return util.JoinErrors(ErrSnapNotFound, err)
}
return err
}
@ -904,7 +903,7 @@ func (rv *rbdVolume) updateVolWithImageInfo(cr *util.Credentials) error {
klog.Errorf("failed getting information for image (%s): (%s)", rv, err)
if strings.Contains(string(stderr), "rbd: error opening image "+rv.RbdImageName+
": (2) No such file or directory") {
return ErrImageNotFound{rv.String(), err}
return util.JoinErrors(ErrImageNotFound, err)
}
return err
}
@ -979,7 +978,7 @@ func (rv *rbdVolume) checkSnapExists(rbdSnap *rbdSnapshot) error {
}
}
return ErrSnapNotFound{rbdSnap.RbdSnapName, fmt.Errorf("snap %s not found", rbdSnap.String())}
return fmt.Errorf("%w: snap %s not found", ErrSnapNotFound, rbdSnap.String())
}
// rbdImageMetadataStash strongly typed JSON spec for stashed RBD image metadata.
@ -1040,7 +1039,7 @@ func lookupRBDImageMetadataStash(path string) (rbdImageMetadataStash, error) {
return imgMeta, fmt.Errorf("failed to read stashed JSON image metadata from path (%s): (%v)", fPath, err)
}
return imgMeta, ErrMissingStash{err}
return imgMeta, util.JoinErrors(ErrMissingStash, err)
}
err = json.Unmarshal(encodedBytes, &imgMeta)
@ -1150,7 +1149,7 @@ func (rv *rbdVolume) listSnapshots(ctx context.Context, cr *util.Credentials) ([
klog.Errorf(util.Log(ctx, "failed getting information for image (%s): (%s)"), rv, err)
if strings.Contains(string(stderr), "rbd: error opening image "+rv.RbdImageName+
": (2) No such file or directory") {
return snapInfo, ErrImageNotFound{rv.String(), err}
return snapInfo, util.JoinErrors(ErrImageNotFound, err)
}
return snapInfo, err
}

View File

@ -66,16 +66,14 @@ func createRBDClone(ctx context.Context, parentVol, cloneRbdVol *rbdVolume, snap
func cleanUpSnapshot(ctx context.Context, parentVol *rbdVolume, rbdSnap *rbdSnapshot, rbdVol *rbdVolume, cr *util.Credentials) error {
err := parentVol.deleteSnapshot(ctx, rbdSnap)
if err != nil {
var esnf ErrSnapNotFound
if !errors.As(err, &esnf) {
if !errors.Is(err, ErrSnapNotFound) {
klog.Errorf(util.Log(ctx, "failed to delete snapshot: %v"), err)
return err
}
}
err = deleteImage(ctx, rbdVol, cr)
if err != nil {
var einf ErrImageNotFound
if !errors.As(err, &einf) {
if !errors.Is(err, ErrImageNotFound) {
klog.Errorf(util.Log(ctx, "failed to delete rbd image: %s/%s with error: %v"), rbdVol.Pool, rbdVol.VolName, err)
return err
}