cleanup: move internal/rbd/errors.go to internal/rbd/errors pacakge

Signed-off-by: Praveen M <m.praveen@ibm.com>
This commit is contained in:
Praveen M 2025-03-03 11:45:43 +05:30 committed by mergify[bot]
parent 9db0600941
commit 5cbc14454a
24 changed files with 251 additions and 155 deletions

View File

@ -21,6 +21,7 @@ import (
"errors"
"github.com/ceph/ceph-csi/internal/rbd"
rbderrors "github.com/ceph/ceph-csi/internal/rbd/errors"
"github.com/ceph/ceph-csi/internal/util"
"github.com/ceph/ceph-csi/internal/util/log"
@ -68,7 +69,7 @@ func (ekrs *EncryptionKeyRotationServer) EncryptionKeyRotate(
rbdVol, err := mgr.GetVolumeByID(ctx, volID)
if err != nil {
switch {
case errors.Is(err, util.ErrImageNotFound):
case errors.Is(err, rbderrors.ErrImageNotFound):
err = status.Errorf(codes.NotFound, "volume ID %s not found", volID)
case errors.Is(err, util.ErrPoolNotFound):
log.ErrorLog(ctx, "failed to get backend volume for %s: %v", volID, err)

View File

@ -23,6 +23,7 @@ import (
csicommon "github.com/ceph/ceph-csi/internal/csi-common"
rbdutil "github.com/ceph/ceph-csi/internal/rbd"
rbderrors "github.com/ceph/ceph-csi/internal/rbd/errors"
"github.com/ceph/ceph-csi/internal/util"
"github.com/ceph/ceph-csi/internal/util/log"
@ -84,7 +85,7 @@ func (rscs *ReclaimSpaceControllerServer) ControllerReclaimSpace(
defer rbdVol.Destroy(ctx)
err = rbdVol.Sparsify(ctx)
if errors.Is(err, rbdutil.ErrImageInUse) {
if errors.Is(err, rbderrors.ErrImageInUse) {
// FIXME: https://github.com/csi-addons/kubernetes-csi-addons/issues/406.
// treat sparsify call as no-op if volume is in use.
log.DebugLog(ctx, fmt.Sprintf("volume with ID %q is in use, skipping sparsify operation", volumeID))

View File

@ -29,6 +29,7 @@ import (
csicommon "github.com/ceph/ceph-csi/internal/csi-common"
"github.com/ceph/ceph-csi/internal/rbd"
corerbd "github.com/ceph/ceph-csi/internal/rbd"
rbderrors "github.com/ceph/ceph-csi/internal/rbd/errors"
"github.com/ceph/ceph-csi/internal/rbd/types"
"github.com/ceph/ceph-csi/internal/util"
"github.com/ceph/ceph-csi/internal/util/log"
@ -651,7 +652,7 @@ func (rs *ReplicationServer) ResyncVolume(ctx context.Context,
sts, err := mirror.GetGlobalMirroringStatus(ctx)
if err != nil {
// the image gets recreated after issuing resync
if errors.Is(err, util.ErrImageNotFound) {
if errors.Is(err, rbderrors.ErrImageNotFound) {
// caller retries till RBD syncs an initial version of the image to
// report its status in the resync call. Ideally, this line will not
// be executed as the error would get returned due to getMirroringInfo
@ -785,13 +786,13 @@ func getGRPCError(err error) error {
}
errorStatusMap := map[error]codes.Code{
util.ErrImageNotFound: codes.NotFound,
util.ErrPoolNotFound: codes.NotFound,
corerbd.ErrInvalidArgument: codes.InvalidArgument,
corerbd.ErrFlattenInProgress: codes.Aborted,
corerbd.ErrAborted: codes.Aborted,
corerbd.ErrFailedPrecondition: codes.FailedPrecondition,
corerbd.ErrUnavailable: codes.Unavailable,
rbderrors.ErrImageNotFound: codes.NotFound,
util.ErrPoolNotFound: codes.NotFound,
rbderrors.ErrInvalidArgument: codes.InvalidArgument,
rbderrors.ErrFlattenInProgress: codes.Aborted,
rbderrors.ErrAborted: codes.Aborted,
rbderrors.ErrFailedPrecondition: codes.FailedPrecondition,
rbderrors.ErrUnavailable: codes.Unavailable,
}
for e, code := range errorStatusMap {
@ -835,7 +836,7 @@ func (rs *ReplicationServer) GetVolumeReplicationInfo(ctx context.Context,
log.ErrorLog(ctx, "failed to get volume with id %q: %v", volumeID, err)
switch {
case errors.Is(err, util.ErrImageNotFound):
case errors.Is(err, rbderrors.ErrImageNotFound):
err = status.Error(codes.NotFound, err.Error())
case errors.Is(err, util.ErrPoolNotFound):
err = status.Error(codes.NotFound, err.Error())
@ -872,7 +873,7 @@ func (rs *ReplicationServer) GetVolumeReplicationInfo(ctx context.Context,
if err != nil {
log.ErrorLog(ctx, "failed to get status for mirror %q: %v", mirror, err)
if errors.Is(err, util.ErrImageNotFound) {
if errors.Is(err, rbderrors.ErrImageNotFound) {
return nil, status.Error(codes.Aborted, err.Error())
}
@ -895,7 +896,7 @@ func (rs *ReplicationServer) GetVolumeReplicationInfo(ctx context.Context,
if err != nil {
log.ErrorLog(ctx, "failed to parse last sync info from %q: %v", description, err)
if errors.Is(err, corerbd.ErrLastSyncTimeNotFound) {
if errors.Is(err, rbderrors.ErrLastSyncTimeNotFound) {
return nil, status.Errorf(codes.NotFound, "failed to get last sync info: %v", err)
}
@ -924,12 +925,12 @@ func getLastSyncInfo(ctx context.Context, description string) (*replication.GetV
var response replication.GetVolumeReplicationInfoResponse
if description == "" {
return nil, fmt.Errorf("empty description: %w", corerbd.ErrLastSyncTimeNotFound)
return nil, fmt.Errorf("empty description: %w", rbderrors.ErrLastSyncTimeNotFound)
}
log.DebugLog(ctx, "description: %s", description)
splittedString := strings.SplitN(description, ",", 2)
if len(splittedString) == 1 {
return nil, fmt.Errorf("no snapshot details: %w", corerbd.ErrLastSyncTimeNotFound)
return nil, fmt.Errorf("no snapshot details: %w", rbderrors.ErrLastSyncTimeNotFound)
}
type localStatus struct {
LocalSnapshotTime int64 `json:"local_snapshot_timestamp"`
@ -946,7 +947,7 @@ func getLastSyncInfo(ctx context.Context, description string) (*replication.GetV
// If the json unmarsal is successful but the local snapshot time is 0, we
// need to consider it as an error as the LastSyncTime is required.
if localSnapInfo.LocalSnapshotTime == 0 {
return nil, fmt.Errorf("empty local snapshot timestamp: %w", corerbd.ErrLastSyncTimeNotFound)
return nil, fmt.Errorf("empty local snapshot timestamp: %w", rbderrors.ErrLastSyncTimeNotFound)
}
if localSnapInfo.LastSnapshotDuration != nil {
// converts localSnapshotDuration of type int64 to string format with

View File

@ -26,6 +26,7 @@ import (
"time"
corerbd "github.com/ceph/ceph-csi/internal/rbd"
rbderrors "github.com/ceph/ceph-csi/internal/rbd/errors"
"github.com/ceph/ceph-csi/internal/rbd/types"
"github.com/ceph/ceph-csi/internal/util"
@ -450,7 +451,7 @@ func TestValidateLastSyncInfo(t *testing.T) {
LastSyncDuration: nil,
LastSyncBytes: 0,
},
expectedErr: corerbd.ErrLastSyncTimeNotFound.Error(),
expectedErr: rbderrors.ErrLastSyncTimeNotFound.Error(),
},
{
name: "description without last_snapshot_bytes",
@ -472,7 +473,7 @@ func TestValidateLastSyncInfo(t *testing.T) {
LastSyncTime: nil,
LastSyncBytes: 0,
},
expectedErr: corerbd.ErrLastSyncTimeNotFound.Error(),
expectedErr: rbderrors.ErrLastSyncTimeNotFound.Error(),
},
{
name: "description without last_snapshot_sync_seconds",
@ -516,7 +517,7 @@ func TestValidateLastSyncInfo(t *testing.T) {
LastSyncTime: nil,
LastSyncBytes: 0,
},
expectedErr: corerbd.ErrLastSyncTimeNotFound.Error(),
expectedErr: rbderrors.ErrLastSyncTimeNotFound.Error(),
},
}
for _, tt := range tests {
@ -567,23 +568,23 @@ func TestGetGRPCError(t *testing.T) {
}{
{
name: "InvalidArgument",
err: corerbd.ErrInvalidArgument,
expectedErr: status.Error(codes.InvalidArgument, corerbd.ErrInvalidArgument.Error()),
err: rbderrors.ErrInvalidArgument,
expectedErr: status.Error(codes.InvalidArgument, rbderrors.ErrInvalidArgument.Error()),
},
{
name: "Aborted",
err: corerbd.ErrAborted,
expectedErr: status.Error(codes.Aborted, corerbd.ErrAborted.Error()),
err: rbderrors.ErrAborted,
expectedErr: status.Error(codes.Aborted, rbderrors.ErrAborted.Error()),
},
{
name: "FailedPrecondition",
err: corerbd.ErrFailedPrecondition,
expectedErr: status.Error(codes.FailedPrecondition, corerbd.ErrFailedPrecondition.Error()),
err: rbderrors.ErrFailedPrecondition,
expectedErr: status.Error(codes.FailedPrecondition, rbderrors.ErrFailedPrecondition.Error()),
},
{
name: "Unavailable",
err: corerbd.ErrUnavailable,
expectedErr: status.Error(codes.Unavailable, corerbd.ErrUnavailable.Error()),
err: rbderrors.ErrUnavailable,
expectedErr: status.Error(codes.Unavailable, rbderrors.ErrUnavailable.Error()),
},
{
name: "InvalidError",
@ -597,8 +598,8 @@ func TestGetGRPCError(t *testing.T) {
},
{
name: "ErrImageNotFound",
err: util.ErrImageNotFound,
expectedErr: status.Error(codes.NotFound, util.ErrImageNotFound.Error()),
err: rbderrors.ErrImageNotFound,
expectedErr: status.Error(codes.NotFound, rbderrors.ErrImageNotFound.Error()),
},
{
name: "ErrPoolNotFound",

View File

@ -23,7 +23,7 @@ import (
"slices"
"github.com/ceph/ceph-csi/internal/rbd"
"github.com/ceph/ceph-csi/internal/rbd/group"
rbderrors "github.com/ceph/ceph-csi/internal/rbd/errors"
"github.com/ceph/ceph-csi/internal/rbd/types"
"github.com/ceph/ceph-csi/internal/util/log"
@ -194,7 +194,7 @@ func (vs *VolumeGroupServer) DeleteVolumeGroup(
// resolve the volume group
vg, err := mgr.GetVolumeGroupByID(ctx, req.GetVolumeGroupId())
if err != nil {
if errors.Is(err, group.ErrRBDGroupNotFound) {
if errors.Is(err, rbderrors.ErrGroupNotFound) {
log.ErrorLog(ctx, "VolumeGroup %q doesn't exists", req.GetVolumeGroupId())
return &volumegroup.DeleteVolumeGroupResponse{}, nil
@ -433,7 +433,7 @@ func (vs *VolumeGroupServer) ControllerGetVolumeGroup(
// resolve the volume group
vg, err := mgr.GetVolumeGroupByID(ctx, req.GetVolumeGroupId())
if err != nil {
if errors.Is(err, group.ErrRBDGroupNotFound) {
if errors.Is(err, rbderrors.ErrGroupNotFound) {
log.ErrorLog(ctx, "VolumeGroup %q doesn't exists", req.GetVolumeGroupId())
return nil, status.Errorf(

View File

@ -22,7 +22,7 @@ import (
"fmt"
"strings"
"github.com/ceph/ceph-csi/internal/util"
rbderrors "github.com/ceph/ceph-csi/internal/rbd/errors"
"github.com/ceph/ceph-csi/internal/util/k8s"
"github.com/ceph/ceph-csi/internal/util/log"
@ -56,7 +56,7 @@ func (rv *rbdVolume) checkCloneImage(ctx context.Context, parentVol *rbdVolume)
err := tempClone.checkSnapExists(snap)
if err != nil {
switch {
case errors.Is(err, ErrSnapNotFound):
case errors.Is(err, rbderrors.ErrSnapNotFound):
// as the snapshot is not present, create new snapshot, clone and
// don't delete the temporary snapshot
err = createRBDClone(ctx, tempClone, rv, snap, false)
@ -66,7 +66,7 @@ func (rv *rbdVolume) checkCloneImage(ctx context.Context, parentVol *rbdVolume)
return true, nil
case errors.Is(err, util.ErrImageNotFound):
case errors.Is(err, rbderrors.ErrImageNotFound):
// as the temp clone does not exist,check snapshot exists on parent volume
// snapshot name is same as temporary clone image
snap.RbdImageName = tempClone.RbdImageName
@ -76,7 +76,7 @@ func (rv *rbdVolume) checkCloneImage(ctx context.Context, parentVol *rbdVolume)
// create new resources for a cleaner approach
err = parentVol.deleteSnapshot(ctx, snap)
}
if errors.Is(err, ErrSnapNotFound) {
if errors.Is(err, rbderrors.ErrSnapNotFound) {
return false, nil
}

View File

@ -23,6 +23,7 @@ import (
"strconv"
csicommon "github.com/ceph/ceph-csi/internal/csi-common"
rbderrors "github.com/ceph/ceph-csi/internal/rbd/errors"
"github.com/ceph/ceph-csi/internal/util"
"github.com/ceph/ceph-csi/internal/util/k8s"
"github.com/ceph/ceph-csi/internal/util/log"
@ -316,10 +317,10 @@ func buildCreateVolumeResponse(
// the input error types it expected to use only for CreateVolume as we need to
// return different GRPC codes for different functions based on the input.
func getGRPCErrorForCreateVolume(err error) error {
if errors.Is(err, ErrVolNameConflict) {
if errors.Is(err, rbderrors.ErrVolNameConflict) {
return status.Error(codes.AlreadyExists, err.Error())
}
if errors.Is(err, ErrFlattenInProgress) {
if errors.Is(err, rbderrors.ErrFlattenInProgress) {
return status.Error(codes.Aborted, err.Error())
}
@ -434,7 +435,7 @@ func (cs *ControllerServer) CreateVolume(
err = cs.createBackingImage(ctx, cr, req.GetSecrets(), rbdVol, parentVol, rbdSnap, req.GetParameters())
if err != nil {
if errors.Is(err, ErrFlattenInProgress) {
if errors.Is(err, rbderrors.ErrFlattenInProgress) {
return nil, status.Error(codes.Aborted, err.Error())
}
@ -592,7 +593,7 @@ func (cs *ControllerServer) repairExistingVolume(ctx context.Context, req *csi.C
func flattenTemporaryClonedImages(ctx context.Context, rbdVol *rbdVolume, cr *util.Credentials) error {
snapAndChildrenInfo, err := rbdVol.listSnapAndChildren()
if err != nil {
if errors.Is(err, util.ErrImageNotFound) {
if errors.Is(err, rbderrors.ErrImageNotFound) {
return status.Error(codes.InvalidArgument, err.Error())
}
@ -855,7 +856,7 @@ func checkContentSource(
rbdSnap, err := genSnapFromSnapID(ctx, snapshotID, cr, req.GetSecrets())
if err != nil {
log.ErrorLog(ctx, "failed to get backend snapshot for %s: %v", snapshotID, err)
if !errors.Is(err, ErrSnapNotFound) {
if !errors.Is(err, rbderrors.ErrSnapNotFound) {
return nil, nil, status.Error(codes.Internal, err.Error())
}
@ -875,7 +876,7 @@ func checkContentSource(
rbdvol, err := GenVolFromVolID(ctx, volID, cr, req.GetSecrets())
if err != nil {
log.ErrorLog(ctx, "failed to get backend image for %s: %v", volID, err)
if !errors.Is(err, util.ErrImageNotFound) {
if !errors.Is(err, rbderrors.ErrImageNotFound) {
return nil, nil, status.Error(codes.Internal, err.Error())
}
@ -915,7 +916,7 @@ func (cs *ControllerServer) checkErrAndUndoReserve(
return &csi.DeleteVolumeResponse{}, nil
}
if errors.Is(err, util.ErrImageNotFound) {
if errors.Is(err, rbderrors.ErrImageNotFound) {
notFoundErr := rbdVol.ensureImageCleanup(ctx)
if notFoundErr != nil {
return nil, status.Errorf(codes.Internal, "failed to cleanup image %q: %v", rbdVol, notFoundErr)
@ -990,7 +991,7 @@ func (cs *ControllerServer) DeleteVolume(
return nil, status.Error(codes.InvalidArgument, pErr.Error())
}
pErr = deleteMigratedVolume(ctx, pmVolID, cr)
if pErr != nil && !errors.Is(pErr, util.ErrImageNotFound) {
if pErr != nil && !errors.Is(pErr, rbderrors.ErrImageNotFound) {
return nil, status.Error(codes.Internal, pErr.Error())
}
@ -1162,7 +1163,7 @@ func (cs *ControllerServer) CreateSnapshot(
}()
if err != nil {
switch {
case errors.Is(err, util.ErrImageNotFound):
case errors.Is(err, rbderrors.ErrImageNotFound):
err = status.Errorf(codes.NotFound, "source Volume ID %s not found", req.GetSourceVolumeId())
case errors.Is(err, util.ErrPoolNotFound):
log.ErrorLog(ctx, "failed to get backend volume for %s: %v", req.GetSourceVolumeId(), err)
@ -1231,7 +1232,7 @@ func (cs *ControllerServer) CreateSnapshot(
return nil, status.Error(codes.Internal, err.Error())
}
defer func() {
if err != nil && !errors.Is(err, ErrFlattenInProgress) {
if err != nil && !errors.Is(err, rbderrors.ErrFlattenInProgress) {
errDefer := undoSnapReservation(ctx, rbdSnap, cr)
if errDefer != nil {
log.WarningLog(ctx, "failed undoing reservation of snapshot: %s %v", req.GetName(), errDefer)
@ -1311,7 +1312,7 @@ func cloneFromSnapshot(
}
err = vol.flattenRbdImage(ctx, false, rbdHardMaxCloneDepth, rbdSoftMaxCloneDepth)
if errors.Is(err, ErrFlattenInProgress) {
if errors.Is(err, rbderrors.ErrFlattenInProgress) {
// if flattening is in progress, return error and do not cleanup
return nil, status.Error(codes.Internal, err.Error())
} else if err != nil {
@ -1398,7 +1399,7 @@ func (cs *ControllerServer) doSnapshotClone(
defer func() {
if err != nil {
if !errors.Is(err, ErrFlattenInProgress) {
if !errors.Is(err, rbderrors.ErrFlattenInProgress) {
// cleanup clone and snapshot
errCleanUp := cleanUpSnapshot(ctx, cloneRbd, rbdSnap, cloneRbd)
if errCleanUp != nil {
@ -1508,7 +1509,7 @@ func (cs *ControllerServer) DeleteSnapshot(
// if the error is ErrImageNotFound, We need to cleanup the image from
// trash and remove the metadata in OMAP.
if errors.Is(err, util.ErrImageNotFound) {
if errors.Is(err, rbderrors.ErrImageNotFound) {
log.UsefulLog(ctx, "cleaning up leftovers of snapshot %s: %v", snapshotID, err)
err = cleanUpImageAndSnapReservation(ctx, rbdSnap, cr)
@ -1611,7 +1612,7 @@ func (cs *ControllerServer) ControllerExpandVolume(
rbdVol, err := genVolFromVolIDWithMigration(ctx, volID, cr, req.GetSecrets())
if err != nil {
switch {
case errors.Is(err, util.ErrImageNotFound):
case errors.Is(err, rbderrors.ErrImageNotFound):
err = status.Errorf(codes.NotFound, "volume ID %s not found", volID)
case errors.Is(err, util.ErrPoolNotFound):
log.ErrorLog(ctx, "failed to get backend volume for %s: %v", volID, err)

View File

@ -19,6 +19,8 @@ package rbd
import (
"context"
"fmt"
rbderrors "github.com/ceph/ceph-csi/internal/rbd/errors"
)
// Sparsify checks the size of the objects in the RBD image and calls
@ -33,7 +35,7 @@ func (ri *rbdImage) Sparsify(_ context.Context) error {
}
if inUse {
// if the image is in use, we should not sparsify it, return ErrImageInUse.
return ErrImageInUse
return rbderrors.ErrImageInUse
}
image, err := ri.open()

View File

@ -14,11 +14,19 @@ See the License for the specific language governing permissions and
limitations under the License.
*/
package rbd
package rbderrors
import "errors"
import (
"errors"
"fmt"
librados "github.com/ceph/go-ceph/rados"
librbd "github.com/ceph/go-ceph/rbd"
)
var (
// ErrImageNotFound is returned when image name is not found in the cluster on the given pool and/or namespace.
ErrImageNotFound = errors.New("image not found")
// ErrSnapNotFound is returned when snap name passed is not found in the list of snapshots for the
// given image.
ErrSnapNotFound = errors.New("snapshot not found")
@ -55,4 +63,8 @@ var (
ErrInvalidArgument = errors.New("invalid arguments provided")
// ErrImageInUse is returned when the image is in use.
ErrImageInUse = errors.New("image is in use")
// ErrGroupNotConnected is returned when the RBD group is not connected.
ErrGroupNotConnected = fmt.Errorf("%w: RBD group is not connected", librados.ErrNotConnected)
// ErrGroupNotFound is returned when group is not found in the cluster on the given pool and/or namespace.
ErrGroupNotFound = fmt.Errorf("%w: RBD group not found", librbd.ErrNotFound)
)

View File

@ -24,6 +24,7 @@ import (
"github.com/container-storage-interface/spec/lib/go/csi"
"google.golang.org/protobuf/types/known/timestamppb"
rbderrors "github.com/ceph/ceph-csi/internal/rbd/errors"
"github.com/ceph/ceph-csi/internal/rbd/types"
"github.com/ceph/ceph-csi/internal/util"
"github.com/ceph/ceph-csi/internal/util/log"
@ -70,7 +71,7 @@ func GetVolumeGroupSnapshot(
attrs, err := vgs.getVolumeGroupAttributes(ctx)
if err != nil {
if errors.Is(err, ErrRBDGroupNotFound) {
if errors.Is(err, rbderrors.ErrGroupNotFound) {
log.ErrorLog(ctx, "%v, returning empty volume group snapshot %q", vgs, err)
return vgs, err

View File

@ -26,6 +26,7 @@ import (
"github.com/ceph/go-ceph/rados"
"github.com/ceph/ceph-csi/internal/journal"
rbderrors "github.com/ceph/ceph-csi/internal/rbd/errors"
"github.com/ceph/ceph-csi/internal/util"
"github.com/ceph/ceph-csi/internal/util/log"
)
@ -126,7 +127,7 @@ func (cvg *commonVolumeGroup) generateVolumeGroupFromMapping(
}
mcsiID.LocationID = mPID
err = cvg.generateVolumeGroup(mcsiID)
if util.ShouldRetryVolumeGeneration(err) {
if ShouldRetryVolumeGroupGeneration(err) {
continue
}
@ -161,13 +162,13 @@ func (cvg *commonVolumeGroup) initCommonVolumeGroup(
err = cvg.generateVolumeGroup(csiID)
// If the error is not a retryable error, return from here.
if err != nil && !util.ShouldRetryVolumeGeneration(err) {
if err != nil && !ShouldRetryVolumeGroupGeneration(err) {
return err
}
// If the error is a retryable error, we should try to get the cluster mapping
// and generate the volume group from the mapping.
if util.ShouldRetryVolumeGeneration(err) {
if ShouldRetryVolumeGroupGeneration(err) {
mapping, err := util.GetClusterMappingInfo(csiID.ClusterID)
if err != nil {
return err
@ -229,7 +230,7 @@ func (cvg *commonVolumeGroup) getVolumeGroupAttributes(ctx context.Context) (*jo
if attrs.GroupName == "" {
log.ErrorLog(ctx, "volume group with id %v not found", cvg.id)
return nil, ErrRBDGroupNotFound
return nil, rbderrors.ErrGroupNotFound
}
cvg.requestName = attrs.RequestName
@ -356,12 +357,12 @@ func (cvg *commonVolumeGroup) GetIOContext(ctx context.Context) (*rados.IOContex
conn, err := cvg.getConnection(ctx)
if err != nil {
return nil, fmt.Errorf("%w: failed to connect: %w", ErrRBDGroupNotConnected, err)
return nil, fmt.Errorf("%w: failed to connect: %w", rbderrors.ErrGroupNotConnected, err)
}
ioctx, err := conn.GetIoctx(cvg.pool)
if err != nil {
return nil, fmt.Errorf("%w: failed to get IOContext: %w", ErrRBDGroupNotConnected, err)
return nil, fmt.Errorf("%w: failed to get IOContext: %w", rbderrors.ErrGroupNotConnected, err)
}
if cvg.namespace != "" {
@ -417,3 +418,28 @@ func (cvg *commonVolumeGroup) GetCreationTime(ctx context.Context) (*time.Time,
return cvg.creationTime, nil
}
// ShouldRetryVolumeGroupGeneration determines whether the process of finding or generating
// volumegroups should continue based on the type of error encountered.
//
// It checks if the given error matches any of the following known errors:
// - ErrPoolNotFound: The rbd pool where the volumegroup/omap is expected doesn't exist.
// - ErrRBDGroupNotFound: The volumegroup doesn't exist in the rbd pool.
// - rados.ErrPermissionDenied: Permissions to access the pool is denied.
//
// If any of these errors are encountered, the function returns `true`, indicating
// that the volumegroup search should continue because of known error. Otherwise, it
// returns `false`, meaning the search should stop.
//
// This helper function is used in scenarios where multiple attempts may be made
// to retrieve or generate volumegroup information, and we want to gracefully handle
// specific failure cases while retrying for others.
func ShouldRetryVolumeGroupGeneration(err error) bool {
if err == nil {
return false // No error, do not retry
}
// Continue searching for specific known errors
return (errors.Is(err, util.ErrPoolNotFound) ||
errors.Is(err, rbderrors.ErrGroupNotFound) ||
errors.Is(err, rados.ErrPermissionDenied))
}

View File

@ -14,16 +14,19 @@ See the License for the specific language governing permissions and
limitations under the License.
*/
package util
package group
import (
"errors"
"testing"
rbderrors "github.com/ceph/ceph-csi/internal/rbd/errors"
"github.com/ceph/ceph-csi/internal/util"
"github.com/ceph/go-ceph/rados"
)
func Test_shouldRetryVolumeGeneration(t *testing.T) {
func Test_shouldRetryVolumeGroupGeneration(t *testing.T) {
t.Parallel()
type args struct {
err error
@ -38,19 +41,14 @@ func Test_shouldRetryVolumeGeneration(t *testing.T) {
args: args{err: nil},
want: false, // No error, stop searching
},
{
name: "ErrKeyNotFound (continue searching)",
args: args{err: ErrKeyNotFound},
want: true, // Known error, continue searching
},
{
name: "ErrPoolNotFound (continue searching)",
args: args{err: ErrPoolNotFound},
args: args{err: util.ErrPoolNotFound},
want: true, // Known error, continue searching
},
{
name: "ErrImageNotFound (continue searching)",
args: args{err: ErrImageNotFound},
name: "ErrRBDGroupNotFound (continue searching)",
args: args{err: rbderrors.ErrGroupNotFound},
want: true, // Known error, continue searching
},
{
@ -67,7 +65,7 @@ func Test_shouldRetryVolumeGeneration(t *testing.T) {
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
t.Parallel()
if got := ShouldRetryVolumeGeneration(tt.args.err); got != tt.want {
if got := ShouldRetryVolumeGroupGeneration(tt.args.err); got != tt.want {
t.Errorf("ShouldRetryVolumeGeneration() = %v, want %v", got, tt.want)
}
})

View File

@ -22,21 +22,16 @@ import (
"fmt"
"github.com/ceph/go-ceph/rados"
librados "github.com/ceph/go-ceph/rados"
librbd "github.com/ceph/go-ceph/rbd"
"github.com/container-storage-interface/spec/lib/go/csi"
"github.com/csi-addons/spec/lib/go/volumegroup"
rbderrors "github.com/ceph/ceph-csi/internal/rbd/errors"
"github.com/ceph/ceph-csi/internal/rbd/types"
"github.com/ceph/ceph-csi/internal/util"
"github.com/ceph/ceph-csi/internal/util/log"
)
var (
ErrRBDGroupNotConnected = fmt.Errorf("%w: RBD group is not connected", librados.ErrNotConnected)
ErrRBDGroupNotFound = fmt.Errorf("%w: RBD group not found", librbd.ErrNotFound)
)
// volumeGroup handles all requests for 'rbd group' operations.
type volumeGroup struct {
commonVolumeGroup
@ -77,7 +72,7 @@ func GetVolumeGroup(
attrs, err := vg.getVolumeGroupAttributes(ctx)
if err != nil {
if errors.Is(err, ErrRBDGroupNotFound) {
if errors.Is(err, rbderrors.ErrGroupNotFound) {
log.ErrorLog(ctx, "%v, returning empty volume group %q", vg, err)
return vg, err

View File

@ -24,7 +24,7 @@ import (
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
"github.com/ceph/ceph-csi/internal/rbd/group"
rbderrors "github.com/ceph/ceph-csi/internal/rbd/errors"
"github.com/ceph/ceph-csi/internal/rbd/types"
"github.com/ceph/ceph-csi/internal/util"
"github.com/ceph/ceph-csi/internal/util/log"
@ -215,7 +215,7 @@ func (cs *ControllerServer) DeleteVolumeGroupSnapshot(
groupSnapshot, err := mgr.GetVolumeGroupSnapshotByID(ctx, groupSnapshotID)
if err != nil {
if errors.Is(err, group.ErrRBDGroupNotFound) {
if errors.Is(err, rbderrors.ErrGroupNotFound) {
log.ErrorLog(ctx, "VolumeGroupSnapshot %q doesn't exists", groupSnapshotID)
return &csi.DeleteVolumeGroupSnapshotResponse{}, nil
@ -261,7 +261,7 @@ func (cs *ControllerServer) GetVolumeGroupSnapshot(
groupSnapshot, err := mgr.GetVolumeGroupSnapshotByID(ctx, groupSnapshotID)
if err != nil {
if errors.Is(err, group.ErrRBDGroupNotFound) {
if errors.Is(err, rbderrors.ErrGroupNotFound) {
log.ErrorLog(ctx, "VolumeGroupSnapshot %q doesn't exists", groupSnapshotID)
return nil, status.Errorf(

View File

@ -22,6 +22,7 @@ import (
"fmt"
"github.com/ceph/ceph-csi/internal/journal"
rbderrors "github.com/ceph/ceph-csi/internal/rbd/errors"
rbd_group "github.com/ceph/ceph-csi/internal/rbd/group"
"github.com/ceph/ceph-csi/internal/rbd/types"
"github.com/ceph/ceph-csi/internal/util"
@ -174,7 +175,7 @@ func (mgr *rbdManager) GetVolumeByID(ctx context.Context, id string) (types.Volu
volume, err := GenVolFromVolID(ctx, id, creds, mgr.secrets)
if err != nil {
switch {
case errors.Is(err, util.ErrImageNotFound):
case errors.Is(err, rbderrors.ErrImageNotFound):
err = fmt.Errorf("volume %s not found: %w", id, err)
return nil, err
@ -199,7 +200,7 @@ func (mgr *rbdManager) GetSnapshotByID(ctx context.Context, id string) (types.Sn
snapshot, err := genSnapFromSnapID(ctx, id, creds, mgr.secrets)
if err != nil {
switch {
case errors.Is(err, util.ErrImageNotFound):
case errors.Is(err, rbderrors.ErrImageNotFound):
err = fmt.Errorf("volume %s not found: %w", id, err)
return nil, err
@ -467,7 +468,7 @@ func (mgr *rbdManager) CreateVolumeGroupSnapshot(
return vgs, nil
}
} else if err != nil && !errors.Is(err, util.ErrImageNotFound) {
} else if err != nil && !errors.Is(err, rbderrors.ErrImageNotFound) {
// ErrImageNotFound can be returned if the VolumeGroupSnapshot
// could not be found. It is expected that it does not exist
// yet, in which case it will be created below.
@ -537,7 +538,7 @@ func (mgr *rbdManager) RegenerateVolumeGroupJournal(
err = gi.DecomposeCSIID(groupID)
if err != nil {
return "", fmt.Errorf("%w: error decoding volume group ID (%w) (%s)", ErrInvalidVolID, err, groupID)
return "", fmt.Errorf("%w: error decoding volume group ID (%w) (%s)", rbderrors.ErrInvalidVolID, err, groupID)
}
monitors, clusterID, err = util.FetchMappedClusterIDAndMons(ctx, gi.ClusterID)

View File

@ -21,6 +21,7 @@ import (
"encoding/hex"
"strings"
rbderrors "github.com/ceph/ceph-csi/internal/rbd/errors"
"github.com/ceph/ceph-csi/internal/util"
"github.com/ceph/ceph-csi/internal/util/log"
)
@ -39,13 +40,13 @@ func parseMigrationVolID(vh string) (*migrationVolID, error) {
handSlice := strings.Split(vh, migVolIDFieldSep)
if len(handSlice) < migVolIDTotalLength {
// its short of length in this case, so return error
return nil, ErrInvalidVolID
return nil, rbderrors.ErrInvalidVolID
}
// Store pool
poolHash := strings.Join(handSlice[migVolIDSplitLength:], migVolIDFieldSep)
poolByte, dErr := hex.DecodeString(poolHash)
if dErr != nil {
return nil, ErrMissingPoolNameInVolID
return nil, rbderrors.ErrMissingPoolNameInVolID
}
mh.poolName = string(poolByte)
// Parse migration mons( for clusterID) and image
@ -62,13 +63,13 @@ func parseMigrationVolID(vh string) (*migrationVolID, error) {
}
}
if mh.imageName == "" {
return nil, ErrMissingImageNameInVolID
return nil, rbderrors.ErrMissingImageNameInVolID
}
if mh.poolName == "" {
return nil, ErrMissingPoolNameInVolID
return nil, rbderrors.ErrMissingPoolNameInVolID
}
if mh.clusterID == "" {
return nil, ErrDecodeClusterIDFromMonsInVolID
return nil, rbderrors.ErrDecodeClusterIDFromMonsInVolID
}
return mh, nil

View File

@ -20,6 +20,7 @@ import (
"fmt"
"time"
rbderrors "github.com/ceph/ceph-csi/internal/rbd/errors"
"github.com/ceph/ceph-csi/internal/rbd/types"
"github.com/ceph/ceph-csi/internal/util"
"github.com/ceph/ceph-csi/internal/util/log"
@ -56,7 +57,7 @@ func (rv *rbdVolume) HandleParentImageExistence(
if rv.ParentInTrash {
return fmt.Errorf("%w: failed to enable mirroring on image %q:"+
" parent is in trash",
ErrFailedPrecondition, rv)
rbderrors.ErrFailedPrecondition, rv)
}
parent, err := rv.getParent()
@ -72,7 +73,7 @@ func (rv *rbdVolume) HandleParentImageExistence(
if parentMirroringInfo.GetState() != librbd.MirrorImageEnabled.String() {
return fmt.Errorf("%w: failed to enable mirroring on image %q: "+
"parent image %q is not enabled for mirroring",
ErrFailedPrecondition, rv, parent)
rbderrors.ErrFailedPrecondition, rv, parent)
}
return nil
@ -204,7 +205,7 @@ func (ri *rbdImage) Resync(_ context.Context) error {
// If we issued a resync, return a non-final error as image needs to be recreated
// locally. Caller retries till RBD syncs an initial version of the image to
// report its status in the resync request.
return fmt.Errorf("%w: awaiting initial resync due to split brain", ErrUnavailable)
return fmt.Errorf("%w: awaiting initial resync due to split brain", rbderrors.ErrUnavailable)
}
// GetGlobalMirroringStatus get the mirroring status of an image.

View File

@ -27,6 +27,7 @@ import (
"github.com/ceph/ceph-csi/pkg/util/kernel"
csicommon "github.com/ceph/ceph-csi/internal/csi-common"
rbderrors "github.com/ceph/ceph-csi/internal/rbd/errors"
"github.com/ceph/ceph-csi/internal/util"
"github.com/ceph/ceph-csi/internal/util/file"
"github.com/ceph/ceph-csi/internal/util/fscrypt"
@ -1046,7 +1047,7 @@ func (ns *NodeServer) NodeUnstageVolume(
}
// If not mounted, and error is anything other than metadata file missing, it is an error
if !errors.Is(err, ErrMissingStash) {
if !errors.Is(err, rbderrors.ErrMissingStash) {
return nil, status.Error(codes.Internal, err.Error())
}

View File

@ -24,6 +24,7 @@ import (
"github.com/ceph/ceph-csi/pkg/util/crypto"
"github.com/ceph/ceph-csi/internal/journal"
rbderrors "github.com/ceph/ceph-csi/internal/rbd/errors"
"github.com/ceph/ceph-csi/internal/util"
"github.com/ceph/ceph-csi/internal/util/k8s"
"github.com/ceph/ceph-csi/internal/util/log"
@ -174,10 +175,10 @@ func checkSnapCloneExists(
// Fetch on-disk image attributes
err = vol.getImageInfo()
if err != nil {
if errors.Is(err, util.ErrImageNotFound) {
if errors.Is(err, rbderrors.ErrImageNotFound) {
err = parentVol.deleteSnapshot(ctx, rbdSnap)
if err != nil {
if !errors.Is(err, ErrSnapNotFound) {
if !errors.Is(err, rbderrors.ErrSnapNotFound) {
log.ErrorLog(ctx, "failed to delete snapshot %s: %v", rbdSnap, err)
return false, err
@ -205,7 +206,7 @@ func checkSnapCloneExists(
// check snapshot exists if not create it
err = vol.checkSnapExists(rbdSnap)
if errors.Is(err, ErrSnapNotFound) {
if errors.Is(err, rbderrors.ErrSnapNotFound) {
// create snapshot
sErr := vol.createSnapshot(ctx, rbdSnap)
if sErr != nil {
@ -300,7 +301,7 @@ func (rv *rbdVolume) Exists(ctx context.Context, parentVol *rbdVolume) (bool, er
// Fetch on-disk image attributes and compare against request
err = rv.getImageInfo()
switch {
case errors.Is(err, util.ErrImageNotFound) && parentVol != nil:
case errors.Is(err, rbderrors.ErrImageNotFound) && parentVol != nil:
// Need to check cloned info here not on createvolume
found, cErr := rv.checkCloneImage(ctx, parentVol)
if cErr != nil {
@ -314,7 +315,7 @@ func (rv *rbdVolume) Exists(ctx context.Context, parentVol *rbdVolume) (bool, er
return false, err
}
case errors.Is(err, util.ErrImageNotFound) && parentVol == nil:
case errors.Is(err, rbderrors.ErrImageNotFound) && parentVol == nil:
// image not found, undo the reservation
err = j.UndoReservation(ctx, rv.JournalPool, rv.Pool, rv.RbdImageName, rv.RequestName)
@ -332,7 +333,7 @@ func (rv *rbdVolume) Exists(ctx context.Context, parentVol *rbdVolume) (bool, er
// size checks
if rv.VolSize < requestSize {
return false, fmt.Errorf("%w: image with the same name (%s) but with different size already exists",
ErrVolNameConflict, rv.RbdImageName)
rbderrors.ErrVolNameConflict, rv.RbdImageName)
}
// TODO: We should also ensure image features and format is the same
@ -600,7 +601,7 @@ func RegenerateJournal(
err = vi.DecomposeCSIID(rbdVol.VolID)
if err != nil {
return "", fmt.Errorf("%w: error decoding volume ID (%w) (%s)",
ErrInvalidVolID, err, rbdVol.VolID)
rbderrors.ErrInvalidVolID, err, rbdVol.VolID)
}
rbdVol.Owner = owner

View File

@ -31,6 +31,7 @@ import (
"github.com/ceph/ceph-csi/pkg/util/crypto"
"github.com/ceph/ceph-csi/pkg/util/kernel"
rbderrors "github.com/ceph/ceph-csi/internal/rbd/errors"
"github.com/ceph/ceph-csi/internal/rbd/types"
"github.com/ceph/ceph-csi/internal/util"
"github.com/ceph/ceph-csi/internal/util/log"
@ -537,7 +538,7 @@ func (ri *rbdImage) open() (*librbd.Image, error) {
image, err := librbd.OpenImage(ri.ioctx, ri.RbdImageName, librbd.NoSnapshot)
if err != nil {
if errors.Is(err, librbd.ErrNotFound) {
err = fmt.Errorf("Failed as %w (internal %w)", util.ErrImageNotFound, err)
err = fmt.Errorf("Failed as %w (internal %w)", rbderrors.ErrImageNotFound, err)
}
return nil, err
@ -555,7 +556,7 @@ func (ri *rbdImage) open() (*librbd.Image, error) {
func (ri *rbdImage) isInUse() (bool, error) {
image, err := ri.open()
if err != nil {
if errors.Is(err, util.ErrImageNotFound) || errors.Is(err, util.ErrPoolNotFound) {
if errors.Is(err, rbderrors.ErrImageNotFound) || errors.Is(err, util.ErrPoolNotFound) {
return false, err
}
// any error should assume something else is using the image
@ -698,7 +699,7 @@ func (ri *rbdImage) Delete(ctx context.Context) error {
err = rbdImage.Trash(0)
if err != nil {
if errors.Is(err, librbd.ErrNotFound) {
return fmt.Errorf("Failed as %w (internal %w)", util.ErrImageNotFound, err)
return fmt.Errorf("Failed as %w (internal %w)", rbderrors.ErrImageNotFound, err)
}
log.ErrorLog(ctx, "failed to delete rbd image: %s, error: %v", ri, err)
@ -759,14 +760,14 @@ func (rv *rbdVolume) DeleteTempImage(ctx context.Context) error {
snap.RadosNamespace = rv.RadosNamespace
err := tempClone.deleteSnapshot(ctx, snap)
if err != nil {
if !errors.Is(err, util.ErrImageNotFound) && !errors.Is(err, ErrSnapNotFound) {
if !errors.Is(err, rbderrors.ErrImageNotFound) && !errors.Is(err, rbderrors.ErrSnapNotFound) {
return fmt.Errorf("failed to delete snapshot %q: %w", snap, err)
}
}
err = tempClone.Delete(ctx)
if err != nil {
if errors.Is(err, util.ErrImageNotFound) {
if errors.Is(err, rbderrors.ErrImageNotFound) {
return tempClone.ensureImageCleanup(ctx)
} else {
// return error if it is not ErrImageNotFound
@ -805,7 +806,7 @@ func (ri *rbdImage) getCloneDepth(ctx context.Context) (uint, error) {
// if the parent image is moved to trash the name will be present
// in rbd image info but the image will be in trash, in that case
// return the found depth
if errors.Is(err, util.ErrImageNotFound) {
if errors.Is(err, rbderrors.ErrImageNotFound) {
return depth, nil
}
log.ErrorLog(ctx, "failed to check depth on image %s: %s", &vol, err)
@ -900,7 +901,7 @@ func (ri *rbdImage) flattenRbdImage(
return err
}
if forceFlatten || depth >= hardlimit {
return fmt.Errorf("%w: flatten is in progress for image %s", ErrFlattenInProgress, ri.RbdImageName)
return fmt.Errorf("%w: flatten is in progress for image %s", rbderrors.ErrFlattenInProgress, ri.RbdImageName)
}
log.DebugLog(ctx, "successfully added task to flatten image %q", ri)
}
@ -995,7 +996,7 @@ func (ri *rbdImage) checkImageChainHasFeature(ctx context.Context, feature uint6
// is in the trash, when we try to open the parent image to get its
// information it fails because it is already in trash. We should
// treat error as nil if the parent is not found.
if errors.Is(err, util.ErrImageNotFound) {
if errors.Is(err, rbderrors.ErrImageNotFound) {
return false, nil
}
log.ErrorLog(ctx, "failed to get image info for %s: %s", rbdImg.String(), err)
@ -1233,6 +1234,33 @@ func generateVolumeFromVolumeID(
return rbdVol, err
}
// ShouldRetryVolumeGeneration determines whether the process of finding or generating
// volumes should continue based on the type of error encountered.
//
// It checks if the given error matches any of the following known errors:
// - util.ErrKeyNotFound: The key required to locate the volume is missing in Rados omap.
// - util.ErrPoolNotFound: The rbd pool where the volume/omap is expected doesn't exist.
// - ErrImageNotFound: The image doesn't exist in the rbd pool.
// - rados.ErrPermissionDenied: Permissions to access the pool is denied.
//
// If any of these errors are encountered, the function returns `true`, indicating
// that the volume search should continue because of known error. Otherwise, it
// returns `false`, meaning the search should stop.
//
// This helper function is used in scenarios where multiple attempts may be made
// to retrieve or generate volume information, and we want to gracefully handle
// specific failure cases while retrying for others.
func ShouldRetryVolumeGeneration(err error) bool {
if err == nil {
return false // No error, do not retry
}
// Continue searching for specific known errors
return (errors.Is(err, util.ErrKeyNotFound) ||
errors.Is(err, util.ErrPoolNotFound) ||
errors.Is(err, rbderrors.ErrImageNotFound) ||
errors.Is(err, rados.ErrPermissionDenied))
}
// GenVolFromVolID generates a rbdVolume structure from the provided identifier, updating
// the structure with elements from on-disk image metadata as well.
func GenVolFromVolID(
@ -1249,11 +1277,11 @@ func GenVolFromVolID(
err := vi.DecomposeCSIID(volumeID)
if err != nil {
return vol, fmt.Errorf("%w: error decoding volume ID (%w) (%s)",
ErrInvalidVolID, err, volumeID)
rbderrors.ErrInvalidVolID, err, volumeID)
}
vol, err = generateVolumeFromVolumeID(ctx, volumeID, vi, cr, secrets)
if !util.ShouldRetryVolumeGeneration(err) {
if !ShouldRetryVolumeGeneration(err) {
return vol, err
}
@ -1264,7 +1292,7 @@ func GenVolFromVolID(
}
if mapping != nil {
rbdVol, vErr := generateVolumeFromMapping(ctx, mapping, volumeID, vi, cr, secrets)
if !util.ShouldRetryVolumeGeneration(vErr) {
if !ShouldRetryVolumeGeneration(vErr) {
return rbdVol, vErr
}
}
@ -1317,7 +1345,7 @@ func generateVolumeFromMapping(
// Add mapping poolID to Identifier
nvi.LocationID = pID
vol, err = generateVolumeFromVolumeID(ctx, volumeID, nvi, cr, secrets)
if !util.ShouldRetryVolumeGeneration(err) {
if !ShouldRetryVolumeGeneration(err) {
return vol, err
}
}
@ -1511,7 +1539,7 @@ func (ri *rbdImage) deleteSnapshot(ctx context.Context, pOpts *rbdSnapshot) erro
}
err = snap.Remove()
if errors.Is(err, librbd.ErrNotFound) {
return fmt.Errorf("Failed as %w (internal %w)", ErrSnapNotFound, err)
return fmt.Errorf("Failed as %w (internal %w)", rbderrors.ErrSnapNotFound, err)
}
return err
@ -1771,7 +1799,7 @@ func (ri *rbdImage) checkSnapExists(rbdSnap *rbdSnapshot) error {
}
}
return fmt.Errorf("%w: snap %s not found", ErrSnapNotFound, rbdSnap)
return fmt.Errorf("%w: snap %s not found", rbderrors.ErrSnapNotFound, rbdSnap)
}
// rbdImageMetadataStash strongly typed JSON spec for stashed RBD image metadata.
@ -1853,7 +1881,7 @@ func lookupRBDImageMetadataStash(metaDataPath string) (rbdImageMetadataStash, er
return imgMeta, fmt.Errorf("failed to read stashed JSON image metadata from path (%s): %w", fPath, err)
}
return imgMeta, fmt.Errorf("Failed as %w (internal %w)", ErrMissingStash, err)
return imgMeta, fmt.Errorf("Failed as %w (internal %w)", rbderrors.ErrMissingStash, err)
}
err = json.Unmarshal(encodedBytes, &imgMeta)

View File

@ -23,6 +23,10 @@ import (
"strings"
"testing"
rbderrors "github.com/ceph/ceph-csi/internal/rbd/errors"
"github.com/ceph/ceph-csi/internal/util"
"github.com/ceph/go-ceph/rados"
librbd "github.com/ceph/go-ceph/rbd"
"github.com/stretchr/testify/require"
)
@ -387,3 +391,54 @@ func Test_checkValidImageFeatures(t *testing.T) {
})
}
}
func Test_shouldRetryVolumeGeneration(t *testing.T) {
t.Parallel()
type args struct {
err error
}
tests := []struct {
name string
args args
want bool
}{
{
name: "No error (stop searching)",
args: args{err: nil},
want: false, // No error, stop searching
},
{
name: "ErrKeyNotFound (continue searching)",
args: args{err: util.ErrKeyNotFound},
want: true, // Known error, continue searching
},
{
name: "ErrPoolNotFound (continue searching)",
args: args{err: util.ErrPoolNotFound},
want: true, // Known error, continue searching
},
{
name: "ErrImageNotFound (continue searching)",
args: args{err: rbderrors.ErrImageNotFound},
want: true, // Known error, continue searching
},
{
name: "ErrPermissionDenied (continue searching)",
args: args{err: rados.ErrPermissionDenied},
want: true, // Known error, continue searching
},
{
name: "Different error (stop searching)",
args: args{err: errors.New("unknown error")},
want: false, // Unknown error, stop searching
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
t.Parallel()
if got := ShouldRetryVolumeGeneration(tt.args.err); got != tt.want {
t.Errorf("ShouldRetryVolumeGeneration() = %v, want %v", got, tt.want)
}
})
}
}

View File

@ -20,6 +20,7 @@ import (
"context"
"fmt"
rbderrors "github.com/ceph/ceph-csi/internal/rbd/errors"
"github.com/ceph/ceph-csi/internal/rbd/types"
librbd "github.com/ceph/go-ceph/rbd"
@ -70,14 +71,14 @@ func DisableVolumeReplication(mirror types.Mirror,
localStatus, err := sts.GetLocalSiteStatus()
if err != nil {
return fmt.Errorf("failed to get local state: %w", ErrInvalidArgument)
return fmt.Errorf("failed to get local state: %w", rbderrors.ErrInvalidArgument)
}
if localStatus.IsUP() && localStatus.GetState() == librbd.MirrorImageStatusStateReplaying.String() {
return nil
}
return fmt.Errorf("%w: secondary image status is up=%t and state=%s",
ErrInvalidArgument, localStatus.IsUP(), localStatus.GetState())
rbderrors.ErrInvalidArgument, localStatus.IsUP(), localStatus.GetState())
}
err := mirror.DisableMirroring(ctx, force)
if err != nil {
@ -92,7 +93,7 @@ func DisableVolumeReplication(mirror types.Mirror,
// error out if the image is not in disabled state.
if info.GetState() != librbd.MirrorImageDisabled.String() {
return fmt.Errorf("%w: image is in %q state, expected state %q", ErrAborted,
return fmt.Errorf("%w: image is in %q state, expected state %q", rbderrors.ErrAborted,
info.GetState(), librbd.MirrorImageDisabled.String())
}

View File

@ -24,6 +24,7 @@ import (
"github.com/container-storage-interface/spec/lib/go/csi"
"google.golang.org/protobuf/types/known/timestamppb"
rbderrors "github.com/ceph/ceph-csi/internal/rbd/errors"
"github.com/ceph/ceph-csi/internal/rbd/types"
"github.com/ceph/ceph-csi/internal/util"
"github.com/ceph/ceph-csi/internal/util/log"
@ -90,7 +91,7 @@ func cleanUpSnapshot(
) error {
err := parentVol.deleteSnapshot(ctx, rbdSnap)
if err != nil {
if !errors.Is(err, util.ErrImageNotFound) && !errors.Is(err, ErrSnapNotFound) {
if !errors.Is(err, rbderrors.ErrImageNotFound) && !errors.Is(err, rbderrors.ErrSnapNotFound) {
log.ErrorLog(ctx, "failed to delete snapshot %q: %v", rbdSnap, err)
return err
@ -100,7 +101,7 @@ func cleanUpSnapshot(
if rbdVol != nil {
err := rbdVol.Delete(ctx)
if err != nil {
if !errors.Is(err, util.ErrImageNotFound) {
if !errors.Is(err, rbderrors.ErrImageNotFound) {
log.ErrorLog(ctx, "failed to delete rbd image %q with error: %v", rbdVol, err)
return err

View File

@ -16,15 +16,9 @@ limitations under the License.
package util
import (
"errors"
"github.com/ceph/go-ceph/rados"
)
import "errors"
var (
// ErrImageNotFound is returned when image name is not found in the cluster on the given pool and/or namespace.
ErrImageNotFound = errors.New("image not found")
// ErrKeyNotFound is returned when requested key in omap is not found.
ErrKeyNotFound = errors.New("key not found")
// ErrObjectExists is returned when named omap is already present in rados.
@ -41,30 +35,3 @@ var (
// ErrMissingConfigForMonitor is returned when clusterID is not found for the mon.
ErrMissingConfigForMonitor = errors.New("missing configuration of cluster ID for monitor")
)
// ShouldRetryVolumeGeneration determines whether the process of finding or generating
// volumes should continue based on the type of error encountered.
//
// It checks if the given error matches any of the following known errors:
// - util.ErrKeyNotFound: The key required to locate the volume is missing in Rados omap.
// - util.ErrPoolNotFound: The rbd pool where the volume/omap is expected doesn't exist.
// - ErrImageNotFound: The image doesn't exist in the rbd pool.
// - rados.ErrPermissionDenied: Permissions to access the pool is denied.
//
// If any of these errors are encountered, the function returns `true`, indicating
// that the volume search should continue because of known error. Otherwise, it
// returns `false`, meaning the search should stop.
//
// This helper function is used in scenarios where multiple attempts may be made
// to retrieve or generate volume information, and we want to gracefully handle
// specific failure cases while retrying for others.
func ShouldRetryVolumeGeneration(err error) bool {
if err == nil {
return false // No error, do not retry
}
// Continue searching for specific known errors
return (errors.Is(err, ErrKeyNotFound) ||
errors.Is(err, ErrPoolNotFound) ||
errors.Is(err, ErrImageNotFound) ||
errors.Is(err, rados.ErrPermissionDenied))
}