cleanup: move rbd errors to internal/rbd/errors

This commit includes:
1). Moves rbd errors from internal/util/errors to internal/rbd/errors.

2). Introduces ShouldRetryVolumeGroupGeneration helper function to determine
whether volume group generation should continue based on specific error types.
The function returns true if the error is of type ErrPoolNotFound,
ErrRBDGroupNotFound, ErrPermissionDenied. (Ceph user might not have access to
all the objects/pools where mapping exists)

Signed-off-by: Praveen M <m.praveen@ibm.com>
This commit is contained in:
Praveen M 2025-03-03 13:12:42 +05:30
parent 810690a9aa
commit c1b3325677
18 changed files with 180 additions and 99 deletions

View File

@ -21,6 +21,7 @@ import (
"errors" "errors"
"github.com/ceph/ceph-csi/internal/rbd" "github.com/ceph/ceph-csi/internal/rbd"
rbd_errors "github.com/ceph/ceph-csi/internal/rbd/errors"
"github.com/ceph/ceph-csi/internal/util" "github.com/ceph/ceph-csi/internal/util"
"github.com/ceph/ceph-csi/internal/util/log" "github.com/ceph/ceph-csi/internal/util/log"
@ -68,7 +69,7 @@ func (ekrs *EncryptionKeyRotationServer) EncryptionKeyRotate(
rbdVol, err := mgr.GetVolumeByID(ctx, volID) rbdVol, err := mgr.GetVolumeByID(ctx, volID)
if err != nil { if err != nil {
switch { switch {
case errors.Is(err, util.ErrImageNotFound): case errors.Is(err, rbd_errors.ErrImageNotFound):
err = status.Errorf(codes.NotFound, "volume ID %s not found", volID) err = status.Errorf(codes.NotFound, "volume ID %s not found", volID)
case errors.Is(err, util.ErrPoolNotFound): case errors.Is(err, util.ErrPoolNotFound):
log.ErrorLog(ctx, "failed to get backend volume for %s: %v", volID, err) log.ErrorLog(ctx, "failed to get backend volume for %s: %v", volID, err)

View File

@ -652,7 +652,7 @@ func (rs *ReplicationServer) ResyncVolume(ctx context.Context,
sts, err := mirror.GetGlobalMirroringStatus(ctx) sts, err := mirror.GetGlobalMirroringStatus(ctx)
if err != nil { if err != nil {
// the image gets recreated after issuing resync // the image gets recreated after issuing resync
if errors.Is(err, util.ErrImageNotFound) { if errors.Is(err, rbd_errors.ErrImageNotFound) {
// caller retries till RBD syncs an initial version of the image to // caller retries till RBD syncs an initial version of the image to
// report its status in the resync call. Ideally, this line will not // report its status in the resync call. Ideally, this line will not
// be executed as the error would get returned due to getMirroringInfo // be executed as the error would get returned due to getMirroringInfo
@ -786,7 +786,7 @@ func getGRPCError(err error) error {
} }
errorStatusMap := map[error]codes.Code{ errorStatusMap := map[error]codes.Code{
util.ErrImageNotFound: codes.NotFound, rbd_errors.ErrImageNotFound: codes.NotFound,
util.ErrPoolNotFound: codes.NotFound, util.ErrPoolNotFound: codes.NotFound,
rbd_errors.ErrInvalidArgument: codes.InvalidArgument, rbd_errors.ErrInvalidArgument: codes.InvalidArgument,
rbd_errors.ErrFlattenInProgress: codes.Aborted, rbd_errors.ErrFlattenInProgress: codes.Aborted,
@ -836,7 +836,7 @@ func (rs *ReplicationServer) GetVolumeReplicationInfo(ctx context.Context,
log.ErrorLog(ctx, "failed to get volume with id %q: %v", volumeID, err) log.ErrorLog(ctx, "failed to get volume with id %q: %v", volumeID, err)
switch { switch {
case errors.Is(err, util.ErrImageNotFound): case errors.Is(err, rbd_errors.ErrImageNotFound):
err = status.Error(codes.NotFound, err.Error()) err = status.Error(codes.NotFound, err.Error())
case errors.Is(err, util.ErrPoolNotFound): case errors.Is(err, util.ErrPoolNotFound):
err = status.Error(codes.NotFound, err.Error()) err = status.Error(codes.NotFound, err.Error())
@ -873,7 +873,7 @@ func (rs *ReplicationServer) GetVolumeReplicationInfo(ctx context.Context,
if err != nil { if err != nil {
log.ErrorLog(ctx, "failed to get status for mirror %q: %v", mirror, err) log.ErrorLog(ctx, "failed to get status for mirror %q: %v", mirror, err)
if errors.Is(err, util.ErrImageNotFound) { if errors.Is(err, rbd_errors.ErrImageNotFound) {
return nil, status.Error(codes.Aborted, err.Error()) return nil, status.Error(codes.Aborted, err.Error())
} }

View File

@ -598,8 +598,8 @@ func TestGetGRPCError(t *testing.T) {
}, },
{ {
name: "ErrImageNotFound", name: "ErrImageNotFound",
err: util.ErrImageNotFound, err: rbd_errors.ErrImageNotFound,
expectedErr: status.Error(codes.NotFound, util.ErrImageNotFound.Error()), expectedErr: status.Error(codes.NotFound, rbd_errors.ErrImageNotFound.Error()),
}, },
{ {
name: "ErrPoolNotFound", name: "ErrPoolNotFound",

View File

@ -23,7 +23,7 @@ import (
"slices" "slices"
"github.com/ceph/ceph-csi/internal/rbd" "github.com/ceph/ceph-csi/internal/rbd"
"github.com/ceph/ceph-csi/internal/rbd/group" rbd_errors "github.com/ceph/ceph-csi/internal/rbd/errors"
"github.com/ceph/ceph-csi/internal/rbd/types" "github.com/ceph/ceph-csi/internal/rbd/types"
"github.com/ceph/ceph-csi/internal/util/log" "github.com/ceph/ceph-csi/internal/util/log"
@ -194,7 +194,7 @@ func (vs *VolumeGroupServer) DeleteVolumeGroup(
// resolve the volume group // resolve the volume group
vg, err := mgr.GetVolumeGroupByID(ctx, req.GetVolumeGroupId()) vg, err := mgr.GetVolumeGroupByID(ctx, req.GetVolumeGroupId())
if err != nil { if err != nil {
if errors.Is(err, group.ErrRBDGroupNotFound) { if errors.Is(err, rbd_errors.ErrRBDGroupNotFound) {
log.ErrorLog(ctx, "VolumeGroup %q doesn't exists", req.GetVolumeGroupId()) log.ErrorLog(ctx, "VolumeGroup %q doesn't exists", req.GetVolumeGroupId())
return &volumegroup.DeleteVolumeGroupResponse{}, nil return &volumegroup.DeleteVolumeGroupResponse{}, nil
@ -433,7 +433,7 @@ func (vs *VolumeGroupServer) ControllerGetVolumeGroup(
// resolve the volume group // resolve the volume group
vg, err := mgr.GetVolumeGroupByID(ctx, req.GetVolumeGroupId()) vg, err := mgr.GetVolumeGroupByID(ctx, req.GetVolumeGroupId())
if err != nil { if err != nil {
if errors.Is(err, group.ErrRBDGroupNotFound) { if errors.Is(err, rbd_errors.ErrRBDGroupNotFound) {
log.ErrorLog(ctx, "VolumeGroup %q doesn't exists", req.GetVolumeGroupId()) log.ErrorLog(ctx, "VolumeGroup %q doesn't exists", req.GetVolumeGroupId())
return nil, status.Errorf( return nil, status.Errorf(

View File

@ -22,7 +22,6 @@ import (
"fmt" "fmt"
rbd_errors "github.com/ceph/ceph-csi/internal/rbd/errors" rbd_errors "github.com/ceph/ceph-csi/internal/rbd/errors"
"github.com/ceph/ceph-csi/internal/util"
"github.com/ceph/ceph-csi/internal/util/k8s" "github.com/ceph/ceph-csi/internal/util/k8s"
"github.com/ceph/ceph-csi/internal/util/log" "github.com/ceph/ceph-csi/internal/util/log"
@ -68,7 +67,7 @@ func (rv *rbdVolume) checkCloneImage(ctx context.Context, parentVol *rbdVolume)
return true, nil return true, nil
case errors.Is(err, util.ErrImageNotFound): case errors.Is(err, rbd_errors.ErrImageNotFound):
// as the temp clone does not exist,check snapshot exists on parent volume // as the temp clone does not exist,check snapshot exists on parent volume
// snapshot name is same as temporary clone image // snapshot name is same as temporary clone image
snap.RbdImageName = tempClone.RbdImageName snap.RbdImageName = tempClone.RbdImageName

View File

@ -593,7 +593,7 @@ func (cs *ControllerServer) repairExistingVolume(ctx context.Context, req *csi.C
func flattenTemporaryClonedImages(ctx context.Context, rbdVol *rbdVolume, cr *util.Credentials) error { func flattenTemporaryClonedImages(ctx context.Context, rbdVol *rbdVolume, cr *util.Credentials) error {
snaps, children, err := rbdVol.listSnapAndChildren() snaps, children, err := rbdVol.listSnapAndChildren()
if err != nil { if err != nil {
if errors.Is(err, util.ErrImageNotFound) { if errors.Is(err, rbd_errors.ErrImageNotFound) {
return status.Error(codes.InvalidArgument, err.Error()) return status.Error(codes.InvalidArgument, err.Error())
} }
@ -865,7 +865,7 @@ func checkContentSource(
rbdvol, err := GenVolFromVolID(ctx, volID, cr, req.GetSecrets()) rbdvol, err := GenVolFromVolID(ctx, volID, cr, req.GetSecrets())
if err != nil { if err != nil {
log.ErrorLog(ctx, "failed to get backend image for %s: %v", volID, err) log.ErrorLog(ctx, "failed to get backend image for %s: %v", volID, err)
if !errors.Is(err, util.ErrImageNotFound) { if !errors.Is(err, rbd_errors.ErrImageNotFound) {
return nil, nil, status.Error(codes.Internal, err.Error()) return nil, nil, status.Error(codes.Internal, err.Error())
} }
@ -905,7 +905,7 @@ func (cs *ControllerServer) checkErrAndUndoReserve(
return &csi.DeleteVolumeResponse{}, nil return &csi.DeleteVolumeResponse{}, nil
} }
if errors.Is(err, util.ErrImageNotFound) { if errors.Is(err, rbd_errors.ErrImageNotFound) {
notFoundErr := rbdVol.ensureImageCleanup(ctx) notFoundErr := rbdVol.ensureImageCleanup(ctx)
if notFoundErr != nil { if notFoundErr != nil {
return nil, status.Errorf(codes.Internal, "failed to cleanup image %q: %v", rbdVol, notFoundErr) return nil, status.Errorf(codes.Internal, "failed to cleanup image %q: %v", rbdVol, notFoundErr)
@ -980,7 +980,7 @@ func (cs *ControllerServer) DeleteVolume(
return nil, status.Error(codes.InvalidArgument, pErr.Error()) return nil, status.Error(codes.InvalidArgument, pErr.Error())
} }
pErr = deleteMigratedVolume(ctx, pmVolID, cr) pErr = deleteMigratedVolume(ctx, pmVolID, cr)
if pErr != nil && !errors.Is(pErr, util.ErrImageNotFound) { if pErr != nil && !errors.Is(pErr, rbd_errors.ErrImageNotFound) {
return nil, status.Error(codes.Internal, pErr.Error()) return nil, status.Error(codes.Internal, pErr.Error())
} }
@ -1152,7 +1152,7 @@ func (cs *ControllerServer) CreateSnapshot(
}() }()
if err != nil { if err != nil {
switch { switch {
case errors.Is(err, util.ErrImageNotFound): case errors.Is(err, rbd_errors.ErrImageNotFound):
err = status.Errorf(codes.NotFound, "source Volume ID %s not found", req.GetSourceVolumeId()) err = status.Errorf(codes.NotFound, "source Volume ID %s not found", req.GetSourceVolumeId())
case errors.Is(err, util.ErrPoolNotFound): case errors.Is(err, util.ErrPoolNotFound):
log.ErrorLog(ctx, "failed to get backend volume for %s: %v", req.GetSourceVolumeId(), err) log.ErrorLog(ctx, "failed to get backend volume for %s: %v", req.GetSourceVolumeId(), err)
@ -1497,7 +1497,7 @@ func (cs *ControllerServer) DeleteSnapshot(
// if the error is ErrImageNotFound, We need to cleanup the image from // if the error is ErrImageNotFound, We need to cleanup the image from
// trash and remove the metadata in OMAP. // trash and remove the metadata in OMAP.
if errors.Is(err, util.ErrImageNotFound) { if errors.Is(err, rbd_errors.ErrImageNotFound) {
log.UsefulLog(ctx, "cleaning up leftovers of snapshot %s: %v", snapshotID, err) log.UsefulLog(ctx, "cleaning up leftovers of snapshot %s: %v", snapshotID, err)
err = cleanUpImageAndSnapReservation(ctx, rbdSnap, cr) err = cleanUpImageAndSnapReservation(ctx, rbdSnap, cr)
@ -1600,7 +1600,7 @@ func (cs *ControllerServer) ControllerExpandVolume(
rbdVol, err := genVolFromVolIDWithMigration(ctx, volID, cr, req.GetSecrets()) rbdVol, err := genVolFromVolIDWithMigration(ctx, volID, cr, req.GetSecrets())
if err != nil { if err != nil {
switch { switch {
case errors.Is(err, util.ErrImageNotFound): case errors.Is(err, rbd_errors.ErrImageNotFound):
err = status.Errorf(codes.NotFound, "volume ID %s not found", volID) err = status.Errorf(codes.NotFound, "volume ID %s not found", volID)
case errors.Is(err, util.ErrPoolNotFound): case errors.Is(err, util.ErrPoolNotFound):
log.ErrorLog(ctx, "failed to get backend volume for %s: %v", volID, err) log.ErrorLog(ctx, "failed to get backend volume for %s: %v", volID, err)

View File

@ -16,9 +16,17 @@ limitations under the License.
package rbd_errors package rbd_errors
import "errors" import (
"errors"
"fmt"
librados "github.com/ceph/go-ceph/rados"
librbd "github.com/ceph/go-ceph/rbd"
)
var ( var (
// ErrImageNotFound is returned when image name is not found in the cluster on the given pool and/or namespace.
ErrImageNotFound = errors.New("image not found")
// ErrSnapNotFound is returned when snap name passed is not found in the list of snapshots for the // ErrSnapNotFound is returned when snap name passed is not found in the list of snapshots for the
// given image. // given image.
ErrSnapNotFound = errors.New("snapshot not found") ErrSnapNotFound = errors.New("snapshot not found")
@ -55,4 +63,8 @@ var (
ErrInvalidArgument = errors.New("invalid arguments provided") ErrInvalidArgument = errors.New("invalid arguments provided")
// ErrImageInUse is returned when the image is in use. // ErrImageInUse is returned when the image is in use.
ErrImageInUse = errors.New("image is in use") ErrImageInUse = errors.New("image is in use")
// ErrRBDGroupNotConnected is returned when the RBD group is not connected.
ErrRBDGroupNotConnected = fmt.Errorf("%w: RBD group is not connected", librados.ErrNotConnected)
// ErrRBDGroupNotFound is returned when group name is not found in the cluster on the given pool and/or namespace.
ErrRBDGroupNotFound = fmt.Errorf("%w: RBD group not found", librbd.ErrNotFound)
) )

View File

@ -24,6 +24,7 @@ import (
"github.com/container-storage-interface/spec/lib/go/csi" "github.com/container-storage-interface/spec/lib/go/csi"
"google.golang.org/protobuf/types/known/timestamppb" "google.golang.org/protobuf/types/known/timestamppb"
rbd_errors "github.com/ceph/ceph-csi/internal/rbd/errors"
"github.com/ceph/ceph-csi/internal/rbd/types" "github.com/ceph/ceph-csi/internal/rbd/types"
"github.com/ceph/ceph-csi/internal/util" "github.com/ceph/ceph-csi/internal/util"
"github.com/ceph/ceph-csi/internal/util/log" "github.com/ceph/ceph-csi/internal/util/log"
@ -70,7 +71,7 @@ func GetVolumeGroupSnapshot(
attrs, err := vgs.getVolumeGroupAttributes(ctx) attrs, err := vgs.getVolumeGroupAttributes(ctx)
if err != nil { if err != nil {
if errors.Is(err, ErrRBDGroupNotFound) { if errors.Is(err, rbd_errors.ErrRBDGroupNotFound) {
log.ErrorLog(ctx, "%v, returning empty volume group snapshot %q", vgs, err) log.ErrorLog(ctx, "%v, returning empty volume group snapshot %q", vgs, err)
return vgs, err return vgs, err

View File

@ -26,6 +26,7 @@ import (
"github.com/ceph/go-ceph/rados" "github.com/ceph/go-ceph/rados"
"github.com/ceph/ceph-csi/internal/journal" "github.com/ceph/ceph-csi/internal/journal"
rbd_errors "github.com/ceph/ceph-csi/internal/rbd/errors"
"github.com/ceph/ceph-csi/internal/util" "github.com/ceph/ceph-csi/internal/util"
"github.com/ceph/ceph-csi/internal/util/log" "github.com/ceph/ceph-csi/internal/util/log"
) )
@ -126,7 +127,7 @@ func (cvg *commonVolumeGroup) generateVolumeGroupFromMapping(
} }
mcsiID.LocationID = mPID mcsiID.LocationID = mPID
err = cvg.generateVolumeGroup(mcsiID) err = cvg.generateVolumeGroup(mcsiID)
if util.ShouldRetryVolumeGeneration(err) { if ShouldRetryVolumeGroupGeneration(err) {
continue continue
} }
@ -161,13 +162,13 @@ func (cvg *commonVolumeGroup) initCommonVolumeGroup(
err = cvg.generateVolumeGroup(csiID) err = cvg.generateVolumeGroup(csiID)
// If the error is not a retryable error, return from here. // If the error is not a retryable error, return from here.
if err != nil && !util.ShouldRetryVolumeGeneration(err) { if err != nil && !ShouldRetryVolumeGroupGeneration(err) {
return err return err
} }
// If the error is a retryable error, we should try to get the cluster mapping // If the error is a retryable error, we should try to get the cluster mapping
// and generate the volume group from the mapping. // and generate the volume group from the mapping.
if util.ShouldRetryVolumeGeneration(err) { if ShouldRetryVolumeGroupGeneration(err) {
mapping, err := util.GetClusterMappingInfo(csiID.ClusterID) mapping, err := util.GetClusterMappingInfo(csiID.ClusterID)
if err != nil { if err != nil {
return err return err
@ -229,7 +230,7 @@ func (cvg *commonVolumeGroup) getVolumeGroupAttributes(ctx context.Context) (*jo
if attrs.GroupName == "" { if attrs.GroupName == "" {
log.ErrorLog(ctx, "volume group with id %v not found", cvg.id) log.ErrorLog(ctx, "volume group with id %v not found", cvg.id)
return nil, ErrRBDGroupNotFound return nil, rbd_errors.ErrRBDGroupNotFound
} }
cvg.requestName = attrs.RequestName cvg.requestName = attrs.RequestName
@ -356,12 +357,12 @@ func (cvg *commonVolumeGroup) GetIOContext(ctx context.Context) (*rados.IOContex
conn, err := cvg.getConnection(ctx) conn, err := cvg.getConnection(ctx)
if err != nil { if err != nil {
return nil, fmt.Errorf("%w: failed to connect: %w", ErrRBDGroupNotConnected, err) return nil, fmt.Errorf("%w: failed to connect: %w", rbd_errors.ErrRBDGroupNotConnected, err)
} }
ioctx, err := conn.GetIoctx(cvg.pool) ioctx, err := conn.GetIoctx(cvg.pool)
if err != nil { if err != nil {
return nil, fmt.Errorf("%w: failed to get IOContext: %w", ErrRBDGroupNotConnected, err) return nil, fmt.Errorf("%w: failed to get IOContext: %w", rbd_errors.ErrRBDGroupNotConnected, err)
} }
if cvg.namespace != "" { if cvg.namespace != "" {
@ -417,3 +418,28 @@ func (cvg *commonVolumeGroup) GetCreationTime(ctx context.Context) (*time.Time,
return cvg.creationTime, nil return cvg.creationTime, nil
} }
// ShouldRetryVolumeGroupGeneration determines whether the process of finding or generating
// volumegroups should continue based on the type of error encountered.
//
// It checks if the given error matches any of the following known errors:
// - ErrPoolNotFound: The rbd pool where the volumegroup/omap is expected doesn't exist.
// - ErrRBDGroupNotFound: The volumegroup doesn't exist in the rbd pool.
// - rados.ErrPermissionDenied: Permissions to access the pool is denied.
//
// If any of these errors are encountered, the function returns `true`, indicating
// that the volumegroup search should continue because of known error. Otherwise, it
// returns `false`, meaning the search should stop.
//
// This helper function is used in scenarios where multiple attempts may be made
// to retrieve or generate volumegroup information, and we want to gracefully handle
// specific failure cases while retrying for others.
func ShouldRetryVolumeGroupGeneration(err error) bool {
if err == nil {
return false // No error, do not retry
}
// Continue searching for specific known errors
return (errors.Is(err, util.ErrPoolNotFound) ||
errors.Is(err, rbd_errors.ErrRBDGroupNotFound) ||
errors.Is(err, rados.ErrPermissionDenied))
}

View File

@ -14,16 +14,19 @@ See the License for the specific language governing permissions and
limitations under the License. limitations under the License.
*/ */
package util package group
import ( import (
"errors" "errors"
"testing" "testing"
rbd_errors "github.com/ceph/ceph-csi/internal/rbd/errors"
"github.com/ceph/ceph-csi/internal/util"
"github.com/ceph/go-ceph/rados" "github.com/ceph/go-ceph/rados"
) )
func Test_shouldRetryVolumeGeneration(t *testing.T) { func Test_shouldRetryVolumeGroupGeneration(t *testing.T) {
t.Parallel() t.Parallel()
type args struct { type args struct {
err error err error
@ -38,19 +41,14 @@ func Test_shouldRetryVolumeGeneration(t *testing.T) {
args: args{err: nil}, args: args{err: nil},
want: false, // No error, stop searching want: false, // No error, stop searching
}, },
{
name: "ErrKeyNotFound (continue searching)",
args: args{err: ErrKeyNotFound},
want: true, // Known error, continue searching
},
{ {
name: "ErrPoolNotFound (continue searching)", name: "ErrPoolNotFound (continue searching)",
args: args{err: ErrPoolNotFound}, args: args{err: util.ErrPoolNotFound},
want: true, // Known error, continue searching want: true, // Known error, continue searching
}, },
{ {
name: "ErrImageNotFound (continue searching)", name: "ErrRBDGroupNotFound (continue searching)",
args: args{err: ErrImageNotFound}, args: args{err: rbd_errors.ErrRBDGroupNotFound},
want: true, // Known error, continue searching want: true, // Known error, continue searching
}, },
{ {
@ -67,7 +65,7 @@ func Test_shouldRetryVolumeGeneration(t *testing.T) {
for _, tt := range tests { for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) { t.Run(tt.name, func(t *testing.T) {
t.Parallel() t.Parallel()
if got := ShouldRetryVolumeGeneration(tt.args.err); got != tt.want { if got := ShouldRetryVolumeGroupGeneration(tt.args.err); got != tt.want {
t.Errorf("ShouldRetryVolumeGeneration() = %v, want %v", got, tt.want) t.Errorf("ShouldRetryVolumeGeneration() = %v, want %v", got, tt.want)
} }
}) })

View File

@ -22,21 +22,16 @@ import (
"fmt" "fmt"
"github.com/ceph/go-ceph/rados" "github.com/ceph/go-ceph/rados"
librados "github.com/ceph/go-ceph/rados"
librbd "github.com/ceph/go-ceph/rbd" librbd "github.com/ceph/go-ceph/rbd"
"github.com/container-storage-interface/spec/lib/go/csi" "github.com/container-storage-interface/spec/lib/go/csi"
"github.com/csi-addons/spec/lib/go/volumegroup" "github.com/csi-addons/spec/lib/go/volumegroup"
rbd_errors "github.com/ceph/ceph-csi/internal/rbd/errors"
"github.com/ceph/ceph-csi/internal/rbd/types" "github.com/ceph/ceph-csi/internal/rbd/types"
"github.com/ceph/ceph-csi/internal/util" "github.com/ceph/ceph-csi/internal/util"
"github.com/ceph/ceph-csi/internal/util/log" "github.com/ceph/ceph-csi/internal/util/log"
) )
var (
ErrRBDGroupNotConnected = fmt.Errorf("%w: RBD group is not connected", librados.ErrNotConnected)
ErrRBDGroupNotFound = fmt.Errorf("%w: RBD group not found", librbd.ErrNotFound)
)
// volumeGroup handles all requests for 'rbd group' operations. // volumeGroup handles all requests for 'rbd group' operations.
type volumeGroup struct { type volumeGroup struct {
commonVolumeGroup commonVolumeGroup
@ -77,7 +72,7 @@ func GetVolumeGroup(
attrs, err := vg.getVolumeGroupAttributes(ctx) attrs, err := vg.getVolumeGroupAttributes(ctx)
if err != nil { if err != nil {
if errors.Is(err, ErrRBDGroupNotFound) { if errors.Is(err, rbd_errors.ErrRBDGroupNotFound) {
log.ErrorLog(ctx, "%v, returning empty volume group %q", vg, err) log.ErrorLog(ctx, "%v, returning empty volume group %q", vg, err)
return vg, err return vg, err

View File

@ -24,7 +24,7 @@ import (
"google.golang.org/grpc/codes" "google.golang.org/grpc/codes"
"google.golang.org/grpc/status" "google.golang.org/grpc/status"
"github.com/ceph/ceph-csi/internal/rbd/group" rbd_errors "github.com/ceph/ceph-csi/internal/rbd/errors"
"github.com/ceph/ceph-csi/internal/rbd/types" "github.com/ceph/ceph-csi/internal/rbd/types"
"github.com/ceph/ceph-csi/internal/util" "github.com/ceph/ceph-csi/internal/util"
"github.com/ceph/ceph-csi/internal/util/log" "github.com/ceph/ceph-csi/internal/util/log"
@ -215,7 +215,7 @@ func (cs *ControllerServer) DeleteVolumeGroupSnapshot(
groupSnapshot, err := mgr.GetVolumeGroupSnapshotByID(ctx, groupSnapshotID) groupSnapshot, err := mgr.GetVolumeGroupSnapshotByID(ctx, groupSnapshotID)
if err != nil { if err != nil {
if errors.Is(err, group.ErrRBDGroupNotFound) { if errors.Is(err, rbd_errors.ErrRBDGroupNotFound) {
log.ErrorLog(ctx, "VolumeGroupSnapshot %q doesn't exists", groupSnapshotID) log.ErrorLog(ctx, "VolumeGroupSnapshot %q doesn't exists", groupSnapshotID)
return &csi.DeleteVolumeGroupSnapshotResponse{}, nil return &csi.DeleteVolumeGroupSnapshotResponse{}, nil
@ -261,7 +261,7 @@ func (cs *ControllerServer) GetVolumeGroupSnapshot(
groupSnapshot, err := mgr.GetVolumeGroupSnapshotByID(ctx, groupSnapshotID) groupSnapshot, err := mgr.GetVolumeGroupSnapshotByID(ctx, groupSnapshotID)
if err != nil { if err != nil {
if errors.Is(err, group.ErrRBDGroupNotFound) { if errors.Is(err, rbd_errors.ErrRBDGroupNotFound) {
log.ErrorLog(ctx, "VolumeGroupSnapshot %q doesn't exists", groupSnapshotID) log.ErrorLog(ctx, "VolumeGroupSnapshot %q doesn't exists", groupSnapshotID)
return nil, status.Errorf( return nil, status.Errorf(

View File

@ -175,7 +175,7 @@ func (mgr *rbdManager) GetVolumeByID(ctx context.Context, id string) (types.Volu
volume, err := GenVolFromVolID(ctx, id, creds, mgr.secrets) volume, err := GenVolFromVolID(ctx, id, creds, mgr.secrets)
if err != nil { if err != nil {
switch { switch {
case errors.Is(err, util.ErrImageNotFound): case errors.Is(err, rbd_errors.ErrImageNotFound):
err = fmt.Errorf("volume %s not found: %w", id, err) err = fmt.Errorf("volume %s not found: %w", id, err)
return nil, err return nil, err
@ -200,7 +200,7 @@ func (mgr *rbdManager) GetSnapshotByID(ctx context.Context, id string) (types.Sn
snapshot, err := genSnapFromSnapID(ctx, id, creds, mgr.secrets) snapshot, err := genSnapFromSnapID(ctx, id, creds, mgr.secrets)
if err != nil { if err != nil {
switch { switch {
case errors.Is(err, util.ErrImageNotFound): case errors.Is(err, rbd_errors.ErrImageNotFound):
err = fmt.Errorf("volume %s not found: %w", id, err) err = fmt.Errorf("volume %s not found: %w", id, err)
return nil, err return nil, err
@ -468,7 +468,7 @@ func (mgr *rbdManager) CreateVolumeGroupSnapshot(
return vgs, nil return vgs, nil
} }
} else if err != nil && !errors.Is(err, util.ErrImageNotFound) { } else if err != nil && !errors.Is(err, rbd_errors.ErrImageNotFound) {
// ErrImageNotFound can be returned if the VolumeGroupSnapshot // ErrImageNotFound can be returned if the VolumeGroupSnapshot
// could not be found. It is expected that it does not exist // could not be found. It is expected that it does not exist
// yet, in which case it will be created below. // yet, in which case it will be created below.

View File

@ -173,7 +173,7 @@ func checkSnapCloneExists(
// Fetch on-disk image attributes // Fetch on-disk image attributes
err = vol.getImageInfo() err = vol.getImageInfo()
if err != nil { if err != nil {
if errors.Is(err, util.ErrImageNotFound) { if errors.Is(err, rbd_errors.ErrImageNotFound) {
err = parentVol.deleteSnapshot(ctx, rbdSnap) err = parentVol.deleteSnapshot(ctx, rbdSnap)
if err != nil { if err != nil {
if !errors.Is(err, rbd_errors.ErrSnapNotFound) { if !errors.Is(err, rbd_errors.ErrSnapNotFound) {
@ -299,7 +299,7 @@ func (rv *rbdVolume) Exists(ctx context.Context, parentVol *rbdVolume) (bool, er
// Fetch on-disk image attributes and compare against request // Fetch on-disk image attributes and compare against request
err = rv.getImageInfo() err = rv.getImageInfo()
switch { switch {
case errors.Is(err, util.ErrImageNotFound) && parentVol != nil: case errors.Is(err, rbd_errors.ErrImageNotFound) && parentVol != nil:
// Need to check cloned info here not on createvolume // Need to check cloned info here not on createvolume
found, cErr := rv.checkCloneImage(ctx, parentVol) found, cErr := rv.checkCloneImage(ctx, parentVol)
if cErr != nil { if cErr != nil {
@ -313,7 +313,7 @@ func (rv *rbdVolume) Exists(ctx context.Context, parentVol *rbdVolume) (bool, er
return false, err return false, err
} }
case errors.Is(err, util.ErrImageNotFound) && parentVol == nil: case errors.Is(err, rbd_errors.ErrImageNotFound) && parentVol == nil:
// image not found, undo the reservation // image not found, undo the reservation
err = j.UndoReservation(ctx, rv.JournalPool, rv.Pool, rv.RbdImageName, rv.RequestName) err = j.UndoReservation(ctx, rv.JournalPool, rv.Pool, rv.RbdImageName, rv.RequestName)

View File

@ -531,7 +531,7 @@ func (ri *rbdImage) open() (*librbd.Image, error) {
image, err := librbd.OpenImage(ri.ioctx, ri.RbdImageName, librbd.NoSnapshot) image, err := librbd.OpenImage(ri.ioctx, ri.RbdImageName, librbd.NoSnapshot)
if err != nil { if err != nil {
if errors.Is(err, librbd.ErrNotFound) { if errors.Is(err, librbd.ErrNotFound) {
err = fmt.Errorf("Failed as %w (internal %w)", util.ErrImageNotFound, err) err = fmt.Errorf("Failed as %w (internal %w)", rbd_errors.ErrImageNotFound, err)
} }
return nil, err return nil, err
@ -549,7 +549,7 @@ func (ri *rbdImage) open() (*librbd.Image, error) {
func (ri *rbdImage) isInUse() (bool, error) { func (ri *rbdImage) isInUse() (bool, error) {
image, err := ri.open() image, err := ri.open()
if err != nil { if err != nil {
if errors.Is(err, util.ErrImageNotFound) || errors.Is(err, util.ErrPoolNotFound) { if errors.Is(err, rbd_errors.ErrImageNotFound) || errors.Is(err, util.ErrPoolNotFound) {
return false, err return false, err
} }
// any error should assume something else is using the image // any error should assume something else is using the image
@ -692,7 +692,7 @@ func (ri *rbdImage) Delete(ctx context.Context) error {
err = rbdImage.Trash(0) err = rbdImage.Trash(0)
if err != nil { if err != nil {
if errors.Is(err, librbd.ErrNotFound) { if errors.Is(err, librbd.ErrNotFound) {
return fmt.Errorf("Failed as %w (internal %w)", util.ErrImageNotFound, err) return fmt.Errorf("Failed as %w (internal %w)", rbd_errors.ErrImageNotFound, err)
} }
log.ErrorLog(ctx, "failed to delete rbd image: %s, error: %v", ri, err) log.ErrorLog(ctx, "failed to delete rbd image: %s, error: %v", ri, err)
@ -746,7 +746,7 @@ func (rv *rbdVolume) DeleteTempImage(ctx context.Context) error {
tempClone := rv.generateTempClone() tempClone := rv.generateTempClone()
err := tempClone.Delete(ctx) err := tempClone.Delete(ctx)
if err != nil { if err != nil {
if errors.Is(err, util.ErrImageNotFound) { if errors.Is(err, rbd_errors.ErrImageNotFound) {
return tempClone.ensureImageCleanup(ctx) return tempClone.ensureImageCleanup(ctx)
} else { } else {
// return error if it is not ErrImageNotFound // return error if it is not ErrImageNotFound
@ -785,7 +785,7 @@ func (ri *rbdImage) getCloneDepth(ctx context.Context) (uint, error) {
// if the parent image is moved to trash the name will be present // if the parent image is moved to trash the name will be present
// in rbd image info but the image will be in trash, in that case // in rbd image info but the image will be in trash, in that case
// return the found depth // return the found depth
if errors.Is(err, util.ErrImageNotFound) { if errors.Is(err, rbd_errors.ErrImageNotFound) {
return depth, nil return depth, nil
} }
log.ErrorLog(ctx, "failed to check depth on image %s: %s", &vol, err) log.ErrorLog(ctx, "failed to check depth on image %s: %s", &vol, err)
@ -975,7 +975,7 @@ func (ri *rbdImage) checkImageChainHasFeature(ctx context.Context, feature uint6
// is in the trash, when we try to open the parent image to get its // is in the trash, when we try to open the parent image to get its
// information it fails because it is already in trash. We should // information it fails because it is already in trash. We should
// treat error as nil if the parent is not found. // treat error as nil if the parent is not found.
if errors.Is(err, util.ErrImageNotFound) { if errors.Is(err, rbd_errors.ErrImageNotFound) {
return false, nil return false, nil
} }
log.ErrorLog(ctx, "failed to get image info for %s: %s", rbdImg.String(), err) log.ErrorLog(ctx, "failed to get image info for %s: %s", rbdImg.String(), err)
@ -1213,6 +1213,33 @@ func generateVolumeFromVolumeID(
return rbdVol, err return rbdVol, err
} }
// ShouldRetryVolumeGeneration determines whether the process of finding or generating
// volumes should continue based on the type of error encountered.
//
// It checks if the given error matches any of the following known errors:
// - util.ErrKeyNotFound: The key required to locate the volume is missing in Rados omap.
// - util.ErrPoolNotFound: The rbd pool where the volume/omap is expected doesn't exist.
// - ErrImageNotFound: The image doesn't exist in the rbd pool.
// - rados.ErrPermissionDenied: Permissions to access the pool is denied.
//
// If any of these errors are encountered, the function returns `true`, indicating
// that the volume search should continue because of known error. Otherwise, it
// returns `false`, meaning the search should stop.
//
// This helper function is used in scenarios where multiple attempts may be made
// to retrieve or generate volume information, and we want to gracefully handle
// specific failure cases while retrying for others.
func ShouldRetryVolumeGeneration(err error) bool {
if err == nil {
return false // No error, do not retry
}
// Continue searching for specific known errors
return (errors.Is(err, util.ErrKeyNotFound) ||
errors.Is(err, util.ErrPoolNotFound) ||
errors.Is(err, rbd_errors.ErrImageNotFound) ||
errors.Is(err, rados.ErrPermissionDenied))
}
// GenVolFromVolID generates a rbdVolume structure from the provided identifier, updating // GenVolFromVolID generates a rbdVolume structure from the provided identifier, updating
// the structure with elements from on-disk image metadata as well. // the structure with elements from on-disk image metadata as well.
func GenVolFromVolID( func GenVolFromVolID(
@ -1233,7 +1260,7 @@ func GenVolFromVolID(
} }
vol, err = generateVolumeFromVolumeID(ctx, volumeID, vi, cr, secrets) vol, err = generateVolumeFromVolumeID(ctx, volumeID, vi, cr, secrets)
if !util.ShouldRetryVolumeGeneration(err) { if !ShouldRetryVolumeGeneration(err) {
return vol, err return vol, err
} }
@ -1244,7 +1271,7 @@ func GenVolFromVolID(
} }
if mapping != nil { if mapping != nil {
rbdVol, vErr := generateVolumeFromMapping(ctx, mapping, volumeID, vi, cr, secrets) rbdVol, vErr := generateVolumeFromMapping(ctx, mapping, volumeID, vi, cr, secrets)
if !util.ShouldRetryVolumeGeneration(vErr) { if !ShouldRetryVolumeGeneration(vErr) {
return rbdVol, vErr return rbdVol, vErr
} }
} }
@ -1297,7 +1324,7 @@ func generateVolumeFromMapping(
// Add mapping poolID to Identifier // Add mapping poolID to Identifier
nvi.LocationID = pID nvi.LocationID = pID
vol, err = generateVolumeFromVolumeID(ctx, volumeID, nvi, cr, secrets) vol, err = generateVolumeFromVolumeID(ctx, volumeID, nvi, cr, secrets)
if !util.ShouldRetryVolumeGeneration(err) { if !ShouldRetryVolumeGeneration(err) {
return vol, err return vol, err
} }
} }

View File

@ -23,6 +23,10 @@ import (
"strings" "strings"
"testing" "testing"
rbd_errors "github.com/ceph/ceph-csi/internal/rbd/errors"
"github.com/ceph/ceph-csi/internal/util"
"github.com/ceph/go-ceph/rados"
librbd "github.com/ceph/go-ceph/rbd" librbd "github.com/ceph/go-ceph/rbd"
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
) )
@ -387,3 +391,54 @@ func Test_checkValidImageFeatures(t *testing.T) {
}) })
} }
} }
func Test_shouldRetryVolumeGeneration(t *testing.T) {
t.Parallel()
type args struct {
err error
}
tests := []struct {
name string
args args
want bool
}{
{
name: "No error (stop searching)",
args: args{err: nil},
want: false, // No error, stop searching
},
{
name: "ErrKeyNotFound (continue searching)",
args: args{err: util.ErrKeyNotFound},
want: true, // Known error, continue searching
},
{
name: "ErrPoolNotFound (continue searching)",
args: args{err: util.ErrPoolNotFound},
want: true, // Known error, continue searching
},
{
name: "ErrImageNotFound (continue searching)",
args: args{err: rbd_errors.ErrImageNotFound},
want: true, // Known error, continue searching
},
{
name: "ErrPermissionDenied (continue searching)",
args: args{err: rados.ErrPermissionDenied},
want: true, // Known error, continue searching
},
{
name: "Different error (stop searching)",
args: args{err: errors.New("unknown error")},
want: false, // Unknown error, stop searching
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
t.Parallel()
if got := ShouldRetryVolumeGeneration(tt.args.err); got != tt.want {
t.Errorf("ShouldRetryVolumeGeneration() = %v, want %v", got, tt.want)
}
})
}
}

View File

@ -83,7 +83,7 @@ func cleanUpSnapshot(
) error { ) error {
err := parentVol.deleteSnapshot(ctx, rbdSnap) err := parentVol.deleteSnapshot(ctx, rbdSnap)
if err != nil { if err != nil {
if !errors.Is(err, util.ErrImageNotFound) && !errors.Is(err, rbd_errors.ErrSnapNotFound) { if !errors.Is(err, rbd_errors.ErrImageNotFound) && !errors.Is(err, rbd_errors.ErrSnapNotFound) {
log.ErrorLog(ctx, "failed to delete snapshot %q: %v", rbdSnap, err) log.ErrorLog(ctx, "failed to delete snapshot %q: %v", rbdSnap, err)
return err return err
@ -93,7 +93,7 @@ func cleanUpSnapshot(
if rbdVol != nil { if rbdVol != nil {
err := rbdVol.Delete(ctx) err := rbdVol.Delete(ctx)
if err != nil { if err != nil {
if !errors.Is(err, util.ErrImageNotFound) { if !errors.Is(err, rbd_errors.ErrImageNotFound) {
log.ErrorLog(ctx, "failed to delete rbd image %q with error: %v", rbdVol, err) log.ErrorLog(ctx, "failed to delete rbd image %q with error: %v", rbdVol, err)
return err return err

View File

@ -16,15 +16,9 @@ limitations under the License.
package util package util
import ( import "errors"
"errors"
"github.com/ceph/go-ceph/rados"
)
var ( var (
// ErrImageNotFound is returned when image name is not found in the cluster on the given pool and/or namespace.
ErrImageNotFound = errors.New("image not found")
// ErrKeyNotFound is returned when requested key in omap is not found. // ErrKeyNotFound is returned when requested key in omap is not found.
ErrKeyNotFound = errors.New("key not found") ErrKeyNotFound = errors.New("key not found")
// ErrObjectExists is returned when named omap is already present in rados. // ErrObjectExists is returned when named omap is already present in rados.
@ -41,30 +35,3 @@ var (
// ErrMissingConfigForMonitor is returned when clusterID is not found for the mon. // ErrMissingConfigForMonitor is returned when clusterID is not found for the mon.
ErrMissingConfigForMonitor = errors.New("missing configuration of cluster ID for monitor") ErrMissingConfigForMonitor = errors.New("missing configuration of cluster ID for monitor")
) )
// ShouldRetryVolumeGeneration determines whether the process of finding or generating
// volumes should continue based on the type of error encountered.
//
// It checks if the given error matches any of the following known errors:
// - util.ErrKeyNotFound: The key required to locate the volume is missing in Rados omap.
// - util.ErrPoolNotFound: The rbd pool where the volume/omap is expected doesn't exist.
// - ErrImageNotFound: The image doesn't exist in the rbd pool.
// - rados.ErrPermissionDenied: Permissions to access the pool is denied.
//
// If any of these errors are encountered, the function returns `true`, indicating
// that the volume search should continue because of known error. Otherwise, it
// returns `false`, meaning the search should stop.
//
// This helper function is used in scenarios where multiple attempts may be made
// to retrieve or generate volume information, and we want to gracefully handle
// specific failure cases while retrying for others.
func ShouldRetryVolumeGeneration(err error) bool {
if err == nil {
return false // No error, do not retry
}
// Continue searching for specific known errors
return (errors.Is(err, ErrKeyNotFound) ||
errors.Is(err, ErrPoolNotFound) ||
errors.Is(err, ErrImageNotFound) ||
errors.Is(err, rados.ErrPermissionDenied))
}