ceph-csi/internal/rbd/replicationcontrollerserver.go

1050 lines
34 KiB
Go
Raw Normal View History

/*
Copyright 2021 The Ceph-CSI Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package rbd
import (
"context"
"encoding/json"
"errors"
"fmt"
"regexp"
"strconv"
"strings"
"sync"
"time"
"github.com/ceph/ceph-csi/internal/util"
"github.com/ceph/ceph-csi/internal/util/log"
librbd "github.com/ceph/go-ceph/rbd"
"github.com/ceph/go-ceph/rbd/admin"
"github.com/csi-addons/spec/lib/go/replication"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
"google.golang.org/protobuf/types/known/timestamppb"
)
// imageMirroringMode is used to indicate the mirroring mode for an RBD image.
type imageMirroringMode string
const (
// imageMirrorModeSnapshot uses snapshots to propagate RBD images between
// ceph clusters.
imageMirrorModeSnapshot imageMirroringMode = "snapshot"
// imageMirrorModeJournal uses journaling to propagate RBD images between
// ceph clusters.
imageMirrorModeJournal imageMirroringMode = "journal"
)
const (
// mirroringMode + key to get the imageMirroringMode from parameters.
imageMirroringKey = "mirroringMode"
// forceKey + key to get the force option from parameters.
forceKey = "force"
// schedulingIntervalKey to get the schedulingInterval from the
// parameters.
// Interval of time between scheduled snapshots. Typically in the form
// <num><m,h,d>.
schedulingIntervalKey = "schedulingInterval"
// schedulingStartTimeKey to get the schedulingStartTime from the
// parameters.
// (optional) StartTime is the time the snapshot schedule
// begins, can be specified using the ISO 8601 time format.
schedulingStartTimeKey = "schedulingStartTime"
)
type operation string
var (
// pool+"/"+key to check dummy image is created.
dummyImageCreated operation = "dummyImageCreated"
// Read write lock to ensure that only one operation is happening at a time.
operationLock = sync.Map{}
// Lock to serialize operations on the dummy image to tickle RBD snapshot schedule.
dummyImageOpsLock sync.Mutex
)
// ReplicationServer struct of rbd CSI driver with supported methods of Replication
// controller server spec.
type ReplicationServer struct {
// added UnimplementedControllerServer as a member of
// ControllerServer. if replication spec add more RPC services in the proto
// file, then we don't need to add all RPC methods leading to forward
// compatibility.
*replication.UnimplementedControllerServer
// Embed ControllerServer as it implements helper functions
*ControllerServer
}
func (rs *ReplicationServer) RegisterService(server grpc.ServiceRegistrar) {
replication.RegisterControllerServer(server, rs)
}
// getForceOption extracts the force option from the GRPC request parameters.
// If not set, the default will be set to false.
func getForceOption(ctx context.Context, parameters map[string]string) (bool, error) {
val, ok := parameters[forceKey]
if !ok {
log.WarningLog(ctx, "%s is not set in parameters, setting to default (%v)", forceKey, false)
return false, nil
}
force, err := strconv.ParseBool(val)
if err != nil {
return false, status.Errorf(codes.Internal, err.Error())
}
return force, nil
}
// getMirroringMode gets the mirroring mode from the input GRPC request parameters.
// mirroringMode is the key to check the mode in the parameters.
func getMirroringMode(ctx context.Context, parameters map[string]string) (librbd.ImageMirrorMode, error) {
val, ok := parameters[imageMirroringKey]
if !ok {
log.WarningLog(
ctx,
"%s is not set in parameters, setting to mirroringMode to default (%s)",
imageMirroringKey,
imageMirrorModeSnapshot)
return librbd.ImageMirrorModeSnapshot, nil
}
var mirroringMode librbd.ImageMirrorMode
switch imageMirroringMode(val) {
case imageMirrorModeSnapshot:
mirroringMode = librbd.ImageMirrorModeSnapshot
case imageMirrorModeJournal:
mirroringMode = librbd.ImageMirrorModeJournal
default:
return mirroringMode, status.Errorf(codes.InvalidArgument, "%s %s not supported", imageMirroringKey, val)
}
return mirroringMode, nil
}
// validateSchedulingDetails gets the mirroring mode and scheduling details from the
// input GRPC request parameters and validates the scheduling is only supported
// for snapshot mirroring mode.
func validateSchedulingDetails(ctx context.Context, parameters map[string]string) error {
var err error
val := parameters[imageMirroringKey]
switch imageMirroringMode(val) {
case imageMirrorModeJournal:
// journal mirror mode does not require scheduling parameters
if _, ok := parameters[schedulingIntervalKey]; ok {
log.WarningLog(ctx, "%s parameter cannot be used with %s mirror mode, ignoring it",
schedulingIntervalKey, string(imageMirrorModeJournal))
}
if _, ok := parameters[schedulingStartTimeKey]; ok {
log.WarningLog(ctx, "%s parameter cannot be used with %s mirror mode, ignoring it",
schedulingStartTimeKey, string(imageMirrorModeJournal))
}
return nil
case imageMirrorModeSnapshot:
// If mirroring mode is not set in parameters, we are defaulting mirroring
// mode to snapshot. Discard empty mirroring mode from validation as it is
// an optional parameter.
case "":
default:
return status.Error(codes.InvalidArgument, "scheduling is only supported for snapshot mode")
}
// validate mandatory interval field
interval, ok := parameters[schedulingIntervalKey]
if ok && interval == "" {
return status.Error(codes.InvalidArgument, "scheduling interval cannot be empty")
}
adminStartTime := admin.StartTime(parameters[schedulingStartTimeKey])
if !ok {
// startTime is alone not supported it has to be present with interval
if adminStartTime != "" {
return status.Errorf(codes.InvalidArgument,
"%q parameter is supported only with %q",
schedulingStartTimeKey,
schedulingIntervalKey)
}
}
if interval != "" {
err = validateSchedulingInterval(interval)
if err != nil {
return status.Error(codes.InvalidArgument, err.Error())
}
}
return nil
}
// getSchedulingDetails returns scheduling interval and scheduling startTime.
func getSchedulingDetails(parameters map[string]string) (admin.Interval, admin.StartTime) {
return admin.Interval(parameters[schedulingIntervalKey]),
admin.StartTime(parameters[schedulingStartTimeKey])
}
// validateSchedulingInterval return the interval as it is if its ending with
// `m|h|d` or else it will return error.
func validateSchedulingInterval(interval string) error {
re := regexp.MustCompile(`^\d+[mhd]$`)
if re.MatchString(interval) {
return nil
}
return errors.New("interval specified without d, h, m suffix")
}
// EnableVolumeReplication extracts the RBD volume information from the
// volumeID, If the image is present it will enable the mirroring based on the
// user provided information.
func (rs *ReplicationServer) EnableVolumeReplication(ctx context.Context,
req *replication.EnableVolumeReplicationRequest,
) (*replication.EnableVolumeReplicationResponse, error) {
volumeID := req.GetVolumeId()
if volumeID == "" {
return nil, status.Error(codes.InvalidArgument, "empty volume ID in request")
}
cr, err := util.NewUserCredentials(req.GetSecrets())
if err != nil {
return nil, status.Error(codes.Internal, err.Error())
}
defer cr.DeleteCredentials()
err = validateSchedulingDetails(ctx, req.GetParameters())
if err != nil {
return nil, err
}
if acquired := rs.VolumeLocks.TryAcquire(volumeID); !acquired {
log.ErrorLog(ctx, util.VolumeOperationAlreadyExistsFmt, volumeID)
return nil, status.Errorf(codes.Aborted, util.VolumeOperationAlreadyExistsFmt, volumeID)
}
defer rs.VolumeLocks.Release(volumeID)
rbdVol, err := GenVolFromVolID(ctx, volumeID, cr, req.GetSecrets())
defer rbdVol.Destroy()
if err != nil {
switch {
case errors.Is(err, ErrImageNotFound):
err = status.Errorf(codes.NotFound, "volume %s not found", volumeID)
case errors.Is(err, util.ErrPoolNotFound):
err = status.Errorf(codes.NotFound, "pool %s not found for %s", rbdVol.Pool, volumeID)
default:
err = status.Errorf(codes.Internal, err.Error())
}
return nil, err
}
// extract the mirroring mode
mirroringMode, err := getMirroringMode(ctx, req.GetParameters())
if err != nil {
return nil, err
}
mirroringInfo, err := rbdVol.getImageMirroringInfo()
if err != nil {
log.ErrorLog(ctx, err.Error())
return nil, status.Error(codes.Internal, err.Error())
}
err = createDummyImage(ctx, rbdVol)
if err != nil {
return nil, status.Errorf(codes.Internal, "failed to create dummy image %s", err.Error())
}
if mirroringInfo.State != librbd.MirrorImageEnabled {
err = rbdVol.enableImageMirroring(mirroringMode)
if err != nil {
log.ErrorLog(ctx, err.Error())
return nil, status.Error(codes.Internal, err.Error())
}
}
return &replication.EnableVolumeReplicationResponse{}, nil
}
// getDummyImageName returns the csi-vol-dummy+cluster FSID as the image name.
// each cluster should have a unique dummy image created. choosing the cluster
// FSID for the same reason.
func getDummyImageName(conn *util.ClusterConnection) (string, error) {
id, err := conn.GetFSID()
if err != nil {
return "", err
}
return fmt.Sprintf("csi-vol-dummy-%s", id), nil
}
// getOperationName returns the operation name for the given operation type
// combined with the pool name.
func getOperationName(poolName string, optName operation) string {
return fmt.Sprintf("%s/%s", poolName, optName)
}
// createDummyImage creates a dummy image as a workaround for the rbd
// scheduling problem.
func createDummyImage(ctx context.Context, rbdVol *rbdVolume) error {
var err error
var imgName string
dummyImageOpsLock.Lock()
defer dummyImageOpsLock.Unlock()
optName := getOperationName(rbdVol.Pool, dummyImageCreated)
if _, ok := operationLock.Load(optName); !ok {
// create a dummy image
imgName, err = getDummyImageName(rbdVol.conn)
if err != nil {
return err
}
dummyVol := *rbdVol
dummyVol.RbdImageName = imgName
// dummyVol holds rbdVol details, reset ImageID or else dummy image cannot be
// deleted from trash during repair operation.
dummyVol.ImageID = ""
f := []string{
librbd.FeatureNameLayering,
librbd.FeatureNameObjectMap,
librbd.FeatureNameExclusiveLock,
librbd.FeatureNameFastDiff,
}
features := librbd.FeatureSetFromNames(f)
dummyVol.ImageFeatureSet = features
// create 1MiB dummy image. 1MiB=1048576 bytes
dummyVol.VolSize = 1048576
err = createImage(ctx, &dummyVol, dummyVol.conn.Creds)
if err != nil {
if strings.Contains(err.Error(), "File exists") {
err = repairDummyImage(ctx, &dummyVol)
}
}
if err == nil {
operationLock.Store(optName, true)
}
}
return err
}
// repairDummyImage deletes and recreates the dummy image.
func repairDummyImage(ctx context.Context, dummyVol *rbdVolume) error {
// instead of checking the images features and than adding missing image
// features, updating the image size to 1Mib. We will delete the image
// and recreate it.
// deleting and recreating the dummy image will not impact anything as its
// a workaround to fix the scheduling problem.
err := dummyVol.deleteImage(ctx)
if err != nil {
return err
}
return createImage(ctx, dummyVol, dummyVol.conn.Creds)
}
// tickleMirroringOnDummyImage disables and reenables mirroring on the dummy image, and sets a
// schedule of a minute for the dummy image, to force a schedule refresh for other mirrored images
// within a minute.
func tickleMirroringOnDummyImage(rbdVol *rbdVolume, mirroringMode librbd.ImageMirrorMode) error {
imgName, err := getDummyImageName(rbdVol.conn)
if err != nil {
return err
}
dummyVol := *rbdVol
dummyVol.RbdImageName = imgName
dummyImageOpsLock.Lock()
defer dummyImageOpsLock.Unlock()
err = dummyVol.disableImageMirroring(false)
if err != nil {
return err
}
err = dummyVol.enableImageMirroring(mirroringMode)
if err != nil {
return err
}
if mirroringMode == librbd.ImageMirrorModeSnapshot {
err = dummyVol.addSnapshotScheduling(admin.Interval("1m"), admin.NoStartTime)
if err != nil {
return err
}
}
return nil
}
// DisableVolumeReplication extracts the RBD volume information from the
// volumeID, If the image is present and the mirroring is enabled on the RBD
// image it will disable the mirroring.
func (rs *ReplicationServer) DisableVolumeReplication(ctx context.Context,
req *replication.DisableVolumeReplicationRequest,
) (*replication.DisableVolumeReplicationResponse, error) {
volumeID := req.GetVolumeId()
if volumeID == "" {
return nil, status.Error(codes.InvalidArgument, "empty volume ID in request")
}
cr, err := util.NewUserCredentials(req.GetSecrets())
if err != nil {
return nil, status.Error(codes.Internal, err.Error())
}
defer cr.DeleteCredentials()
if acquired := rs.VolumeLocks.TryAcquire(volumeID); !acquired {
log.ErrorLog(ctx, util.VolumeOperationAlreadyExistsFmt, volumeID)
return nil, status.Errorf(codes.Aborted, util.VolumeOperationAlreadyExistsFmt, volumeID)
}
defer rs.VolumeLocks.Release(volumeID)
rbdVol, err := GenVolFromVolID(ctx, volumeID, cr, req.GetSecrets())
defer rbdVol.Destroy()
if err != nil {
switch {
case errors.Is(err, ErrImageNotFound):
err = status.Errorf(codes.NotFound, "volume %s not found", volumeID)
case errors.Is(err, util.ErrPoolNotFound):
err = status.Errorf(codes.NotFound, "pool %s not found for %s", rbdVol.Pool, volumeID)
default:
err = status.Errorf(codes.Internal, err.Error())
}
return nil, err
}
// extract the force option
force, err := getForceOption(ctx, req.GetParameters())
if err != nil {
return nil, err
}
mirroringInfo, err := rbdVol.getImageMirroringInfo()
if err != nil {
log.ErrorLog(ctx, err.Error())
return nil, status.Error(codes.Internal, err.Error())
}
switch mirroringInfo.State {
// image is already in disabled state
case librbd.MirrorImageDisabled:
// image mirroring is still disabling
case librbd.MirrorImageDisabling:
return nil, status.Errorf(codes.Aborted, "%s is in disabling state", volumeID)
case librbd.MirrorImageEnabled:
return disableVolumeReplication(rbdVol, mirroringInfo, force)
default:
return nil, status.Errorf(codes.InvalidArgument, "image is in %s Mode", mirroringInfo.State)
}
return &replication.DisableVolumeReplicationResponse{}, nil
}
func disableVolumeReplication(rbdVol *rbdVolume,
mirroringInfo *librbd.MirrorImageInfo,
force bool,
) (*replication.DisableVolumeReplicationResponse, error) {
if !mirroringInfo.Primary {
// Return success if the below condition is met
// Local image is secondary
// Local image is in up+replaying state
// If the image is in a secondary and its state is up+replaying means
// its a healthy secondary and the image is primary somewhere in the
// remote cluster and the local image is getting replayed. Return
// success for the Disabling mirroring as we cannot disable mirroring
// on the secondary image, when the image on the primary site gets
// disabled the image on all the remote (secondary) clusters will get
// auto-deleted. This helps in garbage collecting the volume
// replication Kubernetes artifacts after failback operation.
localStatus, rErr := rbdVol.getLocalState()
if rErr != nil {
return nil, status.Error(codes.Internal, rErr.Error())
}
if localStatus.Up && localStatus.State == librbd.MirrorImageStatusStateReplaying {
return &replication.DisableVolumeReplicationResponse{}, nil
}
return nil, status.Errorf(codes.InvalidArgument,
"secondary image status is up=%t and state=%s",
localStatus.Up,
localStatus.State)
}
err := rbdVol.disableImageMirroring(force)
if err != nil {
return nil, status.Error(codes.Internal, err.Error())
}
// the image state can be still disabling once we disable the mirroring
// check the mirroring is disabled or not
mirroringInfo, err = rbdVol.getImageMirroringInfo()
if err != nil {
return nil, status.Error(codes.Internal, err.Error())
}
if mirroringInfo.State == librbd.MirrorImageDisabling {
return nil, status.Errorf(codes.Aborted, "%s is in disabling state", rbdVol.VolID)
}
return &replication.DisableVolumeReplicationResponse{}, nil
}
// PromoteVolume extracts the RBD volume information from the volumeID, If the
// image is present, mirroring is enabled and the image is in demoted state it
// will promote the volume as primary.
// If the image is already primary it will return success.
func (rs *ReplicationServer) PromoteVolume(ctx context.Context,
req *replication.PromoteVolumeRequest,
) (*replication.PromoteVolumeResponse, error) {
volumeID := req.GetVolumeId()
if volumeID == "" {
return nil, status.Error(codes.InvalidArgument, "empty volume ID in request")
}
cr, err := util.NewUserCredentials(req.GetSecrets())
if err != nil {
return nil, status.Error(codes.Internal, err.Error())
}
defer cr.DeleteCredentials()
if acquired := rs.VolumeLocks.TryAcquire(volumeID); !acquired {
log.ErrorLog(ctx, util.VolumeOperationAlreadyExistsFmt, volumeID)
return nil, status.Errorf(codes.Aborted, util.VolumeOperationAlreadyExistsFmt, volumeID)
}
defer rs.VolumeLocks.Release(volumeID)
rbdVol, err := GenVolFromVolID(ctx, volumeID, cr, req.GetSecrets())
defer rbdVol.Destroy()
if err != nil {
switch {
case errors.Is(err, ErrImageNotFound):
err = status.Errorf(codes.NotFound, "volume %s not found", volumeID)
case errors.Is(err, util.ErrPoolNotFound):
err = status.Errorf(codes.NotFound, "pool %s not found for %s", rbdVol.Pool, volumeID)
default:
err = status.Errorf(codes.Internal, err.Error())
}
return nil, err
}
mirroringInfo, err := rbdVol.getImageMirroringInfo()
if err != nil {
log.ErrorLog(ctx, err.Error())
return nil, status.Error(codes.Internal, err.Error())
}
if mirroringInfo.State != librbd.MirrorImageEnabled {
return nil, status.Errorf(
codes.InvalidArgument,
"mirroring is not enabled on %s, image is in %d Mode",
rbdVol.VolID,
mirroringInfo.State)
}
// promote secondary to primary
if !mirroringInfo.Primary {
if req.GetForce() {
// workaround for https://github.com/ceph/ceph-csi/issues/2736
// TODO: remove this workaround when the issue is fixed
err = rbdVol.forcePromoteImage(cr)
} else {
err = rbdVol.promoteImage(req.GetForce())
}
if err != nil {
log.ErrorLog(ctx, err.Error())
// In case of the DR the image on the primary site cannot be
// demoted as the cluster is down, during failover the image need
// to be force promoted. RBD returns `Device or resource busy`
// error message if the image cannot be promoted for above reason.
// Return FailedPrecondition so that replication operator can send
// request to force promote the image.
if strings.Contains(err.Error(), "Device or resource busy") {
return nil, status.Error(codes.FailedPrecondition, err.Error())
}
return nil, status.Error(codes.Internal, err.Error())
}
}
var mode librbd.ImageMirrorMode
mode, err = getMirroringMode(ctx, req.GetParameters())
if err != nil {
return nil, status.Errorf(codes.Internal, "failed to get mirroring mode %s", err.Error())
}
interval, startTime := getSchedulingDetails(req.GetParameters())
if interval != admin.NoInterval {
err = rbdVol.addSnapshotScheduling(interval, startTime)
if err != nil {
return nil, err
}
log.DebugLog(
ctx,
"Added scheduling at interval %s, start time %s for volume %s",
interval,
startTime,
rbdVol)
}
log.DebugLog(ctx, "attempting to tickle dummy image for restarting RBD schedules")
err = tickleMirroringOnDummyImage(rbdVol, mode)
if err != nil {
return nil, status.Errorf(codes.Internal, "failed to enable mirroring on dummy image %s", err.Error())
}
return &replication.PromoteVolumeResponse{}, nil
}
// DemoteVolume extracts the RBD volume information from the
// volumeID, If the image is present, mirroring is enabled and the
// image is in promoted state it will demote the volume as secondary.
// If the image is already secondary it will return success.
func (rs *ReplicationServer) DemoteVolume(ctx context.Context,
req *replication.DemoteVolumeRequest,
) (*replication.DemoteVolumeResponse, error) {
volumeID := req.GetVolumeId()
if volumeID == "" {
return nil, status.Error(codes.InvalidArgument, "empty volume ID in request")
}
cr, err := util.NewUserCredentials(req.GetSecrets())
if err != nil {
return nil, status.Error(codes.Internal, err.Error())
}
defer cr.DeleteCredentials()
if acquired := rs.VolumeLocks.TryAcquire(volumeID); !acquired {
log.ErrorLog(ctx, util.VolumeOperationAlreadyExistsFmt, volumeID)
return nil, status.Errorf(codes.Aborted, util.VolumeOperationAlreadyExistsFmt, volumeID)
}
defer rs.VolumeLocks.Release(volumeID)
rbdVol, err := GenVolFromVolID(ctx, volumeID, cr, req.GetSecrets())
defer rbdVol.Destroy()
if err != nil {
switch {
case errors.Is(err, ErrImageNotFound):
err = status.Errorf(codes.NotFound, "volume %s not found", volumeID)
case errors.Is(err, util.ErrPoolNotFound):
err = status.Errorf(codes.NotFound, "pool %s not found for %s", rbdVol.Pool, volumeID)
default:
err = status.Errorf(codes.Internal, err.Error())
}
return nil, err
}
mirroringInfo, err := rbdVol.getImageMirroringInfo()
if err != nil {
log.ErrorLog(ctx, err.Error())
return nil, status.Error(codes.Internal, err.Error())
}
if mirroringInfo.State != librbd.MirrorImageEnabled {
return nil, status.Errorf(
codes.InvalidArgument,
"mirroring is not enabled on %s, image is in %d Mode",
rbdVol.VolID,
mirroringInfo.State)
}
// demote image to secondary
if mirroringInfo.Primary {
err = rbdVol.demoteImage()
if err != nil {
log.ErrorLog(ctx, err.Error())
return nil, status.Error(codes.Internal, err.Error())
}
}
return &replication.DemoteVolumeResponse{}, nil
}
// checkRemoteSiteStatus checks the state of the remote cluster.
// It returns true if the state of the remote cluster is up and unknown.
func checkRemoteSiteStatus(ctx context.Context, mirrorStatus *librbd.GlobalMirrorImageStatus) bool {
ready := true
found := false
for _, s := range mirrorStatus.SiteStatuses {
log.UsefulLog(
ctx,
"peer site mirrorUUID=%q, daemon up=%t, mirroring state=%q, description=%q and lastUpdate=%d",
s.MirrorUUID,
s.Up,
s.State,
s.Description,
s.LastUpdate)
if s.MirrorUUID != "" {
found = true
// If ready is already "false" do not flip it based on another remote peer status
if ready && (s.State != librbd.MirrorImageStatusStateUnknown || !s.Up) {
ready = false
}
}
}
// Return readiness only if at least one remote peer status was processed
return found && ready
}
// ResyncVolume extracts the RBD volume information from the volumeID, If the
// image is present, mirroring is enabled and the image is in demoted state.
// If yes it will resync the image to correct the split-brain.
func (rs *ReplicationServer) ResyncVolume(ctx context.Context,
req *replication.ResyncVolumeRequest,
) (*replication.ResyncVolumeResponse, error) {
volumeID := req.GetVolumeId()
if volumeID == "" {
return nil, status.Error(codes.InvalidArgument, "empty volume ID in request")
}
cr, err := util.NewUserCredentials(req.GetSecrets())
if err != nil {
return nil, status.Error(codes.Internal, err.Error())
}
defer cr.DeleteCredentials()
if acquired := rs.VolumeLocks.TryAcquire(volumeID); !acquired {
log.ErrorLog(ctx, util.VolumeOperationAlreadyExistsFmt, volumeID)
return nil, status.Errorf(codes.Aborted, util.VolumeOperationAlreadyExistsFmt, volumeID)
}
defer rs.VolumeLocks.Release(volumeID)
rbdVol, err := GenVolFromVolID(ctx, volumeID, cr, req.GetSecrets())
defer rbdVol.Destroy()
if err != nil {
switch {
case errors.Is(err, ErrImageNotFound):
err = status.Errorf(codes.NotFound, "volume %s not found", volumeID)
case errors.Is(err, util.ErrPoolNotFound):
err = status.Errorf(codes.NotFound, "pool %s not found for %s", rbdVol.Pool, volumeID)
default:
err = status.Errorf(codes.Internal, err.Error())
}
return nil, err
}
mirroringInfo, err := rbdVol.getImageMirroringInfo()
if err != nil {
// in case of Resync the image will get deleted and gets recreated and
// it takes time for this operation.
log.ErrorLog(ctx, err.Error())
return nil, status.Error(codes.Aborted, err.Error())
}
if mirroringInfo.State != librbd.MirrorImageEnabled {
return nil, status.Error(codes.InvalidArgument, "image mirroring is not enabled")
}
// return error if the image is still primary
if mirroringInfo.Primary {
return nil, status.Error(codes.InvalidArgument, "image is in primary state")
}
mirrorStatus, err := rbdVol.getImageMirroringStatus()
if err != nil {
// the image gets recreated after issuing resync
if errors.Is(err, ErrImageNotFound) {
// caller retries till RBD syncs an initial version of the image to
// report its status in the resync call. Ideally, this line will not
// be executed as the error would get returned due to getImageMirroringInfo
// failing to find an image above.
return nil, status.Error(codes.Aborted, err.Error())
}
log.ErrorLog(ctx, err.Error())
return nil, status.Error(codes.Internal, err.Error())
}
ready := false
localStatus, err := mirrorStatus.LocalStatus()
if err != nil {
log.ErrorLog(ctx, err.Error())
return nil, fmt.Errorf("failed to get local status: %w", err)
}
// convert the last update time to UTC
lastUpdateTime := time.Unix(localStatus.LastUpdate, 0).UTC()
log.UsefulLog(
ctx,
"local status: daemon up=%t, image mirroring state=%q, description=%q and lastUpdate=%s",
localStatus.Up,
localStatus.State,
localStatus.Description,
lastUpdateTime)
rbd: mark image ready when image state is up+unknown To recover from split brain (up+error) state the image need to be demoted and requested for resync on site-a and then the image on site-b should gets demoted.The volume should be marked to ready=true when the image state on both the clusters are up+unknown because during the last snapshot syncing the data gets copied first and then image state on the site-a changes to up+unknown. If the image state on both the sites are up+unknown consider that complete data is synced as the last snapshot gets exchanged between the clusters. * create 10 GB of file and validate the data after resync * Do Failover when the site-a goes down * Force promote the image and write data in GiB * Once the site-a comes back, Demote the image and issue resync * Demote the image on site-b * The status will get reflected on the other site when the last snapshot sync happens * The image will go to up+unknown state. and complete data will be copied to site a * Promote the image on site-a and use it ```bash csi-vol-5633715e-a7eb-11eb-bebb-0242ac110006: global_id: e7f9ec55-06ab-46cb-a1ae-784be75ed96d state: up+unknown description: remote image demoted service: a on minicluster1 last_update: 2021-04-28 07:11:56 peer_sites: name: e47e29f4-96e8-44ed-b6c6-edf15c5a91d6-rook-ceph state: up+unknown description: remote image demoted last_update: 2021-04-28 07:11:41 ``` * Do Failover when the site-a goes down * Force promote the image on site-b and write data in GiB * Demote the image on site-b * Once the site-a comes back, Demote the image on site-a * The images on the both site will go to split brain state ```bash csi-vol-37effcb5-a7f1-11eb-bebb-0242ac110006: global_id: 115c3df9-3d4f-4c04-93a7-531b82155ddf state: up+error description: split-brain service: a on minicluster2 last_update: 2021-04-28 07:25:41 peer_sites: name: abbda0f0-0117-4425-8cb2-deb4c853da47-rook-ceph state: up+error description: split-brain last_update: 2021-04-28 07:25:26 ``` * Issue resync * The images cannot be resynced because when we issue resync on site a the image on site-b was in demoted state * To recover from this state (promote and then demote the image on site-b after sometime) ```bash csi-vol-37effcb5-a7f1-11eb-bebb-0242ac110006: global_id: 115c3df9-3d4f-4c04-93a7-531b82155ddf state: up+unknown description: remote image demoted service: a on minicluster1 last_update: 2021-04-28 07:32:56 peer_sites: name: e47e29f4-96e8-44ed-b6c6-edf15c5a91d6-rook-ceph state: up+unknown description: remote image demoted last_update: 2021-04-28 07:32:41 ``` * Once the data is copied we can see that the image state is moved to up+unknown on both sites * Promote the image on site-a and use it Signed-off-by: Madhu Rajanna <madhupr007@gmail.com>
2021-04-28 07:46:00 +00:00
// To recover from split brain (up+error) state the image need to be
// demoted and requested for resync on site-a and then the image on site-b
// should be demoted. The volume should be marked to ready=true when the
// image state on both the clusters are up+unknown because during the last
// snapshot syncing the data gets copied first and then image state on the
// site-a changes to up+unknown.
// If the image state on both the sites are up+unknown consider that
// complete data is synced as the last snapshot
// gets exchanged between the clusters.
if localStatus.State == librbd.MirrorImageStatusStateUnknown && localStatus.Up {
ready = checkRemoteSiteStatus(ctx, mirrorStatus)
}
err = resyncVolume(localStatus, rbdVol, req.Force)
if err != nil {
log.ErrorLog(ctx, err.Error())
return nil, err
}
err = checkVolumeResyncStatus(localStatus)
if err != nil {
return nil, status.Error(codes.Internal, err.Error())
}
rbd: repair imageid after resync During resync operation the local image will get deleted and a new image is recreated by the rbd mirroring. The new image will have a new imageID. Once resync is completed update the imageID in the OMAP to get the image removed from the trash during DeleteVolume. Before resyncing ``` sh-4.4# rbd info replicapool/csi-vol-0c25bdd3-485f-11ec-bd30-0242ac110004 rbd image 'csi-vol-0c25bdd3-485f-11ec-bd30-0242ac110004': size 1 GiB in 256 objects order 22 (4 MiB objects) snapshot_count: 1 id: 1efcc6b7a769 block_name_prefix: rbd_data.1efcc6b7a769 format: 2 features: layering op_features: flags: create_timestamp: Thu Nov 18 11:02:40 2021 access_timestamp: Thu Nov 18 11:02:40 2021 modify_timestamp: Thu Nov 18 11:02:40 2021 mirroring state: enabled mirroring mode: snapshot mirroring global id: 9c4c236d-8a47-4779-b4f6-94e05da70dbd mirroring primary: true ``` ``` sh-4.4# rados listomapvals csi.volume.0c25bdd3-485f-11ec-bd30-0242ac110004 --pool=replicapool csi.imageid value (12 bytes) : 00000000 31 65 66 63 63 36 62 37 61 37 36 39 |1efcc6b7a769| 0000000c csi.imagename value (44 bytes) : 00000000 63 73 69 2d 76 6f 6c 2d 30 63 32 35 62 64 64 33 |csi-vol-0c25bdd3| 00000010 2d 34 38 35 66 2d 31 31 65 63 2d 62 64 33 30 2d |-485f-11ec-bd30-| 00000020 30 32 34 32 61 63 31 31 30 30 30 34 |0242ac110004| 0000002c csi.volname value (40 bytes) : 00000000 70 76 63 2d 32 36 38 39 33 66 30 38 2d 66 66 32 |pvc-26893f08-ff2| 00000010 62 2d 34 61 30 66 2d 61 35 63 33 2d 38 38 34 62 |b-4a0f-a5c3-884b| 00000020 37 32 30 66 66 62 32 63 |720ffb2c| 00000028 csi.volume.owner value (7 bytes) : 00000000 64 65 66 61 75 6c 74 |default| 00000007 ``` After Resyncing ``` sh-4.4# rbd info replicapool/csi-vol-0c25bdd3-485f-11ec-bd30-0242ac110004 rbd image 'csi-vol-0c25bdd3-485f-11ec-bd30-0242ac110004': size 1 GiB in 256 objects order 22 (4 MiB objects) snapshot_count: 1 id: 10b183a48a97 block_name_prefix: rbd_data.10b183a48a97 format: 2 features: layering, non-primary op_features: flags: create_timestamp: Thu Nov 18 11:09:39 2021 access_timestamp: Thu Nov 18 11:09:39 2021 modify_timestamp: Thu Nov 18 11:09:39 2021 mirroring state: enabled mirroring mode: snapshot mirroring global id: 9c4c236d-8a47-4779-b4f6-94e05da70dbd mirroring primary: false sh-4.4# rados listomapvals csi.volume.0c25bdd3-485f-11ec-bd30-0242ac110004 --pool=replicapool csi.imageid value (12 bytes) : 00000000 31 30 62 31 38 33 61 34 38 61 39 37 |10b183a48a97| 0000000c csi.imagename value (44 bytes) : 00000000 63 73 69 2d 76 6f 6c 2d 30 63 32 35 62 64 64 33 |csi-vol-0c25bdd3| 00000010 2d 34 38 35 66 2d 31 31 65 63 2d 62 64 33 30 2d |-485f-11ec-bd30-| 00000020 30 32 34 32 61 63 31 31 30 30 30 34 |0242ac110004| 0000002c csi.volname value (40 bytes) : 00000000 70 76 63 2d 32 36 38 39 33 66 30 38 2d 66 66 32 |pvc-26893f08-ff2| 00000010 62 2d 34 61 30 66 2d 61 35 63 33 2d 38 38 34 62 |b-4a0f-a5c3-884b| 00000020 37 32 30 66 66 62 32 63 |720ffb2c| 00000028 csi.volume.owner value (7 bytes) : 00000000 64 65 66 61 75 6c 74 |default| 00000007 ``` Signed-off-by: Madhu Rajanna <madhupr007@gmail.com>
2021-11-18 10:59:50 +00:00
err = repairResyncedImageID(ctx, rbdVol, ready)
if err != nil {
return nil, status.Errorf(codes.Internal, "failed to resync Image ID: %s", err.Error())
}
resp := &replication.ResyncVolumeResponse{
Ready: ready,
}
return resp, nil
}
// GetVolumeReplicationInfo extracts the RBD volume information from the volumeID, If the
// image is present, mirroring is enabled and the image is in primary state.
func (rs *ReplicationServer) GetVolumeReplicationInfo(ctx context.Context,
req *replication.GetVolumeReplicationInfoRequest,
) (*replication.GetVolumeReplicationInfoResponse, error) {
volumeID := req.GetVolumeId()
if volumeID == "" {
return nil, status.Error(codes.InvalidArgument, "empty volume ID in request")
}
cr, err := util.NewUserCredentials(req.GetSecrets())
if err != nil {
return nil, status.Error(codes.Internal, err.Error())
}
defer cr.DeleteCredentials()
if acquired := rs.VolumeLocks.TryAcquire(volumeID); !acquired {
log.ErrorLog(ctx, util.VolumeOperationAlreadyExistsFmt, volumeID)
return nil, status.Errorf(codes.Aborted, util.VolumeOperationAlreadyExistsFmt, volumeID)
}
defer rs.VolumeLocks.Release(volumeID)
rbdVol, err := GenVolFromVolID(ctx, volumeID, cr, req.GetSecrets())
defer rbdVol.Destroy()
if err != nil {
switch {
case errors.Is(err, ErrImageNotFound):
err = status.Errorf(codes.NotFound, "volume %s not found", volumeID)
case errors.Is(err, util.ErrPoolNotFound):
err = status.Errorf(codes.NotFound, "pool %s not found for %s", rbdVol.Pool, volumeID)
default:
err = status.Errorf(codes.Internal, err.Error())
}
return nil, err
}
mirroringInfo, err := rbdVol.getImageMirroringInfo()
if err != nil {
log.ErrorLog(ctx, err.Error())
return nil, status.Error(codes.Aborted, err.Error())
}
if mirroringInfo.State != librbd.MirrorImageEnabled {
return nil, status.Error(codes.InvalidArgument, "image mirroring is not enabled")
}
// return error if the image is not in primary state
if !mirroringInfo.Primary {
return nil, status.Error(codes.InvalidArgument, "image is not in primary state")
}
mirrorStatus, err := rbdVol.getImageMirroringStatus()
if err != nil {
if errors.Is(err, ErrImageNotFound) {
return nil, status.Error(codes.Aborted, err.Error())
}
log.ErrorLog(ctx, err.Error())
return nil, status.Error(codes.Internal, err.Error())
}
remoteStatus, err := RemoteStatus(mirrorStatus)
if err != nil {
log.ErrorLog(ctx, err.Error())
return nil, fmt.Errorf("failed to get remote status: %w", err)
}
description := remoteStatus.Description
lastSyncTime, err := getLastSyncTime(description)
if err != nil {
return nil, fmt.Errorf("failed to get last sync time: %w", err)
}
resp := &replication.GetVolumeReplicationInfoResponse{
LastSyncTime: lastSyncTime,
}
return resp, nil
}
// RemoteStatus returns one SiteMirrorImageStatus item from the SiteStatuses
// slice that corresponds to the remote site's status. If the remote status
// is not found than the error ErrNotExist will be returned.
func RemoteStatus(gmis *librbd.GlobalMirrorImageStatus) (librbd.SiteMirrorImageStatus, error) {
var (
ss librbd.SiteMirrorImageStatus
err error = librbd.ErrNotExist
)
for i := range gmis.SiteStatuses {
if gmis.SiteStatuses[i].MirrorUUID != "" {
ss = gmis.SiteStatuses[i]
err = nil
break
}
}
return ss, err
}
// This function gets the local snapshot time from the description
// of localStatus and converts it into required type.
func getLastSyncTime(description string) (*timestamppb.Timestamp, error) {
// Format of the description will be as followed:
// description = "replaying,{"bytes_per_second":0.0,
// "bytes_per_snapshot":149504.0,"local_snapshot_timestamp":1662655501
// ,"remote_snapshot_timestamp":1662655501}"
// In case there is no local snapshot timestamp we can pass the default value
if description == "" {
return nil, nil
}
splittedString := strings.SplitN(description, ",", 2)
if len(splittedString) == 1 {
return nil, nil
}
type localStatus struct {
LocalSnapshotTime int64 `json:"local_snapshot_timestamp"`
}
var localSnapTime localStatus
err := json.Unmarshal([]byte(splittedString[1]), &localSnapTime)
if err != nil {
return nil, fmt.Errorf("failed to unmarshal description: %w", err)
}
lastUpdateTime := time.Unix(localSnapTime.LocalSnapshotTime, 0)
lastSyncTime := timestamppb.New(lastUpdateTime)
return lastSyncTime, nil
}
func resyncVolume(localStatus librbd.SiteMirrorImageStatus, rbdVol *rbdVolume, force bool) error {
if resyncRequired(localStatus) {
// If the force option is not set return the error message to retry
// with Force option.
if !force {
return status.Errorf(codes.FailedPrecondition,
"image is in %q state, description (%s). Force resync to recover volume",
localStatus.State, localStatus.Description)
}
err := rbdVol.resyncImage()
if err != nil {
return status.Error(codes.Internal, err.Error())
}
// If we issued a resync, return a non-final error as image needs to be recreated
// locally. Caller retries till RBD syncs an initial version of the image to
// report its status in the resync request.
return status.Error(codes.Unavailable, "awaiting initial resync due to split brain")
}
return nil
}
func checkVolumeResyncStatus(localStatus librbd.SiteMirrorImageStatus) error {
// we are considering 2 states to check resync started and resync completed
// as below. all other states will be considered as an error state so that
// cephCSI can return error message and volume replication operator can
// mark the VolumeReplication status as not resyncing for the volume.
// If the state is Replaying means the resync is going on.
// Once the volume on remote cluster is demoted and resync
// is completed the image state will be moved to UNKNOWN.
// RBD mirror daemon should be always running on the primary cluster.
if !localStatus.Up || (localStatus.State != librbd.MirrorImageStatusStateReplaying &&
localStatus.State != librbd.MirrorImageStatusStateUnknown) {
return fmt.Errorf(
"not resyncing. Local status: daemon up=%t image is in %q state",
localStatus.Up, localStatus.State)
}
return nil
}
// resyncRequired returns true if local image is in split-brain state and image
// needs resync.
func resyncRequired(localStatus librbd.SiteMirrorImageStatus) bool {
// resync is required if the image is in error state or the description
// contains split-brain message.
// In some corner cases like `re-player shutdown` the local image will not
// be in an error state. It would be also worth considering the `description`
// field to make sure about split-brain.
if localStatus.State == librbd.MirrorImageStatusStateError ||
strings.Contains(localStatus.Description, "split-brain") {
return true
}
return false
}
rbd: repair imageid after resync During resync operation the local image will get deleted and a new image is recreated by the rbd mirroring. The new image will have a new imageID. Once resync is completed update the imageID in the OMAP to get the image removed from the trash during DeleteVolume. Before resyncing ``` sh-4.4# rbd info replicapool/csi-vol-0c25bdd3-485f-11ec-bd30-0242ac110004 rbd image 'csi-vol-0c25bdd3-485f-11ec-bd30-0242ac110004': size 1 GiB in 256 objects order 22 (4 MiB objects) snapshot_count: 1 id: 1efcc6b7a769 block_name_prefix: rbd_data.1efcc6b7a769 format: 2 features: layering op_features: flags: create_timestamp: Thu Nov 18 11:02:40 2021 access_timestamp: Thu Nov 18 11:02:40 2021 modify_timestamp: Thu Nov 18 11:02:40 2021 mirroring state: enabled mirroring mode: snapshot mirroring global id: 9c4c236d-8a47-4779-b4f6-94e05da70dbd mirroring primary: true ``` ``` sh-4.4# rados listomapvals csi.volume.0c25bdd3-485f-11ec-bd30-0242ac110004 --pool=replicapool csi.imageid value (12 bytes) : 00000000 31 65 66 63 63 36 62 37 61 37 36 39 |1efcc6b7a769| 0000000c csi.imagename value (44 bytes) : 00000000 63 73 69 2d 76 6f 6c 2d 30 63 32 35 62 64 64 33 |csi-vol-0c25bdd3| 00000010 2d 34 38 35 66 2d 31 31 65 63 2d 62 64 33 30 2d |-485f-11ec-bd30-| 00000020 30 32 34 32 61 63 31 31 30 30 30 34 |0242ac110004| 0000002c csi.volname value (40 bytes) : 00000000 70 76 63 2d 32 36 38 39 33 66 30 38 2d 66 66 32 |pvc-26893f08-ff2| 00000010 62 2d 34 61 30 66 2d 61 35 63 33 2d 38 38 34 62 |b-4a0f-a5c3-884b| 00000020 37 32 30 66 66 62 32 63 |720ffb2c| 00000028 csi.volume.owner value (7 bytes) : 00000000 64 65 66 61 75 6c 74 |default| 00000007 ``` After Resyncing ``` sh-4.4# rbd info replicapool/csi-vol-0c25bdd3-485f-11ec-bd30-0242ac110004 rbd image 'csi-vol-0c25bdd3-485f-11ec-bd30-0242ac110004': size 1 GiB in 256 objects order 22 (4 MiB objects) snapshot_count: 1 id: 10b183a48a97 block_name_prefix: rbd_data.10b183a48a97 format: 2 features: layering, non-primary op_features: flags: create_timestamp: Thu Nov 18 11:09:39 2021 access_timestamp: Thu Nov 18 11:09:39 2021 modify_timestamp: Thu Nov 18 11:09:39 2021 mirroring state: enabled mirroring mode: snapshot mirroring global id: 9c4c236d-8a47-4779-b4f6-94e05da70dbd mirroring primary: false sh-4.4# rados listomapvals csi.volume.0c25bdd3-485f-11ec-bd30-0242ac110004 --pool=replicapool csi.imageid value (12 bytes) : 00000000 31 30 62 31 38 33 61 34 38 61 39 37 |10b183a48a97| 0000000c csi.imagename value (44 bytes) : 00000000 63 73 69 2d 76 6f 6c 2d 30 63 32 35 62 64 64 33 |csi-vol-0c25bdd3| 00000010 2d 34 38 35 66 2d 31 31 65 63 2d 62 64 33 30 2d |-485f-11ec-bd30-| 00000020 30 32 34 32 61 63 31 31 30 30 30 34 |0242ac110004| 0000002c csi.volname value (40 bytes) : 00000000 70 76 63 2d 32 36 38 39 33 66 30 38 2d 66 66 32 |pvc-26893f08-ff2| 00000010 62 2d 34 61 30 66 2d 61 35 63 33 2d 38 38 34 62 |b-4a0f-a5c3-884b| 00000020 37 32 30 66 66 62 32 63 |720ffb2c| 00000028 csi.volume.owner value (7 bytes) : 00000000 64 65 66 61 75 6c 74 |default| 00000007 ``` Signed-off-by: Madhu Rajanna <madhupr007@gmail.com>
2021-11-18 10:59:50 +00:00
// repairResyncedImageID updates the existing image ID with new one.
func repairResyncedImageID(ctx context.Context, rv *rbdVolume, ready bool) error {
// During resync operation the local image will get deleted and a new
// image is recreated by the rbd mirroring. The new image will have a
// new image ID. Once resync is completed update the image ID in the OMAP
// to get the image removed from the trash during DeleteVolume.
// if the image is not completely resynced skip repairing image ID.
if !ready {
return nil
}
j, err := volJournal.Connect(rv.Monitors, rv.RadosNamespace, rv.conn.Creds)
if err != nil {
return err
}
defer j.Destroy()
// reset the image ID which is stored in the existing OMAP
return rv.repairImageID(ctx, j, true)
}