From d166229d8f92eedcdddb44bc3193bbcfcd577ef6 Mon Sep 17 00:00:00 2001 From: Rakshith R Date: Thu, 6 Jun 2024 17:14:08 +0530 Subject: [PATCH] rbd: add support for flattenMode option for replication This commit adds support for flattenMode option for replication. If the flattenMode is set to "force" in volumereplicationclass parameters, cephcsi will add a task to flatten the image if it has parent. This enable cephcsi to then mirror such images after flattening them. The error message when the image's parent is in trash or unmirrored is improved as well. Signed-off-by: Rakshith R --- internal/csi-addons/rbd/replication.go | 39 ++++++++++++ internal/csi-addons/rbd/replication_test.go | 66 +++++++++++++++++++++ internal/rbd/controllerserver.go | 16 +---- internal/rbd/mirror.go | 63 ++++++++++++++++++++ internal/rbd/rbd_util.go | 16 +++++ 5 files changed, 187 insertions(+), 13 deletions(-) diff --git a/internal/csi-addons/rbd/replication.go b/internal/csi-addons/rbd/replication.go index 045203cc4..a949a383c 100644 --- a/internal/csi-addons/rbd/replication.go +++ b/internal/csi-addons/rbd/replication.go @@ -74,6 +74,12 @@ const ( // (optional) StartTime is the time the snapshot schedule // begins, can be specified using the ISO 8601 time format. schedulingStartTimeKey = "schedulingStartTime" + + // flattenModeKey to get the flattenMode from the parameters. + // (optional) flattenMode decides how to handle images with parent. + // (default) If set to "never", the image with parent will not be flattened. + // If set to "force", the image with parent will be flattened. + flattenModeKey = "flattenMode" ) // ReplicationServer struct of rbd CSI driver with supported methods of Replication @@ -115,6 +121,27 @@ func getForceOption(ctx context.Context, parameters map[string]string) (bool, er return force, nil } +// getFlattenMode gets flatten mode from the input GRPC request parameters. +// flattenMode is the key to check the mode in the parameters. +func getFlattenMode(ctx context.Context, parameters map[string]string) (corerbd.FlattenMode, error) { + val, ok := parameters[flattenModeKey] + if !ok { + log.DebugLog(ctx, "%q is not set in parameters, setting to default (%v)", + flattenModeKey, corerbd.FlattenModeNever) + + return corerbd.FlattenModeNever, nil + } + + mode := corerbd.FlattenMode(val) + switch mode { + case corerbd.FlattenModeForce, corerbd.FlattenModeNever: + return mode, nil + } + log.ErrorLog(ctx, "%q=%q is not supported", flattenModeKey, val) + + return mode, status.Errorf(codes.InvalidArgument, "%q=%q is not supported", flattenModeKey, val) +} + // getMirroringMode gets the mirroring mode from the input GRPC request parameters. // mirroringMode is the key to check the mode in the parameters. func getMirroringMode(ctx context.Context, parameters map[string]string) (librbd.ImageMirrorMode, error) { @@ -265,6 +292,11 @@ func (rs *ReplicationServer) EnableVolumeReplication(ctx context.Context, if err != nil { return nil, err } + // extract the flatten mode + flattenMode, err := getFlattenMode(ctx, req.GetParameters()) + if err != nil { + return nil, err + } mirroringInfo, err := rbdVol.GetImageMirroringInfo() if err != nil { @@ -274,6 +306,12 @@ func (rs *ReplicationServer) EnableVolumeReplication(ctx context.Context, } if mirroringInfo.State != librbd.MirrorImageEnabled { + err = rbdVol.HandleParentImageExistence(ctx, flattenMode) + if err != nil { + log.ErrorLog(ctx, err.Error()) + + return nil, getGRPCError(err) + } err = rbdVol.EnableImageMirroring(mirroringMode) if err != nil { log.ErrorLog(ctx, err.Error()) @@ -777,6 +815,7 @@ func getGRPCError(err error) error { errorStatusMap := map[error]codes.Code{ corerbd.ErrInvalidArgument: codes.InvalidArgument, + corerbd.ErrFlattenInProgress: codes.Aborted, corerbd.ErrAborted: codes.Aborted, corerbd.ErrFailedPrecondition: codes.FailedPrecondition, corerbd.ErrUnavailable: codes.Unavailable, diff --git a/internal/csi-addons/rbd/replication_test.go b/internal/csi-addons/rbd/replication_test.go index c00a7168d..c678c6377 100644 --- a/internal/csi-addons/rbd/replication_test.go +++ b/internal/csi-addons/rbd/replication_test.go @@ -641,3 +641,69 @@ func Test_timestampFromString(t *testing.T) { }) } } + +func Test_getFlattenMode(t *testing.T) { + t.Parallel() + type args struct { + ctx context.Context + parameters map[string]string + } + tests := []struct { + name string + args args + want corerbd.FlattenMode + wantErr bool + }{ + { + name: "flattenMode option not set", + args: args{ + ctx: context.TODO(), + parameters: map[string]string{}, + }, + want: corerbd.FlattenModeNever, + }, + { + name: "flattenMode option set to never", + args: args{ + ctx: context.TODO(), + parameters: map[string]string{ + flattenModeKey: string(corerbd.FlattenModeNever), + }, + }, + want: corerbd.FlattenModeNever, + }, + { + name: "flattenMode option set to force", + args: args{ + ctx: context.TODO(), + parameters: map[string]string{ + flattenModeKey: string(corerbd.FlattenModeForce), + }, + }, + want: corerbd.FlattenModeForce, + }, + + { + name: "flattenMode option set to invalid value", + args: args{ + ctx: context.TODO(), + parameters: map[string]string{ + flattenModeKey: "invalid123", + }, + }, + wantErr: true, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + t.Parallel() + got, err := getFlattenMode(tt.args.ctx, tt.args.parameters) + if (err != nil) != tt.wantErr { + t.Errorf("getFlattenMode() error = %v, wantErr %v", err, tt.wantErr) + } + if !tt.wantErr && !reflect.DeepEqual(got, tt.want) { + t.Errorf("getFlattenMode() = %v, want %v", got, tt.want) + } + }) + } +} diff --git a/internal/rbd/controllerserver.go b/internal/rbd/controllerserver.go index 3b6c58d06..71f0492c4 100644 --- a/internal/rbd/controllerserver.go +++ b/internal/rbd/controllerserver.go @@ -1020,21 +1020,11 @@ func cleanupRBDImage(ctx context.Context, // delete the temporary rbd image created as part of volume clone during // create volume - tempClone := rbdVol.generateTempClone() - err = tempClone.deleteImage(ctx) + err = rbdVol.DeleteTempImage(ctx) if err != nil { - if errors.Is(err, ErrImageNotFound) { - err = tempClone.ensureImageCleanup(ctx) - if err != nil { - return nil, status.Error(codes.Internal, err.Error()) - } - } else { - // return error if it is not ErrImageNotFound - log.ErrorLog(ctx, "failed to delete rbd image: %s with error: %v", - tempClone, err) + log.ErrorLog(ctx, "failed to delete temporary rbd image: %v", err) - return nil, status.Error(codes.Internal, err.Error()) - } + return nil, status.Error(codes.Internal, err.Error()) } // Deleting rbd image diff --git a/internal/rbd/mirror.go b/internal/rbd/mirror.go index 4f6507614..106a9eb77 100644 --- a/internal/rbd/mirror.go +++ b/internal/rbd/mirror.go @@ -25,6 +25,69 @@ import ( librbd "github.com/ceph/go-ceph/rbd" ) +// FlattenMode is used to indicate the flatten mode for an RBD image. +type FlattenMode string + +const ( + // FlattenModeNever indicates that the image should never be flattened. + FlattenModeNever FlattenMode = "never" + // FlattenModeForce indicates that the image with the parent must be flattened. + FlattenModeForce FlattenMode = "force" +) + +// HandleParentImageExistence checks the image's parent. +// if the parent image does not exist and is not in trash, it returns nil. +// if the flattenMode is FlattenModeForce, it flattens the image itself. +// if the parent image is in trash, it returns an error. +// if the parent image exists and is not enabled for mirroring, it returns an error. +func (rv *rbdVolume) HandleParentImageExistence( + ctx context.Context, + flattenMode FlattenMode, +) error { + if rv.ParentName == "" && !rv.ParentInTrash { + return nil + } + + if flattenMode == FlattenModeForce { + // Delete temp image that exists for volume datasource since + // it is no longer required when the live image is flattened. + err := rv.DeleteTempImage(ctx) + if err != nil { + return fmt.Errorf("failed to delete temporary rbd image: %w", err) + } + + err = rv.flattenRbdImage(ctx, true, 0, 0) + if err != nil { + return err + } + } + + if rv.ParentInTrash { + return fmt.Errorf("%w: failed to enable mirroring on image %q:"+ + " parent is in trash", + ErrFailedPrecondition, rv) + } + + parent, err := rv.getParent() + if err != nil { + return err + } + parentMirroringInfo, err := parent.GetImageMirroringInfo() + if err != nil { + return fmt.Errorf( + "failed to get mirroring info of parent %q of image %q: %w", + parent, rv, err) + } + + if parentMirroringInfo.State != librbd.MirrorImageEnabled { + return fmt.Errorf("%w: failed to enable mirroring on image %q: "+ + "parent image %q is not enabled for mirroring", + ErrFailedPrecondition, rv, parent) + } + + return nil +} + // EnableImageMirroring enables mirroring on an image. func (ri *rbdImage) EnableImageMirroring(mode librbd.ImageMirrorMode) error { image, err := ri.open() diff --git a/internal/rbd/rbd_util.go b/internal/rbd/rbd_util.go index ee955d745..b84f64344 100644 --- a/internal/rbd/rbd_util.go +++ b/internal/rbd/rbd_util.go @@ -703,6 +703,22 @@ func (ri *rbdImage) trashRemoveImage(ctx context.Context) error { return nil } +// DeleteTempImage deletes the temporary image created for volume datasource. +func (rv *rbdVolume) DeleteTempImage(ctx context.Context) error { + tempClone := rv.generateTempClone() + err := tempClone.deleteImage(ctx) + if err != nil { + if errors.Is(err, ErrImageNotFound) { + return tempClone.ensureImageCleanup(ctx) + } else { + // return error if it is not ErrImageNotFound + return err + } + } + + return nil +} + func (ri *rbdImage) getCloneDepth(ctx context.Context) (uint, error) { var depth uint vol := rbdVolume{}