Merge pull request #51 from ceph/devel

Sync rhs/ceph-csi:devel with ceph/ceph-csi:devel
This commit is contained in:
OpenShift Merge Robot 2021-12-02 13:36:46 +01:00 committed by GitHub
commit 2c3cf2dd93
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 65 additions and 8 deletions

View File

@ -42,4 +42,6 @@ var (
ErrMissingImageNameInVolID = errors.New("rbd image name information can not be empty in volID") ErrMissingImageNameInVolID = errors.New("rbd image name information can not be empty in volID")
// ErrDecodeClusterIDFromMonsInVolID is returned when mons hash decoding on migration volID. // ErrDecodeClusterIDFromMonsInVolID is returned when mons hash decoding on migration volID.
ErrDecodeClusterIDFromMonsInVolID = errors.New("failed to get clusterID from monitors hash in volID") ErrDecodeClusterIDFromMonsInVolID = errors.New("failed to get clusterID from monitors hash in volID")
// ErrUnHealthyMirroredImage is returned when mirrored image is not healthy.
ErrUnHealthyMirroredImage = errors.New("mirrored image is not healthy")
) )

View File

@ -43,6 +43,9 @@ const (
// imageMirrorModeSnapshot uses snapshots to propagate RBD images between // imageMirrorModeSnapshot uses snapshots to propagate RBD images between
// ceph clusters. // ceph clusters.
imageMirrorModeSnapshot imageMirroringMode = "snapshot" imageMirrorModeSnapshot imageMirroringMode = "snapshot"
// imageMirrorModeJournal uses journaling to propagate RBD images between
// ceph clusters.
imageMirrorModeJournal imageMirroringMode = "journal"
) )
const ( const (
@ -123,6 +126,8 @@ func getMirroringMode(ctx context.Context, parameters map[string]string) (librbd
switch imageMirroringMode(val) { switch imageMirroringMode(val) {
case imageMirrorModeSnapshot: case imageMirrorModeSnapshot:
mirroringMode = librbd.ImageMirrorModeSnapshot mirroringMode = librbd.ImageMirrorModeSnapshot
case imageMirrorModeJournal:
mirroringMode = librbd.ImageMirrorModeJournal
default: default:
return mirroringMode, status.Errorf(codes.InvalidArgument, "%s %s not supported", imageMirroringKey, val) return mirroringMode, status.Errorf(codes.InvalidArgument, "%s %s not supported", imageMirroringKey, val)
} }
@ -133,12 +138,24 @@ func getMirroringMode(ctx context.Context, parameters map[string]string) (librbd
// validateSchedulingDetails gets the mirroring mode and scheduling details from the // validateSchedulingDetails gets the mirroring mode and scheduling details from the
// input GRPC request parameters and validates the scheduling is only supported // input GRPC request parameters and validates the scheduling is only supported
// for snapshot mirroring mode. // for snapshot mirroring mode.
func validateSchedulingDetails(parameters map[string]string) error { func validateSchedulingDetails(ctx context.Context, parameters map[string]string) error {
var err error var err error
val := parameters[imageMirroringKey] val := parameters[imageMirroringKey]
switch imageMirroringMode(val) { switch imageMirroringMode(val) {
case imageMirrorModeJournal:
// journal mirror mode does not require scheduling parameters
if _, ok := parameters[schedulingIntervalKey]; ok {
log.WarningLog(ctx, "%s parameter cannot be used with %s mirror mode, ignoring it",
schedulingIntervalKey, string(imageMirrorModeJournal))
}
if _, ok := parameters[schedulingStartTimeKey]; ok {
log.WarningLog(ctx, "%s parameter cannot be used with %s mirror mode, ignoring it",
schedulingStartTimeKey, string(imageMirrorModeJournal))
}
return nil
case imageMirrorModeSnapshot: case imageMirrorModeSnapshot:
// If mirroring mode is not set in parameters, we are defaulting mirroring // If mirroring mode is not set in parameters, we are defaulting mirroring
// mode to snapshot. Discard empty mirroring mode from validation as it is // mode to snapshot. Discard empty mirroring mode from validation as it is
@ -206,7 +223,7 @@ func (rs *ReplicationServer) EnableVolumeReplication(ctx context.Context,
} }
defer cr.DeleteCredentials() defer cr.DeleteCredentials()
err = validateSchedulingDetails(req.GetParameters()) err = validateSchedulingDetails(ctx, req.GetParameters())
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -325,9 +342,11 @@ func tickleMirroringOnDummyImage(rbdVol *rbdVolume, mirroringMode librbd.ImageMi
return err return err
} }
err = dummyVol.addSnapshotScheduling(admin.Interval("1m"), admin.NoStartTime) if mirroringMode == librbd.ImageMirrorModeSnapshot {
if err != nil { err = dummyVol.addSnapshotScheduling(admin.Interval("1m"), admin.NoStartTime)
return err if err != nil {
return err
}
} }
return nil return nil
@ -517,6 +536,11 @@ func (rs *ReplicationServer) PromoteVolume(ctx context.Context,
} }
} }
err = checkHealthyPrimary(ctx, rbdVol)
if err != nil {
return nil, status.Error(codes.Internal, err.Error())
}
var mode librbd.ImageMirrorMode var mode librbd.ImageMirrorMode
mode, err = getMirroringMode(ctx, req.GetParameters()) mode, err = getMirroringMode(ctx, req.GetParameters())
if err != nil { if err != nil {
@ -546,6 +570,35 @@ func (rs *ReplicationServer) PromoteVolume(ctx context.Context,
return &replication.PromoteVolumeResponse{}, nil return &replication.PromoteVolumeResponse{}, nil
} }
// checkHealthyPrimary checks if the image is a healhty primary or not.
// healthy primary image will be in up+stopped state, for states other
// than this it returns an error message.
func checkHealthyPrimary(ctx context.Context, rbdVol *rbdVolume) error {
mirrorStatus, err := rbdVol.getImageMirroringStatus()
if err != nil {
return err
}
localStatus, err := mirrorStatus.LocalStatus()
if err != nil {
// LocalStatus can fail if the local site status is not found in
// mirroring status. Log complete sites status to debug why getting
// local status failed
log.ErrorLog(ctx, "mirroring status is %+v", mirrorStatus)
return fmt.Errorf("failed to get local status: %w", err)
}
if !localStatus.Up && localStatus.State != librbd.MirrorImageStatusStateStopped {
return fmt.Errorf("%s %w. State is up=%t, state=%q",
rbdVol,
ErrUnHealthyMirroredImage,
localStatus.Up,
localStatus.State)
}
return nil
}
// DemoteVolume extracts the RBD volume information from the // DemoteVolume extracts the RBD volume information from the
// volumeID, If the image is present, mirroring is enabled and the // volumeID, If the image is present, mirroring is enabled and the
// image is in promoted state it will demote the volume as secondary. // image is in promoted state it will demote the volume as secondary.

View File

@ -17,6 +17,7 @@ limitations under the License.
package rbd package rbd
import ( import (
"context"
"reflect" "reflect"
"testing" "testing"
@ -73,6 +74,7 @@ func TestValidateSchedulingInterval(t *testing.T) {
func TestValidateSchedulingDetails(t *testing.T) { func TestValidateSchedulingDetails(t *testing.T) {
t.Parallel() t.Parallel()
ctx := context.TODO()
tests := []struct { tests := []struct {
name string name string
parameters map[string]string parameters map[string]string
@ -98,10 +100,10 @@ func TestValidateSchedulingDetails(t *testing.T) {
{ {
"when mirroring mode is journal", "when mirroring mode is journal",
map[string]string{ map[string]string{
imageMirroringKey: "journal", imageMirroringKey: string(imageMirrorModeJournal),
schedulingIntervalKey: "1h", schedulingIntervalKey: "1h",
}, },
true, false,
}, },
{ {
"when startTime is specified without interval", "when startTime is specified without interval",
@ -136,7 +138,7 @@ func TestValidateSchedulingDetails(t *testing.T) {
tt := tt tt := tt
t.Run(tt.name, func(t *testing.T) { t.Run(tt.name, func(t *testing.T) {
t.Parallel() t.Parallel()
err := validateSchedulingDetails(tt.parameters) err := validateSchedulingDetails(ctx, tt.parameters)
if (err != nil) != tt.wantErr { if (err != nil) != tt.wantErr {
t.Errorf("getSchedulingDetails() error = %v, wantErr %v", err, tt.wantErr) t.Errorf("getSchedulingDetails() error = %v, wantErr %v", err, tt.wantErr)