cleanup: migration of volrep to csi-addons

This commit moves the volrep logic from internal/rbd to
internal/csi-addons/rbd.

Signed-off-by: riya-singhal31 <rsinghal@redhat.com>
This commit is contained in:
riya-singhal31 2023-01-12 15:22:51 +05:30 committed by mergify[bot]
parent fb930243a8
commit 304194a0c0
7 changed files with 196 additions and 166 deletions

View File

@ -26,6 +26,7 @@ import (
"strings" "strings"
"time" "time"
corerbd "github.com/ceph/ceph-csi/internal/rbd"
"github.com/ceph/ceph-csi/internal/util" "github.com/ceph/ceph-csi/internal/util"
"github.com/ceph/ceph-csi/internal/util/log" "github.com/ceph/ceph-csi/internal/util/log"
@ -78,7 +79,7 @@ type ReplicationServer struct {
// compatibility. // compatibility.
*replication.UnimplementedControllerServer *replication.UnimplementedControllerServer
// Embed ControllerServer as it implements helper functions // Embed ControllerServer as it implements helper functions
*ControllerServer *corerbd.ControllerServer
} }
func (rs *ReplicationServer) RegisterService(server grpc.ServiceRegistrar) { func (rs *ReplicationServer) RegisterService(server grpc.ServiceRegistrar) {
@ -229,11 +230,11 @@ func (rs *ReplicationServer) EnableVolumeReplication(ctx context.Context,
} }
defer rs.VolumeLocks.Release(volumeID) defer rs.VolumeLocks.Release(volumeID)
rbdVol, err := GenVolFromVolID(ctx, volumeID, cr, req.GetSecrets()) rbdVol, err := corerbd.GenVolFromVolID(ctx, volumeID, cr, req.GetSecrets())
defer rbdVol.Destroy() defer rbdVol.Destroy()
if err != nil { if err != nil {
switch { switch {
case errors.Is(err, ErrImageNotFound): case errors.Is(err, corerbd.ErrImageNotFound):
err = status.Errorf(codes.NotFound, "volume %s not found", volumeID) err = status.Errorf(codes.NotFound, "volume %s not found", volumeID)
case errors.Is(err, util.ErrPoolNotFound): case errors.Is(err, util.ErrPoolNotFound):
err = status.Errorf(codes.NotFound, "pool %s not found for %s", rbdVol.Pool, volumeID) err = status.Errorf(codes.NotFound, "pool %s not found for %s", rbdVol.Pool, volumeID)
@ -249,7 +250,7 @@ func (rs *ReplicationServer) EnableVolumeReplication(ctx context.Context,
return nil, err return nil, err
} }
mirroringInfo, err := rbdVol.getImageMirroringInfo() mirroringInfo, err := rbdVol.GetImageMirroringInfo()
if err != nil { if err != nil {
log.ErrorLog(ctx, err.Error()) log.ErrorLog(ctx, err.Error())
@ -257,7 +258,7 @@ func (rs *ReplicationServer) EnableVolumeReplication(ctx context.Context,
} }
if mirroringInfo.State != librbd.MirrorImageEnabled { if mirroringInfo.State != librbd.MirrorImageEnabled {
err = rbdVol.enableImageMirroring(mirroringMode) err = rbdVol.EnableImageMirroring(mirroringMode)
if err != nil { if err != nil {
log.ErrorLog(ctx, err.Error()) log.ErrorLog(ctx, err.Error())
@ -291,11 +292,11 @@ func (rs *ReplicationServer) DisableVolumeReplication(ctx context.Context,
} }
defer rs.VolumeLocks.Release(volumeID) defer rs.VolumeLocks.Release(volumeID)
rbdVol, err := GenVolFromVolID(ctx, volumeID, cr, req.GetSecrets()) rbdVol, err := corerbd.GenVolFromVolID(ctx, volumeID, cr, req.GetSecrets())
defer rbdVol.Destroy() defer rbdVol.Destroy()
if err != nil { if err != nil {
switch { switch {
case errors.Is(err, ErrImageNotFound): case errors.Is(err, corerbd.ErrImageNotFound):
err = status.Errorf(codes.NotFound, "volume %s not found", volumeID) err = status.Errorf(codes.NotFound, "volume %s not found", volumeID)
case errors.Is(err, util.ErrPoolNotFound): case errors.Is(err, util.ErrPoolNotFound):
err = status.Errorf(codes.NotFound, "pool %s not found for %s", rbdVol.Pool, volumeID) err = status.Errorf(codes.NotFound, "pool %s not found for %s", rbdVol.Pool, volumeID)
@ -311,7 +312,7 @@ func (rs *ReplicationServer) DisableVolumeReplication(ctx context.Context,
return nil, err return nil, err
} }
mirroringInfo, err := rbdVol.getImageMirroringInfo() mirroringInfo, err := rbdVol.GetImageMirroringInfo()
if err != nil { if err != nil {
log.ErrorLog(ctx, err.Error()) log.ErrorLog(ctx, err.Error())
@ -325,7 +326,7 @@ func (rs *ReplicationServer) DisableVolumeReplication(ctx context.Context,
case librbd.MirrorImageDisabling: case librbd.MirrorImageDisabling:
return nil, status.Errorf(codes.Aborted, "%s is in disabling state", volumeID) return nil, status.Errorf(codes.Aborted, "%s is in disabling state", volumeID)
case librbd.MirrorImageEnabled: case librbd.MirrorImageEnabled:
return disableVolumeReplication(rbdVol, mirroringInfo, force) return corerbd.DisableVolumeReplication(rbdVol, mirroringInfo, force)
default: default:
return nil, status.Errorf(codes.InvalidArgument, "image is in %s Mode", mirroringInfo.State) return nil, status.Errorf(codes.InvalidArgument, "image is in %s Mode", mirroringInfo.State)
} }
@ -333,53 +334,6 @@ func (rs *ReplicationServer) DisableVolumeReplication(ctx context.Context,
return &replication.DisableVolumeReplicationResponse{}, nil return &replication.DisableVolumeReplicationResponse{}, nil
} }
func disableVolumeReplication(rbdVol *rbdVolume,
mirroringInfo *librbd.MirrorImageInfo,
force bool,
) (*replication.DisableVolumeReplicationResponse, error) {
if !mirroringInfo.Primary {
// Return success if the below condition is met
// Local image is secondary
// Local image is in up+replaying state
// If the image is in a secondary and its state is up+replaying means
// its a healthy secondary and the image is primary somewhere in the
// remote cluster and the local image is getting replayed. Return
// success for the Disabling mirroring as we cannot disable mirroring
// on the secondary image, when the image on the primary site gets
// disabled the image on all the remote (secondary) clusters will get
// auto-deleted. This helps in garbage collecting the volume
// replication Kubernetes artifacts after failback operation.
localStatus, rErr := rbdVol.getLocalState()
if rErr != nil {
return nil, status.Error(codes.Internal, rErr.Error())
}
if localStatus.Up && localStatus.State == librbd.MirrorImageStatusStateReplaying {
return &replication.DisableVolumeReplicationResponse{}, nil
}
return nil, status.Errorf(codes.InvalidArgument,
"secondary image status is up=%t and state=%s",
localStatus.Up,
localStatus.State)
}
err := rbdVol.disableImageMirroring(force)
if err != nil {
return nil, status.Error(codes.Internal, err.Error())
}
// the image state can be still disabling once we disable the mirroring
// check the mirroring is disabled or not
mirroringInfo, err = rbdVol.getImageMirroringInfo()
if err != nil {
return nil, status.Error(codes.Internal, err.Error())
}
if mirroringInfo.State == librbd.MirrorImageDisabling {
return nil, status.Errorf(codes.Aborted, "%s is in disabling state", rbdVol.VolID)
}
return &replication.DisableVolumeReplicationResponse{}, nil
}
// PromoteVolume extracts the RBD volume information from the volumeID, If the // PromoteVolume extracts the RBD volume information from the volumeID, If the
// image is present, mirroring is enabled and the image is in demoted state it // image is present, mirroring is enabled and the image is in demoted state it
// will promote the volume as primary. // will promote the volume as primary.
@ -404,11 +358,11 @@ func (rs *ReplicationServer) PromoteVolume(ctx context.Context,
} }
defer rs.VolumeLocks.Release(volumeID) defer rs.VolumeLocks.Release(volumeID)
rbdVol, err := GenVolFromVolID(ctx, volumeID, cr, req.GetSecrets()) rbdVol, err := corerbd.GenVolFromVolID(ctx, volumeID, cr, req.GetSecrets())
defer rbdVol.Destroy() defer rbdVol.Destroy()
if err != nil { if err != nil {
switch { switch {
case errors.Is(err, ErrImageNotFound): case errors.Is(err, corerbd.ErrImageNotFound):
err = status.Errorf(codes.NotFound, "volume %s not found", volumeID) err = status.Errorf(codes.NotFound, "volume %s not found", volumeID)
case errors.Is(err, util.ErrPoolNotFound): case errors.Is(err, util.ErrPoolNotFound):
err = status.Errorf(codes.NotFound, "pool %s not found for %s", rbdVol.Pool, volumeID) err = status.Errorf(codes.NotFound, "pool %s not found for %s", rbdVol.Pool, volumeID)
@ -419,7 +373,7 @@ func (rs *ReplicationServer) PromoteVolume(ctx context.Context,
return nil, err return nil, err
} }
mirroringInfo, err := rbdVol.getImageMirroringInfo() mirroringInfo, err := rbdVol.GetImageMirroringInfo()
if err != nil { if err != nil {
log.ErrorLog(ctx, err.Error()) log.ErrorLog(ctx, err.Error())
@ -439,9 +393,9 @@ func (rs *ReplicationServer) PromoteVolume(ctx context.Context,
if req.GetForce() { if req.GetForce() {
// workaround for https://github.com/ceph/ceph-csi/issues/2736 // workaround for https://github.com/ceph/ceph-csi/issues/2736
// TODO: remove this workaround when the issue is fixed // TODO: remove this workaround when the issue is fixed
err = rbdVol.forcePromoteImage(cr) err = rbdVol.ForcePromoteImage(cr)
} else { } else {
err = rbdVol.promoteImage(req.GetForce()) err = rbdVol.PromoteImage(req.GetForce())
} }
if err != nil { if err != nil {
log.ErrorLog(ctx, err.Error()) log.ErrorLog(ctx, err.Error())
@ -461,7 +415,7 @@ func (rs *ReplicationServer) PromoteVolume(ctx context.Context,
interval, startTime := getSchedulingDetails(req.GetParameters()) interval, startTime := getSchedulingDetails(req.GetParameters())
if interval != admin.NoInterval { if interval != admin.NoInterval {
err = rbdVol.addSnapshotScheduling(interval, startTime) err = rbdVol.AddSnapshotScheduling(interval, startTime)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -500,11 +454,11 @@ func (rs *ReplicationServer) DemoteVolume(ctx context.Context,
} }
defer rs.VolumeLocks.Release(volumeID) defer rs.VolumeLocks.Release(volumeID)
rbdVol, err := GenVolFromVolID(ctx, volumeID, cr, req.GetSecrets()) rbdVol, err := corerbd.GenVolFromVolID(ctx, volumeID, cr, req.GetSecrets())
defer rbdVol.Destroy() defer rbdVol.Destroy()
if err != nil { if err != nil {
switch { switch {
case errors.Is(err, ErrImageNotFound): case errors.Is(err, corerbd.ErrImageNotFound):
err = status.Errorf(codes.NotFound, "volume %s not found", volumeID) err = status.Errorf(codes.NotFound, "volume %s not found", volumeID)
case errors.Is(err, util.ErrPoolNotFound): case errors.Is(err, util.ErrPoolNotFound):
err = status.Errorf(codes.NotFound, "pool %s not found for %s", rbdVol.Pool, volumeID) err = status.Errorf(codes.NotFound, "pool %s not found for %s", rbdVol.Pool, volumeID)
@ -514,7 +468,7 @@ func (rs *ReplicationServer) DemoteVolume(ctx context.Context,
return nil, err return nil, err
} }
mirroringInfo, err := rbdVol.getImageMirroringInfo() mirroringInfo, err := rbdVol.GetImageMirroringInfo()
if err != nil { if err != nil {
log.ErrorLog(ctx, err.Error()) log.ErrorLog(ctx, err.Error())
@ -531,7 +485,7 @@ func (rs *ReplicationServer) DemoteVolume(ctx context.Context,
// demote image to secondary // demote image to secondary
if mirroringInfo.Primary { if mirroringInfo.Primary {
err = rbdVol.demoteImage() err = rbdVol.DemoteImage()
if err != nil { if err != nil {
log.ErrorLog(ctx, err.Error()) log.ErrorLog(ctx, err.Error())
@ -591,11 +545,11 @@ func (rs *ReplicationServer) ResyncVolume(ctx context.Context,
return nil, status.Errorf(codes.Aborted, util.VolumeOperationAlreadyExistsFmt, volumeID) return nil, status.Errorf(codes.Aborted, util.VolumeOperationAlreadyExistsFmt, volumeID)
} }
defer rs.VolumeLocks.Release(volumeID) defer rs.VolumeLocks.Release(volumeID)
rbdVol, err := GenVolFromVolID(ctx, volumeID, cr, req.GetSecrets()) rbdVol, err := corerbd.GenVolFromVolID(ctx, volumeID, cr, req.GetSecrets())
defer rbdVol.Destroy() defer rbdVol.Destroy()
if err != nil { if err != nil {
switch { switch {
case errors.Is(err, ErrImageNotFound): case errors.Is(err, corerbd.ErrImageNotFound):
err = status.Errorf(codes.NotFound, "volume %s not found", volumeID) err = status.Errorf(codes.NotFound, "volume %s not found", volumeID)
case errors.Is(err, util.ErrPoolNotFound): case errors.Is(err, util.ErrPoolNotFound):
err = status.Errorf(codes.NotFound, "pool %s not found for %s", rbdVol.Pool, volumeID) err = status.Errorf(codes.NotFound, "pool %s not found for %s", rbdVol.Pool, volumeID)
@ -606,7 +560,7 @@ func (rs *ReplicationServer) ResyncVolume(ctx context.Context,
return nil, err return nil, err
} }
mirroringInfo, err := rbdVol.getImageMirroringInfo() mirroringInfo, err := rbdVol.GetImageMirroringInfo()
if err != nil { if err != nil {
// in case of Resync the image will get deleted and gets recreated and // in case of Resync the image will get deleted and gets recreated and
// it takes time for this operation. // it takes time for this operation.
@ -624,10 +578,10 @@ func (rs *ReplicationServer) ResyncVolume(ctx context.Context,
return nil, status.Error(codes.InvalidArgument, "image is in primary state") return nil, status.Error(codes.InvalidArgument, "image is in primary state")
} }
mirrorStatus, err := rbdVol.getImageMirroringStatus() mirrorStatus, err := rbdVol.GetImageMirroringStatus()
if err != nil { if err != nil {
// the image gets recreated after issuing resync // the image gets recreated after issuing resync
if errors.Is(err, ErrImageNotFound) { if errors.Is(err, corerbd.ErrImageNotFound) {
// caller retries till RBD syncs an initial version of the image to // caller retries till RBD syncs an initial version of the image to
// report its status in the resync call. Ideally, this line will not // report its status in the resync call. Ideally, this line will not
// be executed as the error would get returned due to getImageMirroringInfo // be executed as the error would get returned due to getImageMirroringInfo
@ -671,7 +625,7 @@ func (rs *ReplicationServer) ResyncVolume(ctx context.Context,
ready = checkRemoteSiteStatus(ctx, mirrorStatus) ready = checkRemoteSiteStatus(ctx, mirrorStatus)
} }
err = resyncVolume(localStatus, rbdVol, req.Force) err = rbdVol.ResyncVol(localStatus, req.Force)
if err != nil { if err != nil {
log.ErrorLog(ctx, err.Error()) log.ErrorLog(ctx, err.Error())
@ -683,7 +637,7 @@ func (rs *ReplicationServer) ResyncVolume(ctx context.Context,
return nil, status.Error(codes.Internal, err.Error()) return nil, status.Error(codes.Internal, err.Error())
} }
err = repairResyncedImageID(ctx, rbdVol, ready) err = rbdVol.RepairResyncedImageID(ctx, ready)
if err != nil { if err != nil {
return nil, status.Errorf(codes.Internal, "failed to resync Image ID: %s", err.Error()) return nil, status.Errorf(codes.Internal, "failed to resync Image ID: %s", err.Error())
} }
@ -716,11 +670,11 @@ func (rs *ReplicationServer) GetVolumeReplicationInfo(ctx context.Context,
return nil, status.Errorf(codes.Aborted, util.VolumeOperationAlreadyExistsFmt, volumeID) return nil, status.Errorf(codes.Aborted, util.VolumeOperationAlreadyExistsFmt, volumeID)
} }
defer rs.VolumeLocks.Release(volumeID) defer rs.VolumeLocks.Release(volumeID)
rbdVol, err := GenVolFromVolID(ctx, volumeID, cr, req.GetSecrets()) rbdVol, err := corerbd.GenVolFromVolID(ctx, volumeID, cr, req.GetSecrets())
defer rbdVol.Destroy() defer rbdVol.Destroy()
if err != nil { if err != nil {
switch { switch {
case errors.Is(err, ErrImageNotFound): case errors.Is(err, corerbd.ErrImageNotFound):
err = status.Errorf(codes.NotFound, "volume %s not found", volumeID) err = status.Errorf(codes.NotFound, "volume %s not found", volumeID)
case errors.Is(err, util.ErrPoolNotFound): case errors.Is(err, util.ErrPoolNotFound):
err = status.Errorf(codes.NotFound, "pool %s not found for %s", rbdVol.Pool, volumeID) err = status.Errorf(codes.NotFound, "pool %s not found for %s", rbdVol.Pool, volumeID)
@ -731,7 +685,7 @@ func (rs *ReplicationServer) GetVolumeReplicationInfo(ctx context.Context,
return nil, err return nil, err
} }
mirroringInfo, err := rbdVol.getImageMirroringInfo() mirroringInfo, err := rbdVol.GetImageMirroringInfo()
if err != nil { if err != nil {
log.ErrorLog(ctx, err.Error()) log.ErrorLog(ctx, err.Error())
@ -747,9 +701,9 @@ func (rs *ReplicationServer) GetVolumeReplicationInfo(ctx context.Context,
return nil, status.Error(codes.InvalidArgument, "image is not in primary state") return nil, status.Error(codes.InvalidArgument, "image is not in primary state")
} }
mirrorStatus, err := rbdVol.getImageMirroringStatus() mirrorStatus, err := rbdVol.GetImageMirroringStatus()
if err != nil { if err != nil {
if errors.Is(err, ErrImageNotFound) { if errors.Is(err, corerbd.ErrImageNotFound) {
return nil, status.Error(codes.Aborted, err.Error()) return nil, status.Error(codes.Aborted, err.Error())
} }
log.ErrorLog(ctx, err.Error()) log.ErrorLog(ctx, err.Error())
@ -767,7 +721,7 @@ func (rs *ReplicationServer) GetVolumeReplicationInfo(ctx context.Context,
description := remoteStatus.Description description := remoteStatus.Description
lastSyncTime, err := getLastSyncTime(description) lastSyncTime, err := getLastSyncTime(description)
if err != nil { if err != nil {
if errors.Is(err, ErrLastSyncTimeNotFound) { if errors.Is(err, corerbd.ErrLastSyncTimeNotFound) {
return nil, status.Errorf(codes.NotFound, "failed to get last sync time: %v", err) return nil, status.Errorf(codes.NotFound, "failed to get last sync time: %v", err)
} }
log.ErrorLog(ctx, err.Error()) log.ErrorLog(ctx, err.Error())
@ -812,11 +766,11 @@ func getLastSyncTime(description string) (*timestamppb.Timestamp, error) {
// In case there is no local snapshot timestamp return an error as the // In case there is no local snapshot timestamp return an error as the
// LastSyncTime is required. // LastSyncTime is required.
if description == "" { if description == "" {
return nil, fmt.Errorf("empty description: %w", ErrLastSyncTimeNotFound) return nil, fmt.Errorf("empty description: %w", corerbd.ErrLastSyncTimeNotFound)
} }
splittedString := strings.SplitN(description, ",", 2) splittedString := strings.SplitN(description, ",", 2)
if len(splittedString) == 1 { if len(splittedString) == 1 {
return nil, fmt.Errorf("no local snapshot timestamp: %w", ErrLastSyncTimeNotFound) return nil, fmt.Errorf("no local snapshot timestamp: %w", corerbd.ErrLastSyncTimeNotFound)
} }
type localStatus struct { type localStatus struct {
LocalSnapshotTime int64 `json:"local_snapshot_timestamp"` LocalSnapshotTime int64 `json:"local_snapshot_timestamp"`
@ -831,7 +785,7 @@ func getLastSyncTime(description string) (*timestamppb.Timestamp, error) {
// If the json unmarsal is successful but the local snapshot time is 0, we // If the json unmarsal is successful but the local snapshot time is 0, we
// need to consider it as an error as the LastSyncTime is required. // need to consider it as an error as the LastSyncTime is required.
if localSnapTime.LocalSnapshotTime == 0 { if localSnapTime.LocalSnapshotTime == 0 {
return nil, fmt.Errorf("empty local snapshot timestamp: %w", ErrLastSyncTimeNotFound) return nil, fmt.Errorf("empty local snapshot timestamp: %w", corerbd.ErrLastSyncTimeNotFound)
} }
lastUpdateTime := time.Unix(localSnapTime.LocalSnapshotTime, 0) lastUpdateTime := time.Unix(localSnapTime.LocalSnapshotTime, 0)
@ -840,29 +794,6 @@ func getLastSyncTime(description string) (*timestamppb.Timestamp, error) {
return lastSyncTime, nil return lastSyncTime, nil
} }
func resyncVolume(localStatus librbd.SiteMirrorImageStatus, rbdVol *rbdVolume, force bool) error {
if resyncRequired(localStatus) {
// If the force option is not set return the error message to retry
// with Force option.
if !force {
return status.Errorf(codes.FailedPrecondition,
"image is in %q state, description (%s). Force resync to recover volume",
localStatus.State, localStatus.Description)
}
err := rbdVol.resyncImage()
if err != nil {
return status.Error(codes.Internal, err.Error())
}
// If we issued a resync, return a non-final error as image needs to be recreated
// locally. Caller retries till RBD syncs an initial version of the image to
// report its status in the resync request.
return status.Error(codes.Unavailable, "awaiting initial resync due to split brain")
}
return nil
}
func checkVolumeResyncStatus(localStatus librbd.SiteMirrorImageStatus) error { func checkVolumeResyncStatus(localStatus librbd.SiteMirrorImageStatus) error {
// we are considering 2 states to check resync started and resync completed // we are considering 2 states to check resync started and resync completed
// as below. all other states will be considered as an error state so that // as below. all other states will be considered as an error state so that
@ -882,39 +813,3 @@ func checkVolumeResyncStatus(localStatus librbd.SiteMirrorImageStatus) error {
return nil return nil
} }
// resyncRequired returns true if local image is in split-brain state and image
// needs resync.
func resyncRequired(localStatus librbd.SiteMirrorImageStatus) bool {
// resync is required if the image is in error state or the description
// contains split-brain message.
// In some corner cases like `re-player shutdown` the local image will not
// be in an error state. It would be also worth considering the `description`
// field to make sure about split-brain.
if localStatus.State == librbd.MirrorImageStatusStateError ||
strings.Contains(localStatus.Description, "split-brain") {
return true
}
return false
}
// repairResyncedImageID updates the existing image ID with new one.
func repairResyncedImageID(ctx context.Context, rv *rbdVolume, ready bool) error {
// During resync operation the local image will get deleted and a new
// image is recreated by the rbd mirroring. The new image will have a
// new image ID. Once resync is completed update the image ID in the OMAP
// to get the image removed from the trash during DeleteVolume.
// if the image is not completely resynced skip repairing image ID.
if !ready {
return nil
}
j, err := volJournal.Connect(rv.Monitors, rv.RadosNamespace, rv.conn.Creds)
if err != nil {
return err
}
defer j.Destroy()
// reset the image ID which is stored in the existing OMAP
return rv.repairImageID(ctx, j, true)
}

View File

@ -23,6 +23,8 @@ import (
"testing" "testing"
"time" "time"
corerbd "github.com/ceph/ceph-csi/internal/rbd"
librbd "github.com/ceph/go-ceph/rbd" librbd "github.com/ceph/go-ceph/rbd"
"github.com/ceph/go-ceph/rbd/admin" "github.com/ceph/go-ceph/rbd/admin"
"google.golang.org/protobuf/types/known/timestamppb" "google.golang.org/protobuf/types/known/timestamppb"
@ -455,7 +457,7 @@ func TestValidateLastSyncTime(t *testing.T) {
"empty description", "empty description",
"", "",
nil, nil,
ErrLastSyncTimeNotFound.Error(), corerbd.ErrLastSyncTimeNotFound.Error(),
}, },
{ {
"description without local_snapshot_timestamp", "description without local_snapshot_timestamp",
@ -473,7 +475,7 @@ func TestValidateLastSyncTime(t *testing.T) {
"description with no JSON", "description with no JSON",
`replaying`, `replaying`,
nil, nil,
ErrLastSyncTimeNotFound.Error(), corerbd.ErrLastSyncTimeNotFound.Error(),
}, },
} }
for _, tt := range tests { for _, tt := range tests {

View File

@ -943,7 +943,7 @@ func (cs *ControllerServer) DeleteVolume(
func cleanupRBDImage(ctx context.Context, func cleanupRBDImage(ctx context.Context,
rbdVol *rbdVolume, cr *util.Credentials, rbdVol *rbdVolume, cr *util.Credentials,
) (*csi.DeleteVolumeResponse, error) { ) (*csi.DeleteVolumeResponse, error) {
mirroringInfo, err := rbdVol.getImageMirroringInfo() mirroringInfo, err := rbdVol.GetImageMirroringInfo()
if err != nil { if err != nil {
log.ErrorLog(ctx, err.Error()) log.ErrorLog(ctx, err.Error())
@ -962,7 +962,7 @@ func cleanupRBDImage(ctx context.Context,
// the image on all the remote (secondary) clusters will get // the image on all the remote (secondary) clusters will get
// auto-deleted. This helps in garbage collecting the OMAP, PVC and PV // auto-deleted. This helps in garbage collecting the OMAP, PVC and PV
// objects after failback operation. // objects after failback operation.
localStatus, rErr := rbdVol.getLocalState() localStatus, rErr := rbdVol.GetLocalState()
if rErr != nil { if rErr != nil {
return nil, status.Error(codes.Internal, rErr.Error()) return nil, status.Error(codes.Internal, rErr.Error())
} }

View File

@ -38,7 +38,7 @@ type Driver struct {
ids *rbd.IdentityServer ids *rbd.IdentityServer
ns *rbd.NodeServer ns *rbd.NodeServer
cs *rbd.ControllerServer cs *rbd.ControllerServer
rs *rbd.ReplicationServer rs *casrbd.ReplicationServer
// cas is the CSIAddonsServer where CSI-Addons services are handled // cas is the CSIAddonsServer where CSI-Addons services are handled
cas *csiaddons.CSIAddonsServer cas *csiaddons.CSIAddonsServer
@ -66,8 +66,8 @@ func NewControllerServer(d *csicommon.CSIDriver) *rbd.ControllerServer {
} }
} }
func NewReplicationServer(c *rbd.ControllerServer) *rbd.ReplicationServer { func NewReplicationServer(c *rbd.ControllerServer) *casrbd.ReplicationServer {
return &rbd.ReplicationServer{ControllerServer: c} return &casrbd.ReplicationServer{ControllerServer: c}
} }
// NewNodeServer initialize a node server for rbd CSI driver. // NewNodeServer initialize a node server for rbd CSI driver.

View File

@ -25,8 +25,8 @@ import (
librbd "github.com/ceph/go-ceph/rbd" librbd "github.com/ceph/go-ceph/rbd"
) )
// enableImageMirroring enables mirroring on an image. // EnableImageMirroring enables mirroring on an image.
func (ri *rbdImage) enableImageMirroring(mode librbd.ImageMirrorMode) error { func (ri *rbdImage) EnableImageMirroring(mode librbd.ImageMirrorMode) error {
image, err := ri.open() image, err := ri.open()
if err != nil { if err != nil {
return fmt.Errorf("failed to open image %q with error: %w", ri, err) return fmt.Errorf("failed to open image %q with error: %w", ri, err)
@ -41,8 +41,8 @@ func (ri *rbdImage) enableImageMirroring(mode librbd.ImageMirrorMode) error {
return nil return nil
} }
// disableImageMirroring disables mirroring on an image. // DisableImageMirroring disables mirroring on an image.
func (ri *rbdImage) disableImageMirroring(force bool) error { func (ri *rbdImage) DisableImageMirroring(force bool) error {
image, err := ri.open() image, err := ri.open()
if err != nil { if err != nil {
return fmt.Errorf("failed to open image %q with error: %w", ri, err) return fmt.Errorf("failed to open image %q with error: %w", ri, err)
@ -57,8 +57,8 @@ func (ri *rbdImage) disableImageMirroring(force bool) error {
return nil return nil
} }
// getImageMirroringInfo gets mirroring information of an image. // GetImageMirroringInfo gets mirroring information of an image.
func (ri *rbdImage) getImageMirroringInfo() (*librbd.MirrorImageInfo, error) { func (ri *rbdImage) GetImageMirroringInfo() (*librbd.MirrorImageInfo, error) {
image, err := ri.open() image, err := ri.open()
if err != nil { if err != nil {
return nil, fmt.Errorf("failed to open image %q with error: %w", ri, err) return nil, fmt.Errorf("failed to open image %q with error: %w", ri, err)
@ -73,8 +73,8 @@ func (ri *rbdImage) getImageMirroringInfo() (*librbd.MirrorImageInfo, error) {
return info, nil return info, nil
} }
// promoteImage promotes image to primary. // PromoteImage promotes image to primary.
func (ri *rbdImage) promoteImage(force bool) error { func (ri *rbdImage) PromoteImage(force bool) error {
image, err := ri.open() image, err := ri.open()
if err != nil { if err != nil {
return fmt.Errorf("failed to open image %q with error: %w", ri, err) return fmt.Errorf("failed to open image %q with error: %w", ri, err)
@ -88,10 +88,10 @@ func (ri *rbdImage) promoteImage(force bool) error {
return nil return nil
} }
// forcePromoteImage promotes image to primary with force option with 2 minutes // ForcePromoteImage promotes image to primary with force option with 2 minutes
// timeout. If there is no response within 2 minutes,the rbd CLI process will be // timeout. If there is no response within 2 minutes,the rbd CLI process will be
// killed and an error is returned. // killed and an error is returned.
func (rv *rbdVolume) forcePromoteImage(cr *util.Credentials) error { func (rv *rbdVolume) ForcePromoteImage(cr *util.Credentials) error {
promoteArgs := []string{ promoteArgs := []string{
"mirror", "image", "promote", "mirror", "image", "promote",
rv.String(), rv.String(),
@ -118,8 +118,8 @@ func (rv *rbdVolume) forcePromoteImage(cr *util.Credentials) error {
return nil return nil
} }
// demoteImage demotes image to secondary. // DemoteImage demotes image to secondary.
func (ri *rbdImage) demoteImage() error { func (ri *rbdImage) DemoteImage() error {
image, err := ri.open() image, err := ri.open()
if err != nil { if err != nil {
return fmt.Errorf("failed to open image %q with error: %w", ri, err) return fmt.Errorf("failed to open image %q with error: %w", ri, err)
@ -148,8 +148,8 @@ func (ri *rbdImage) resyncImage() error {
return nil return nil
} }
// getImageMirroringStatus get the mirroring status of an image. // GetImageMirroringStatus get the mirroring status of an image.
func (ri *rbdImage) getImageMirroringStatus() (*librbd.GlobalMirrorImageStatus, error) { func (ri *rbdImage) GetImageMirroringStatus() (*librbd.GlobalMirrorImageStatus, error) {
image, err := ri.open() image, err := ri.open()
if err != nil { if err != nil {
return nil, fmt.Errorf("failed to open image %q with error: %w", ri, err) return nil, fmt.Errorf("failed to open image %q with error: %w", ri, err)
@ -163,8 +163,8 @@ func (ri *rbdImage) getImageMirroringStatus() (*librbd.GlobalMirrorImageStatus,
return &statusInfo, nil return &statusInfo, nil
} }
// getLocalState returns the local state of the image. // GetLocalState returns the local state of the image.
func (ri *rbdImage) getLocalState() (librbd.SiteMirrorImageStatus, error) { func (ri *rbdImage) GetLocalState() (librbd.SiteMirrorImageStatus, error) {
localStatus := librbd.SiteMirrorImageStatus{} localStatus := librbd.SiteMirrorImageStatus{}
image, err := ri.open() image, err := ri.open()
if err != nil { if err != nil {

View File

@ -2018,7 +2018,7 @@ func (ri *rbdImage) isCompabitableClone(dst *rbdImage) error {
return nil return nil
} }
func (ri *rbdImage) addSnapshotScheduling( func (ri *rbdImage) AddSnapshotScheduling(
interval admin.Interval, interval admin.Interval,
startTime admin.StartTime, startTime admin.StartTime,
) error { ) error {

133
internal/rbd/replication.go Normal file
View File

@ -0,0 +1,133 @@
/*
Copyright 2023 The Ceph-CSI Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package rbd
import (
"context"
"strings"
librbd "github.com/ceph/go-ceph/rbd"
"github.com/csi-addons/spec/lib/go/replication"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
)
func (rv *rbdVolume) ResyncVol(localStatus librbd.SiteMirrorImageStatus, force bool) error {
if resyncRequired(localStatus) {
// If the force option is not set return the error message to retry
// with Force option.
if !force {
return status.Errorf(codes.FailedPrecondition,
"image is in %q state, description (%s). Force resync to recover volume",
localStatus.State, localStatus.Description)
}
err := rv.resyncImage()
if err != nil {
return status.Error(codes.Internal, err.Error())
}
// If we issued a resync, return a non-final error as image needs to be recreated
// locally. Caller retries till RBD syncs an initial version of the image to
// report its status in the resync request.
return status.Error(codes.Unavailable, "awaiting initial resync due to split brain")
}
return nil
}
// repairResyncedImageID updates the existing image ID with new one.
func (rv *rbdVolume) RepairResyncedImageID(ctx context.Context, ready bool) error {
// During resync operation the local image will get deleted and a new
// image is recreated by the rbd mirroring. The new image will have a
// new image ID. Once resync is completed update the image ID in the OMAP
// to get the image removed from the trash during DeleteVolume.
// if the image is not completely resynced skip repairing image ID.
if !ready {
return nil
}
j, err := volJournal.Connect(rv.Monitors, rv.RadosNamespace, rv.conn.Creds)
if err != nil {
return err
}
defer j.Destroy()
// reset the image ID which is stored in the existing OMAP
return rv.repairImageID(ctx, j, true)
}
// resyncRequired returns true if local image is in split-brain state and image
// needs resync.
func resyncRequired(localStatus librbd.SiteMirrorImageStatus) bool {
// resync is required if the image is in error state or the description
// contains split-brain message.
// In some corner cases like `re-player shutdown` the local image will not
// be in an error state. It would be also worth considering the `description`
// field to make sure about split-brain.
if localStatus.State == librbd.MirrorImageStatusStateError ||
strings.Contains(localStatus.Description, "split-brain") {
return true
}
return false
}
func DisableVolumeReplication(rbdVol *rbdVolume,
mirroringInfo *librbd.MirrorImageInfo,
force bool,
) (*replication.DisableVolumeReplicationResponse, error) {
if !mirroringInfo.Primary {
// Return success if the below condition is met
// Local image is secondary
// Local image is in up+replaying state
// If the image is in a secondary and its state is up+replaying means
// its a healthy secondary and the image is primary somewhere in the
// remote cluster and the local image is getting replayed. Return
// success for the Disabling mirroring as we cannot disable mirroring
// on the secondary image, when the image on the primary site gets
// disabled the image on all the remote (secondary) clusters will get
// auto-deleted. This helps in garbage collecting the volume
// replication Kubernetes artifacts after failback operation.
localStatus, rErr := rbdVol.GetLocalState()
if rErr != nil {
return nil, status.Error(codes.Internal, rErr.Error())
}
if localStatus.Up && localStatus.State == librbd.MirrorImageStatusStateReplaying {
return &replication.DisableVolumeReplicationResponse{}, nil
}
return nil, status.Errorf(codes.InvalidArgument,
"secondary image status is up=%t and state=%s",
localStatus.Up,
localStatus.State)
}
err := rbdVol.DisableImageMirroring(force)
if err != nil {
return nil, status.Error(codes.Internal, err.Error())
}
// the image state can be still disabling once we disable the mirroring
// check the mirroring is disabled or not
mirroringInfo, err = rbdVol.GetImageMirroringInfo()
if err != nil {
return nil, status.Error(codes.Internal, err.Error())
}
if mirroringInfo.State == librbd.MirrorImageDisabling {
return nil, status.Errorf(codes.Aborted, "%s is in disabling state", rbdVol.VolID)
}
return &replication.DisableVolumeReplicationResponse{}, nil
}