cleanup: move internal/rbd/errors.go to internal/rbd/errors pacakge

Signed-off-by: Praveen M <m.praveen@ibm.com>
(cherry picked from commit 5cbc14454a3df7bb6b3b3a419520e25230d9ab6e)

# Conflicts:
#	internal/csi-addons/rbd/encryptionkeyrotation.go
#	internal/csi-addons/rbd/reclaimspace.go
#	internal/csi-addons/rbd/replication.go
#	internal/csi-addons/rbd/replication_test.go
#	internal/rbd/clone.go
#	internal/rbd/controllerserver.go
#	internal/rbd/group/util.go
#	internal/rbd/group/util_test.go
#	internal/rbd/manager.go
#	internal/rbd/rbd_journal.go
#	internal/rbd/rbd_util.go
#	internal/rbd/rbd_util_test.go
#	internal/rbd/snapshot.go
#	internal/util/errors.go
This commit is contained in:
Praveen M 2025-03-03 11:45:43 +05:30 committed by Mergify
parent 7cff156d9a
commit 4dcb9d3a45
24 changed files with 632 additions and 63 deletions

View File

@ -21,6 +21,7 @@ import (
"errors"
"github.com/ceph/ceph-csi/internal/rbd"
rbderrors "github.com/ceph/ceph-csi/internal/rbd/errors"
"github.com/ceph/ceph-csi/internal/util"
"github.com/ceph/ceph-csi/internal/util/log"
@ -69,7 +70,11 @@ func (ekrs *EncryptionKeyRotationServer) EncryptionKeyRotate(
rbdVol, err := rbd.GenVolFromVolID(ctx, volID, creds, req.GetSecrets())
if err != nil {
switch {
<<<<<<< HEAD
case errors.Is(err, rbd.ErrImageNotFound):
=======
case errors.Is(err, rbderrors.ErrImageNotFound):
>>>>>>> 5cbc1445 (cleanup: move internal/rbd/errors.go to internal/rbd/errors pacakge)
err = status.Errorf(codes.NotFound, "volume ID %s not found", volID)
case errors.Is(err, util.ErrPoolNotFound):
log.ErrorLog(ctx, "failed to get backend volume for %s: %v", volID, err)

View File

@ -23,6 +23,7 @@ import (
csicommon "github.com/ceph/ceph-csi/internal/csi-common"
rbdutil "github.com/ceph/ceph-csi/internal/rbd"
rbderrors "github.com/ceph/ceph-csi/internal/rbd/errors"
"github.com/ceph/ceph-csi/internal/util"
"github.com/ceph/ceph-csi/internal/util/log"
@ -78,8 +79,13 @@ func (rscs *ReclaimSpaceControllerServer) ControllerReclaimSpace(
}
defer rbdVol.Destroy(ctx)
<<<<<<< HEAD
err = rbdVol.Sparsify()
if errors.Is(err, rbdutil.ErrImageInUse) {
=======
err = rbdVol.Sparsify(ctx)
if errors.Is(err, rbderrors.ErrImageInUse) {
>>>>>>> 5cbc1445 (cleanup: move internal/rbd/errors.go to internal/rbd/errors pacakge)
// FIXME: https://github.com/csi-addons/kubernetes-csi-addons/issues/406.
// treat sparsify call as no-op if volume is in use.
log.DebugLog(ctx, fmt.Sprintf("volume with ID %q is in use, skipping sparsify operation", volumeID))

View File

@ -29,6 +29,7 @@ import (
csicommon "github.com/ceph/ceph-csi/internal/csi-common"
"github.com/ceph/ceph-csi/internal/rbd"
corerbd "github.com/ceph/ceph-csi/internal/rbd"
rbderrors "github.com/ceph/ceph-csi/internal/rbd/errors"
"github.com/ceph/ceph-csi/internal/rbd/types"
"github.com/ceph/ceph-csi/internal/util"
"github.com/ceph/ceph-csi/internal/util/log"
@ -651,7 +652,11 @@ func (rs *ReplicationServer) ResyncVolume(ctx context.Context,
sts, err := mirror.GetGlobalMirroringStatus(ctx)
if err != nil {
// the image gets recreated after issuing resync
<<<<<<< HEAD
if errors.Is(err, corerbd.ErrImageNotFound) {
=======
if errors.Is(err, rbderrors.ErrImageNotFound) {
>>>>>>> 5cbc1445 (cleanup: move internal/rbd/errors.go to internal/rbd/errors pacakge)
// caller retries till RBD syncs an initial version of the image to
// report its status in the resync call. Ideally, this line will not
// be executed as the error would get returned due to getMirroringInfo
@ -785,6 +790,7 @@ func getGRPCError(err error) error {
}
errorStatusMap := map[error]codes.Code{
<<<<<<< HEAD
corerbd.ErrImageNotFound: codes.NotFound,
util.ErrPoolNotFound: codes.NotFound,
corerbd.ErrInvalidArgument: codes.InvalidArgument,
@ -792,6 +798,15 @@ func getGRPCError(err error) error {
corerbd.ErrAborted: codes.Aborted,
corerbd.ErrFailedPrecondition: codes.FailedPrecondition,
corerbd.ErrUnavailable: codes.Unavailable,
=======
rbderrors.ErrImageNotFound: codes.NotFound,
util.ErrPoolNotFound: codes.NotFound,
rbderrors.ErrInvalidArgument: codes.InvalidArgument,
rbderrors.ErrFlattenInProgress: codes.Aborted,
rbderrors.ErrAborted: codes.Aborted,
rbderrors.ErrFailedPrecondition: codes.FailedPrecondition,
rbderrors.ErrUnavailable: codes.Unavailable,
>>>>>>> 5cbc1445 (cleanup: move internal/rbd/errors.go to internal/rbd/errors pacakge)
}
for e, code := range errorStatusMap {
@ -831,8 +846,13 @@ func (rs *ReplicationServer) GetVolumeReplicationInfo(ctx context.Context,
rbdVol, err := mgr.GetVolumeByID(ctx, volumeID)
if err != nil {
switch {
<<<<<<< HEAD
case errors.Is(err, corerbd.ErrImageNotFound):
err = status.Errorf(codes.NotFound, err.Error())
=======
case errors.Is(err, rbderrors.ErrImageNotFound):
err = status.Error(codes.NotFound, err.Error())
>>>>>>> 5cbc1445 (cleanup: move internal/rbd/errors.go to internal/rbd/errors pacakge)
case errors.Is(err, util.ErrPoolNotFound):
err = status.Errorf(codes.NotFound, err.Error())
default:
@ -864,7 +884,13 @@ func (rs *ReplicationServer) GetVolumeReplicationInfo(ctx context.Context,
mirrorStatus, err := mirror.GetGlobalMirroringStatus(ctx)
if err != nil {
<<<<<<< HEAD
if errors.Is(err, corerbd.ErrImageNotFound) {
=======
log.ErrorLog(ctx, "failed to get status for mirror %q: %v", mirror, err)
if errors.Is(err, rbderrors.ErrImageNotFound) {
>>>>>>> 5cbc1445 (cleanup: move internal/rbd/errors.go to internal/rbd/errors pacakge)
return nil, status.Error(codes.Aborted, err.Error())
}
log.ErrorLog(ctx, err.Error())
@ -886,7 +912,13 @@ func (rs *ReplicationServer) GetVolumeReplicationInfo(ctx context.Context,
description := remoteStatus.GetDescription()
resp, err := getLastSyncInfo(ctx, description)
if err != nil {
<<<<<<< HEAD
if errors.Is(err, corerbd.ErrLastSyncTimeNotFound) {
=======
log.ErrorLog(ctx, "failed to parse last sync info from %q: %v", description, err)
if errors.Is(err, rbderrors.ErrLastSyncTimeNotFound) {
>>>>>>> 5cbc1445 (cleanup: move internal/rbd/errors.go to internal/rbd/errors pacakge)
return nil, status.Errorf(codes.NotFound, "failed to get last sync info: %v", err)
}
log.ErrorLog(ctx, err.Error())
@ -916,12 +948,12 @@ func getLastSyncInfo(ctx context.Context, description string) (*replication.GetV
var response replication.GetVolumeReplicationInfoResponse
if description == "" {
return nil, fmt.Errorf("empty description: %w", corerbd.ErrLastSyncTimeNotFound)
return nil, fmt.Errorf("empty description: %w", rbderrors.ErrLastSyncTimeNotFound)
}
log.DebugLog(ctx, "description: %s", description)
splittedString := strings.SplitN(description, ",", 2)
if len(splittedString) == 1 {
return nil, fmt.Errorf("no snapshot details: %w", corerbd.ErrLastSyncTimeNotFound)
return nil, fmt.Errorf("no snapshot details: %w", rbderrors.ErrLastSyncTimeNotFound)
}
type localStatus struct {
LocalSnapshotTime int64 `json:"local_snapshot_timestamp"`
@ -938,7 +970,7 @@ func getLastSyncInfo(ctx context.Context, description string) (*replication.GetV
// If the json unmarsal is successful but the local snapshot time is 0, we
// need to consider it as an error as the LastSyncTime is required.
if localSnapInfo.LocalSnapshotTime == 0 {
return nil, fmt.Errorf("empty local snapshot timestamp: %w", corerbd.ErrLastSyncTimeNotFound)
return nil, fmt.Errorf("empty local snapshot timestamp: %w", rbderrors.ErrLastSyncTimeNotFound)
}
if localSnapInfo.LastSnapshotDuration != nil {
// converts localSnapshotDuration of type int64 to string format with

View File

@ -26,6 +26,7 @@ import (
"time"
corerbd "github.com/ceph/ceph-csi/internal/rbd"
rbderrors "github.com/ceph/ceph-csi/internal/rbd/errors"
"github.com/ceph/ceph-csi/internal/rbd/types"
"github.com/ceph/ceph-csi/internal/util"
@ -450,7 +451,7 @@ func TestValidateLastSyncInfo(t *testing.T) {
LastSyncDuration: nil,
LastSyncBytes: 0,
},
expectedErr: corerbd.ErrLastSyncTimeNotFound.Error(),
expectedErr: rbderrors.ErrLastSyncTimeNotFound.Error(),
},
{
name: "description without last_snapshot_bytes",
@ -472,7 +473,7 @@ func TestValidateLastSyncInfo(t *testing.T) {
LastSyncTime: nil,
LastSyncBytes: 0,
},
expectedErr: corerbd.ErrLastSyncTimeNotFound.Error(),
expectedErr: rbderrors.ErrLastSyncTimeNotFound.Error(),
},
{
name: "description without last_snapshot_sync_seconds",
@ -516,7 +517,7 @@ func TestValidateLastSyncInfo(t *testing.T) {
LastSyncTime: nil,
LastSyncBytes: 0,
},
expectedErr: corerbd.ErrLastSyncTimeNotFound.Error(),
expectedErr: rbderrors.ErrLastSyncTimeNotFound.Error(),
},
}
for _, tt := range tests {
@ -567,23 +568,23 @@ func TestGetGRPCError(t *testing.T) {
}{
{
name: "InvalidArgument",
err: corerbd.ErrInvalidArgument,
expectedErr: status.Error(codes.InvalidArgument, corerbd.ErrInvalidArgument.Error()),
err: rbderrors.ErrInvalidArgument,
expectedErr: status.Error(codes.InvalidArgument, rbderrors.ErrInvalidArgument.Error()),
},
{
name: "Aborted",
err: corerbd.ErrAborted,
expectedErr: status.Error(codes.Aborted, corerbd.ErrAborted.Error()),
err: rbderrors.ErrAborted,
expectedErr: status.Error(codes.Aborted, rbderrors.ErrAborted.Error()),
},
{
name: "FailedPrecondition",
err: corerbd.ErrFailedPrecondition,
expectedErr: status.Error(codes.FailedPrecondition, corerbd.ErrFailedPrecondition.Error()),
err: rbderrors.ErrFailedPrecondition,
expectedErr: status.Error(codes.FailedPrecondition, rbderrors.ErrFailedPrecondition.Error()),
},
{
name: "Unavailable",
err: corerbd.ErrUnavailable,
expectedErr: status.Error(codes.Unavailable, corerbd.ErrUnavailable.Error()),
err: rbderrors.ErrUnavailable,
expectedErr: status.Error(codes.Unavailable, rbderrors.ErrUnavailable.Error()),
},
{
name: "InvalidError",
@ -597,8 +598,13 @@ func TestGetGRPCError(t *testing.T) {
},
{
name: "ErrImageNotFound",
<<<<<<< HEAD
err: corerbd.ErrImageNotFound,
expectedErr: status.Error(codes.NotFound, corerbd.ErrImageNotFound.Error()),
=======
err: rbderrors.ErrImageNotFound,
expectedErr: status.Error(codes.NotFound, rbderrors.ErrImageNotFound.Error()),
>>>>>>> 5cbc1445 (cleanup: move internal/rbd/errors.go to internal/rbd/errors pacakge)
},
{
name: "ErrPoolNotFound",

View File

@ -23,7 +23,7 @@ import (
"slices"
"github.com/ceph/ceph-csi/internal/rbd"
"github.com/ceph/ceph-csi/internal/rbd/group"
rbderrors "github.com/ceph/ceph-csi/internal/rbd/errors"
"github.com/ceph/ceph-csi/internal/rbd/types"
"github.com/ceph/ceph-csi/internal/util/log"
@ -194,7 +194,7 @@ func (vs *VolumeGroupServer) DeleteVolumeGroup(
// resolve the volume group
vg, err := mgr.GetVolumeGroupByID(ctx, req.GetVolumeGroupId())
if err != nil {
if errors.Is(err, group.ErrRBDGroupNotFound) {
if errors.Is(err, rbderrors.ErrGroupNotFound) {
log.ErrorLog(ctx, "VolumeGroup %q doesn't exists", req.GetVolumeGroupId())
return &volumegroup.DeleteVolumeGroupResponse{}, nil
@ -433,7 +433,7 @@ func (vs *VolumeGroupServer) ControllerGetVolumeGroup(
// resolve the volume group
vg, err := mgr.GetVolumeGroupByID(ctx, req.GetVolumeGroupId())
if err != nil {
if errors.Is(err, group.ErrRBDGroupNotFound) {
if errors.Is(err, rbderrors.ErrGroupNotFound) {
log.ErrorLog(ctx, "VolumeGroup %q doesn't exists", req.GetVolumeGroupId())
return nil, status.Errorf(

View File

@ -21,6 +21,10 @@ import (
"errors"
"fmt"
<<<<<<< HEAD
=======
rbderrors "github.com/ceph/ceph-csi/internal/rbd/errors"
>>>>>>> 5cbc1445 (cleanup: move internal/rbd/errors.go to internal/rbd/errors pacakge)
"github.com/ceph/ceph-csi/internal/util/k8s"
"github.com/ceph/ceph-csi/internal/util/log"
@ -56,17 +60,28 @@ func (rv *rbdVolume) checkCloneImage(ctx context.Context, parentVol *rbdVolume)
err := tempClone.checkSnapExists(snap)
if err != nil {
switch {
<<<<<<< HEAD
case errors.Is(err, ErrSnapNotFound):
// as the snapshot is not present, create new snapshot,clone and
// delete the temporary snapshot
err = createRBDClone(ctx, tempClone, rv, snap)
=======
case errors.Is(err, rbderrors.ErrSnapNotFound):
// as the snapshot is not present, create new snapshot, clone and
// don't delete the temporary snapshot
err = createRBDClone(ctx, tempClone, rv, snap, false)
>>>>>>> 5cbc1445 (cleanup: move internal/rbd/errors.go to internal/rbd/errors pacakge)
if err != nil {
return false, err
}
return true, nil
<<<<<<< HEAD
case errors.Is(err, ErrImageNotFound):
=======
case errors.Is(err, rbderrors.ErrImageNotFound):
>>>>>>> 5cbc1445 (cleanup: move internal/rbd/errors.go to internal/rbd/errors pacakge)
// as the temp clone does not exist,check snapshot exists on parent volume
// snapshot name is same as temporary clone image
snap.RbdImageName = tempClone.RbdImageName
@ -76,7 +91,7 @@ func (rv *rbdVolume) checkCloneImage(ctx context.Context, parentVol *rbdVolume)
// create new resources for a cleaner approach
err = parentVol.deleteSnapshot(ctx, snap)
}
if errors.Is(err, ErrSnapNotFound) {
if errors.Is(err, rbderrors.ErrSnapNotFound) {
return false, nil
}

View File

@ -23,6 +23,7 @@ import (
"strconv"
csicommon "github.com/ceph/ceph-csi/internal/csi-common"
rbderrors "github.com/ceph/ceph-csi/internal/rbd/errors"
"github.com/ceph/ceph-csi/internal/util"
"github.com/ceph/ceph-csi/internal/util/k8s"
"github.com/ceph/ceph-csi/internal/util/log"
@ -310,10 +311,10 @@ func buildCreateVolumeResponse(
// the input error types it expected to use only for CreateVolume as we need to
// return different GRPC codes for different functions based on the input.
func getGRPCErrorForCreateVolume(err error) error {
if errors.Is(err, ErrVolNameConflict) {
if errors.Is(err, rbderrors.ErrVolNameConflict) {
return status.Error(codes.AlreadyExists, err.Error())
}
if errors.Is(err, ErrFlattenInProgress) {
if errors.Is(err, rbderrors.ErrFlattenInProgress) {
return status.Error(codes.Aborted, err.Error())
}
@ -428,7 +429,7 @@ func (cs *ControllerServer) CreateVolume(
err = cs.createBackingImage(ctx, cr, req.GetSecrets(), rbdVol, parentVol, rbdSnap)
if err != nil {
if errors.Is(err, ErrFlattenInProgress) {
if errors.Is(err, rbderrors.ErrFlattenInProgress) {
return nil, status.Error(codes.Aborted, err.Error())
}
@ -586,7 +587,11 @@ func (cs *ControllerServer) repairExistingVolume(ctx context.Context, req *csi.C
func flattenTemporaryClonedImages(ctx context.Context, rbdVol *rbdVolume, cr *util.Credentials) error {
snaps, children, err := rbdVol.listSnapAndChildren()
if err != nil {
<<<<<<< HEAD
if errors.Is(err, ErrImageNotFound) {
=======
if errors.Is(err, rbderrors.ErrImageNotFound) {
>>>>>>> 5cbc1445 (cleanup: move internal/rbd/errors.go to internal/rbd/errors pacakge)
return status.Error(codes.InvalidArgument, err.Error())
}
@ -822,7 +827,7 @@ func checkContentSource(
rbdSnap, err := genSnapFromSnapID(ctx, snapshotID, cr, req.GetSecrets())
if err != nil {
log.ErrorLog(ctx, "failed to get backend snapshot for %s: %v", snapshotID, err)
if !errors.Is(err, ErrSnapNotFound) {
if !errors.Is(err, rbderrors.ErrSnapNotFound) {
return nil, nil, status.Error(codes.Internal, err.Error())
}
@ -842,7 +847,11 @@ func checkContentSource(
rbdvol, err := GenVolFromVolID(ctx, volID, cr, req.GetSecrets())
if err != nil {
log.ErrorLog(ctx, "failed to get backend image for %s: %v", volID, err)
<<<<<<< HEAD
if !errors.Is(err, ErrImageNotFound) {
=======
if !errors.Is(err, rbderrors.ErrImageNotFound) {
>>>>>>> 5cbc1445 (cleanup: move internal/rbd/errors.go to internal/rbd/errors pacakge)
return nil, nil, status.Error(codes.Internal, err.Error())
}
@ -882,7 +891,11 @@ func (cs *ControllerServer) checkErrAndUndoReserve(
return &csi.DeleteVolumeResponse{}, nil
}
<<<<<<< HEAD
if errors.Is(err, ErrImageNotFound) {
=======
if errors.Is(err, rbderrors.ErrImageNotFound) {
>>>>>>> 5cbc1445 (cleanup: move internal/rbd/errors.go to internal/rbd/errors pacakge)
notFoundErr := rbdVol.ensureImageCleanup(ctx)
if notFoundErr != nil {
return nil, status.Errorf(codes.Internal, "failed to cleanup image %q: %v", rbdVol, notFoundErr)
@ -957,7 +970,11 @@ func (cs *ControllerServer) DeleteVolume(
return nil, status.Error(codes.InvalidArgument, pErr.Error())
}
pErr = deleteMigratedVolume(ctx, pmVolID, cr)
<<<<<<< HEAD
if pErr != nil && !errors.Is(pErr, ErrImageNotFound) {
=======
if pErr != nil && !errors.Is(pErr, rbderrors.ErrImageNotFound) {
>>>>>>> 5cbc1445 (cleanup: move internal/rbd/errors.go to internal/rbd/errors pacakge)
return nil, status.Error(codes.Internal, pErr.Error())
}
@ -1129,7 +1146,11 @@ func (cs *ControllerServer) CreateSnapshot(
}()
if err != nil {
switch {
<<<<<<< HEAD
case errors.Is(err, ErrImageNotFound):
=======
case errors.Is(err, rbderrors.ErrImageNotFound):
>>>>>>> 5cbc1445 (cleanup: move internal/rbd/errors.go to internal/rbd/errors pacakge)
err = status.Errorf(codes.NotFound, "source Volume ID %s not found", req.GetSourceVolumeId())
case errors.Is(err, util.ErrPoolNotFound):
log.ErrorLog(ctx, "failed to get backend volume for %s: %v", req.GetSourceVolumeId(), err)
@ -1198,7 +1219,7 @@ func (cs *ControllerServer) CreateSnapshot(
return nil, status.Error(codes.Internal, err.Error())
}
defer func() {
if err != nil && !errors.Is(err, ErrFlattenInProgress) {
if err != nil && !errors.Is(err, rbderrors.ErrFlattenInProgress) {
errDefer := undoSnapReservation(ctx, rbdSnap, cr)
if errDefer != nil {
log.WarningLog(ctx, "failed undoing reservation of snapshot: %s %v", req.GetName(), errDefer)
@ -1278,7 +1299,7 @@ func cloneFromSnapshot(
}
err = vol.flattenRbdImage(ctx, false, rbdHardMaxCloneDepth, rbdSoftMaxCloneDepth)
if errors.Is(err, ErrFlattenInProgress) {
if errors.Is(err, rbderrors.ErrFlattenInProgress) {
// if flattening is in progress, return error and do not cleanup
return nil, status.Errorf(codes.Internal, err.Error())
} else if err != nil {
@ -1364,7 +1385,7 @@ func (cs *ControllerServer) doSnapshotClone(
defer func() {
if err != nil {
if !errors.Is(err, ErrFlattenInProgress) {
if !errors.Is(err, rbderrors.ErrFlattenInProgress) {
// cleanup clone and snapshot
errCleanUp := cleanUpSnapshot(ctx, cloneRbd, rbdSnap, cloneRbd)
if errCleanUp != nil {
@ -1474,7 +1495,11 @@ func (cs *ControllerServer) DeleteSnapshot(
// if the error is ErrImageNotFound, We need to cleanup the image from
// trash and remove the metadata in OMAP.
<<<<<<< HEAD
if errors.Is(err, ErrImageNotFound) {
=======
if errors.Is(err, rbderrors.ErrImageNotFound) {
>>>>>>> 5cbc1445 (cleanup: move internal/rbd/errors.go to internal/rbd/errors pacakge)
log.UsefulLog(ctx, "cleaning up leftovers of snapshot %s: %v", snapshotID, err)
err = cleanUpImageAndSnapReservation(ctx, rbdSnap, cr)
@ -1577,7 +1602,11 @@ func (cs *ControllerServer) ControllerExpandVolume(
rbdVol, err := genVolFromVolIDWithMigration(ctx, volID, cr, req.GetSecrets())
if err != nil {
switch {
<<<<<<< HEAD
case errors.Is(err, ErrImageNotFound):
=======
case errors.Is(err, rbderrors.ErrImageNotFound):
>>>>>>> 5cbc1445 (cleanup: move internal/rbd/errors.go to internal/rbd/errors pacakge)
err = status.Errorf(codes.NotFound, "volume ID %s not found", volID)
case errors.Is(err, util.ErrPoolNotFound):
log.ErrorLog(ctx, "failed to get backend volume for %s: %v", volID, err)

View File

@ -18,6 +18,8 @@ package rbd
import (
"fmt"
rbderrors "github.com/ceph/ceph-csi/internal/rbd/errors"
)
// Sparsify checks the size of the objects in the RBD image and calls
@ -32,7 +34,7 @@ func (ri *rbdImage) Sparsify() error {
}
if inUse {
// if the image is in use, we should not sparsify it, return ErrImageInUse.
return ErrImageInUse
return rbderrors.ErrImageInUse
}
image, err := ri.open()

View File

@ -14,9 +14,15 @@ See the License for the specific language governing permissions and
limitations under the License.
*/
package rbd
package rbderrors
import "errors"
import (
"errors"
"fmt"
librados "github.com/ceph/go-ceph/rados"
librbd "github.com/ceph/go-ceph/rbd"
)
var (
// ErrImageNotFound is returned when image name is not found in the cluster on the given pool and/or namespace.
@ -57,4 +63,8 @@ var (
ErrInvalidArgument = errors.New("invalid arguments provided")
// ErrImageInUse is returned when the image is in use.
ErrImageInUse = errors.New("image is in use")
// ErrGroupNotConnected is returned when the RBD group is not connected.
ErrGroupNotConnected = fmt.Errorf("%w: RBD group is not connected", librados.ErrNotConnected)
// ErrGroupNotFound is returned when group is not found in the cluster on the given pool and/or namespace.
ErrGroupNotFound = fmt.Errorf("%w: RBD group not found", librbd.ErrNotFound)
)

View File

@ -24,6 +24,7 @@ import (
"github.com/container-storage-interface/spec/lib/go/csi"
"google.golang.org/protobuf/types/known/timestamppb"
rbderrors "github.com/ceph/ceph-csi/internal/rbd/errors"
"github.com/ceph/ceph-csi/internal/rbd/types"
"github.com/ceph/ceph-csi/internal/util"
"github.com/ceph/ceph-csi/internal/util/log"
@ -70,7 +71,7 @@ func GetVolumeGroupSnapshot(
attrs, err := vgs.getVolumeGroupAttributes(ctx)
if err != nil {
if errors.Is(err, ErrRBDGroupNotFound) {
if errors.Is(err, rbderrors.ErrGroupNotFound) {
log.ErrorLog(ctx, "%v, returning empty volume group snapshot %q", vgs, err)
return vgs, err

View File

@ -25,6 +25,7 @@ import (
"github.com/ceph/go-ceph/rados"
"github.com/ceph/ceph-csi/internal/journal"
rbderrors "github.com/ceph/ceph-csi/internal/rbd/errors"
"github.com/ceph/ceph-csi/internal/util"
"github.com/ceph/ceph-csi/internal/util/log"
)
@ -87,7 +88,82 @@ func (cvg *commonVolumeGroup) initCommonVolumeGroup(
pool, err := util.GetPoolName(mons, creds, csiID.LocationID)
if err != nil {
<<<<<<< HEAD
return fmt.Errorf("failed to get pool for volume group id %q: %w", id, err)
=======
return fmt.Errorf("failed to get pool for volume group id %q: %w", cvg.id, err)
}
cvg.monitors = mons
cvg.namespace = namespace
cvg.pool = pool
return nil
}
// generateVolumeGroupFromMapping checks the clusterID and poolID mapping and
// generates commonVolumeGroup structure for the mapped clusterID and poolID.
func (cvg *commonVolumeGroup) generateVolumeGroupFromMapping(
ctx context.Context,
csiID util.CSIIdentifier,
mapping *[]util.ClusterMappingInfo,
) error {
mcsiID := csiID
existingClusterID := csiID.ClusterID
existingPoolID := strconv.FormatInt(csiID.LocationID, 10)
for _, cm := range *mapping {
for key, val := range cm.ClusterIDMapping {
mappedClusterID := util.GetMappedID(key, val, csiID.ClusterID)
if mappedClusterID == "" {
continue
}
log.DebugLog(ctx,
"found new clusterID mapping %s for existing clusterID %s", mappedClusterID, existingClusterID)
// Add mapped clusterID to Identifier
mcsiID.ClusterID = mappedClusterID
for _, pools := range cm.RBDpoolIDMappingInfo {
for key, val := range pools {
mappedPoolID := util.GetMappedID(key, val, existingPoolID)
if mappedPoolID == "" {
continue
}
log.DebugLog(ctx,
"found new poolID mapping %s for existing poolID %s", mappedPoolID, existingPoolID)
mPID, err := strconv.ParseInt(mappedPoolID, 10, 64)
if err != nil {
return err
}
mcsiID.LocationID = mPID
err = cvg.generateVolumeGroup(mcsiID)
if ShouldRetryVolumeGroupGeneration(err) {
continue
}
return err
}
}
}
}
return util.ErrPoolNotFound
}
func (cvg *commonVolumeGroup) initCommonVolumeGroup(
ctx context.Context,
id string,
csiDriver string,
creds *util.Credentials,
) error {
csiID := util.CSIIdentifier{}
err := csiID.DecomposeCSIID(id)
if err != nil {
return fmt.Errorf("failed to decompose volume group id %q: %w", id, err)
>>>>>>> 5cbc1445 (cleanup: move internal/rbd/errors.go to internal/rbd/errors pacakge)
}
cvg.csiDriver = csiDriver
@ -95,9 +171,34 @@ func (cvg *commonVolumeGroup) initCommonVolumeGroup(
cvg.id = id
cvg.clusterID = csiID.ClusterID
cvg.objectUUID = csiID.ObjectUUID
<<<<<<< HEAD
cvg.monitors = mons
cvg.pool = pool
cvg.namespace = namespace
=======
// cvg.monitors, cvg.namespace, cvg.pool are set in generateVolumeGroup
err = cvg.generateVolumeGroup(csiID)
// If the error is not a retryable error, return from here.
if err != nil && !ShouldRetryVolumeGroupGeneration(err) {
return err
}
// If the error is a retryable error, we should try to get the cluster mapping
// and generate the volume group from the mapping.
if ShouldRetryVolumeGroupGeneration(err) {
mapping, err := util.GetClusterMappingInfo(csiID.ClusterID)
if err != nil {
return err
}
if mapping != nil {
err = cvg.generateVolumeGroupFromMapping(ctx, csiID, mapping)
if err != nil {
return err
}
}
}
>>>>>>> 5cbc1445 (cleanup: move internal/rbd/errors.go to internal/rbd/errors pacakge)
log.DebugLog(ctx, "object for volume group %q has been initialized", cvg.id)
@ -148,7 +249,7 @@ func (cvg *commonVolumeGroup) getVolumeGroupAttributes(ctx context.Context) (*jo
if attrs.GroupName == "" {
log.ErrorLog(ctx, "volume group with id %v not found", cvg.id)
return nil, ErrRBDGroupNotFound
return nil, rbderrors.ErrGroupNotFound
}
cvg.requestName = attrs.RequestName
@ -275,12 +376,12 @@ func (cvg *commonVolumeGroup) GetIOContext(ctx context.Context) (*rados.IOContex
conn, err := cvg.getConnection(ctx)
if err != nil {
return nil, fmt.Errorf("%w: failed to connect: %w", ErrRBDGroupNotConnected, err)
return nil, fmt.Errorf("%w: failed to connect: %w", rbderrors.ErrGroupNotConnected, err)
}
ioctx, err := conn.GetIoctx(cvg.pool)
if err != nil {
return nil, fmt.Errorf("%w: failed to get IOContext: %w", ErrRBDGroupNotConnected, err)
return nil, fmt.Errorf("%w: failed to get IOContext: %w", rbderrors.ErrGroupNotConnected, err)
}
if cvg.namespace != "" {
@ -336,3 +437,28 @@ func (cvg *commonVolumeGroup) GetCreationTime(ctx context.Context) (*time.Time,
return cvg.creationTime, nil
}
// ShouldRetryVolumeGroupGeneration determines whether the process of finding or generating
// volumegroups should continue based on the type of error encountered.
//
// It checks if the given error matches any of the following known errors:
// - ErrPoolNotFound: The rbd pool where the volumegroup/omap is expected doesn't exist.
// - ErrRBDGroupNotFound: The volumegroup doesn't exist in the rbd pool.
// - rados.ErrPermissionDenied: Permissions to access the pool is denied.
//
// If any of these errors are encountered, the function returns `true`, indicating
// that the volumegroup search should continue because of known error. Otherwise, it
// returns `false`, meaning the search should stop.
//
// This helper function is used in scenarios where multiple attempts may be made
// to retrieve or generate volumegroup information, and we want to gracefully handle
// specific failure cases while retrying for others.
func ShouldRetryVolumeGroupGeneration(err error) bool {
if err == nil {
return false // No error, do not retry
}
// Continue searching for specific known errors
return (errors.Is(err, util.ErrPoolNotFound) ||
errors.Is(err, rbderrors.ErrGroupNotFound) ||
errors.Is(err, rados.ErrPermissionDenied))
}

View File

@ -0,0 +1,73 @@
/*
Copyright 2025 The Ceph-CSI Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package group
import (
"errors"
"testing"
rbderrors "github.com/ceph/ceph-csi/internal/rbd/errors"
"github.com/ceph/ceph-csi/internal/util"
"github.com/ceph/go-ceph/rados"
)
func Test_shouldRetryVolumeGroupGeneration(t *testing.T) {
t.Parallel()
type args struct {
err error
}
tests := []struct {
name string
args args
want bool
}{
{
name: "No error (stop searching)",
args: args{err: nil},
want: false, // No error, stop searching
},
{
name: "ErrPoolNotFound (continue searching)",
args: args{err: util.ErrPoolNotFound},
want: true, // Known error, continue searching
},
{
name: "ErrRBDGroupNotFound (continue searching)",
args: args{err: rbderrors.ErrGroupNotFound},
want: true, // Known error, continue searching
},
{
name: "ErrPermissionDenied (continue searching)",
args: args{err: rados.ErrPermissionDenied},
want: true, // Known error, continue searching
},
{
name: "Different error (stop searching)",
args: args{err: errors.New("unknown error")},
want: false, // Unknown error, stop searching
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
t.Parallel()
if got := ShouldRetryVolumeGroupGeneration(tt.args.err); got != tt.want {
t.Errorf("ShouldRetryVolumeGeneration() = %v, want %v", got, tt.want)
}
})
}
}

View File

@ -22,21 +22,16 @@ import (
"fmt"
"github.com/ceph/go-ceph/rados"
librados "github.com/ceph/go-ceph/rados"
librbd "github.com/ceph/go-ceph/rbd"
"github.com/container-storage-interface/spec/lib/go/csi"
"github.com/csi-addons/spec/lib/go/volumegroup"
rbderrors "github.com/ceph/ceph-csi/internal/rbd/errors"
"github.com/ceph/ceph-csi/internal/rbd/types"
"github.com/ceph/ceph-csi/internal/util"
"github.com/ceph/ceph-csi/internal/util/log"
)
var (
ErrRBDGroupNotConnected = fmt.Errorf("%w: RBD group is not connected", librados.ErrNotConnected)
ErrRBDGroupNotFound = fmt.Errorf("%w: RBD group not found", librbd.ErrNotFound)
)
// volumeGroup handles all requests for 'rbd group' operations.
type volumeGroup struct {
commonVolumeGroup
@ -77,7 +72,7 @@ func GetVolumeGroup(
attrs, err := vg.getVolumeGroupAttributes(ctx)
if err != nil {
if errors.Is(err, ErrRBDGroupNotFound) {
if errors.Is(err, rbderrors.ErrGroupNotFound) {
log.ErrorLog(ctx, "%v, returning empty volume group %q", vg, err)
return vg, err

View File

@ -24,7 +24,7 @@ import (
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
"github.com/ceph/ceph-csi/internal/rbd/group"
rbderrors "github.com/ceph/ceph-csi/internal/rbd/errors"
"github.com/ceph/ceph-csi/internal/rbd/types"
"github.com/ceph/ceph-csi/internal/util"
"github.com/ceph/ceph-csi/internal/util/log"
@ -215,7 +215,7 @@ func (cs *ControllerServer) DeleteVolumeGroupSnapshot(
groupSnapshot, err := mgr.GetVolumeGroupSnapshotByID(ctx, groupSnapshotID)
if err != nil {
if errors.Is(err, group.ErrRBDGroupNotFound) {
if errors.Is(err, rbderrors.ErrGroupNotFound) {
log.ErrorLog(ctx, "VolumeGroupSnapshot %q doesn't exists", groupSnapshotID)
return &csi.DeleteVolumeGroupSnapshotResponse{}, nil
@ -261,7 +261,7 @@ func (cs *ControllerServer) GetVolumeGroupSnapshot(
groupSnapshot, err := mgr.GetVolumeGroupSnapshotByID(ctx, groupSnapshotID)
if err != nil {
if errors.Is(err, group.ErrRBDGroupNotFound) {
if errors.Is(err, rbderrors.ErrGroupNotFound) {
log.ErrorLog(ctx, "VolumeGroupSnapshot %q doesn't exists", groupSnapshotID)
return nil, status.Errorf(

View File

@ -22,6 +22,7 @@ import (
"fmt"
"github.com/ceph/ceph-csi/internal/journal"
rbderrors "github.com/ceph/ceph-csi/internal/rbd/errors"
rbd_group "github.com/ceph/ceph-csi/internal/rbd/group"
"github.com/ceph/ceph-csi/internal/rbd/types"
"github.com/ceph/ceph-csi/internal/util"
@ -174,7 +175,11 @@ func (mgr *rbdManager) GetVolumeByID(ctx context.Context, id string) (types.Volu
volume, err := GenVolFromVolID(ctx, id, creds, mgr.secrets)
if err != nil {
switch {
<<<<<<< HEAD
case errors.Is(err, ErrImageNotFound):
=======
case errors.Is(err, rbderrors.ErrImageNotFound):
>>>>>>> 5cbc1445 (cleanup: move internal/rbd/errors.go to internal/rbd/errors pacakge)
err = fmt.Errorf("volume %s not found: %w", id, err)
return nil, err
@ -199,7 +204,11 @@ func (mgr *rbdManager) GetSnapshotByID(ctx context.Context, id string) (types.Sn
snapshot, err := genSnapFromSnapID(ctx, id, creds, mgr.secrets)
if err != nil {
switch {
<<<<<<< HEAD
case errors.Is(err, ErrImageNotFound):
=======
case errors.Is(err, rbderrors.ErrImageNotFound):
>>>>>>> 5cbc1445 (cleanup: move internal/rbd/errors.go to internal/rbd/errors pacakge)
err = fmt.Errorf("volume %s not found: %w", id, err)
return nil, err
@ -467,7 +476,11 @@ func (mgr *rbdManager) CreateVolumeGroupSnapshot(
return vgs, nil
}
<<<<<<< HEAD
} else if err != nil && !errors.Is(ErrImageNotFound, err) {
=======
} else if err != nil && !errors.Is(err, rbderrors.ErrImageNotFound) {
>>>>>>> 5cbc1445 (cleanup: move internal/rbd/errors.go to internal/rbd/errors pacakge)
// ErrImageNotFound can be returned if the VolumeGroupSnapshot
// could not be found. It is expected that it does not exist
// yet, in which case it will be created below.
@ -503,3 +516,137 @@ func (mgr *rbdManager) CreateVolumeGroupSnapshot(
return vgs, nil
}
<<<<<<< HEAD
=======
// RegenerateVolumeGroupJournal regenerate the omap data for the volume group.
// This performs the following operations:
// - extracts clusterID and Mons from the cluster mapping
// - Retrieves pool and journalPool parameters from the VolumeGroupReplicationClass
// - Reserves omap data
// - Add volumeIDs mapping to the reserved volume group omap object
// - Generate new volume group handle
//
// Returns the generated volume group handle.
//
// Note: The new volume group handle will differ from the original as it includes
// poolID and clusterID, which vary between clusters.
func (mgr *rbdManager) RegenerateVolumeGroupJournal(
ctx context.Context,
groupID, requestName string,
volumeIds []string,
) (string, error) {
var (
clusterID string
monitors string
pool string
journalPool string
namePrefix string
groupUUID string
vgName string
gi util.CSIIdentifier
ok bool
err error
)
err = gi.DecomposeCSIID(groupID)
if err != nil {
return "", fmt.Errorf("%w: error decoding volume group ID (%w) (%s)", rbderrors.ErrInvalidVolID, err, groupID)
}
monitors, clusterID, err = util.FetchMappedClusterIDAndMons(ctx, gi.ClusterID)
if err != nil {
return "", err
}
pool, ok = mgr.parameters["pool"]
if !ok {
return "", errors.New("required 'pool' parameter missing in parameters")
}
journalPool, ok = mgr.parameters["journalPool"]
if !ok || journalPool == "" {
journalPool = pool
}
vgJournal, err := mgr.getVolumeGroupJournal(clusterID)
if err != nil {
return "", err
}
defer vgJournal.Destroy()
namePrefix = mgr.parameters["volumeGroupNamePrefix"]
vgData, err := vgJournal.CheckReservation(ctx, journalPool, requestName, namePrefix)
if err != nil {
return "", err
}
if vgData != nil {
groupUUID = vgData.GroupUUID
vgName = vgData.GroupName
} else {
log.DebugLog(ctx, "the journal does not contain a reservation for a volume group with name %q yet", requestName)
groupUUID, vgName, err = vgJournal.ReserveName(ctx, journalPool, requestName, gi.ObjectUUID, namePrefix)
if err != nil {
return "", fmt.Errorf("failed to reserve volume group for name %q: %w", requestName, err)
}
defer func() {
if err != nil {
undoError := vgJournal.UndoReservation(ctx, journalPool, vgName, requestName)
if undoError != nil {
log.ErrorLog(ctx, "failed to undo the reservation for volume group %q: %w", requestName, undoError)
}
}
}()
}
volumes := make([]types.Volume, len(volumeIds))
defer func() {
for _, v := range volumes {
v.Destroy(ctx)
}
}()
var volume types.Volume
for i, id := range volumeIds {
volume, err = mgr.GetVolumeByID(ctx, id)
if err != nil {
return "", fmt.Errorf("failed to find required volume %q for volume group id %q: %w", id, vgName, err)
}
volumes[i] = volume
}
var volID string
for _, vol := range volumes {
volID, err = vol.GetID(ctx)
if err != nil {
return "", fmt.Errorf("failed to get VolumeID for %q: %w", vol, err)
}
toAdd := map[string]string{
volID: "",
}
log.DebugLog(ctx, "adding volume mapping for volume %q to volume group %q", volID, vgName)
err = mgr.vgJournal.AddVolumesMapping(ctx, pool, gi.ObjectUUID, toAdd)
if err != nil {
return "", fmt.Errorf("failed to add mapping for volume %q to volume group %q: %w", volID, vgName, err)
}
}
_, poolID, err := util.GetPoolIDs(ctx, monitors, journalPool, pool, mgr.creds)
if err != nil {
return "", fmt.Errorf("failed to get poolID for %q: %w", groupUUID, err)
}
groupHandle, err := util.GenerateVolID(ctx, monitors, mgr.creds, poolID, pool, clusterID, groupUUID)
if err != nil {
return "", fmt.Errorf("failed to generate a unique CSI volume group with uuid for %q: %w", groupUUID, err)
}
log.DebugLog(ctx, "re-generated Group ID (%s) and Group Name (%s) for request name (%s)",
groupHandle, vgName, requestName)
return groupHandle, nil
}
>>>>>>> 5cbc1445 (cleanup: move internal/rbd/errors.go to internal/rbd/errors pacakge)

View File

@ -21,6 +21,7 @@ import (
"encoding/hex"
"strings"
rbderrors "github.com/ceph/ceph-csi/internal/rbd/errors"
"github.com/ceph/ceph-csi/internal/util"
"github.com/ceph/ceph-csi/internal/util/log"
)
@ -39,13 +40,13 @@ func parseMigrationVolID(vh string) (*migrationVolID, error) {
handSlice := strings.Split(vh, migVolIDFieldSep)
if len(handSlice) < migVolIDTotalLength {
// its short of length in this case, so return error
return nil, ErrInvalidVolID
return nil, rbderrors.ErrInvalidVolID
}
// Store pool
poolHash := strings.Join(handSlice[migVolIDSplitLength:], migVolIDFieldSep)
poolByte, dErr := hex.DecodeString(poolHash)
if dErr != nil {
return nil, ErrMissingPoolNameInVolID
return nil, rbderrors.ErrMissingPoolNameInVolID
}
mh.poolName = string(poolByte)
// Parse migration mons( for clusterID) and image
@ -62,13 +63,13 @@ func parseMigrationVolID(vh string) (*migrationVolID, error) {
}
}
if mh.imageName == "" {
return nil, ErrMissingImageNameInVolID
return nil, rbderrors.ErrMissingImageNameInVolID
}
if mh.poolName == "" {
return nil, ErrMissingPoolNameInVolID
return nil, rbderrors.ErrMissingPoolNameInVolID
}
if mh.clusterID == "" {
return nil, ErrDecodeClusterIDFromMonsInVolID
return nil, rbderrors.ErrDecodeClusterIDFromMonsInVolID
}
return mh, nil

View File

@ -20,6 +20,7 @@ import (
"fmt"
"time"
rbderrors "github.com/ceph/ceph-csi/internal/rbd/errors"
"github.com/ceph/ceph-csi/internal/rbd/types"
"github.com/ceph/ceph-csi/internal/util"
"github.com/ceph/ceph-csi/internal/util/log"
@ -56,7 +57,7 @@ func (rv *rbdVolume) HandleParentImageExistence(
if rv.ParentInTrash {
return fmt.Errorf("%w: failed to enable mirroring on image %q:"+
" parent is in trash",
ErrFailedPrecondition, rv)
rbderrors.ErrFailedPrecondition, rv)
}
parent, err := rv.getParent()
@ -72,7 +73,7 @@ func (rv *rbdVolume) HandleParentImageExistence(
if parentMirroringInfo.GetState() != librbd.MirrorImageEnabled.String() {
return fmt.Errorf("%w: failed to enable mirroring on image %q: "+
"parent image %q is not enabled for mirroring",
ErrFailedPrecondition, rv, parent)
rbderrors.ErrFailedPrecondition, rv, parent)
}
return nil
@ -204,7 +205,7 @@ func (ri *rbdImage) Resync(_ context.Context) error {
// If we issued a resync, return a non-final error as image needs to be recreated
// locally. Caller retries till RBD syncs an initial version of the image to
// report its status in the resync request.
return fmt.Errorf("%w: awaiting initial resync due to split brain", ErrUnavailable)
return fmt.Errorf("%w: awaiting initial resync due to split brain", rbderrors.ErrUnavailable)
}
// GetGlobalMirroringStatus get the mirroring status of an image.

View File

@ -25,6 +25,7 @@ import (
"strings"
csicommon "github.com/ceph/ceph-csi/internal/csi-common"
rbderrors "github.com/ceph/ceph-csi/internal/rbd/errors"
"github.com/ceph/ceph-csi/internal/util"
"github.com/ceph/ceph-csi/internal/util/fscrypt"
"github.com/ceph/ceph-csi/internal/util/log"
@ -1034,7 +1035,7 @@ func (ns *NodeServer) NodeUnstageVolume(
}
// If not mounted, and error is anything other than metadata file missing, it is an error
if !errors.Is(err, ErrMissingStash) {
if !errors.Is(err, rbderrors.ErrMissingStash) {
return nil, status.Error(codes.Internal, err.Error())
}

View File

@ -22,6 +22,7 @@ import (
"fmt"
"github.com/ceph/ceph-csi/internal/journal"
rbderrors "github.com/ceph/ceph-csi/internal/rbd/errors"
"github.com/ceph/ceph-csi/internal/util"
"github.com/ceph/ceph-csi/internal/util/k8s"
"github.com/ceph/ceph-csi/internal/util/log"
@ -172,10 +173,14 @@ func checkSnapCloneExists(
// Fetch on-disk image attributes
err = vol.getImageInfo()
if err != nil {
<<<<<<< HEAD
if errors.Is(err, ErrImageNotFound) {
=======
if errors.Is(err, rbderrors.ErrImageNotFound) {
>>>>>>> 5cbc1445 (cleanup: move internal/rbd/errors.go to internal/rbd/errors pacakge)
err = parentVol.deleteSnapshot(ctx, rbdSnap)
if err != nil {
if !errors.Is(err, ErrSnapNotFound) {
if !errors.Is(err, rbderrors.ErrSnapNotFound) {
log.ErrorLog(ctx, "failed to delete snapshot %s: %v", rbdSnap, err)
return false, err
@ -203,7 +208,7 @@ func checkSnapCloneExists(
// check snapshot exists if not create it
err = vol.checkSnapExists(rbdSnap)
if errors.Is(err, ErrSnapNotFound) {
if errors.Is(err, rbderrors.ErrSnapNotFound) {
// create snapshot
sErr := vol.createSnapshot(ctx, rbdSnap)
if sErr != nil {
@ -298,7 +303,11 @@ func (rv *rbdVolume) Exists(ctx context.Context, parentVol *rbdVolume) (bool, er
// Fetch on-disk image attributes and compare against request
err = rv.getImageInfo()
switch {
<<<<<<< HEAD
case errors.Is(err, ErrImageNotFound) && parentVol != nil:
=======
case errors.Is(err, rbderrors.ErrImageNotFound) && parentVol != nil:
>>>>>>> 5cbc1445 (cleanup: move internal/rbd/errors.go to internal/rbd/errors pacakge)
// Need to check cloned info here not on createvolume
found, cErr := rv.checkCloneImage(ctx, parentVol)
if cErr != nil {
@ -312,7 +321,11 @@ func (rv *rbdVolume) Exists(ctx context.Context, parentVol *rbdVolume) (bool, er
return false, err
}
<<<<<<< HEAD
case errors.Is(err, ErrImageNotFound) && parentVol == nil:
=======
case errors.Is(err, rbderrors.ErrImageNotFound) && parentVol == nil:
>>>>>>> 5cbc1445 (cleanup: move internal/rbd/errors.go to internal/rbd/errors pacakge)
// image not found, undo the reservation
err = j.UndoReservation(ctx, rv.JournalPool, rv.Pool, rv.RbdImageName, rv.RequestName)
@ -330,7 +343,7 @@ func (rv *rbdVolume) Exists(ctx context.Context, parentVol *rbdVolume) (bool, er
// size checks
if rv.VolSize < requestSize {
return false, fmt.Errorf("%w: image with the same name (%s) but with different size already exists",
ErrVolNameConflict, rv.RbdImageName)
rbderrors.ErrVolNameConflict, rv.RbdImageName)
}
// TODO: We should also ensure image features and format is the same
@ -598,7 +611,7 @@ func RegenerateJournal(
err = vi.DecomposeCSIID(rbdVol.VolID)
if err != nil {
return "", fmt.Errorf("%w: error decoding volume ID (%w) (%s)",
ErrInvalidVolID, err, rbdVol.VolID)
rbderrors.ErrInvalidVolID, err, rbdVol.VolID)
}
rbdVol.Owner = owner

View File

@ -28,6 +28,13 @@ import (
"strings"
"time"
<<<<<<< HEAD
=======
"github.com/ceph/ceph-csi/pkg/util/crypto"
"github.com/ceph/ceph-csi/pkg/util/kernel"
rbderrors "github.com/ceph/ceph-csi/internal/rbd/errors"
>>>>>>> 5cbc1445 (cleanup: move internal/rbd/errors.go to internal/rbd/errors pacakge)
"github.com/ceph/ceph-csi/internal/rbd/types"
"github.com/ceph/ceph-csi/internal/util"
"github.com/ceph/ceph-csi/internal/util/log"
@ -524,7 +531,11 @@ func (ri *rbdImage) open() (*librbd.Image, error) {
image, err := librbd.OpenImage(ri.ioctx, ri.RbdImageName, librbd.NoSnapshot)
if err != nil {
if errors.Is(err, librbd.ErrNotFound) {
<<<<<<< HEAD
err = fmt.Errorf("Failed as %w (internal %w)", ErrImageNotFound, err)
=======
err = fmt.Errorf("Failed as %w (internal %w)", rbderrors.ErrImageNotFound, err)
>>>>>>> 5cbc1445 (cleanup: move internal/rbd/errors.go to internal/rbd/errors pacakge)
}
return nil, err
@ -542,7 +553,11 @@ func (ri *rbdImage) open() (*librbd.Image, error) {
func (ri *rbdImage) isInUse() (bool, error) {
image, err := ri.open()
if err != nil {
<<<<<<< HEAD
if errors.Is(err, ErrImageNotFound) || errors.Is(err, util.ErrPoolNotFound) {
=======
if errors.Is(err, rbderrors.ErrImageNotFound) || errors.Is(err, util.ErrPoolNotFound) {
>>>>>>> 5cbc1445 (cleanup: move internal/rbd/errors.go to internal/rbd/errors pacakge)
return false, err
}
// any error should assume something else is using the image
@ -681,7 +696,11 @@ func (ri *rbdImage) Delete(ctx context.Context) error {
err = rbdImage.Trash(0)
if err != nil {
if errors.Is(err, librbd.ErrNotFound) {
<<<<<<< HEAD
return fmt.Errorf("Failed as %w (internal %w)", ErrImageNotFound, err)
=======
return fmt.Errorf("Failed as %w (internal %w)", rbderrors.ErrImageNotFound, err)
>>>>>>> 5cbc1445 (cleanup: move internal/rbd/errors.go to internal/rbd/errors pacakge)
}
log.ErrorLog(ctx, "failed to delete rbd image: %s, error: %v", ri, err)
@ -731,7 +750,18 @@ func (rv *rbdVolume) DeleteTempImage(ctx context.Context) error {
tempClone := rv.generateTempClone()
err := tempClone.Delete(ctx)
if err != nil {
<<<<<<< HEAD
if errors.Is(err, ErrImageNotFound) {
=======
if !errors.Is(err, rbderrors.ErrImageNotFound) && !errors.Is(err, rbderrors.ErrSnapNotFound) {
return fmt.Errorf("failed to delete snapshot %q: %w", snap, err)
}
}
err = tempClone.Delete(ctx)
if err != nil {
if errors.Is(err, rbderrors.ErrImageNotFound) {
>>>>>>> 5cbc1445 (cleanup: move internal/rbd/errors.go to internal/rbd/errors pacakge)
return tempClone.ensureImageCleanup(ctx)
} else {
// return error if it is not ErrImageNotFound
@ -770,7 +800,11 @@ func (ri *rbdImage) getCloneDepth(ctx context.Context) (uint, error) {
// if the parent image is moved to trash the name will be present
// in rbd image info but the image will be in trash, in that case
// return the found depth
<<<<<<< HEAD
if errors.Is(err, ErrImageNotFound) {
=======
if errors.Is(err, rbderrors.ErrImageNotFound) {
>>>>>>> 5cbc1445 (cleanup: move internal/rbd/errors.go to internal/rbd/errors pacakge)
return depth, nil
}
log.ErrorLog(ctx, "failed to check depth on image %s: %s", &vol, err)
@ -865,7 +899,7 @@ func (ri *rbdImage) flattenRbdImage(
return err
}
if forceFlatten || depth >= hardlimit {
return fmt.Errorf("%w: flatten is in progress for image %s", ErrFlattenInProgress, ri.RbdImageName)
return fmt.Errorf("%w: flatten is in progress for image %s", rbderrors.ErrFlattenInProgress, ri.RbdImageName)
}
log.DebugLog(ctx, "successfully added task to flatten image %q", ri)
}
@ -956,7 +990,11 @@ func (ri *rbdImage) checkImageChainHasFeature(ctx context.Context, feature uint6
// is in the trash, when we try to open the parent image to get its
// information it fails because it is already in trash. We should
// treat error as nil if the parent is not found.
<<<<<<< HEAD
if errors.Is(err, ErrImageNotFound) {
=======
if errors.Is(err, rbderrors.ErrImageNotFound) {
>>>>>>> 5cbc1445 (cleanup: move internal/rbd/errors.go to internal/rbd/errors pacakge)
return false, nil
}
log.ErrorLog(ctx, "failed to get image info for %s: %s", rbdImg.String(), err)
@ -1194,6 +1232,33 @@ func generateVolumeFromVolumeID(
return rbdVol, err
}
// ShouldRetryVolumeGeneration determines whether the process of finding or generating
// volumes should continue based on the type of error encountered.
//
// It checks if the given error matches any of the following known errors:
// - util.ErrKeyNotFound: The key required to locate the volume is missing in Rados omap.
// - util.ErrPoolNotFound: The rbd pool where the volume/omap is expected doesn't exist.
// - ErrImageNotFound: The image doesn't exist in the rbd pool.
// - rados.ErrPermissionDenied: Permissions to access the pool is denied.
//
// If any of these errors are encountered, the function returns `true`, indicating
// that the volume search should continue because of known error. Otherwise, it
// returns `false`, meaning the search should stop.
//
// This helper function is used in scenarios where multiple attempts may be made
// to retrieve or generate volume information, and we want to gracefully handle
// specific failure cases while retrying for others.
func ShouldRetryVolumeGeneration(err error) bool {
if err == nil {
return false // No error, do not retry
}
// Continue searching for specific known errors
return (errors.Is(err, util.ErrKeyNotFound) ||
errors.Is(err, util.ErrPoolNotFound) ||
errors.Is(err, rbderrors.ErrImageNotFound) ||
errors.Is(err, rados.ErrPermissionDenied))
}
// GenVolFromVolID generates a rbdVolume structure from the provided identifier, updating
// the structure with elements from on-disk image metadata as well.
func GenVolFromVolID(
@ -1210,11 +1275,15 @@ func GenVolFromVolID(
err := vi.DecomposeCSIID(volumeID)
if err != nil {
return vol, fmt.Errorf("%w: error decoding volume ID (%w) (%s)",
ErrInvalidVolID, err, volumeID)
rbderrors.ErrInvalidVolID, err, volumeID)
}
vol, err = generateVolumeFromVolumeID(ctx, volumeID, vi, cr, secrets)
<<<<<<< HEAD
if !shouldRetryVolumeGeneration(err) {
=======
if !ShouldRetryVolumeGeneration(err) {
>>>>>>> 5cbc1445 (cleanup: move internal/rbd/errors.go to internal/rbd/errors pacakge)
return vol, err
}
@ -1225,7 +1294,11 @@ func GenVolFromVolID(
}
if mapping != nil {
rbdVol, vErr := generateVolumeFromMapping(ctx, mapping, volumeID, vi, cr, secrets)
<<<<<<< HEAD
if !shouldRetryVolumeGeneration(vErr) {
=======
if !ShouldRetryVolumeGeneration(vErr) {
>>>>>>> 5cbc1445 (cleanup: move internal/rbd/errors.go to internal/rbd/errors pacakge)
return rbdVol, vErr
}
}
@ -1278,7 +1351,11 @@ func generateVolumeFromMapping(
// Add mapping poolID to Identifier
nvi.LocationID = pID
vol, err = generateVolumeFromVolumeID(ctx, volumeID, nvi, cr, secrets)
<<<<<<< HEAD
if !shouldRetryVolumeGeneration(err) {
=======
if !ShouldRetryVolumeGeneration(err) {
>>>>>>> 5cbc1445 (cleanup: move internal/rbd/errors.go to internal/rbd/errors pacakge)
return vol, err
}
}
@ -1499,7 +1576,7 @@ func (ri *rbdImage) deleteSnapshot(ctx context.Context, pOpts *rbdSnapshot) erro
}
err = snap.Remove()
if errors.Is(err, librbd.ErrNotFound) {
return fmt.Errorf("Failed as %w (internal %w)", ErrSnapNotFound, err)
return fmt.Errorf("Failed as %w (internal %w)", rbderrors.ErrSnapNotFound, err)
}
return err
@ -1759,7 +1836,7 @@ func (ri *rbdImage) checkSnapExists(rbdSnap *rbdSnapshot) error {
}
}
return fmt.Errorf("%w: snap %s not found", ErrSnapNotFound, rbdSnap)
return fmt.Errorf("%w: snap %s not found", rbderrors.ErrSnapNotFound, rbdSnap)
}
// rbdImageMetadataStash strongly typed JSON spec for stashed RBD image metadata.
@ -1841,7 +1918,7 @@ func lookupRBDImageMetadataStash(metaDataPath string) (rbdImageMetadataStash, er
return imgMeta, fmt.Errorf("failed to read stashed JSON image metadata from path (%s): %w", fPath, err)
}
return imgMeta, fmt.Errorf("Failed as %w (internal %w)", ErrMissingStash, err)
return imgMeta, fmt.Errorf("Failed as %w (internal %w)", rbderrors.ErrMissingStash, err)
}
err = json.Unmarshal(encodedBytes, &imgMeta)

View File

@ -23,6 +23,12 @@ import (
"strings"
"testing"
<<<<<<< HEAD
=======
rbderrors "github.com/ceph/ceph-csi/internal/rbd/errors"
"github.com/ceph/ceph-csi/internal/util"
>>>>>>> 5cbc1445 (cleanup: move internal/rbd/errors.go to internal/rbd/errors pacakge)
"github.com/ceph/go-ceph/rados"
librbd "github.com/ceph/go-ceph/rbd"
"github.com/stretchr/testify/require"
@ -418,7 +424,11 @@ func Test_shouldRetryVolumeGeneration(t *testing.T) {
},
{
name: "ErrImageNotFound (continue searching)",
<<<<<<< HEAD
args: args{err: ErrImageNotFound},
=======
args: args{err: rbderrors.ErrImageNotFound},
>>>>>>> 5cbc1445 (cleanup: move internal/rbd/errors.go to internal/rbd/errors pacakge)
want: true, // Known error, continue searching
},
{
@ -435,8 +445,13 @@ func Test_shouldRetryVolumeGeneration(t *testing.T) {
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
t.Parallel()
<<<<<<< HEAD
if got := shouldRetryVolumeGeneration(tt.args.err); got != tt.want {
t.Errorf("shouldRetryVolumeGeneration() = %v, want %v", got, tt.want)
=======
if got := ShouldRetryVolumeGeneration(tt.args.err); got != tt.want {
t.Errorf("ShouldRetryVolumeGeneration() = %v, want %v", got, tt.want)
>>>>>>> 5cbc1445 (cleanup: move internal/rbd/errors.go to internal/rbd/errors pacakge)
}
})
}

View File

@ -20,6 +20,7 @@ import (
"context"
"fmt"
rbderrors "github.com/ceph/ceph-csi/internal/rbd/errors"
"github.com/ceph/ceph-csi/internal/rbd/types"
librbd "github.com/ceph/go-ceph/rbd"
@ -70,14 +71,14 @@ func DisableVolumeReplication(mirror types.Mirror,
localStatus, err := sts.GetLocalSiteStatus()
if err != nil {
return fmt.Errorf("failed to get local state: %w", ErrInvalidArgument)
return fmt.Errorf("failed to get local state: %w", rbderrors.ErrInvalidArgument)
}
if localStatus.IsUP() && localStatus.GetState() == librbd.MirrorImageStatusStateReplaying.String() {
return nil
}
return fmt.Errorf("%w: secondary image status is up=%t and state=%s",
ErrInvalidArgument, localStatus.IsUP(), localStatus.GetState())
rbderrors.ErrInvalidArgument, localStatus.IsUP(), localStatus.GetState())
}
err := mirror.DisableMirroring(ctx, force)
if err != nil {
@ -92,7 +93,7 @@ func DisableVolumeReplication(mirror types.Mirror,
// error out if the image is not in disabled state.
if info.GetState() != librbd.MirrorImageDisabled.String() {
return fmt.Errorf("%w: image is in %q state, expected state %q", ErrAborted,
return fmt.Errorf("%w: image is in %q state, expected state %q", rbderrors.ErrAborted,
info.GetState(), librbd.MirrorImageDisabled.String())
}

View File

@ -24,6 +24,7 @@ import (
"github.com/container-storage-interface/spec/lib/go/csi"
"google.golang.org/protobuf/types/known/timestamppb"
rbderrors "github.com/ceph/ceph-csi/internal/rbd/errors"
"github.com/ceph/ceph-csi/internal/rbd/types"
"github.com/ceph/ceph-csi/internal/util"
"github.com/ceph/ceph-csi/internal/util/log"
@ -82,7 +83,11 @@ func cleanUpSnapshot(
) error {
err := parentVol.deleteSnapshot(ctx, rbdSnap)
if err != nil {
<<<<<<< HEAD
if !errors.Is(err, ErrImageNotFound) && !errors.Is(err, ErrSnapNotFound) {
=======
if !errors.Is(err, rbderrors.ErrImageNotFound) && !errors.Is(err, rbderrors.ErrSnapNotFound) {
>>>>>>> 5cbc1445 (cleanup: move internal/rbd/errors.go to internal/rbd/errors pacakge)
log.ErrorLog(ctx, "failed to delete snapshot %q: %v", rbdSnap, err)
return err
@ -92,7 +97,11 @@ func cleanUpSnapshot(
if rbdVol != nil {
err := rbdVol.Delete(ctx)
if err != nil {
<<<<<<< HEAD
if !errors.Is(err, ErrImageNotFound) {
=======
if !errors.Is(err, rbderrors.ErrImageNotFound) {
>>>>>>> 5cbc1445 (cleanup: move internal/rbd/errors.go to internal/rbd/errors pacakge)
log.ErrorLog(ctx, "failed to delete rbd image %q with error: %v", rbdVol, err)
return err

View File

@ -16,9 +16,13 @@ limitations under the License.
package util
<<<<<<< HEAD
import (
"errors"
)
=======
import "errors"
>>>>>>> 5cbc1445 (cleanup: move internal/rbd/errors.go to internal/rbd/errors pacakge)
var (
// ErrKeyNotFound is returned when requested key in omap is not found.