diff --git a/internal/rbd/replicationcontrollerserver.go b/internal/csi-addons/rbd/replication.go similarity index 80% rename from internal/rbd/replicationcontrollerserver.go rename to internal/csi-addons/rbd/replication.go index 83d516633..28aaed603 100644 --- a/internal/rbd/replicationcontrollerserver.go +++ b/internal/csi-addons/rbd/replication.go @@ -26,6 +26,7 @@ import ( "strings" "time" + corerbd "github.com/ceph/ceph-csi/internal/rbd" "github.com/ceph/ceph-csi/internal/util" "github.com/ceph/ceph-csi/internal/util/log" @@ -38,7 +39,7 @@ import ( "google.golang.org/protobuf/types/known/timestamppb" ) -// imageMirroringMode is used to indicate the mirroring mode for an RBD image. +// imageMirroringMode is used to indicate the mirroring mode for an RBD image. type imageMirroringMode string const ( @@ -78,7 +79,7 @@ type ReplicationServer struct { // compatibility. *replication.UnimplementedControllerServer // Embed ControllerServer as it implements helper functions - *ControllerServer + *corerbd.ControllerServer } func (rs *ReplicationServer) RegisterService(server grpc.ServiceRegistrar) { @@ -229,11 +230,11 @@ func (rs *ReplicationServer) EnableVolumeReplication(ctx context.Context, } defer rs.VolumeLocks.Release(volumeID) - rbdVol, err := GenVolFromVolID(ctx, volumeID, cr, req.GetSecrets()) + rbdVol, err := corerbd.GenVolFromVolID(ctx, volumeID, cr, req.GetSecrets()) defer rbdVol.Destroy() if err != nil { switch { - case errors.Is(err, ErrImageNotFound): + case errors.Is(err, corerbd.ErrImageNotFound): err = status.Errorf(codes.NotFound, "volume %s not found", volumeID) case errors.Is(err, util.ErrPoolNotFound): err = status.Errorf(codes.NotFound, "pool %s not found for %s", rbdVol.Pool, volumeID) @@ -249,7 +250,7 @@ func (rs *ReplicationServer) EnableVolumeReplication(ctx context.Context, return nil, err } - mirroringInfo, err := rbdVol.getImageMirroringInfo() + mirroringInfo, err := rbdVol.GetImageMirroringInfo() if err != nil { log.ErrorLog(ctx, err.Error()) @@ -257,7 +258,7 @@ func (rs *ReplicationServer) EnableVolumeReplication(ctx context.Context, } if mirroringInfo.State != librbd.MirrorImageEnabled { - err = rbdVol.enableImageMirroring(mirroringMode) + err = rbdVol.EnableImageMirroring(mirroringMode) if err != nil { log.ErrorLog(ctx, err.Error()) @@ -291,11 +292,11 @@ func (rs *ReplicationServer) DisableVolumeReplication(ctx context.Context, } defer rs.VolumeLocks.Release(volumeID) - rbdVol, err := GenVolFromVolID(ctx, volumeID, cr, req.GetSecrets()) + rbdVol, err := corerbd.GenVolFromVolID(ctx, volumeID, cr, req.GetSecrets()) defer rbdVol.Destroy() if err != nil { switch { - case errors.Is(err, ErrImageNotFound): + case errors.Is(err, corerbd.ErrImageNotFound): err = status.Errorf(codes.NotFound, "volume %s not found", volumeID) case errors.Is(err, util.ErrPoolNotFound): err = status.Errorf(codes.NotFound, "pool %s not found for %s", rbdVol.Pool, volumeID) @@ -311,7 +312,7 @@ func (rs *ReplicationServer) DisableVolumeReplication(ctx context.Context, return nil, err } - mirroringInfo, err := rbdVol.getImageMirroringInfo() + mirroringInfo, err := rbdVol.GetImageMirroringInfo() if err != nil { log.ErrorLog(ctx, err.Error()) @@ -325,7 +326,7 @@ func (rs *ReplicationServer) DisableVolumeReplication(ctx context.Context, case librbd.MirrorImageDisabling: return nil, status.Errorf(codes.Aborted, "%s is in disabling state", volumeID) case librbd.MirrorImageEnabled: - return disableVolumeReplication(rbdVol, mirroringInfo, force) + return corerbd.DisableVolumeReplication(rbdVol, mirroringInfo, force) default: return nil, status.Errorf(codes.InvalidArgument, "image is in %s Mode", mirroringInfo.State) } @@ -333,53 +334,6 @@ func (rs *ReplicationServer) DisableVolumeReplication(ctx context.Context, return &replication.DisableVolumeReplicationResponse{}, nil } -func disableVolumeReplication(rbdVol *rbdVolume, - mirroringInfo *librbd.MirrorImageInfo, - force bool, -) (*replication.DisableVolumeReplicationResponse, error) { - if !mirroringInfo.Primary { - // Return success if the below condition is met - // Local image is secondary - // Local image is in up+replaying state - - // If the image is in a secondary and its state is up+replaying means - // its a healthy secondary and the image is primary somewhere in the - // remote cluster and the local image is getting replayed. Return - // success for the Disabling mirroring as we cannot disable mirroring - // on the secondary image, when the image on the primary site gets - // disabled the image on all the remote (secondary) clusters will get - // auto-deleted. This helps in garbage collecting the volume - // replication Kubernetes artifacts after failback operation. - localStatus, rErr := rbdVol.getLocalState() - if rErr != nil { - return nil, status.Error(codes.Internal, rErr.Error()) - } - if localStatus.Up && localStatus.State == librbd.MirrorImageStatusStateReplaying { - return &replication.DisableVolumeReplicationResponse{}, nil - } - - return nil, status.Errorf(codes.InvalidArgument, - "secondary image status is up=%t and state=%s", - localStatus.Up, - localStatus.State) - } - err := rbdVol.disableImageMirroring(force) - if err != nil { - return nil, status.Error(codes.Internal, err.Error()) - } - // the image state can be still disabling once we disable the mirroring - // check the mirroring is disabled or not - mirroringInfo, err = rbdVol.getImageMirroringInfo() - if err != nil { - return nil, status.Error(codes.Internal, err.Error()) - } - if mirroringInfo.State == librbd.MirrorImageDisabling { - return nil, status.Errorf(codes.Aborted, "%s is in disabling state", rbdVol.VolID) - } - - return &replication.DisableVolumeReplicationResponse{}, nil -} - // PromoteVolume extracts the RBD volume information from the volumeID, If the // image is present, mirroring is enabled and the image is in demoted state it // will promote the volume as primary. @@ -404,11 +358,11 @@ func (rs *ReplicationServer) PromoteVolume(ctx context.Context, } defer rs.VolumeLocks.Release(volumeID) - rbdVol, err := GenVolFromVolID(ctx, volumeID, cr, req.GetSecrets()) + rbdVol, err := corerbd.GenVolFromVolID(ctx, volumeID, cr, req.GetSecrets()) defer rbdVol.Destroy() if err != nil { switch { - case errors.Is(err, ErrImageNotFound): + case errors.Is(err, corerbd.ErrImageNotFound): err = status.Errorf(codes.NotFound, "volume %s not found", volumeID) case errors.Is(err, util.ErrPoolNotFound): err = status.Errorf(codes.NotFound, "pool %s not found for %s", rbdVol.Pool, volumeID) @@ -419,7 +373,7 @@ func (rs *ReplicationServer) PromoteVolume(ctx context.Context, return nil, err } - mirroringInfo, err := rbdVol.getImageMirroringInfo() + mirroringInfo, err := rbdVol.GetImageMirroringInfo() if err != nil { log.ErrorLog(ctx, err.Error()) @@ -439,9 +393,9 @@ func (rs *ReplicationServer) PromoteVolume(ctx context.Context, if req.GetForce() { // workaround for https://github.com/ceph/ceph-csi/issues/2736 // TODO: remove this workaround when the issue is fixed - err = rbdVol.forcePromoteImage(cr) + err = rbdVol.ForcePromoteImage(cr) } else { - err = rbdVol.promoteImage(req.GetForce()) + err = rbdVol.PromoteImage(req.GetForce()) } if err != nil { log.ErrorLog(ctx, err.Error()) @@ -461,7 +415,7 @@ func (rs *ReplicationServer) PromoteVolume(ctx context.Context, interval, startTime := getSchedulingDetails(req.GetParameters()) if interval != admin.NoInterval { - err = rbdVol.addSnapshotScheduling(interval, startTime) + err = rbdVol.AddSnapshotScheduling(interval, startTime) if err != nil { return nil, err } @@ -500,11 +454,11 @@ func (rs *ReplicationServer) DemoteVolume(ctx context.Context, } defer rs.VolumeLocks.Release(volumeID) - rbdVol, err := GenVolFromVolID(ctx, volumeID, cr, req.GetSecrets()) + rbdVol, err := corerbd.GenVolFromVolID(ctx, volumeID, cr, req.GetSecrets()) defer rbdVol.Destroy() if err != nil { switch { - case errors.Is(err, ErrImageNotFound): + case errors.Is(err, corerbd.ErrImageNotFound): err = status.Errorf(codes.NotFound, "volume %s not found", volumeID) case errors.Is(err, util.ErrPoolNotFound): err = status.Errorf(codes.NotFound, "pool %s not found for %s", rbdVol.Pool, volumeID) @@ -514,7 +468,7 @@ func (rs *ReplicationServer) DemoteVolume(ctx context.Context, return nil, err } - mirroringInfo, err := rbdVol.getImageMirroringInfo() + mirroringInfo, err := rbdVol.GetImageMirroringInfo() if err != nil { log.ErrorLog(ctx, err.Error()) @@ -531,7 +485,7 @@ func (rs *ReplicationServer) DemoteVolume(ctx context.Context, // demote image to secondary if mirroringInfo.Primary { - err = rbdVol.demoteImage() + err = rbdVol.DemoteImage() if err != nil { log.ErrorLog(ctx, err.Error()) @@ -591,11 +545,11 @@ func (rs *ReplicationServer) ResyncVolume(ctx context.Context, return nil, status.Errorf(codes.Aborted, util.VolumeOperationAlreadyExistsFmt, volumeID) } defer rs.VolumeLocks.Release(volumeID) - rbdVol, err := GenVolFromVolID(ctx, volumeID, cr, req.GetSecrets()) + rbdVol, err := corerbd.GenVolFromVolID(ctx, volumeID, cr, req.GetSecrets()) defer rbdVol.Destroy() if err != nil { switch { - case errors.Is(err, ErrImageNotFound): + case errors.Is(err, corerbd.ErrImageNotFound): err = status.Errorf(codes.NotFound, "volume %s not found", volumeID) case errors.Is(err, util.ErrPoolNotFound): err = status.Errorf(codes.NotFound, "pool %s not found for %s", rbdVol.Pool, volumeID) @@ -606,7 +560,7 @@ func (rs *ReplicationServer) ResyncVolume(ctx context.Context, return nil, err } - mirroringInfo, err := rbdVol.getImageMirroringInfo() + mirroringInfo, err := rbdVol.GetImageMirroringInfo() if err != nil { // in case of Resync the image will get deleted and gets recreated and // it takes time for this operation. @@ -624,10 +578,10 @@ func (rs *ReplicationServer) ResyncVolume(ctx context.Context, return nil, status.Error(codes.InvalidArgument, "image is in primary state") } - mirrorStatus, err := rbdVol.getImageMirroringStatus() + mirrorStatus, err := rbdVol.GetImageMirroringStatus() if err != nil { // the image gets recreated after issuing resync - if errors.Is(err, ErrImageNotFound) { + if errors.Is(err, corerbd.ErrImageNotFound) { // caller retries till RBD syncs an initial version of the image to // report its status in the resync call. Ideally, this line will not // be executed as the error would get returned due to getImageMirroringInfo @@ -671,7 +625,7 @@ func (rs *ReplicationServer) ResyncVolume(ctx context.Context, ready = checkRemoteSiteStatus(ctx, mirrorStatus) } - err = resyncVolume(localStatus, rbdVol, req.Force) + err = rbdVol.ResyncVol(localStatus, req.Force) if err != nil { log.ErrorLog(ctx, err.Error()) @@ -683,7 +637,7 @@ func (rs *ReplicationServer) ResyncVolume(ctx context.Context, return nil, status.Error(codes.Internal, err.Error()) } - err = repairResyncedImageID(ctx, rbdVol, ready) + err = rbdVol.RepairResyncedImageID(ctx, ready) if err != nil { return nil, status.Errorf(codes.Internal, "failed to resync Image ID: %s", err.Error()) } @@ -716,11 +670,11 @@ func (rs *ReplicationServer) GetVolumeReplicationInfo(ctx context.Context, return nil, status.Errorf(codes.Aborted, util.VolumeOperationAlreadyExistsFmt, volumeID) } defer rs.VolumeLocks.Release(volumeID) - rbdVol, err := GenVolFromVolID(ctx, volumeID, cr, req.GetSecrets()) + rbdVol, err := corerbd.GenVolFromVolID(ctx, volumeID, cr, req.GetSecrets()) defer rbdVol.Destroy() if err != nil { switch { - case errors.Is(err, ErrImageNotFound): + case errors.Is(err, corerbd.ErrImageNotFound): err = status.Errorf(codes.NotFound, "volume %s not found", volumeID) case errors.Is(err, util.ErrPoolNotFound): err = status.Errorf(codes.NotFound, "pool %s not found for %s", rbdVol.Pool, volumeID) @@ -731,7 +685,7 @@ func (rs *ReplicationServer) GetVolumeReplicationInfo(ctx context.Context, return nil, err } - mirroringInfo, err := rbdVol.getImageMirroringInfo() + mirroringInfo, err := rbdVol.GetImageMirroringInfo() if err != nil { log.ErrorLog(ctx, err.Error()) @@ -747,9 +701,9 @@ func (rs *ReplicationServer) GetVolumeReplicationInfo(ctx context.Context, return nil, status.Error(codes.InvalidArgument, "image is not in primary state") } - mirrorStatus, err := rbdVol.getImageMirroringStatus() + mirrorStatus, err := rbdVol.GetImageMirroringStatus() if err != nil { - if errors.Is(err, ErrImageNotFound) { + if errors.Is(err, corerbd.ErrImageNotFound) { return nil, status.Error(codes.Aborted, err.Error()) } log.ErrorLog(ctx, err.Error()) @@ -767,7 +721,7 @@ func (rs *ReplicationServer) GetVolumeReplicationInfo(ctx context.Context, description := remoteStatus.Description lastSyncTime, err := getLastSyncTime(description) if err != nil { - if errors.Is(err, ErrLastSyncTimeNotFound) { + if errors.Is(err, corerbd.ErrLastSyncTimeNotFound) { return nil, status.Errorf(codes.NotFound, "failed to get last sync time: %v", err) } log.ErrorLog(ctx, err.Error()) @@ -812,11 +766,11 @@ func getLastSyncTime(description string) (*timestamppb.Timestamp, error) { // In case there is no local snapshot timestamp return an error as the // LastSyncTime is required. if description == "" { - return nil, fmt.Errorf("empty description: %w", ErrLastSyncTimeNotFound) + return nil, fmt.Errorf("empty description: %w", corerbd.ErrLastSyncTimeNotFound) } splittedString := strings.SplitN(description, ",", 2) if len(splittedString) == 1 { - return nil, fmt.Errorf("no local snapshot timestamp: %w", ErrLastSyncTimeNotFound) + return nil, fmt.Errorf("no local snapshot timestamp: %w", corerbd.ErrLastSyncTimeNotFound) } type localStatus struct { LocalSnapshotTime int64 `json:"local_snapshot_timestamp"` @@ -831,7 +785,7 @@ func getLastSyncTime(description string) (*timestamppb.Timestamp, error) { // If the json unmarsal is successful but the local snapshot time is 0, we // need to consider it as an error as the LastSyncTime is required. if localSnapTime.LocalSnapshotTime == 0 { - return nil, fmt.Errorf("empty local snapshot timestamp: %w", ErrLastSyncTimeNotFound) + return nil, fmt.Errorf("empty local snapshot timestamp: %w", corerbd.ErrLastSyncTimeNotFound) } lastUpdateTime := time.Unix(localSnapTime.LocalSnapshotTime, 0) @@ -840,29 +794,6 @@ func getLastSyncTime(description string) (*timestamppb.Timestamp, error) { return lastSyncTime, nil } -func resyncVolume(localStatus librbd.SiteMirrorImageStatus, rbdVol *rbdVolume, force bool) error { - if resyncRequired(localStatus) { - // If the force option is not set return the error message to retry - // with Force option. - if !force { - return status.Errorf(codes.FailedPrecondition, - "image is in %q state, description (%s). Force resync to recover volume", - localStatus.State, localStatus.Description) - } - err := rbdVol.resyncImage() - if err != nil { - return status.Error(codes.Internal, err.Error()) - } - - // If we issued a resync, return a non-final error as image needs to be recreated - // locally. Caller retries till RBD syncs an initial version of the image to - // report its status in the resync request. - return status.Error(codes.Unavailable, "awaiting initial resync due to split brain") - } - - return nil -} - func checkVolumeResyncStatus(localStatus librbd.SiteMirrorImageStatus) error { // we are considering 2 states to check resync started and resync completed // as below. all other states will be considered as an error state so that @@ -882,39 +813,3 @@ func checkVolumeResyncStatus(localStatus librbd.SiteMirrorImageStatus) error { return nil } - -// resyncRequired returns true if local image is in split-brain state and image -// needs resync. -func resyncRequired(localStatus librbd.SiteMirrorImageStatus) bool { - // resync is required if the image is in error state or the description - // contains split-brain message. - // In some corner cases like `re-player shutdown` the local image will not - // be in an error state. It would be also worth considering the `description` - // field to make sure about split-brain. - if localStatus.State == librbd.MirrorImageStatusStateError || - strings.Contains(localStatus.Description, "split-brain") { - return true - } - - return false -} - -// repairResyncedImageID updates the existing image ID with new one. -func repairResyncedImageID(ctx context.Context, rv *rbdVolume, ready bool) error { - // During resync operation the local image will get deleted and a new - // image is recreated by the rbd mirroring. The new image will have a - // new image ID. Once resync is completed update the image ID in the OMAP - // to get the image removed from the trash during DeleteVolume. - - // if the image is not completely resynced skip repairing image ID. - if !ready { - return nil - } - j, err := volJournal.Connect(rv.Monitors, rv.RadosNamespace, rv.conn.Creds) - if err != nil { - return err - } - defer j.Destroy() - // reset the image ID which is stored in the existing OMAP - return rv.repairImageID(ctx, j, true) -} diff --git a/internal/rbd/replicationcontrollerserver_test.go b/internal/csi-addons/rbd/replication_test.go similarity index 98% rename from internal/rbd/replicationcontrollerserver_test.go rename to internal/csi-addons/rbd/replication_test.go index fa7bae9e5..0884afcdf 100644 --- a/internal/rbd/replicationcontrollerserver_test.go +++ b/internal/csi-addons/rbd/replication_test.go @@ -23,6 +23,8 @@ import ( "testing" "time" + corerbd "github.com/ceph/ceph-csi/internal/rbd" + librbd "github.com/ceph/go-ceph/rbd" "github.com/ceph/go-ceph/rbd/admin" "google.golang.org/protobuf/types/known/timestamppb" @@ -455,7 +457,7 @@ func TestValidateLastSyncTime(t *testing.T) { "empty description", "", nil, - ErrLastSyncTimeNotFound.Error(), + corerbd.ErrLastSyncTimeNotFound.Error(), }, { "description without local_snapshot_timestamp", @@ -473,7 +475,7 @@ func TestValidateLastSyncTime(t *testing.T) { "description with no JSON", `replaying`, nil, - ErrLastSyncTimeNotFound.Error(), + corerbd.ErrLastSyncTimeNotFound.Error(), }, } for _, tt := range tests { diff --git a/internal/rbd/controllerserver.go b/internal/rbd/controllerserver.go index efa324407..452e0266f 100644 --- a/internal/rbd/controllerserver.go +++ b/internal/rbd/controllerserver.go @@ -943,7 +943,7 @@ func (cs *ControllerServer) DeleteVolume( func cleanupRBDImage(ctx context.Context, rbdVol *rbdVolume, cr *util.Credentials, ) (*csi.DeleteVolumeResponse, error) { - mirroringInfo, err := rbdVol.getImageMirroringInfo() + mirroringInfo, err := rbdVol.GetImageMirroringInfo() if err != nil { log.ErrorLog(ctx, err.Error()) @@ -962,7 +962,7 @@ func cleanupRBDImage(ctx context.Context, // the image on all the remote (secondary) clusters will get // auto-deleted. This helps in garbage collecting the OMAP, PVC and PV // objects after failback operation. - localStatus, rErr := rbdVol.getLocalState() + localStatus, rErr := rbdVol.GetLocalState() if rErr != nil { return nil, status.Error(codes.Internal, rErr.Error()) } diff --git a/internal/rbd/driver/driver.go b/internal/rbd/driver/driver.go index 5ae7c45ac..3deca01b6 100644 --- a/internal/rbd/driver/driver.go +++ b/internal/rbd/driver/driver.go @@ -38,7 +38,7 @@ type Driver struct { ids *rbd.IdentityServer ns *rbd.NodeServer cs *rbd.ControllerServer - rs *rbd.ReplicationServer + rs *casrbd.ReplicationServer // cas is the CSIAddonsServer where CSI-Addons services are handled cas *csiaddons.CSIAddonsServer @@ -66,8 +66,8 @@ func NewControllerServer(d *csicommon.CSIDriver) *rbd.ControllerServer { } } -func NewReplicationServer(c *rbd.ControllerServer) *rbd.ReplicationServer { - return &rbd.ReplicationServer{ControllerServer: c} +func NewReplicationServer(c *rbd.ControllerServer) *casrbd.ReplicationServer { + return &casrbd.ReplicationServer{ControllerServer: c} } // NewNodeServer initialize a node server for rbd CSI driver. diff --git a/internal/rbd/mirror.go b/internal/rbd/mirror.go index 239c81667..0b6e5dc3d 100644 --- a/internal/rbd/mirror.go +++ b/internal/rbd/mirror.go @@ -25,8 +25,8 @@ import ( librbd "github.com/ceph/go-ceph/rbd" ) -// enableImageMirroring enables mirroring on an image. -func (ri *rbdImage) enableImageMirroring(mode librbd.ImageMirrorMode) error { +// EnableImageMirroring enables mirroring on an image. +func (ri *rbdImage) EnableImageMirroring(mode librbd.ImageMirrorMode) error { image, err := ri.open() if err != nil { return fmt.Errorf("failed to open image %q with error: %w", ri, err) @@ -41,8 +41,8 @@ func (ri *rbdImage) enableImageMirroring(mode librbd.ImageMirrorMode) error { return nil } -// disableImageMirroring disables mirroring on an image. -func (ri *rbdImage) disableImageMirroring(force bool) error { +// DisableImageMirroring disables mirroring on an image. +func (ri *rbdImage) DisableImageMirroring(force bool) error { image, err := ri.open() if err != nil { return fmt.Errorf("failed to open image %q with error: %w", ri, err) @@ -57,8 +57,8 @@ func (ri *rbdImage) disableImageMirroring(force bool) error { return nil } -// getImageMirroringInfo gets mirroring information of an image. -func (ri *rbdImage) getImageMirroringInfo() (*librbd.MirrorImageInfo, error) { +// GetImageMirroringInfo gets mirroring information of an image. +func (ri *rbdImage) GetImageMirroringInfo() (*librbd.MirrorImageInfo, error) { image, err := ri.open() if err != nil { return nil, fmt.Errorf("failed to open image %q with error: %w", ri, err) @@ -73,8 +73,8 @@ func (ri *rbdImage) getImageMirroringInfo() (*librbd.MirrorImageInfo, error) { return info, nil } -// promoteImage promotes image to primary. -func (ri *rbdImage) promoteImage(force bool) error { +// PromoteImage promotes image to primary. +func (ri *rbdImage) PromoteImage(force bool) error { image, err := ri.open() if err != nil { return fmt.Errorf("failed to open image %q with error: %w", ri, err) @@ -88,10 +88,10 @@ func (ri *rbdImage) promoteImage(force bool) error { return nil } -// forcePromoteImage promotes image to primary with force option with 2 minutes +// ForcePromoteImage promotes image to primary with force option with 2 minutes // timeout. If there is no response within 2 minutes,the rbd CLI process will be // killed and an error is returned. -func (rv *rbdVolume) forcePromoteImage(cr *util.Credentials) error { +func (rv *rbdVolume) ForcePromoteImage(cr *util.Credentials) error { promoteArgs := []string{ "mirror", "image", "promote", rv.String(), @@ -118,8 +118,8 @@ func (rv *rbdVolume) forcePromoteImage(cr *util.Credentials) error { return nil } -// demoteImage demotes image to secondary. -func (ri *rbdImage) demoteImage() error { +// DemoteImage demotes image to secondary. +func (ri *rbdImage) DemoteImage() error { image, err := ri.open() if err != nil { return fmt.Errorf("failed to open image %q with error: %w", ri, err) @@ -148,8 +148,8 @@ func (ri *rbdImage) resyncImage() error { return nil } -// getImageMirroringStatus get the mirroring status of an image. -func (ri *rbdImage) getImageMirroringStatus() (*librbd.GlobalMirrorImageStatus, error) { +// GetImageMirroringStatus get the mirroring status of an image. +func (ri *rbdImage) GetImageMirroringStatus() (*librbd.GlobalMirrorImageStatus, error) { image, err := ri.open() if err != nil { return nil, fmt.Errorf("failed to open image %q with error: %w", ri, err) @@ -163,8 +163,8 @@ func (ri *rbdImage) getImageMirroringStatus() (*librbd.GlobalMirrorImageStatus, return &statusInfo, nil } -// getLocalState returns the local state of the image. -func (ri *rbdImage) getLocalState() (librbd.SiteMirrorImageStatus, error) { +// GetLocalState returns the local state of the image. +func (ri *rbdImage) GetLocalState() (librbd.SiteMirrorImageStatus, error) { localStatus := librbd.SiteMirrorImageStatus{} image, err := ri.open() if err != nil { diff --git a/internal/rbd/rbd_util.go b/internal/rbd/rbd_util.go index 1176b9dc1..b3df8ad77 100644 --- a/internal/rbd/rbd_util.go +++ b/internal/rbd/rbd_util.go @@ -2018,7 +2018,7 @@ func (ri *rbdImage) isCompabitableClone(dst *rbdImage) error { return nil } -func (ri *rbdImage) addSnapshotScheduling( +func (ri *rbdImage) AddSnapshotScheduling( interval admin.Interval, startTime admin.StartTime, ) error { diff --git a/internal/rbd/replication.go b/internal/rbd/replication.go new file mode 100644 index 000000000..0e70369be --- /dev/null +++ b/internal/rbd/replication.go @@ -0,0 +1,133 @@ +/* +Copyright 2023 The Ceph-CSI Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package rbd + +import ( + "context" + "strings" + + librbd "github.com/ceph/go-ceph/rbd" + "github.com/csi-addons/spec/lib/go/replication" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" +) + +func (rv *rbdVolume) ResyncVol(localStatus librbd.SiteMirrorImageStatus, force bool) error { + if resyncRequired(localStatus) { + // If the force option is not set return the error message to retry + // with Force option. + if !force { + return status.Errorf(codes.FailedPrecondition, + "image is in %q state, description (%s). Force resync to recover volume", + localStatus.State, localStatus.Description) + } + err := rv.resyncImage() + if err != nil { + return status.Error(codes.Internal, err.Error()) + } + + // If we issued a resync, return a non-final error as image needs to be recreated + // locally. Caller retries till RBD syncs an initial version of the image to + // report its status in the resync request. + return status.Error(codes.Unavailable, "awaiting initial resync due to split brain") + } + + return nil +} + +// repairResyncedImageID updates the existing image ID with new one. +func (rv *rbdVolume) RepairResyncedImageID(ctx context.Context, ready bool) error { + // During resync operation the local image will get deleted and a new + // image is recreated by the rbd mirroring. The new image will have a + // new image ID. Once resync is completed update the image ID in the OMAP + // to get the image removed from the trash during DeleteVolume. + + // if the image is not completely resynced skip repairing image ID. + if !ready { + return nil + } + j, err := volJournal.Connect(rv.Monitors, rv.RadosNamespace, rv.conn.Creds) + if err != nil { + return err + } + defer j.Destroy() + // reset the image ID which is stored in the existing OMAP + return rv.repairImageID(ctx, j, true) +} + +// resyncRequired returns true if local image is in split-brain state and image +// needs resync. +func resyncRequired(localStatus librbd.SiteMirrorImageStatus) bool { + // resync is required if the image is in error state or the description + // contains split-brain message. + // In some corner cases like `re-player shutdown` the local image will not + // be in an error state. It would be also worth considering the `description` + // field to make sure about split-brain. + if localStatus.State == librbd.MirrorImageStatusStateError || + strings.Contains(localStatus.Description, "split-brain") { + return true + } + + return false +} + +func DisableVolumeReplication(rbdVol *rbdVolume, + mirroringInfo *librbd.MirrorImageInfo, + force bool, +) (*replication.DisableVolumeReplicationResponse, error) { + if !mirroringInfo.Primary { + // Return success if the below condition is met + // Local image is secondary + // Local image is in up+replaying state + + // If the image is in a secondary and its state is up+replaying means + // its a healthy secondary and the image is primary somewhere in the + // remote cluster and the local image is getting replayed. Return + // success for the Disabling mirroring as we cannot disable mirroring + // on the secondary image, when the image on the primary site gets + // disabled the image on all the remote (secondary) clusters will get + // auto-deleted. This helps in garbage collecting the volume + // replication Kubernetes artifacts after failback operation. + localStatus, rErr := rbdVol.GetLocalState() + if rErr != nil { + return nil, status.Error(codes.Internal, rErr.Error()) + } + if localStatus.Up && localStatus.State == librbd.MirrorImageStatusStateReplaying { + return &replication.DisableVolumeReplicationResponse{}, nil + } + + return nil, status.Errorf(codes.InvalidArgument, + "secondary image status is up=%t and state=%s", + localStatus.Up, + localStatus.State) + } + err := rbdVol.DisableImageMirroring(force) + if err != nil { + return nil, status.Error(codes.Internal, err.Error()) + } + // the image state can be still disabling once we disable the mirroring + // check the mirroring is disabled or not + mirroringInfo, err = rbdVol.GetImageMirroringInfo() + if err != nil { + return nil, status.Error(codes.Internal, err.Error()) + } + if mirroringInfo.State == librbd.MirrorImageDisabling { + return nil, status.Errorf(codes.Aborted, "%s is in disabling state", rbdVol.VolID) + } + + return &replication.DisableVolumeReplicationResponse{}, nil +}