diff --git a/internal/rbd/errors.go b/internal/rbd/errors.go index 2443b8797..32937c058 100644 --- a/internal/rbd/errors.go +++ b/internal/rbd/errors.go @@ -42,4 +42,6 @@ var ( ErrMissingImageNameInVolID = errors.New("rbd image name information can not be empty in volID") // ErrDecodeClusterIDFromMonsInVolID is returned when mons hash decoding on migration volID. ErrDecodeClusterIDFromMonsInVolID = errors.New("failed to get clusterID from monitors hash in volID") + // ErrUnHealthyMirroredImage is returned when mirrored image is not healthy. + ErrUnHealthyMirroredImage = errors.New("mirrored image is not healthy") ) diff --git a/internal/rbd/replicationcontrollerserver.go b/internal/rbd/replicationcontrollerserver.go index 29f4bc3c4..11a8bd906 100644 --- a/internal/rbd/replicationcontrollerserver.go +++ b/internal/rbd/replicationcontrollerserver.go @@ -43,6 +43,9 @@ const ( // imageMirrorModeSnapshot uses snapshots to propagate RBD images between // ceph clusters. imageMirrorModeSnapshot imageMirroringMode = "snapshot" + // imageMirrorModeJournal uses journaling to propagate RBD images between + // ceph clusters. + imageMirrorModeJournal imageMirroringMode = "journal" ) const ( @@ -123,6 +126,8 @@ func getMirroringMode(ctx context.Context, parameters map[string]string) (librbd switch imageMirroringMode(val) { case imageMirrorModeSnapshot: mirroringMode = librbd.ImageMirrorModeSnapshot + case imageMirrorModeJournal: + mirroringMode = librbd.ImageMirrorModeJournal default: return mirroringMode, status.Errorf(codes.InvalidArgument, "%s %s not supported", imageMirroringKey, val) } @@ -133,12 +138,24 @@ func getMirroringMode(ctx context.Context, parameters map[string]string) (librbd // validateSchedulingDetails gets the mirroring mode and scheduling details from the // input GRPC request parameters and validates the scheduling is only supported // for snapshot mirroring mode. -func validateSchedulingDetails(parameters map[string]string) error { +func validateSchedulingDetails(ctx context.Context, parameters map[string]string) error { var err error val := parameters[imageMirroringKey] switch imageMirroringMode(val) { + case imageMirrorModeJournal: + // journal mirror mode does not require scheduling parameters + if _, ok := parameters[schedulingIntervalKey]; ok { + log.WarningLog(ctx, "%s parameter cannot be used with %s mirror mode, ignoring it", + schedulingIntervalKey, string(imageMirrorModeJournal)) + } + if _, ok := parameters[schedulingStartTimeKey]; ok { + log.WarningLog(ctx, "%s parameter cannot be used with %s mirror mode, ignoring it", + schedulingStartTimeKey, string(imageMirrorModeJournal)) + } + + return nil case imageMirrorModeSnapshot: // If mirroring mode is not set in parameters, we are defaulting mirroring // mode to snapshot. Discard empty mirroring mode from validation as it is @@ -206,7 +223,7 @@ func (rs *ReplicationServer) EnableVolumeReplication(ctx context.Context, } defer cr.DeleteCredentials() - err = validateSchedulingDetails(req.GetParameters()) + err = validateSchedulingDetails(ctx, req.GetParameters()) if err != nil { return nil, err } @@ -325,9 +342,11 @@ func tickleMirroringOnDummyImage(rbdVol *rbdVolume, mirroringMode librbd.ImageMi return err } - err = dummyVol.addSnapshotScheduling(admin.Interval("1m"), admin.NoStartTime) - if err != nil { - return err + if mirroringMode == librbd.ImageMirrorModeSnapshot { + err = dummyVol.addSnapshotScheduling(admin.Interval("1m"), admin.NoStartTime) + if err != nil { + return err + } } return nil @@ -517,6 +536,11 @@ func (rs *ReplicationServer) PromoteVolume(ctx context.Context, } } + err = checkHealthyPrimary(ctx, rbdVol) + if err != nil { + return nil, status.Error(codes.Internal, err.Error()) + } + var mode librbd.ImageMirrorMode mode, err = getMirroringMode(ctx, req.GetParameters()) if err != nil { @@ -546,6 +570,35 @@ func (rs *ReplicationServer) PromoteVolume(ctx context.Context, return &replication.PromoteVolumeResponse{}, nil } +// checkHealthyPrimary checks if the image is a healhty primary or not. +// healthy primary image will be in up+stopped state, for states other +// than this it returns an error message. +func checkHealthyPrimary(ctx context.Context, rbdVol *rbdVolume) error { + mirrorStatus, err := rbdVol.getImageMirroringStatus() + if err != nil { + return err + } + localStatus, err := mirrorStatus.LocalStatus() + if err != nil { + // LocalStatus can fail if the local site status is not found in + // mirroring status. Log complete sites status to debug why getting + // local status failed + log.ErrorLog(ctx, "mirroring status is %+v", mirrorStatus) + + return fmt.Errorf("failed to get local status: %w", err) + } + + if !localStatus.Up && localStatus.State != librbd.MirrorImageStatusStateStopped { + return fmt.Errorf("%s %w. State is up=%t, state=%q", + rbdVol, + ErrUnHealthyMirroredImage, + localStatus.Up, + localStatus.State) + } + + return nil +} + // DemoteVolume extracts the RBD volume information from the // volumeID, If the image is present, mirroring is enabled and the // image is in promoted state it will demote the volume as secondary. diff --git a/internal/rbd/replicationcontrollerserver_test.go b/internal/rbd/replicationcontrollerserver_test.go index cf306710a..401292587 100644 --- a/internal/rbd/replicationcontrollerserver_test.go +++ b/internal/rbd/replicationcontrollerserver_test.go @@ -17,6 +17,7 @@ limitations under the License. package rbd import ( + "context" "reflect" "testing" @@ -73,6 +74,7 @@ func TestValidateSchedulingInterval(t *testing.T) { func TestValidateSchedulingDetails(t *testing.T) { t.Parallel() + ctx := context.TODO() tests := []struct { name string parameters map[string]string @@ -98,10 +100,10 @@ func TestValidateSchedulingDetails(t *testing.T) { { "when mirroring mode is journal", map[string]string{ - imageMirroringKey: "journal", + imageMirroringKey: string(imageMirrorModeJournal), schedulingIntervalKey: "1h", }, - true, + false, }, { "when startTime is specified without interval", @@ -136,7 +138,7 @@ func TestValidateSchedulingDetails(t *testing.T) { tt := tt t.Run(tt.name, func(t *testing.T) { t.Parallel() - err := validateSchedulingDetails(tt.parameters) + err := validateSchedulingDetails(ctx, tt.parameters) if (err != nil) != tt.wantErr { t.Errorf("getSchedulingDetails() error = %v, wantErr %v", err, tt.wantErr)