diff --git a/internal/csi-addons/rbd/replication.go b/internal/csi-addons/rbd/replication.go index 045203cc4..a949a383c 100644 --- a/internal/csi-addons/rbd/replication.go +++ b/internal/csi-addons/rbd/replication.go @@ -74,6 +74,12 @@ const ( // (optional) StartTime is the time the snapshot schedule // begins, can be specified using the ISO 8601 time format. schedulingStartTimeKey = "schedulingStartTime" + + // flattenModeKey to get the flattenMode from the parameters. + // (optional) flattenMode decides how to handle images with parent. + // (default) If set to "never", the image with parent will not be flattened. + // If set to "force", the image with parent will be flattened. + flattenModeKey = "flattenMode" ) // ReplicationServer struct of rbd CSI driver with supported methods of Replication @@ -115,6 +121,27 @@ func getForceOption(ctx context.Context, parameters map[string]string) (bool, er return force, nil } +// getFlattenMode gets flatten mode from the input GRPC request parameters. +// flattenMode is the key to check the mode in the parameters. +func getFlattenMode(ctx context.Context, parameters map[string]string) (corerbd.FlattenMode, error) { + val, ok := parameters[flattenModeKey] + if !ok { + log.DebugLog(ctx, "%q is not set in parameters, setting to default (%v)", + flattenModeKey, corerbd.FlattenModeNever) + + return corerbd.FlattenModeNever, nil + } + + mode := corerbd.FlattenMode(val) + switch mode { + case corerbd.FlattenModeForce, corerbd.FlattenModeNever: + return mode, nil + } + log.ErrorLog(ctx, "%q=%q is not supported", flattenModeKey, val) + + return mode, status.Errorf(codes.InvalidArgument, "%q=%q is not supported", flattenModeKey, val) +} + // getMirroringMode gets the mirroring mode from the input GRPC request parameters. // mirroringMode is the key to check the mode in the parameters. func getMirroringMode(ctx context.Context, parameters map[string]string) (librbd.ImageMirrorMode, error) { @@ -265,6 +292,11 @@ func (rs *ReplicationServer) EnableVolumeReplication(ctx context.Context, if err != nil { return nil, err } + // extract the flatten mode + flattenMode, err := getFlattenMode(ctx, req.GetParameters()) + if err != nil { + return nil, err + } mirroringInfo, err := rbdVol.GetImageMirroringInfo() if err != nil { @@ -274,6 +306,12 @@ func (rs *ReplicationServer) EnableVolumeReplication(ctx context.Context, } if mirroringInfo.State != librbd.MirrorImageEnabled { + err = rbdVol.HandleParentImageExistence(ctx, flattenMode) + if err != nil { + log.ErrorLog(ctx, err.Error()) + + return nil, getGRPCError(err) + } err = rbdVol.EnableImageMirroring(mirroringMode) if err != nil { log.ErrorLog(ctx, err.Error()) @@ -777,6 +815,7 @@ func getGRPCError(err error) error { errorStatusMap := map[error]codes.Code{ corerbd.ErrInvalidArgument: codes.InvalidArgument, + corerbd.ErrFlattenInProgress: codes.Aborted, corerbd.ErrAborted: codes.Aborted, corerbd.ErrFailedPrecondition: codes.FailedPrecondition, corerbd.ErrUnavailable: codes.Unavailable, diff --git a/internal/csi-addons/rbd/replication_test.go b/internal/csi-addons/rbd/replication_test.go index c00a7168d..c678c6377 100644 --- a/internal/csi-addons/rbd/replication_test.go +++ b/internal/csi-addons/rbd/replication_test.go @@ -641,3 +641,69 @@ func Test_timestampFromString(t *testing.T) { }) } } + +func Test_getFlattenMode(t *testing.T) { + t.Parallel() + type args struct { + ctx context.Context + parameters map[string]string + } + tests := []struct { + name string + args args + want corerbd.FlattenMode + wantErr bool + }{ + { + name: "flattenMode option not set", + args: args{ + ctx: context.TODO(), + parameters: map[string]string{}, + }, + want: corerbd.FlattenModeNever, + }, + { + name: "flattenMode option set to never", + args: args{ + ctx: context.TODO(), + parameters: map[string]string{ + flattenModeKey: string(corerbd.FlattenModeNever), + }, + }, + want: corerbd.FlattenModeNever, + }, + { + name: "flattenMode option set to force", + args: args{ + ctx: context.TODO(), + parameters: map[string]string{ + flattenModeKey: string(corerbd.FlattenModeForce), + }, + }, + want: corerbd.FlattenModeForce, + }, + + { + name: "flattenMode option set to invalid value", + args: args{ + ctx: context.TODO(), + parameters: map[string]string{ + flattenModeKey: "invalid123", + }, + }, + wantErr: true, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + t.Parallel() + got, err := getFlattenMode(tt.args.ctx, tt.args.parameters) + if (err != nil) != tt.wantErr { + t.Errorf("getFlattenMode() error = %v, wantErr %v", err, tt.wantErr) + } + if !tt.wantErr && !reflect.DeepEqual(got, tt.want) { + t.Errorf("getFlattenMode() = %v, want %v", got, tt.want) + } + }) + } +} diff --git a/internal/rbd/controllerserver.go b/internal/rbd/controllerserver.go index 3b6c58d06..71f0492c4 100644 --- a/internal/rbd/controllerserver.go +++ b/internal/rbd/controllerserver.go @@ -1020,21 +1020,11 @@ func cleanupRBDImage(ctx context.Context, // delete the temporary rbd image created as part of volume clone during // create volume - tempClone := rbdVol.generateTempClone() - err = tempClone.deleteImage(ctx) + err = rbdVol.DeleteTempImage(ctx) if err != nil { - if errors.Is(err, ErrImageNotFound) { - err = tempClone.ensureImageCleanup(ctx) - if err != nil { - return nil, status.Error(codes.Internal, err.Error()) - } - } else { - // return error if it is not ErrImageNotFound - log.ErrorLog(ctx, "failed to delete rbd image: %s with error: %v", - tempClone, err) + log.ErrorLog(ctx, "failed to delete temporary rbd image: %v", err) - return nil, status.Error(codes.Internal, err.Error()) - } + return nil, status.Error(codes.Internal, err.Error()) } // Deleting rbd image diff --git a/internal/rbd/mirror.go b/internal/rbd/mirror.go index 4f6507614..106a9eb77 100644 --- a/internal/rbd/mirror.go +++ b/internal/rbd/mirror.go @@ -25,6 +25,69 @@ import ( librbd "github.com/ceph/go-ceph/rbd" ) +// FlattenMode is used to indicate the flatten mode for an RBD image. +type FlattenMode string + +const ( + // FlattenModeNever indicates that the image should never be flattened. + FlattenModeNever FlattenMode = "never" + // FlattenModeForce indicates that the image with the parent must be flattened. + FlattenModeForce FlattenMode = "force" +) + +// HandleParentImageExistence checks the image's parent. +// if the parent image does not exist and is not in trash, it returns nil. +// if the flattenMode is FlattenModeForce, it flattens the image itself. +// if the parent image is in trash, it returns an error. +// if the parent image exists and is not enabled for mirroring, it returns an error. +func (rv *rbdVolume) HandleParentImageExistence( + ctx context.Context, + flattenMode FlattenMode, +) error { + if rv.ParentName == "" && !rv.ParentInTrash { + return nil + } + + if flattenMode == FlattenModeForce { + // Delete temp image that exists for volume datasource since + // it is no longer required when the live image is flattened. + err := rv.DeleteTempImage(ctx) + if err != nil { + return fmt.Errorf("failed to delete temporary rbd image: %w", err) + } + + err = rv.flattenRbdImage(ctx, true, 0, 0) + if err != nil { + return err + } + } + + if rv.ParentInTrash { + return fmt.Errorf("%w: failed to enable mirroring on image %q:"+ + " parent is in trash", + ErrFailedPrecondition, rv) + } + + parent, err := rv.getParent() + if err != nil { + return err + } + parentMirroringInfo, err := parent.GetImageMirroringInfo() + if err != nil { + return fmt.Errorf( + "failed to get mirroring info of parent %q of image %q: %w", + parent, rv, err) + } + + if parentMirroringInfo.State != librbd.MirrorImageEnabled { + return fmt.Errorf("%w: failed to enable mirroring on image %q: "+ + "parent image %q is not enabled for mirroring", + ErrFailedPrecondition, rv, parent) + } + + return nil +} + // EnableImageMirroring enables mirroring on an image. func (ri *rbdImage) EnableImageMirroring(mode librbd.ImageMirrorMode) error { image, err := ri.open() diff --git a/internal/rbd/rbd_util.go b/internal/rbd/rbd_util.go index ee955d745..b84f64344 100644 --- a/internal/rbd/rbd_util.go +++ b/internal/rbd/rbd_util.go @@ -703,6 +703,22 @@ func (ri *rbdImage) trashRemoveImage(ctx context.Context) error { return nil } +// DeleteTempImage deletes the temporary image created for volume datasource. +func (rv *rbdVolume) DeleteTempImage(ctx context.Context) error { + tempClone := rv.generateTempClone() + err := tempClone.deleteImage(ctx) + if err != nil { + if errors.Is(err, ErrImageNotFound) { + return tempClone.ensureImageCleanup(ctx) + } else { + // return error if it is not ErrImageNotFound + return err + } + } + + return nil +} + func (ri *rbdImage) getCloneDepth(ctx context.Context) (uint, error) { var depth uint vol := rbdVolume{}