From 4dcb9d3a45bc333b84f4aca400101402466cff4f Mon Sep 17 00:00:00 2001 From: Praveen M Date: Mon, 3 Mar 2025 11:45:43 +0530 Subject: [PATCH] cleanup: move internal/rbd/errors.go to internal/rbd/errors pacakge Signed-off-by: Praveen M (cherry picked from commit 5cbc14454a3df7bb6b3b3a419520e25230d9ab6e) # Conflicts: # internal/csi-addons/rbd/encryptionkeyrotation.go # internal/csi-addons/rbd/reclaimspace.go # internal/csi-addons/rbd/replication.go # internal/csi-addons/rbd/replication_test.go # internal/rbd/clone.go # internal/rbd/controllerserver.go # internal/rbd/group/util.go # internal/rbd/group/util_test.go # internal/rbd/manager.go # internal/rbd/rbd_journal.go # internal/rbd/rbd_util.go # internal/rbd/rbd_util_test.go # internal/rbd/snapshot.go # internal/util/errors.go --- .../csi-addons/rbd/encryptionkeyrotation.go | 5 + internal/csi-addons/rbd/reclaimspace.go | 6 + internal/csi-addons/rbd/replication.go | 38 ++++- internal/csi-addons/rbd/replication_test.go | 28 ++-- internal/csi-addons/rbd/volumegroup.go | 6 +- internal/rbd/clone.go | 17 +- internal/rbd/controllerserver.go | 43 ++++- internal/rbd/diskusage.go | 4 +- internal/rbd/{ => errors}/errors.go | 14 +- internal/rbd/group/group_snapshot.go | 3 +- internal/rbd/group/util.go | 132 +++++++++++++++- internal/rbd/group/util_test.go | 73 +++++++++ internal/rbd/group/volume_group.go | 9 +- internal/rbd/group_controllerserver.go | 6 +- internal/rbd/manager.go | 147 ++++++++++++++++++ internal/rbd/migration.go | 11 +- internal/rbd/mirror.go | 7 +- internal/rbd/nodeserver.go | 3 +- internal/rbd/rbd_journal.go | 21 ++- internal/rbd/rbd_util.go | 87 ++++++++++- internal/rbd/rbd_util_test.go | 15 ++ internal/rbd/replication.go | 7 +- internal/rbd/snapshot.go | 9 ++ internal/util/errors.go | 4 + 24 files changed, 632 insertions(+), 63 deletions(-) rename internal/rbd/{ => errors}/errors.go (87%) create mode 100644 internal/rbd/group/util_test.go diff --git a/internal/csi-addons/rbd/encryptionkeyrotation.go b/internal/csi-addons/rbd/encryptionkeyrotation.go index 8af1fa0ea..d5d2da0b5 100644 --- a/internal/csi-addons/rbd/encryptionkeyrotation.go +++ b/internal/csi-addons/rbd/encryptionkeyrotation.go @@ -21,6 +21,7 @@ import ( "errors" "github.com/ceph/ceph-csi/internal/rbd" + rbderrors "github.com/ceph/ceph-csi/internal/rbd/errors" "github.com/ceph/ceph-csi/internal/util" "github.com/ceph/ceph-csi/internal/util/log" @@ -69,7 +70,11 @@ func (ekrs *EncryptionKeyRotationServer) EncryptionKeyRotate( rbdVol, err := rbd.GenVolFromVolID(ctx, volID, creds, req.GetSecrets()) if err != nil { switch { +<<<<<<< HEAD case errors.Is(err, rbd.ErrImageNotFound): +======= + case errors.Is(err, rbderrors.ErrImageNotFound): +>>>>>>> 5cbc1445 (cleanup: move internal/rbd/errors.go to internal/rbd/errors pacakge) err = status.Errorf(codes.NotFound, "volume ID %s not found", volID) case errors.Is(err, util.ErrPoolNotFound): log.ErrorLog(ctx, "failed to get backend volume for %s: %v", volID, err) diff --git a/internal/csi-addons/rbd/reclaimspace.go b/internal/csi-addons/rbd/reclaimspace.go index 1d3f4bbfa..3d6acdf43 100644 --- a/internal/csi-addons/rbd/reclaimspace.go +++ b/internal/csi-addons/rbd/reclaimspace.go @@ -23,6 +23,7 @@ import ( csicommon "github.com/ceph/ceph-csi/internal/csi-common" rbdutil "github.com/ceph/ceph-csi/internal/rbd" + rbderrors "github.com/ceph/ceph-csi/internal/rbd/errors" "github.com/ceph/ceph-csi/internal/util" "github.com/ceph/ceph-csi/internal/util/log" @@ -78,8 +79,13 @@ func (rscs *ReclaimSpaceControllerServer) ControllerReclaimSpace( } defer rbdVol.Destroy(ctx) +<<<<<<< HEAD err = rbdVol.Sparsify() if errors.Is(err, rbdutil.ErrImageInUse) { +======= + err = rbdVol.Sparsify(ctx) + if errors.Is(err, rbderrors.ErrImageInUse) { +>>>>>>> 5cbc1445 (cleanup: move internal/rbd/errors.go to internal/rbd/errors pacakge) // FIXME: https://github.com/csi-addons/kubernetes-csi-addons/issues/406. // treat sparsify call as no-op if volume is in use. log.DebugLog(ctx, fmt.Sprintf("volume with ID %q is in use, skipping sparsify operation", volumeID)) diff --git a/internal/csi-addons/rbd/replication.go b/internal/csi-addons/rbd/replication.go index d0049b725..76ec1abf0 100644 --- a/internal/csi-addons/rbd/replication.go +++ b/internal/csi-addons/rbd/replication.go @@ -29,6 +29,7 @@ import ( csicommon "github.com/ceph/ceph-csi/internal/csi-common" "github.com/ceph/ceph-csi/internal/rbd" corerbd "github.com/ceph/ceph-csi/internal/rbd" + rbderrors "github.com/ceph/ceph-csi/internal/rbd/errors" "github.com/ceph/ceph-csi/internal/rbd/types" "github.com/ceph/ceph-csi/internal/util" "github.com/ceph/ceph-csi/internal/util/log" @@ -651,7 +652,11 @@ func (rs *ReplicationServer) ResyncVolume(ctx context.Context, sts, err := mirror.GetGlobalMirroringStatus(ctx) if err != nil { // the image gets recreated after issuing resync +<<<<<<< HEAD if errors.Is(err, corerbd.ErrImageNotFound) { +======= + if errors.Is(err, rbderrors.ErrImageNotFound) { +>>>>>>> 5cbc1445 (cleanup: move internal/rbd/errors.go to internal/rbd/errors pacakge) // caller retries till RBD syncs an initial version of the image to // report its status in the resync call. Ideally, this line will not // be executed as the error would get returned due to getMirroringInfo @@ -785,6 +790,7 @@ func getGRPCError(err error) error { } errorStatusMap := map[error]codes.Code{ +<<<<<<< HEAD corerbd.ErrImageNotFound: codes.NotFound, util.ErrPoolNotFound: codes.NotFound, corerbd.ErrInvalidArgument: codes.InvalidArgument, @@ -792,6 +798,15 @@ func getGRPCError(err error) error { corerbd.ErrAborted: codes.Aborted, corerbd.ErrFailedPrecondition: codes.FailedPrecondition, corerbd.ErrUnavailable: codes.Unavailable, +======= + rbderrors.ErrImageNotFound: codes.NotFound, + util.ErrPoolNotFound: codes.NotFound, + rbderrors.ErrInvalidArgument: codes.InvalidArgument, + rbderrors.ErrFlattenInProgress: codes.Aborted, + rbderrors.ErrAborted: codes.Aborted, + rbderrors.ErrFailedPrecondition: codes.FailedPrecondition, + rbderrors.ErrUnavailable: codes.Unavailable, +>>>>>>> 5cbc1445 (cleanup: move internal/rbd/errors.go to internal/rbd/errors pacakge) } for e, code := range errorStatusMap { @@ -831,8 +846,13 @@ func (rs *ReplicationServer) GetVolumeReplicationInfo(ctx context.Context, rbdVol, err := mgr.GetVolumeByID(ctx, volumeID) if err != nil { switch { +<<<<<<< HEAD case errors.Is(err, corerbd.ErrImageNotFound): err = status.Errorf(codes.NotFound, err.Error()) +======= + case errors.Is(err, rbderrors.ErrImageNotFound): + err = status.Error(codes.NotFound, err.Error()) +>>>>>>> 5cbc1445 (cleanup: move internal/rbd/errors.go to internal/rbd/errors pacakge) case errors.Is(err, util.ErrPoolNotFound): err = status.Errorf(codes.NotFound, err.Error()) default: @@ -864,7 +884,13 @@ func (rs *ReplicationServer) GetVolumeReplicationInfo(ctx context.Context, mirrorStatus, err := mirror.GetGlobalMirroringStatus(ctx) if err != nil { +<<<<<<< HEAD if errors.Is(err, corerbd.ErrImageNotFound) { +======= + log.ErrorLog(ctx, "failed to get status for mirror %q: %v", mirror, err) + + if errors.Is(err, rbderrors.ErrImageNotFound) { +>>>>>>> 5cbc1445 (cleanup: move internal/rbd/errors.go to internal/rbd/errors pacakge) return nil, status.Error(codes.Aborted, err.Error()) } log.ErrorLog(ctx, err.Error()) @@ -886,7 +912,13 @@ func (rs *ReplicationServer) GetVolumeReplicationInfo(ctx context.Context, description := remoteStatus.GetDescription() resp, err := getLastSyncInfo(ctx, description) if err != nil { +<<<<<<< HEAD if errors.Is(err, corerbd.ErrLastSyncTimeNotFound) { +======= + log.ErrorLog(ctx, "failed to parse last sync info from %q: %v", description, err) + + if errors.Is(err, rbderrors.ErrLastSyncTimeNotFound) { +>>>>>>> 5cbc1445 (cleanup: move internal/rbd/errors.go to internal/rbd/errors pacakge) return nil, status.Errorf(codes.NotFound, "failed to get last sync info: %v", err) } log.ErrorLog(ctx, err.Error()) @@ -916,12 +948,12 @@ func getLastSyncInfo(ctx context.Context, description string) (*replication.GetV var response replication.GetVolumeReplicationInfoResponse if description == "" { - return nil, fmt.Errorf("empty description: %w", corerbd.ErrLastSyncTimeNotFound) + return nil, fmt.Errorf("empty description: %w", rbderrors.ErrLastSyncTimeNotFound) } log.DebugLog(ctx, "description: %s", description) splittedString := strings.SplitN(description, ",", 2) if len(splittedString) == 1 { - return nil, fmt.Errorf("no snapshot details: %w", corerbd.ErrLastSyncTimeNotFound) + return nil, fmt.Errorf("no snapshot details: %w", rbderrors.ErrLastSyncTimeNotFound) } type localStatus struct { LocalSnapshotTime int64 `json:"local_snapshot_timestamp"` @@ -938,7 +970,7 @@ func getLastSyncInfo(ctx context.Context, description string) (*replication.GetV // If the json unmarsal is successful but the local snapshot time is 0, we // need to consider it as an error as the LastSyncTime is required. if localSnapInfo.LocalSnapshotTime == 0 { - return nil, fmt.Errorf("empty local snapshot timestamp: %w", corerbd.ErrLastSyncTimeNotFound) + return nil, fmt.Errorf("empty local snapshot timestamp: %w", rbderrors.ErrLastSyncTimeNotFound) } if localSnapInfo.LastSnapshotDuration != nil { // converts localSnapshotDuration of type int64 to string format with diff --git a/internal/csi-addons/rbd/replication_test.go b/internal/csi-addons/rbd/replication_test.go index ba7212483..03332f974 100644 --- a/internal/csi-addons/rbd/replication_test.go +++ b/internal/csi-addons/rbd/replication_test.go @@ -26,6 +26,7 @@ import ( "time" corerbd "github.com/ceph/ceph-csi/internal/rbd" + rbderrors "github.com/ceph/ceph-csi/internal/rbd/errors" "github.com/ceph/ceph-csi/internal/rbd/types" "github.com/ceph/ceph-csi/internal/util" @@ -450,7 +451,7 @@ func TestValidateLastSyncInfo(t *testing.T) { LastSyncDuration: nil, LastSyncBytes: 0, }, - expectedErr: corerbd.ErrLastSyncTimeNotFound.Error(), + expectedErr: rbderrors.ErrLastSyncTimeNotFound.Error(), }, { name: "description without last_snapshot_bytes", @@ -472,7 +473,7 @@ func TestValidateLastSyncInfo(t *testing.T) { LastSyncTime: nil, LastSyncBytes: 0, }, - expectedErr: corerbd.ErrLastSyncTimeNotFound.Error(), + expectedErr: rbderrors.ErrLastSyncTimeNotFound.Error(), }, { name: "description without last_snapshot_sync_seconds", @@ -516,7 +517,7 @@ func TestValidateLastSyncInfo(t *testing.T) { LastSyncTime: nil, LastSyncBytes: 0, }, - expectedErr: corerbd.ErrLastSyncTimeNotFound.Error(), + expectedErr: rbderrors.ErrLastSyncTimeNotFound.Error(), }, } for _, tt := range tests { @@ -567,23 +568,23 @@ func TestGetGRPCError(t *testing.T) { }{ { name: "InvalidArgument", - err: corerbd.ErrInvalidArgument, - expectedErr: status.Error(codes.InvalidArgument, corerbd.ErrInvalidArgument.Error()), + err: rbderrors.ErrInvalidArgument, + expectedErr: status.Error(codes.InvalidArgument, rbderrors.ErrInvalidArgument.Error()), }, { name: "Aborted", - err: corerbd.ErrAborted, - expectedErr: status.Error(codes.Aborted, corerbd.ErrAborted.Error()), + err: rbderrors.ErrAborted, + expectedErr: status.Error(codes.Aborted, rbderrors.ErrAborted.Error()), }, { name: "FailedPrecondition", - err: corerbd.ErrFailedPrecondition, - expectedErr: status.Error(codes.FailedPrecondition, corerbd.ErrFailedPrecondition.Error()), + err: rbderrors.ErrFailedPrecondition, + expectedErr: status.Error(codes.FailedPrecondition, rbderrors.ErrFailedPrecondition.Error()), }, { name: "Unavailable", - err: corerbd.ErrUnavailable, - expectedErr: status.Error(codes.Unavailable, corerbd.ErrUnavailable.Error()), + err: rbderrors.ErrUnavailable, + expectedErr: status.Error(codes.Unavailable, rbderrors.ErrUnavailable.Error()), }, { name: "InvalidError", @@ -597,8 +598,13 @@ func TestGetGRPCError(t *testing.T) { }, { name: "ErrImageNotFound", +<<<<<<< HEAD err: corerbd.ErrImageNotFound, expectedErr: status.Error(codes.NotFound, corerbd.ErrImageNotFound.Error()), +======= + err: rbderrors.ErrImageNotFound, + expectedErr: status.Error(codes.NotFound, rbderrors.ErrImageNotFound.Error()), +>>>>>>> 5cbc1445 (cleanup: move internal/rbd/errors.go to internal/rbd/errors pacakge) }, { name: "ErrPoolNotFound", diff --git a/internal/csi-addons/rbd/volumegroup.go b/internal/csi-addons/rbd/volumegroup.go index c069118b0..17ff361f0 100644 --- a/internal/csi-addons/rbd/volumegroup.go +++ b/internal/csi-addons/rbd/volumegroup.go @@ -23,7 +23,7 @@ import ( "slices" "github.com/ceph/ceph-csi/internal/rbd" - "github.com/ceph/ceph-csi/internal/rbd/group" + rbderrors "github.com/ceph/ceph-csi/internal/rbd/errors" "github.com/ceph/ceph-csi/internal/rbd/types" "github.com/ceph/ceph-csi/internal/util/log" @@ -194,7 +194,7 @@ func (vs *VolumeGroupServer) DeleteVolumeGroup( // resolve the volume group vg, err := mgr.GetVolumeGroupByID(ctx, req.GetVolumeGroupId()) if err != nil { - if errors.Is(err, group.ErrRBDGroupNotFound) { + if errors.Is(err, rbderrors.ErrGroupNotFound) { log.ErrorLog(ctx, "VolumeGroup %q doesn't exists", req.GetVolumeGroupId()) return &volumegroup.DeleteVolumeGroupResponse{}, nil @@ -433,7 +433,7 @@ func (vs *VolumeGroupServer) ControllerGetVolumeGroup( // resolve the volume group vg, err := mgr.GetVolumeGroupByID(ctx, req.GetVolumeGroupId()) if err != nil { - if errors.Is(err, group.ErrRBDGroupNotFound) { + if errors.Is(err, rbderrors.ErrGroupNotFound) { log.ErrorLog(ctx, "VolumeGroup %q doesn't exists", req.GetVolumeGroupId()) return nil, status.Errorf( diff --git a/internal/rbd/clone.go b/internal/rbd/clone.go index 25452921a..4bfd0dcdd 100644 --- a/internal/rbd/clone.go +++ b/internal/rbd/clone.go @@ -21,6 +21,10 @@ import ( "errors" "fmt" +<<<<<<< HEAD +======= + rbderrors "github.com/ceph/ceph-csi/internal/rbd/errors" +>>>>>>> 5cbc1445 (cleanup: move internal/rbd/errors.go to internal/rbd/errors pacakge) "github.com/ceph/ceph-csi/internal/util/k8s" "github.com/ceph/ceph-csi/internal/util/log" @@ -56,17 +60,28 @@ func (rv *rbdVolume) checkCloneImage(ctx context.Context, parentVol *rbdVolume) err := tempClone.checkSnapExists(snap) if err != nil { switch { +<<<<<<< HEAD case errors.Is(err, ErrSnapNotFound): // as the snapshot is not present, create new snapshot,clone and // delete the temporary snapshot err = createRBDClone(ctx, tempClone, rv, snap) +======= + case errors.Is(err, rbderrors.ErrSnapNotFound): + // as the snapshot is not present, create new snapshot, clone and + // don't delete the temporary snapshot + err = createRBDClone(ctx, tempClone, rv, snap, false) +>>>>>>> 5cbc1445 (cleanup: move internal/rbd/errors.go to internal/rbd/errors pacakge) if err != nil { return false, err } return true, nil +<<<<<<< HEAD case errors.Is(err, ErrImageNotFound): +======= + case errors.Is(err, rbderrors.ErrImageNotFound): +>>>>>>> 5cbc1445 (cleanup: move internal/rbd/errors.go to internal/rbd/errors pacakge) // as the temp clone does not exist,check snapshot exists on parent volume // snapshot name is same as temporary clone image snap.RbdImageName = tempClone.RbdImageName @@ -76,7 +91,7 @@ func (rv *rbdVolume) checkCloneImage(ctx context.Context, parentVol *rbdVolume) // create new resources for a cleaner approach err = parentVol.deleteSnapshot(ctx, snap) } - if errors.Is(err, ErrSnapNotFound) { + if errors.Is(err, rbderrors.ErrSnapNotFound) { return false, nil } diff --git a/internal/rbd/controllerserver.go b/internal/rbd/controllerserver.go index bfb5edaab..d93dcf6e7 100644 --- a/internal/rbd/controllerserver.go +++ b/internal/rbd/controllerserver.go @@ -23,6 +23,7 @@ import ( "strconv" csicommon "github.com/ceph/ceph-csi/internal/csi-common" + rbderrors "github.com/ceph/ceph-csi/internal/rbd/errors" "github.com/ceph/ceph-csi/internal/util" "github.com/ceph/ceph-csi/internal/util/k8s" "github.com/ceph/ceph-csi/internal/util/log" @@ -310,10 +311,10 @@ func buildCreateVolumeResponse( // the input error types it expected to use only for CreateVolume as we need to // return different GRPC codes for different functions based on the input. func getGRPCErrorForCreateVolume(err error) error { - if errors.Is(err, ErrVolNameConflict) { + if errors.Is(err, rbderrors.ErrVolNameConflict) { return status.Error(codes.AlreadyExists, err.Error()) } - if errors.Is(err, ErrFlattenInProgress) { + if errors.Is(err, rbderrors.ErrFlattenInProgress) { return status.Error(codes.Aborted, err.Error()) } @@ -428,7 +429,7 @@ func (cs *ControllerServer) CreateVolume( err = cs.createBackingImage(ctx, cr, req.GetSecrets(), rbdVol, parentVol, rbdSnap) if err != nil { - if errors.Is(err, ErrFlattenInProgress) { + if errors.Is(err, rbderrors.ErrFlattenInProgress) { return nil, status.Error(codes.Aborted, err.Error()) } @@ -586,7 +587,11 @@ func (cs *ControllerServer) repairExistingVolume(ctx context.Context, req *csi.C func flattenTemporaryClonedImages(ctx context.Context, rbdVol *rbdVolume, cr *util.Credentials) error { snaps, children, err := rbdVol.listSnapAndChildren() if err != nil { +<<<<<<< HEAD if errors.Is(err, ErrImageNotFound) { +======= + if errors.Is(err, rbderrors.ErrImageNotFound) { +>>>>>>> 5cbc1445 (cleanup: move internal/rbd/errors.go to internal/rbd/errors pacakge) return status.Error(codes.InvalidArgument, err.Error()) } @@ -822,7 +827,7 @@ func checkContentSource( rbdSnap, err := genSnapFromSnapID(ctx, snapshotID, cr, req.GetSecrets()) if err != nil { log.ErrorLog(ctx, "failed to get backend snapshot for %s: %v", snapshotID, err) - if !errors.Is(err, ErrSnapNotFound) { + if !errors.Is(err, rbderrors.ErrSnapNotFound) { return nil, nil, status.Error(codes.Internal, err.Error()) } @@ -842,7 +847,11 @@ func checkContentSource( rbdvol, err := GenVolFromVolID(ctx, volID, cr, req.GetSecrets()) if err != nil { log.ErrorLog(ctx, "failed to get backend image for %s: %v", volID, err) +<<<<<<< HEAD if !errors.Is(err, ErrImageNotFound) { +======= + if !errors.Is(err, rbderrors.ErrImageNotFound) { +>>>>>>> 5cbc1445 (cleanup: move internal/rbd/errors.go to internal/rbd/errors pacakge) return nil, nil, status.Error(codes.Internal, err.Error()) } @@ -882,7 +891,11 @@ func (cs *ControllerServer) checkErrAndUndoReserve( return &csi.DeleteVolumeResponse{}, nil } +<<<<<<< HEAD if errors.Is(err, ErrImageNotFound) { +======= + if errors.Is(err, rbderrors.ErrImageNotFound) { +>>>>>>> 5cbc1445 (cleanup: move internal/rbd/errors.go to internal/rbd/errors pacakge) notFoundErr := rbdVol.ensureImageCleanup(ctx) if notFoundErr != nil { return nil, status.Errorf(codes.Internal, "failed to cleanup image %q: %v", rbdVol, notFoundErr) @@ -957,7 +970,11 @@ func (cs *ControllerServer) DeleteVolume( return nil, status.Error(codes.InvalidArgument, pErr.Error()) } pErr = deleteMigratedVolume(ctx, pmVolID, cr) +<<<<<<< HEAD if pErr != nil && !errors.Is(pErr, ErrImageNotFound) { +======= + if pErr != nil && !errors.Is(pErr, rbderrors.ErrImageNotFound) { +>>>>>>> 5cbc1445 (cleanup: move internal/rbd/errors.go to internal/rbd/errors pacakge) return nil, status.Error(codes.Internal, pErr.Error()) } @@ -1129,7 +1146,11 @@ func (cs *ControllerServer) CreateSnapshot( }() if err != nil { switch { +<<<<<<< HEAD case errors.Is(err, ErrImageNotFound): +======= + case errors.Is(err, rbderrors.ErrImageNotFound): +>>>>>>> 5cbc1445 (cleanup: move internal/rbd/errors.go to internal/rbd/errors pacakge) err = status.Errorf(codes.NotFound, "source Volume ID %s not found", req.GetSourceVolumeId()) case errors.Is(err, util.ErrPoolNotFound): log.ErrorLog(ctx, "failed to get backend volume for %s: %v", req.GetSourceVolumeId(), err) @@ -1198,7 +1219,7 @@ func (cs *ControllerServer) CreateSnapshot( return nil, status.Error(codes.Internal, err.Error()) } defer func() { - if err != nil && !errors.Is(err, ErrFlattenInProgress) { + if err != nil && !errors.Is(err, rbderrors.ErrFlattenInProgress) { errDefer := undoSnapReservation(ctx, rbdSnap, cr) if errDefer != nil { log.WarningLog(ctx, "failed undoing reservation of snapshot: %s %v", req.GetName(), errDefer) @@ -1278,7 +1299,7 @@ func cloneFromSnapshot( } err = vol.flattenRbdImage(ctx, false, rbdHardMaxCloneDepth, rbdSoftMaxCloneDepth) - if errors.Is(err, ErrFlattenInProgress) { + if errors.Is(err, rbderrors.ErrFlattenInProgress) { // if flattening is in progress, return error and do not cleanup return nil, status.Errorf(codes.Internal, err.Error()) } else if err != nil { @@ -1364,7 +1385,7 @@ func (cs *ControllerServer) doSnapshotClone( defer func() { if err != nil { - if !errors.Is(err, ErrFlattenInProgress) { + if !errors.Is(err, rbderrors.ErrFlattenInProgress) { // cleanup clone and snapshot errCleanUp := cleanUpSnapshot(ctx, cloneRbd, rbdSnap, cloneRbd) if errCleanUp != nil { @@ -1474,7 +1495,11 @@ func (cs *ControllerServer) DeleteSnapshot( // if the error is ErrImageNotFound, We need to cleanup the image from // trash and remove the metadata in OMAP. +<<<<<<< HEAD if errors.Is(err, ErrImageNotFound) { +======= + if errors.Is(err, rbderrors.ErrImageNotFound) { +>>>>>>> 5cbc1445 (cleanup: move internal/rbd/errors.go to internal/rbd/errors pacakge) log.UsefulLog(ctx, "cleaning up leftovers of snapshot %s: %v", snapshotID, err) err = cleanUpImageAndSnapReservation(ctx, rbdSnap, cr) @@ -1577,7 +1602,11 @@ func (cs *ControllerServer) ControllerExpandVolume( rbdVol, err := genVolFromVolIDWithMigration(ctx, volID, cr, req.GetSecrets()) if err != nil { switch { +<<<<<<< HEAD case errors.Is(err, ErrImageNotFound): +======= + case errors.Is(err, rbderrors.ErrImageNotFound): +>>>>>>> 5cbc1445 (cleanup: move internal/rbd/errors.go to internal/rbd/errors pacakge) err = status.Errorf(codes.NotFound, "volume ID %s not found", volID) case errors.Is(err, util.ErrPoolNotFound): log.ErrorLog(ctx, "failed to get backend volume for %s: %v", volID, err) diff --git a/internal/rbd/diskusage.go b/internal/rbd/diskusage.go index 444a5a604..aee2d8536 100644 --- a/internal/rbd/diskusage.go +++ b/internal/rbd/diskusage.go @@ -18,6 +18,8 @@ package rbd import ( "fmt" + + rbderrors "github.com/ceph/ceph-csi/internal/rbd/errors" ) // Sparsify checks the size of the objects in the RBD image and calls @@ -32,7 +34,7 @@ func (ri *rbdImage) Sparsify() error { } if inUse { // if the image is in use, we should not sparsify it, return ErrImageInUse. - return ErrImageInUse + return rbderrors.ErrImageInUse } image, err := ri.open() diff --git a/internal/rbd/errors.go b/internal/rbd/errors/errors.go similarity index 87% rename from internal/rbd/errors.go rename to internal/rbd/errors/errors.go index 8248dd98c..887496116 100644 --- a/internal/rbd/errors.go +++ b/internal/rbd/errors/errors.go @@ -14,9 +14,15 @@ See the License for the specific language governing permissions and limitations under the License. */ -package rbd +package rbderrors -import "errors" +import ( + "errors" + "fmt" + + librados "github.com/ceph/go-ceph/rados" + librbd "github.com/ceph/go-ceph/rbd" +) var ( // ErrImageNotFound is returned when image name is not found in the cluster on the given pool and/or namespace. @@ -57,4 +63,8 @@ var ( ErrInvalidArgument = errors.New("invalid arguments provided") // ErrImageInUse is returned when the image is in use. ErrImageInUse = errors.New("image is in use") + // ErrGroupNotConnected is returned when the RBD group is not connected. + ErrGroupNotConnected = fmt.Errorf("%w: RBD group is not connected", librados.ErrNotConnected) + // ErrGroupNotFound is returned when group is not found in the cluster on the given pool and/or namespace. + ErrGroupNotFound = fmt.Errorf("%w: RBD group not found", librbd.ErrNotFound) ) diff --git a/internal/rbd/group/group_snapshot.go b/internal/rbd/group/group_snapshot.go index 81daaf010..086c1784c 100644 --- a/internal/rbd/group/group_snapshot.go +++ b/internal/rbd/group/group_snapshot.go @@ -24,6 +24,7 @@ import ( "github.com/container-storage-interface/spec/lib/go/csi" "google.golang.org/protobuf/types/known/timestamppb" + rbderrors "github.com/ceph/ceph-csi/internal/rbd/errors" "github.com/ceph/ceph-csi/internal/rbd/types" "github.com/ceph/ceph-csi/internal/util" "github.com/ceph/ceph-csi/internal/util/log" @@ -70,7 +71,7 @@ func GetVolumeGroupSnapshot( attrs, err := vgs.getVolumeGroupAttributes(ctx) if err != nil { - if errors.Is(err, ErrRBDGroupNotFound) { + if errors.Is(err, rbderrors.ErrGroupNotFound) { log.ErrorLog(ctx, "%v, returning empty volume group snapshot %q", vgs, err) return vgs, err diff --git a/internal/rbd/group/util.go b/internal/rbd/group/util.go index 6c4e56d5d..d2e62bee3 100644 --- a/internal/rbd/group/util.go +++ b/internal/rbd/group/util.go @@ -25,6 +25,7 @@ import ( "github.com/ceph/go-ceph/rados" "github.com/ceph/ceph-csi/internal/journal" + rbderrors "github.com/ceph/ceph-csi/internal/rbd/errors" "github.com/ceph/ceph-csi/internal/util" "github.com/ceph/ceph-csi/internal/util/log" ) @@ -87,7 +88,82 @@ func (cvg *commonVolumeGroup) initCommonVolumeGroup( pool, err := util.GetPoolName(mons, creds, csiID.LocationID) if err != nil { +<<<<<<< HEAD return fmt.Errorf("failed to get pool for volume group id %q: %w", id, err) +======= + return fmt.Errorf("failed to get pool for volume group id %q: %w", cvg.id, err) + } + + cvg.monitors = mons + cvg.namespace = namespace + cvg.pool = pool + + return nil +} + +// generateVolumeGroupFromMapping checks the clusterID and poolID mapping and +// generates commonVolumeGroup structure for the mapped clusterID and poolID. +func (cvg *commonVolumeGroup) generateVolumeGroupFromMapping( + ctx context.Context, + csiID util.CSIIdentifier, + mapping *[]util.ClusterMappingInfo, +) error { + mcsiID := csiID + existingClusterID := csiID.ClusterID + existingPoolID := strconv.FormatInt(csiID.LocationID, 10) + + for _, cm := range *mapping { + for key, val := range cm.ClusterIDMapping { + mappedClusterID := util.GetMappedID(key, val, csiID.ClusterID) + if mappedClusterID == "" { + continue + } + + log.DebugLog(ctx, + "found new clusterID mapping %s for existing clusterID %s", mappedClusterID, existingClusterID) + + // Add mapped clusterID to Identifier + mcsiID.ClusterID = mappedClusterID + for _, pools := range cm.RBDpoolIDMappingInfo { + for key, val := range pools { + mappedPoolID := util.GetMappedID(key, val, existingPoolID) + if mappedPoolID == "" { + continue + } + log.DebugLog(ctx, + "found new poolID mapping %s for existing poolID %s", mappedPoolID, existingPoolID) + + mPID, err := strconv.ParseInt(mappedPoolID, 10, 64) + if err != nil { + return err + } + mcsiID.LocationID = mPID + err = cvg.generateVolumeGroup(mcsiID) + if ShouldRetryVolumeGroupGeneration(err) { + continue + } + + return err + } + } + } + } + + return util.ErrPoolNotFound +} + +func (cvg *commonVolumeGroup) initCommonVolumeGroup( + ctx context.Context, + id string, + csiDriver string, + creds *util.Credentials, +) error { + csiID := util.CSIIdentifier{} + + err := csiID.DecomposeCSIID(id) + if err != nil { + return fmt.Errorf("failed to decompose volume group id %q: %w", id, err) +>>>>>>> 5cbc1445 (cleanup: move internal/rbd/errors.go to internal/rbd/errors pacakge) } cvg.csiDriver = csiDriver @@ -95,9 +171,34 @@ func (cvg *commonVolumeGroup) initCommonVolumeGroup( cvg.id = id cvg.clusterID = csiID.ClusterID cvg.objectUUID = csiID.ObjectUUID +<<<<<<< HEAD cvg.monitors = mons cvg.pool = pool cvg.namespace = namespace +======= + // cvg.monitors, cvg.namespace, cvg.pool are set in generateVolumeGroup + + err = cvg.generateVolumeGroup(csiID) + // If the error is not a retryable error, return from here. + if err != nil && !ShouldRetryVolumeGroupGeneration(err) { + return err + } + + // If the error is a retryable error, we should try to get the cluster mapping + // and generate the volume group from the mapping. + if ShouldRetryVolumeGroupGeneration(err) { + mapping, err := util.GetClusterMappingInfo(csiID.ClusterID) + if err != nil { + return err + } + if mapping != nil { + err = cvg.generateVolumeGroupFromMapping(ctx, csiID, mapping) + if err != nil { + return err + } + } + } +>>>>>>> 5cbc1445 (cleanup: move internal/rbd/errors.go to internal/rbd/errors pacakge) log.DebugLog(ctx, "object for volume group %q has been initialized", cvg.id) @@ -148,7 +249,7 @@ func (cvg *commonVolumeGroup) getVolumeGroupAttributes(ctx context.Context) (*jo if attrs.GroupName == "" { log.ErrorLog(ctx, "volume group with id %v not found", cvg.id) - return nil, ErrRBDGroupNotFound + return nil, rbderrors.ErrGroupNotFound } cvg.requestName = attrs.RequestName @@ -275,12 +376,12 @@ func (cvg *commonVolumeGroup) GetIOContext(ctx context.Context) (*rados.IOContex conn, err := cvg.getConnection(ctx) if err != nil { - return nil, fmt.Errorf("%w: failed to connect: %w", ErrRBDGroupNotConnected, err) + return nil, fmt.Errorf("%w: failed to connect: %w", rbderrors.ErrGroupNotConnected, err) } ioctx, err := conn.GetIoctx(cvg.pool) if err != nil { - return nil, fmt.Errorf("%w: failed to get IOContext: %w", ErrRBDGroupNotConnected, err) + return nil, fmt.Errorf("%w: failed to get IOContext: %w", rbderrors.ErrGroupNotConnected, err) } if cvg.namespace != "" { @@ -336,3 +437,28 @@ func (cvg *commonVolumeGroup) GetCreationTime(ctx context.Context) (*time.Time, return cvg.creationTime, nil } + +// ShouldRetryVolumeGroupGeneration determines whether the process of finding or generating +// volumegroups should continue based on the type of error encountered. +// +// It checks if the given error matches any of the following known errors: +// - ErrPoolNotFound: The rbd pool where the volumegroup/omap is expected doesn't exist. +// - ErrRBDGroupNotFound: The volumegroup doesn't exist in the rbd pool. +// - rados.ErrPermissionDenied: Permissions to access the pool is denied. +// +// If any of these errors are encountered, the function returns `true`, indicating +// that the volumegroup search should continue because of known error. Otherwise, it +// returns `false`, meaning the search should stop. +// +// This helper function is used in scenarios where multiple attempts may be made +// to retrieve or generate volumegroup information, and we want to gracefully handle +// specific failure cases while retrying for others. +func ShouldRetryVolumeGroupGeneration(err error) bool { + if err == nil { + return false // No error, do not retry + } + // Continue searching for specific known errors + return (errors.Is(err, util.ErrPoolNotFound) || + errors.Is(err, rbderrors.ErrGroupNotFound) || + errors.Is(err, rados.ErrPermissionDenied)) +} diff --git a/internal/rbd/group/util_test.go b/internal/rbd/group/util_test.go new file mode 100644 index 000000000..f0837a23e --- /dev/null +++ b/internal/rbd/group/util_test.go @@ -0,0 +1,73 @@ +/* +Copyright 2025 The Ceph-CSI Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package group + +import ( + "errors" + "testing" + + rbderrors "github.com/ceph/ceph-csi/internal/rbd/errors" + "github.com/ceph/ceph-csi/internal/util" + + "github.com/ceph/go-ceph/rados" +) + +func Test_shouldRetryVolumeGroupGeneration(t *testing.T) { + t.Parallel() + type args struct { + err error + } + tests := []struct { + name string + args args + want bool + }{ + { + name: "No error (stop searching)", + args: args{err: nil}, + want: false, // No error, stop searching + }, + { + name: "ErrPoolNotFound (continue searching)", + args: args{err: util.ErrPoolNotFound}, + want: true, // Known error, continue searching + }, + { + name: "ErrRBDGroupNotFound (continue searching)", + args: args{err: rbderrors.ErrGroupNotFound}, + want: true, // Known error, continue searching + }, + { + name: "ErrPermissionDenied (continue searching)", + args: args{err: rados.ErrPermissionDenied}, + want: true, // Known error, continue searching + }, + { + name: "Different error (stop searching)", + args: args{err: errors.New("unknown error")}, + want: false, // Unknown error, stop searching + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + t.Parallel() + if got := ShouldRetryVolumeGroupGeneration(tt.args.err); got != tt.want { + t.Errorf("ShouldRetryVolumeGeneration() = %v, want %v", got, tt.want) + } + }) + } +} diff --git a/internal/rbd/group/volume_group.go b/internal/rbd/group/volume_group.go index e8414f3dd..5ec8acf9b 100644 --- a/internal/rbd/group/volume_group.go +++ b/internal/rbd/group/volume_group.go @@ -22,21 +22,16 @@ import ( "fmt" "github.com/ceph/go-ceph/rados" - librados "github.com/ceph/go-ceph/rados" librbd "github.com/ceph/go-ceph/rbd" "github.com/container-storage-interface/spec/lib/go/csi" "github.com/csi-addons/spec/lib/go/volumegroup" + rbderrors "github.com/ceph/ceph-csi/internal/rbd/errors" "github.com/ceph/ceph-csi/internal/rbd/types" "github.com/ceph/ceph-csi/internal/util" "github.com/ceph/ceph-csi/internal/util/log" ) -var ( - ErrRBDGroupNotConnected = fmt.Errorf("%w: RBD group is not connected", librados.ErrNotConnected) - ErrRBDGroupNotFound = fmt.Errorf("%w: RBD group not found", librbd.ErrNotFound) -) - // volumeGroup handles all requests for 'rbd group' operations. type volumeGroup struct { commonVolumeGroup @@ -77,7 +72,7 @@ func GetVolumeGroup( attrs, err := vg.getVolumeGroupAttributes(ctx) if err != nil { - if errors.Is(err, ErrRBDGroupNotFound) { + if errors.Is(err, rbderrors.ErrGroupNotFound) { log.ErrorLog(ctx, "%v, returning empty volume group %q", vg, err) return vg, err diff --git a/internal/rbd/group_controllerserver.go b/internal/rbd/group_controllerserver.go index a4b313ea7..2dc171b00 100644 --- a/internal/rbd/group_controllerserver.go +++ b/internal/rbd/group_controllerserver.go @@ -24,7 +24,7 @@ import ( "google.golang.org/grpc/codes" "google.golang.org/grpc/status" - "github.com/ceph/ceph-csi/internal/rbd/group" + rbderrors "github.com/ceph/ceph-csi/internal/rbd/errors" "github.com/ceph/ceph-csi/internal/rbd/types" "github.com/ceph/ceph-csi/internal/util" "github.com/ceph/ceph-csi/internal/util/log" @@ -215,7 +215,7 @@ func (cs *ControllerServer) DeleteVolumeGroupSnapshot( groupSnapshot, err := mgr.GetVolumeGroupSnapshotByID(ctx, groupSnapshotID) if err != nil { - if errors.Is(err, group.ErrRBDGroupNotFound) { + if errors.Is(err, rbderrors.ErrGroupNotFound) { log.ErrorLog(ctx, "VolumeGroupSnapshot %q doesn't exists", groupSnapshotID) return &csi.DeleteVolumeGroupSnapshotResponse{}, nil @@ -261,7 +261,7 @@ func (cs *ControllerServer) GetVolumeGroupSnapshot( groupSnapshot, err := mgr.GetVolumeGroupSnapshotByID(ctx, groupSnapshotID) if err != nil { - if errors.Is(err, group.ErrRBDGroupNotFound) { + if errors.Is(err, rbderrors.ErrGroupNotFound) { log.ErrorLog(ctx, "VolumeGroupSnapshot %q doesn't exists", groupSnapshotID) return nil, status.Errorf( diff --git a/internal/rbd/manager.go b/internal/rbd/manager.go index ced7026fc..717ea7ff8 100644 --- a/internal/rbd/manager.go +++ b/internal/rbd/manager.go @@ -22,6 +22,7 @@ import ( "fmt" "github.com/ceph/ceph-csi/internal/journal" + rbderrors "github.com/ceph/ceph-csi/internal/rbd/errors" rbd_group "github.com/ceph/ceph-csi/internal/rbd/group" "github.com/ceph/ceph-csi/internal/rbd/types" "github.com/ceph/ceph-csi/internal/util" @@ -174,7 +175,11 @@ func (mgr *rbdManager) GetVolumeByID(ctx context.Context, id string) (types.Volu volume, err := GenVolFromVolID(ctx, id, creds, mgr.secrets) if err != nil { switch { +<<<<<<< HEAD case errors.Is(err, ErrImageNotFound): +======= + case errors.Is(err, rbderrors.ErrImageNotFound): +>>>>>>> 5cbc1445 (cleanup: move internal/rbd/errors.go to internal/rbd/errors pacakge) err = fmt.Errorf("volume %s not found: %w", id, err) return nil, err @@ -199,7 +204,11 @@ func (mgr *rbdManager) GetSnapshotByID(ctx context.Context, id string) (types.Sn snapshot, err := genSnapFromSnapID(ctx, id, creds, mgr.secrets) if err != nil { switch { +<<<<<<< HEAD case errors.Is(err, ErrImageNotFound): +======= + case errors.Is(err, rbderrors.ErrImageNotFound): +>>>>>>> 5cbc1445 (cleanup: move internal/rbd/errors.go to internal/rbd/errors pacakge) err = fmt.Errorf("volume %s not found: %w", id, err) return nil, err @@ -467,7 +476,11 @@ func (mgr *rbdManager) CreateVolumeGroupSnapshot( return vgs, nil } +<<<<<<< HEAD } else if err != nil && !errors.Is(ErrImageNotFound, err) { +======= + } else if err != nil && !errors.Is(err, rbderrors.ErrImageNotFound) { +>>>>>>> 5cbc1445 (cleanup: move internal/rbd/errors.go to internal/rbd/errors pacakge) // ErrImageNotFound can be returned if the VolumeGroupSnapshot // could not be found. It is expected that it does not exist // yet, in which case it will be created below. @@ -503,3 +516,137 @@ func (mgr *rbdManager) CreateVolumeGroupSnapshot( return vgs, nil } +<<<<<<< HEAD +======= + +// RegenerateVolumeGroupJournal regenerate the omap data for the volume group. +// This performs the following operations: +// - extracts clusterID and Mons from the cluster mapping +// - Retrieves pool and journalPool parameters from the VolumeGroupReplicationClass +// - Reserves omap data +// - Add volumeIDs mapping to the reserved volume group omap object +// - Generate new volume group handle +// +// Returns the generated volume group handle. +// +// Note: The new volume group handle will differ from the original as it includes +// poolID and clusterID, which vary between clusters. +func (mgr *rbdManager) RegenerateVolumeGroupJournal( + ctx context.Context, + groupID, requestName string, + volumeIds []string, +) (string, error) { + var ( + clusterID string + monitors string + pool string + journalPool string + namePrefix string + groupUUID string + vgName string + + gi util.CSIIdentifier + ok bool + err error + ) + + err = gi.DecomposeCSIID(groupID) + if err != nil { + return "", fmt.Errorf("%w: error decoding volume group ID (%w) (%s)", rbderrors.ErrInvalidVolID, err, groupID) + } + + monitors, clusterID, err = util.FetchMappedClusterIDAndMons(ctx, gi.ClusterID) + if err != nil { + return "", err + } + + pool, ok = mgr.parameters["pool"] + if !ok { + return "", errors.New("required 'pool' parameter missing in parameters") + } + + journalPool, ok = mgr.parameters["journalPool"] + if !ok || journalPool == "" { + journalPool = pool + } + + vgJournal, err := mgr.getVolumeGroupJournal(clusterID) + if err != nil { + return "", err + } + defer vgJournal.Destroy() + + namePrefix = mgr.parameters["volumeGroupNamePrefix"] + vgData, err := vgJournal.CheckReservation(ctx, journalPool, requestName, namePrefix) + if err != nil { + return "", err + } + + if vgData != nil { + groupUUID = vgData.GroupUUID + vgName = vgData.GroupName + } else { + log.DebugLog(ctx, "the journal does not contain a reservation for a volume group with name %q yet", requestName) + groupUUID, vgName, err = vgJournal.ReserveName(ctx, journalPool, requestName, gi.ObjectUUID, namePrefix) + if err != nil { + return "", fmt.Errorf("failed to reserve volume group for name %q: %w", requestName, err) + } + defer func() { + if err != nil { + undoError := vgJournal.UndoReservation(ctx, journalPool, vgName, requestName) + if undoError != nil { + log.ErrorLog(ctx, "failed to undo the reservation for volume group %q: %w", requestName, undoError) + } + } + }() + } + + volumes := make([]types.Volume, len(volumeIds)) + defer func() { + for _, v := range volumes { + v.Destroy(ctx) + } + }() + var volume types.Volume + for i, id := range volumeIds { + volume, err = mgr.GetVolumeByID(ctx, id) + if err != nil { + return "", fmt.Errorf("failed to find required volume %q for volume group id %q: %w", id, vgName, err) + } + + volumes[i] = volume + } + + var volID string + for _, vol := range volumes { + volID, err = vol.GetID(ctx) + if err != nil { + return "", fmt.Errorf("failed to get VolumeID for %q: %w", vol, err) + } + + toAdd := map[string]string{ + volID: "", + } + log.DebugLog(ctx, "adding volume mapping for volume %q to volume group %q", volID, vgName) + err = mgr.vgJournal.AddVolumesMapping(ctx, pool, gi.ObjectUUID, toAdd) + if err != nil { + return "", fmt.Errorf("failed to add mapping for volume %q to volume group %q: %w", volID, vgName, err) + } + } + + _, poolID, err := util.GetPoolIDs(ctx, monitors, journalPool, pool, mgr.creds) + if err != nil { + return "", fmt.Errorf("failed to get poolID for %q: %w", groupUUID, err) + } + + groupHandle, err := util.GenerateVolID(ctx, monitors, mgr.creds, poolID, pool, clusterID, groupUUID) + if err != nil { + return "", fmt.Errorf("failed to generate a unique CSI volume group with uuid for %q: %w", groupUUID, err) + } + + log.DebugLog(ctx, "re-generated Group ID (%s) and Group Name (%s) for request name (%s)", + groupHandle, vgName, requestName) + + return groupHandle, nil +} +>>>>>>> 5cbc1445 (cleanup: move internal/rbd/errors.go to internal/rbd/errors pacakge) diff --git a/internal/rbd/migration.go b/internal/rbd/migration.go index b7629251b..f357206fa 100644 --- a/internal/rbd/migration.go +++ b/internal/rbd/migration.go @@ -21,6 +21,7 @@ import ( "encoding/hex" "strings" + rbderrors "github.com/ceph/ceph-csi/internal/rbd/errors" "github.com/ceph/ceph-csi/internal/util" "github.com/ceph/ceph-csi/internal/util/log" ) @@ -39,13 +40,13 @@ func parseMigrationVolID(vh string) (*migrationVolID, error) { handSlice := strings.Split(vh, migVolIDFieldSep) if len(handSlice) < migVolIDTotalLength { // its short of length in this case, so return error - return nil, ErrInvalidVolID + return nil, rbderrors.ErrInvalidVolID } // Store pool poolHash := strings.Join(handSlice[migVolIDSplitLength:], migVolIDFieldSep) poolByte, dErr := hex.DecodeString(poolHash) if dErr != nil { - return nil, ErrMissingPoolNameInVolID + return nil, rbderrors.ErrMissingPoolNameInVolID } mh.poolName = string(poolByte) // Parse migration mons( for clusterID) and image @@ -62,13 +63,13 @@ func parseMigrationVolID(vh string) (*migrationVolID, error) { } } if mh.imageName == "" { - return nil, ErrMissingImageNameInVolID + return nil, rbderrors.ErrMissingImageNameInVolID } if mh.poolName == "" { - return nil, ErrMissingPoolNameInVolID + return nil, rbderrors.ErrMissingPoolNameInVolID } if mh.clusterID == "" { - return nil, ErrDecodeClusterIDFromMonsInVolID + return nil, rbderrors.ErrDecodeClusterIDFromMonsInVolID } return mh, nil diff --git a/internal/rbd/mirror.go b/internal/rbd/mirror.go index e20927d01..67f79a1a5 100644 --- a/internal/rbd/mirror.go +++ b/internal/rbd/mirror.go @@ -20,6 +20,7 @@ import ( "fmt" "time" + rbderrors "github.com/ceph/ceph-csi/internal/rbd/errors" "github.com/ceph/ceph-csi/internal/rbd/types" "github.com/ceph/ceph-csi/internal/util" "github.com/ceph/ceph-csi/internal/util/log" @@ -56,7 +57,7 @@ func (rv *rbdVolume) HandleParentImageExistence( if rv.ParentInTrash { return fmt.Errorf("%w: failed to enable mirroring on image %q:"+ " parent is in trash", - ErrFailedPrecondition, rv) + rbderrors.ErrFailedPrecondition, rv) } parent, err := rv.getParent() @@ -72,7 +73,7 @@ func (rv *rbdVolume) HandleParentImageExistence( if parentMirroringInfo.GetState() != librbd.MirrorImageEnabled.String() { return fmt.Errorf("%w: failed to enable mirroring on image %q: "+ "parent image %q is not enabled for mirroring", - ErrFailedPrecondition, rv, parent) + rbderrors.ErrFailedPrecondition, rv, parent) } return nil @@ -204,7 +205,7 @@ func (ri *rbdImage) Resync(_ context.Context) error { // If we issued a resync, return a non-final error as image needs to be recreated // locally. Caller retries till RBD syncs an initial version of the image to // report its status in the resync request. - return fmt.Errorf("%w: awaiting initial resync due to split brain", ErrUnavailable) + return fmt.Errorf("%w: awaiting initial resync due to split brain", rbderrors.ErrUnavailable) } // GetGlobalMirroringStatus get the mirroring status of an image. diff --git a/internal/rbd/nodeserver.go b/internal/rbd/nodeserver.go index 905fcf8af..fc590c838 100644 --- a/internal/rbd/nodeserver.go +++ b/internal/rbd/nodeserver.go @@ -25,6 +25,7 @@ import ( "strings" csicommon "github.com/ceph/ceph-csi/internal/csi-common" + rbderrors "github.com/ceph/ceph-csi/internal/rbd/errors" "github.com/ceph/ceph-csi/internal/util" "github.com/ceph/ceph-csi/internal/util/fscrypt" "github.com/ceph/ceph-csi/internal/util/log" @@ -1034,7 +1035,7 @@ func (ns *NodeServer) NodeUnstageVolume( } // If not mounted, and error is anything other than metadata file missing, it is an error - if !errors.Is(err, ErrMissingStash) { + if !errors.Is(err, rbderrors.ErrMissingStash) { return nil, status.Error(codes.Internal, err.Error()) } diff --git a/internal/rbd/rbd_journal.go b/internal/rbd/rbd_journal.go index 04d354889..17e8faa85 100644 --- a/internal/rbd/rbd_journal.go +++ b/internal/rbd/rbd_journal.go @@ -22,6 +22,7 @@ import ( "fmt" "github.com/ceph/ceph-csi/internal/journal" + rbderrors "github.com/ceph/ceph-csi/internal/rbd/errors" "github.com/ceph/ceph-csi/internal/util" "github.com/ceph/ceph-csi/internal/util/k8s" "github.com/ceph/ceph-csi/internal/util/log" @@ -172,10 +173,14 @@ func checkSnapCloneExists( // Fetch on-disk image attributes err = vol.getImageInfo() if err != nil { +<<<<<<< HEAD if errors.Is(err, ErrImageNotFound) { +======= + if errors.Is(err, rbderrors.ErrImageNotFound) { +>>>>>>> 5cbc1445 (cleanup: move internal/rbd/errors.go to internal/rbd/errors pacakge) err = parentVol.deleteSnapshot(ctx, rbdSnap) if err != nil { - if !errors.Is(err, ErrSnapNotFound) { + if !errors.Is(err, rbderrors.ErrSnapNotFound) { log.ErrorLog(ctx, "failed to delete snapshot %s: %v", rbdSnap, err) return false, err @@ -203,7 +208,7 @@ func checkSnapCloneExists( // check snapshot exists if not create it err = vol.checkSnapExists(rbdSnap) - if errors.Is(err, ErrSnapNotFound) { + if errors.Is(err, rbderrors.ErrSnapNotFound) { // create snapshot sErr := vol.createSnapshot(ctx, rbdSnap) if sErr != nil { @@ -298,7 +303,11 @@ func (rv *rbdVolume) Exists(ctx context.Context, parentVol *rbdVolume) (bool, er // Fetch on-disk image attributes and compare against request err = rv.getImageInfo() switch { +<<<<<<< HEAD case errors.Is(err, ErrImageNotFound) && parentVol != nil: +======= + case errors.Is(err, rbderrors.ErrImageNotFound) && parentVol != nil: +>>>>>>> 5cbc1445 (cleanup: move internal/rbd/errors.go to internal/rbd/errors pacakge) // Need to check cloned info here not on createvolume found, cErr := rv.checkCloneImage(ctx, parentVol) if cErr != nil { @@ -312,7 +321,11 @@ func (rv *rbdVolume) Exists(ctx context.Context, parentVol *rbdVolume) (bool, er return false, err } +<<<<<<< HEAD case errors.Is(err, ErrImageNotFound) && parentVol == nil: +======= + case errors.Is(err, rbderrors.ErrImageNotFound) && parentVol == nil: +>>>>>>> 5cbc1445 (cleanup: move internal/rbd/errors.go to internal/rbd/errors pacakge) // image not found, undo the reservation err = j.UndoReservation(ctx, rv.JournalPool, rv.Pool, rv.RbdImageName, rv.RequestName) @@ -330,7 +343,7 @@ func (rv *rbdVolume) Exists(ctx context.Context, parentVol *rbdVolume) (bool, er // size checks if rv.VolSize < requestSize { return false, fmt.Errorf("%w: image with the same name (%s) but with different size already exists", - ErrVolNameConflict, rv.RbdImageName) + rbderrors.ErrVolNameConflict, rv.RbdImageName) } // TODO: We should also ensure image features and format is the same @@ -598,7 +611,7 @@ func RegenerateJournal( err = vi.DecomposeCSIID(rbdVol.VolID) if err != nil { return "", fmt.Errorf("%w: error decoding volume ID (%w) (%s)", - ErrInvalidVolID, err, rbdVol.VolID) + rbderrors.ErrInvalidVolID, err, rbdVol.VolID) } rbdVol.Owner = owner diff --git a/internal/rbd/rbd_util.go b/internal/rbd/rbd_util.go index d1fc4a328..caa91d2b7 100644 --- a/internal/rbd/rbd_util.go +++ b/internal/rbd/rbd_util.go @@ -28,6 +28,13 @@ import ( "strings" "time" +<<<<<<< HEAD +======= + "github.com/ceph/ceph-csi/pkg/util/crypto" + "github.com/ceph/ceph-csi/pkg/util/kernel" + + rbderrors "github.com/ceph/ceph-csi/internal/rbd/errors" +>>>>>>> 5cbc1445 (cleanup: move internal/rbd/errors.go to internal/rbd/errors pacakge) "github.com/ceph/ceph-csi/internal/rbd/types" "github.com/ceph/ceph-csi/internal/util" "github.com/ceph/ceph-csi/internal/util/log" @@ -524,7 +531,11 @@ func (ri *rbdImage) open() (*librbd.Image, error) { image, err := librbd.OpenImage(ri.ioctx, ri.RbdImageName, librbd.NoSnapshot) if err != nil { if errors.Is(err, librbd.ErrNotFound) { +<<<<<<< HEAD err = fmt.Errorf("Failed as %w (internal %w)", ErrImageNotFound, err) +======= + err = fmt.Errorf("Failed as %w (internal %w)", rbderrors.ErrImageNotFound, err) +>>>>>>> 5cbc1445 (cleanup: move internal/rbd/errors.go to internal/rbd/errors pacakge) } return nil, err @@ -542,7 +553,11 @@ func (ri *rbdImage) open() (*librbd.Image, error) { func (ri *rbdImage) isInUse() (bool, error) { image, err := ri.open() if err != nil { +<<<<<<< HEAD if errors.Is(err, ErrImageNotFound) || errors.Is(err, util.ErrPoolNotFound) { +======= + if errors.Is(err, rbderrors.ErrImageNotFound) || errors.Is(err, util.ErrPoolNotFound) { +>>>>>>> 5cbc1445 (cleanup: move internal/rbd/errors.go to internal/rbd/errors pacakge) return false, err } // any error should assume something else is using the image @@ -681,7 +696,11 @@ func (ri *rbdImage) Delete(ctx context.Context) error { err = rbdImage.Trash(0) if err != nil { if errors.Is(err, librbd.ErrNotFound) { +<<<<<<< HEAD return fmt.Errorf("Failed as %w (internal %w)", ErrImageNotFound, err) +======= + return fmt.Errorf("Failed as %w (internal %w)", rbderrors.ErrImageNotFound, err) +>>>>>>> 5cbc1445 (cleanup: move internal/rbd/errors.go to internal/rbd/errors pacakge) } log.ErrorLog(ctx, "failed to delete rbd image: %s, error: %v", ri, err) @@ -731,7 +750,18 @@ func (rv *rbdVolume) DeleteTempImage(ctx context.Context) error { tempClone := rv.generateTempClone() err := tempClone.Delete(ctx) if err != nil { +<<<<<<< HEAD if errors.Is(err, ErrImageNotFound) { +======= + if !errors.Is(err, rbderrors.ErrImageNotFound) && !errors.Is(err, rbderrors.ErrSnapNotFound) { + return fmt.Errorf("failed to delete snapshot %q: %w", snap, err) + } + } + + err = tempClone.Delete(ctx) + if err != nil { + if errors.Is(err, rbderrors.ErrImageNotFound) { +>>>>>>> 5cbc1445 (cleanup: move internal/rbd/errors.go to internal/rbd/errors pacakge) return tempClone.ensureImageCleanup(ctx) } else { // return error if it is not ErrImageNotFound @@ -770,7 +800,11 @@ func (ri *rbdImage) getCloneDepth(ctx context.Context) (uint, error) { // if the parent image is moved to trash the name will be present // in rbd image info but the image will be in trash, in that case // return the found depth +<<<<<<< HEAD if errors.Is(err, ErrImageNotFound) { +======= + if errors.Is(err, rbderrors.ErrImageNotFound) { +>>>>>>> 5cbc1445 (cleanup: move internal/rbd/errors.go to internal/rbd/errors pacakge) return depth, nil } log.ErrorLog(ctx, "failed to check depth on image %s: %s", &vol, err) @@ -865,7 +899,7 @@ func (ri *rbdImage) flattenRbdImage( return err } if forceFlatten || depth >= hardlimit { - return fmt.Errorf("%w: flatten is in progress for image %s", ErrFlattenInProgress, ri.RbdImageName) + return fmt.Errorf("%w: flatten is in progress for image %s", rbderrors.ErrFlattenInProgress, ri.RbdImageName) } log.DebugLog(ctx, "successfully added task to flatten image %q", ri) } @@ -956,7 +990,11 @@ func (ri *rbdImage) checkImageChainHasFeature(ctx context.Context, feature uint6 // is in the trash, when we try to open the parent image to get its // information it fails because it is already in trash. We should // treat error as nil if the parent is not found. +<<<<<<< HEAD if errors.Is(err, ErrImageNotFound) { +======= + if errors.Is(err, rbderrors.ErrImageNotFound) { +>>>>>>> 5cbc1445 (cleanup: move internal/rbd/errors.go to internal/rbd/errors pacakge) return false, nil } log.ErrorLog(ctx, "failed to get image info for %s: %s", rbdImg.String(), err) @@ -1194,6 +1232,33 @@ func generateVolumeFromVolumeID( return rbdVol, err } +// ShouldRetryVolumeGeneration determines whether the process of finding or generating +// volumes should continue based on the type of error encountered. +// +// It checks if the given error matches any of the following known errors: +// - util.ErrKeyNotFound: The key required to locate the volume is missing in Rados omap. +// - util.ErrPoolNotFound: The rbd pool where the volume/omap is expected doesn't exist. +// - ErrImageNotFound: The image doesn't exist in the rbd pool. +// - rados.ErrPermissionDenied: Permissions to access the pool is denied. +// +// If any of these errors are encountered, the function returns `true`, indicating +// that the volume search should continue because of known error. Otherwise, it +// returns `false`, meaning the search should stop. +// +// This helper function is used in scenarios where multiple attempts may be made +// to retrieve or generate volume information, and we want to gracefully handle +// specific failure cases while retrying for others. +func ShouldRetryVolumeGeneration(err error) bool { + if err == nil { + return false // No error, do not retry + } + // Continue searching for specific known errors + return (errors.Is(err, util.ErrKeyNotFound) || + errors.Is(err, util.ErrPoolNotFound) || + errors.Is(err, rbderrors.ErrImageNotFound) || + errors.Is(err, rados.ErrPermissionDenied)) +} + // GenVolFromVolID generates a rbdVolume structure from the provided identifier, updating // the structure with elements from on-disk image metadata as well. func GenVolFromVolID( @@ -1210,11 +1275,15 @@ func GenVolFromVolID( err := vi.DecomposeCSIID(volumeID) if err != nil { return vol, fmt.Errorf("%w: error decoding volume ID (%w) (%s)", - ErrInvalidVolID, err, volumeID) + rbderrors.ErrInvalidVolID, err, volumeID) } vol, err = generateVolumeFromVolumeID(ctx, volumeID, vi, cr, secrets) +<<<<<<< HEAD if !shouldRetryVolumeGeneration(err) { +======= + if !ShouldRetryVolumeGeneration(err) { +>>>>>>> 5cbc1445 (cleanup: move internal/rbd/errors.go to internal/rbd/errors pacakge) return vol, err } @@ -1225,7 +1294,11 @@ func GenVolFromVolID( } if mapping != nil { rbdVol, vErr := generateVolumeFromMapping(ctx, mapping, volumeID, vi, cr, secrets) +<<<<<<< HEAD if !shouldRetryVolumeGeneration(vErr) { +======= + if !ShouldRetryVolumeGeneration(vErr) { +>>>>>>> 5cbc1445 (cleanup: move internal/rbd/errors.go to internal/rbd/errors pacakge) return rbdVol, vErr } } @@ -1278,7 +1351,11 @@ func generateVolumeFromMapping( // Add mapping poolID to Identifier nvi.LocationID = pID vol, err = generateVolumeFromVolumeID(ctx, volumeID, nvi, cr, secrets) +<<<<<<< HEAD if !shouldRetryVolumeGeneration(err) { +======= + if !ShouldRetryVolumeGeneration(err) { +>>>>>>> 5cbc1445 (cleanup: move internal/rbd/errors.go to internal/rbd/errors pacakge) return vol, err } } @@ -1499,7 +1576,7 @@ func (ri *rbdImage) deleteSnapshot(ctx context.Context, pOpts *rbdSnapshot) erro } err = snap.Remove() if errors.Is(err, librbd.ErrNotFound) { - return fmt.Errorf("Failed as %w (internal %w)", ErrSnapNotFound, err) + return fmt.Errorf("Failed as %w (internal %w)", rbderrors.ErrSnapNotFound, err) } return err @@ -1759,7 +1836,7 @@ func (ri *rbdImage) checkSnapExists(rbdSnap *rbdSnapshot) error { } } - return fmt.Errorf("%w: snap %s not found", ErrSnapNotFound, rbdSnap) + return fmt.Errorf("%w: snap %s not found", rbderrors.ErrSnapNotFound, rbdSnap) } // rbdImageMetadataStash strongly typed JSON spec for stashed RBD image metadata. @@ -1841,7 +1918,7 @@ func lookupRBDImageMetadataStash(metaDataPath string) (rbdImageMetadataStash, er return imgMeta, fmt.Errorf("failed to read stashed JSON image metadata from path (%s): %w", fPath, err) } - return imgMeta, fmt.Errorf("Failed as %w (internal %w)", ErrMissingStash, err) + return imgMeta, fmt.Errorf("Failed as %w (internal %w)", rbderrors.ErrMissingStash, err) } err = json.Unmarshal(encodedBytes, &imgMeta) diff --git a/internal/rbd/rbd_util_test.go b/internal/rbd/rbd_util_test.go index 5d14ed844..3d1ae27e6 100644 --- a/internal/rbd/rbd_util_test.go +++ b/internal/rbd/rbd_util_test.go @@ -23,6 +23,12 @@ import ( "strings" "testing" +<<<<<<< HEAD +======= + rbderrors "github.com/ceph/ceph-csi/internal/rbd/errors" + "github.com/ceph/ceph-csi/internal/util" + +>>>>>>> 5cbc1445 (cleanup: move internal/rbd/errors.go to internal/rbd/errors pacakge) "github.com/ceph/go-ceph/rados" librbd "github.com/ceph/go-ceph/rbd" "github.com/stretchr/testify/require" @@ -418,7 +424,11 @@ func Test_shouldRetryVolumeGeneration(t *testing.T) { }, { name: "ErrImageNotFound (continue searching)", +<<<<<<< HEAD args: args{err: ErrImageNotFound}, +======= + args: args{err: rbderrors.ErrImageNotFound}, +>>>>>>> 5cbc1445 (cleanup: move internal/rbd/errors.go to internal/rbd/errors pacakge) want: true, // Known error, continue searching }, { @@ -435,8 +445,13 @@ func Test_shouldRetryVolumeGeneration(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { t.Parallel() +<<<<<<< HEAD if got := shouldRetryVolumeGeneration(tt.args.err); got != tt.want { t.Errorf("shouldRetryVolumeGeneration() = %v, want %v", got, tt.want) +======= + if got := ShouldRetryVolumeGeneration(tt.args.err); got != tt.want { + t.Errorf("ShouldRetryVolumeGeneration() = %v, want %v", got, tt.want) +>>>>>>> 5cbc1445 (cleanup: move internal/rbd/errors.go to internal/rbd/errors pacakge) } }) } diff --git a/internal/rbd/replication.go b/internal/rbd/replication.go index 624eabd33..e7046e179 100644 --- a/internal/rbd/replication.go +++ b/internal/rbd/replication.go @@ -20,6 +20,7 @@ import ( "context" "fmt" + rbderrors "github.com/ceph/ceph-csi/internal/rbd/errors" "github.com/ceph/ceph-csi/internal/rbd/types" librbd "github.com/ceph/go-ceph/rbd" @@ -70,14 +71,14 @@ func DisableVolumeReplication(mirror types.Mirror, localStatus, err := sts.GetLocalSiteStatus() if err != nil { - return fmt.Errorf("failed to get local state: %w", ErrInvalidArgument) + return fmt.Errorf("failed to get local state: %w", rbderrors.ErrInvalidArgument) } if localStatus.IsUP() && localStatus.GetState() == librbd.MirrorImageStatusStateReplaying.String() { return nil } return fmt.Errorf("%w: secondary image status is up=%t and state=%s", - ErrInvalidArgument, localStatus.IsUP(), localStatus.GetState()) + rbderrors.ErrInvalidArgument, localStatus.IsUP(), localStatus.GetState()) } err := mirror.DisableMirroring(ctx, force) if err != nil { @@ -92,7 +93,7 @@ func DisableVolumeReplication(mirror types.Mirror, // error out if the image is not in disabled state. if info.GetState() != librbd.MirrorImageDisabled.String() { - return fmt.Errorf("%w: image is in %q state, expected state %q", ErrAborted, + return fmt.Errorf("%w: image is in %q state, expected state %q", rbderrors.ErrAborted, info.GetState(), librbd.MirrorImageDisabled.String()) } diff --git a/internal/rbd/snapshot.go b/internal/rbd/snapshot.go index d883cba3b..27a596345 100644 --- a/internal/rbd/snapshot.go +++ b/internal/rbd/snapshot.go @@ -24,6 +24,7 @@ import ( "github.com/container-storage-interface/spec/lib/go/csi" "google.golang.org/protobuf/types/known/timestamppb" + rbderrors "github.com/ceph/ceph-csi/internal/rbd/errors" "github.com/ceph/ceph-csi/internal/rbd/types" "github.com/ceph/ceph-csi/internal/util" "github.com/ceph/ceph-csi/internal/util/log" @@ -82,7 +83,11 @@ func cleanUpSnapshot( ) error { err := parentVol.deleteSnapshot(ctx, rbdSnap) if err != nil { +<<<<<<< HEAD if !errors.Is(err, ErrImageNotFound) && !errors.Is(err, ErrSnapNotFound) { +======= + if !errors.Is(err, rbderrors.ErrImageNotFound) && !errors.Is(err, rbderrors.ErrSnapNotFound) { +>>>>>>> 5cbc1445 (cleanup: move internal/rbd/errors.go to internal/rbd/errors pacakge) log.ErrorLog(ctx, "failed to delete snapshot %q: %v", rbdSnap, err) return err @@ -92,7 +97,11 @@ func cleanUpSnapshot( if rbdVol != nil { err := rbdVol.Delete(ctx) if err != nil { +<<<<<<< HEAD if !errors.Is(err, ErrImageNotFound) { +======= + if !errors.Is(err, rbderrors.ErrImageNotFound) { +>>>>>>> 5cbc1445 (cleanup: move internal/rbd/errors.go to internal/rbd/errors pacakge) log.ErrorLog(ctx, "failed to delete rbd image %q with error: %v", rbdVol, err) return err diff --git a/internal/util/errors.go b/internal/util/errors.go index 118ed15e3..4c9f1c9fa 100644 --- a/internal/util/errors.go +++ b/internal/util/errors.go @@ -16,9 +16,13 @@ limitations under the License. package util +<<<<<<< HEAD import ( "errors" ) +======= +import "errors" +>>>>>>> 5cbc1445 (cleanup: move internal/rbd/errors.go to internal/rbd/errors pacakge) var ( // ErrKeyNotFound is returned when requested key in omap is not found.