mirror of
https://github.com/ceph/ceph-csi.git
synced 2025-01-11 14:29:29 +00:00
rbd: enabe journal based mirroring
Journal-based RADOS block device mirroring ensures point-in-time consistent replicas of all changes to an image, including reads and writes, block device resizing, snapshots, clones, and flattening. Journaling-based mirroring records all modifications to an image in the order in which they occur. This ensures that a crash-consistent mirror of an image is available. Mirroring when configured in journal mode, mirroring will utilize the RBD journaling image feature to replicate the image contents. If the RBD journaling image feature is not yet enabled on the image, it will be automatically enabled. Fixes: #2018 Co-authored-by: Madhu Rajanna <madhupr007@gmail.com> Signed-off-by: Prasanna Kumar Kalever <prasanna.kalever@redhat.com>
This commit is contained in:
parent
ab76459e87
commit
e7d8834149
@ -43,6 +43,9 @@ const (
|
||||
// imageMirrorModeSnapshot uses snapshots to propagate RBD images between
|
||||
// ceph clusters.
|
||||
imageMirrorModeSnapshot imageMirroringMode = "snapshot"
|
||||
// imageMirrorModeJournal uses journaling to propagate RBD images between
|
||||
// ceph clusters.
|
||||
imageMirrorModeJournal imageMirroringMode = "journal"
|
||||
)
|
||||
|
||||
const (
|
||||
@ -123,6 +126,8 @@ func getMirroringMode(ctx context.Context, parameters map[string]string) (librbd
|
||||
switch imageMirroringMode(val) {
|
||||
case imageMirrorModeSnapshot:
|
||||
mirroringMode = librbd.ImageMirrorModeSnapshot
|
||||
case imageMirrorModeJournal:
|
||||
mirroringMode = librbd.ImageMirrorModeJournal
|
||||
default:
|
||||
return mirroringMode, status.Errorf(codes.InvalidArgument, "%s %s not supported", imageMirroringKey, val)
|
||||
}
|
||||
@ -133,12 +138,24 @@ func getMirroringMode(ctx context.Context, parameters map[string]string) (librbd
|
||||
// validateSchedulingDetails gets the mirroring mode and scheduling details from the
|
||||
// input GRPC request parameters and validates the scheduling is only supported
|
||||
// for snapshot mirroring mode.
|
||||
func validateSchedulingDetails(parameters map[string]string) error {
|
||||
func validateSchedulingDetails(ctx context.Context, parameters map[string]string) error {
|
||||
var err error
|
||||
|
||||
val := parameters[imageMirroringKey]
|
||||
|
||||
switch imageMirroringMode(val) {
|
||||
case imageMirrorModeJournal:
|
||||
// journal mirror mode does not require scheduling parameters
|
||||
if _, ok := parameters[schedulingIntervalKey]; ok {
|
||||
log.WarningLog(ctx, "%s parameter cannot be used with %s mirror mode, ignoring it",
|
||||
schedulingIntervalKey, string(imageMirrorModeJournal))
|
||||
}
|
||||
if _, ok := parameters[schedulingStartTimeKey]; ok {
|
||||
log.WarningLog(ctx, "%s parameter cannot be used with %s mirror mode, ignoring it",
|
||||
schedulingStartTimeKey, string(imageMirrorModeJournal))
|
||||
}
|
||||
|
||||
return nil
|
||||
case imageMirrorModeSnapshot:
|
||||
// If mirroring mode is not set in parameters, we are defaulting mirroring
|
||||
// mode to snapshot. Discard empty mirroring mode from validation as it is
|
||||
@ -206,7 +223,7 @@ func (rs *ReplicationServer) EnableVolumeReplication(ctx context.Context,
|
||||
}
|
||||
defer cr.DeleteCredentials()
|
||||
|
||||
err = validateSchedulingDetails(req.GetParameters())
|
||||
err = validateSchedulingDetails(ctx, req.GetParameters())
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
@ -325,9 +342,11 @@ func tickleMirroringOnDummyImage(rbdVol *rbdVolume, mirroringMode librbd.ImageMi
|
||||
return err
|
||||
}
|
||||
|
||||
err = dummyVol.addSnapshotScheduling(admin.Interval("1m"), admin.NoStartTime)
|
||||
if err != nil {
|
||||
return err
|
||||
if mirroringMode == librbd.ImageMirrorModeSnapshot {
|
||||
err = dummyVol.addSnapshotScheduling(admin.Interval("1m"), admin.NoStartTime)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
|
@ -17,6 +17,7 @@ limitations under the License.
|
||||
package rbd
|
||||
|
||||
import (
|
||||
"context"
|
||||
"reflect"
|
||||
"testing"
|
||||
|
||||
@ -73,6 +74,7 @@ func TestValidateSchedulingInterval(t *testing.T) {
|
||||
|
||||
func TestValidateSchedulingDetails(t *testing.T) {
|
||||
t.Parallel()
|
||||
ctx := context.TODO()
|
||||
tests := []struct {
|
||||
name string
|
||||
parameters map[string]string
|
||||
@ -98,10 +100,10 @@ func TestValidateSchedulingDetails(t *testing.T) {
|
||||
{
|
||||
"when mirroring mode is journal",
|
||||
map[string]string{
|
||||
imageMirroringKey: "journal",
|
||||
imageMirroringKey: string(imageMirrorModeJournal),
|
||||
schedulingIntervalKey: "1h",
|
||||
},
|
||||
true,
|
||||
false,
|
||||
},
|
||||
{
|
||||
"when startTime is specified without interval",
|
||||
@ -136,7 +138,7 @@ func TestValidateSchedulingDetails(t *testing.T) {
|
||||
tt := tt
|
||||
t.Run(tt.name, func(t *testing.T) {
|
||||
t.Parallel()
|
||||
err := validateSchedulingDetails(tt.parameters)
|
||||
err := validateSchedulingDetails(ctx, tt.parameters)
|
||||
if (err != nil) != tt.wantErr {
|
||||
t.Errorf("getSchedulingDetails() error = %v, wantErr %v", err, tt.wantErr)
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user