rbd: add context to mirror interface

adding required ctx to the mirror
interface as ctx is required for
the volumegroup operations.

Signed-off-by: Madhu Rajanna <madhupr007@gmail.com>
This commit is contained in:
Madhu Rajanna 2024-07-30 18:42:35 +02:00 committed by mergify[bot]
parent 8788e5ec08
commit 37970ae212
5 changed files with 38 additions and 37 deletions

View File

@ -300,7 +300,7 @@ func (rs *ReplicationServer) EnableVolumeReplication(ctx context.Context,
return nil, err return nil, err
} }
info, err := mirror.GetMirroringInfo() info, err := mirror.GetMirroringInfo(ctx)
if err != nil { if err != nil {
log.ErrorLog(ctx, err.Error()) log.ErrorLog(ctx, err.Error())
@ -313,7 +313,7 @@ func (rs *ReplicationServer) EnableVolumeReplication(ctx context.Context,
return nil, getGRPCError(err) return nil, getGRPCError(err)
} }
err = mirror.EnableMirroring(mirroringMode) err = mirror.EnableMirroring(ctx, mirroringMode)
if err != nil { if err != nil {
log.ErrorLog(ctx, err.Error()) log.ErrorLog(ctx, err.Error())
@ -365,7 +365,7 @@ func (rs *ReplicationServer) DisableVolumeReplication(ctx context.Context,
return nil, err return nil, err
} }
info, err := mirror.GetMirroringInfo() info, err := mirror.GetMirroringInfo(ctx)
if err != nil { if err != nil {
log.ErrorLog(ctx, err.Error()) log.ErrorLog(ctx, err.Error())
@ -378,7 +378,7 @@ func (rs *ReplicationServer) DisableVolumeReplication(ctx context.Context,
case librbd.MirrorImageDisabling.String(): case librbd.MirrorImageDisabling.String():
return nil, status.Errorf(codes.Aborted, "%s is in disabling state", volumeID) return nil, status.Errorf(codes.Aborted, "%s is in disabling state", volumeID)
case librbd.MirrorImageEnabled.String(): case librbd.MirrorImageEnabled.String():
err = corerbd.DisableVolumeReplication(mirror, info.IsPrimary(), force) err = corerbd.DisableVolumeReplication(mirror, ctx, info.IsPrimary(), force)
if err != nil { if err != nil {
return nil, getGRPCError(err) return nil, getGRPCError(err)
} }
@ -427,7 +427,7 @@ func (rs *ReplicationServer) PromoteVolume(ctx context.Context,
return nil, status.Error(codes.Internal, err.Error()) return nil, status.Error(codes.Internal, err.Error())
} }
info, err := mirror.GetMirroringInfo() info, err := mirror.GetMirroringInfo(ctx)
if err != nil { if err != nil {
log.ErrorLog(ctx, err.Error()) log.ErrorLog(ctx, err.Error())
@ -447,9 +447,9 @@ func (rs *ReplicationServer) PromoteVolume(ctx context.Context,
if req.GetForce() { if req.GetForce() {
// workaround for https://github.com/ceph/ceph-csi/issues/2736 // workaround for https://github.com/ceph/ceph-csi/issues/2736
// TODO: remove this workaround when the issue is fixed // TODO: remove this workaround when the issue is fixed
err = mirror.ForcePromote(cr) err = mirror.ForcePromote(ctx, cr)
} else { } else {
err = mirror.Promote(req.GetForce()) err = mirror.Promote(ctx, req.GetForce())
} }
if err != nil { if err != nil {
log.ErrorLog(ctx, err.Error()) log.ErrorLog(ctx, err.Error())
@ -527,7 +527,7 @@ func (rs *ReplicationServer) DemoteVolume(ctx context.Context,
return nil, status.Error(codes.Internal, err.Error()) return nil, status.Error(codes.Internal, err.Error())
} }
info, err := mirror.GetMirroringInfo() info, err := mirror.GetMirroringInfo(ctx)
if err != nil { if err != nil {
log.ErrorLog(ctx, err.Error()) log.ErrorLog(ctx, err.Error())
@ -556,7 +556,7 @@ func (rs *ReplicationServer) DemoteVolume(ctx context.Context,
return nil, status.Error(codes.Internal, err.Error()) return nil, status.Error(codes.Internal, err.Error())
} }
err = mirror.Demote() err = mirror.Demote(ctx)
if err != nil { if err != nil {
log.ErrorLog(ctx, err.Error()) log.ErrorLog(ctx, err.Error())
@ -630,7 +630,7 @@ func (rs *ReplicationServer) ResyncVolume(ctx context.Context,
return nil, status.Error(codes.Internal, err.Error()) return nil, status.Error(codes.Internal, err.Error())
} }
info, err := mirror.GetMirroringInfo() info, err := mirror.GetMirroringInfo(ctx)
if err != nil { if err != nil {
// in case of Resync the image will get deleted and gets recreated and // in case of Resync the image will get deleted and gets recreated and
// it takes time for this operation. // it takes time for this operation.
@ -648,7 +648,7 @@ func (rs *ReplicationServer) ResyncVolume(ctx context.Context,
return nil, status.Error(codes.InvalidArgument, "image is in primary state") return nil, status.Error(codes.InvalidArgument, "image is in primary state")
} }
sts, err := mirror.GetGlobalMirroringStatus() sts, err := mirror.GetGlobalMirroringStatus(ctx)
if err != nil { if err != nil {
// the image gets recreated after issuing resync // the image gets recreated after issuing resync
if errors.Is(err, corerbd.ErrImageNotFound) { if errors.Is(err, corerbd.ErrImageNotFound) {
@ -719,7 +719,7 @@ func (rs *ReplicationServer) ResyncVolume(ctx context.Context,
} }
log.DebugLog(ctx, "image %s, savedImageTime=%v, currentImageTime=%v", rbdVol, st, creationTime.AsTime()) log.DebugLog(ctx, "image %s, savedImageTime=%v, currentImageTime=%v", rbdVol, st, creationTime.AsTime())
if req.GetForce() && st.Equal(creationTime.AsTime()) { if req.GetForce() && st.Equal(creationTime.AsTime()) {
err = mirror.Resync() err = mirror.Resync(ctx)
if err != nil { if err != nil {
return nil, getGRPCError(err) return nil, getGRPCError(err)
} }
@ -846,7 +846,7 @@ func (rs *ReplicationServer) GetVolumeReplicationInfo(ctx context.Context,
return nil, status.Error(codes.Internal, err.Error()) return nil, status.Error(codes.Internal, err.Error())
} }
info, err := mirror.GetMirroringInfo() info, err := mirror.GetMirroringInfo(ctx)
if err != nil { if err != nil {
log.ErrorLog(ctx, err.Error()) log.ErrorLog(ctx, err.Error())
@ -862,7 +862,7 @@ func (rs *ReplicationServer) GetVolumeReplicationInfo(ctx context.Context,
return nil, status.Error(codes.InvalidArgument, "image is not in primary state") return nil, status.Error(codes.InvalidArgument, "image is not in primary state")
} }
mirrorStatus, err := mirror.GetGlobalMirroringStatus() mirrorStatus, err := mirror.GetGlobalMirroringStatus(ctx)
if err != nil { if err != nil {
if errors.Is(err, corerbd.ErrImageNotFound) { if errors.Is(err, corerbd.ErrImageNotFound) {
return nil, status.Error(codes.Aborted, err.Error()) return nil, status.Error(codes.Aborted, err.Error())

View File

@ -988,7 +988,7 @@ func (cs *ControllerServer) DeleteVolume(
func cleanupRBDImage(ctx context.Context, func cleanupRBDImage(ctx context.Context,
rbdVol *rbdVolume, cr *util.Credentials, rbdVol *rbdVolume, cr *util.Credentials,
) (*csi.DeleteVolumeResponse, error) { ) (*csi.DeleteVolumeResponse, error) {
info, err := rbdVol.GetMirroringInfo() info, err := rbdVol.GetMirroringInfo(ctx)
if err != nil { if err != nil {
log.ErrorLog(ctx, err.Error()) log.ErrorLog(ctx, err.Error())
@ -1007,7 +1007,7 @@ func cleanupRBDImage(ctx context.Context,
// the image on all the remote (secondary) clusters will get // the image on all the remote (secondary) clusters will get
// auto-deleted. This helps in garbage collecting the OMAP, PVC and PV // auto-deleted. This helps in garbage collecting the OMAP, PVC and PV
// objects after failback operation. // objects after failback operation.
sts, rErr := rbdVol.GetGlobalMirroringStatus() sts, rErr := rbdVol.GetGlobalMirroringStatus(ctx)
if rErr != nil { if rErr != nil {
return nil, status.Error(codes.Internal, rErr.Error()) return nil, status.Error(codes.Internal, rErr.Error())
} }

View File

@ -63,7 +63,7 @@ func (rv *rbdVolume) HandleParentImageExistence(
if err != nil { if err != nil {
return fmt.Errorf("failed to get parent of image %s: %w", rv, err) return fmt.Errorf("failed to get parent of image %s: %w", rv, err)
} }
parentMirroringInfo, err := parent.GetMirroringInfo() parentMirroringInfo, err := parent.GetMirroringInfo(ctx)
if err != nil { if err != nil {
return fmt.Errorf( return fmt.Errorf(
"failed to get mirroring info of parent %q of image %q: %w", "failed to get mirroring info of parent %q of image %q: %w",
@ -82,7 +82,7 @@ func (rv *rbdVolume) HandleParentImageExistence(
var _ types.Mirror = &rbdVolume{} var _ types.Mirror = &rbdVolume{}
// EnableMirroring enables mirroring on an image. // EnableMirroring enables mirroring on an image.
func (ri *rbdImage) EnableMirroring(mode librbd.ImageMirrorMode) error { func (ri *rbdImage) EnableMirroring(_ context.Context, mode librbd.ImageMirrorMode) error {
image, err := ri.open() image, err := ri.open()
if err != nil { if err != nil {
return fmt.Errorf("failed to open image %q with error: %w", ri, err) return fmt.Errorf("failed to open image %q with error: %w", ri, err)
@ -98,7 +98,7 @@ func (ri *rbdImage) EnableMirroring(mode librbd.ImageMirrorMode) error {
} }
// DisableMirroring disables mirroring on an image. // DisableMirroring disables mirroring on an image.
func (ri *rbdImage) DisableMirroring(force bool) error { func (ri *rbdImage) DisableMirroring(_ context.Context, force bool) error {
image, err := ri.open() image, err := ri.open()
if err != nil { if err != nil {
return fmt.Errorf("failed to open image %q with error: %w", ri, err) return fmt.Errorf("failed to open image %q with error: %w", ri, err)
@ -114,7 +114,7 @@ func (ri *rbdImage) DisableMirroring(force bool) error {
} }
// GetMirroringInfo gets mirroring information of an image. // GetMirroringInfo gets mirroring information of an image.
func (ri *rbdImage) GetMirroringInfo() (types.MirrorInfo, error) { func (ri *rbdImage) GetMirroringInfo(_ context.Context) (types.MirrorInfo, error) {
image, err := ri.open() image, err := ri.open()
if err != nil { if err != nil {
return nil, fmt.Errorf("failed to open image %q with error: %w", ri, err) return nil, fmt.Errorf("failed to open image %q with error: %w", ri, err)
@ -130,7 +130,7 @@ func (ri *rbdImage) GetMirroringInfo() (types.MirrorInfo, error) {
} }
// Promote promotes image to primary. // Promote promotes image to primary.
func (ri *rbdImage) Promote(force bool) error { func (ri *rbdImage) Promote(_ context.Context, force bool) error {
image, err := ri.open() image, err := ri.open()
if err != nil { if err != nil {
return fmt.Errorf("failed to open image %q with error: %w", ri, err) return fmt.Errorf("failed to open image %q with error: %w", ri, err)
@ -147,7 +147,7 @@ func (ri *rbdImage) Promote(force bool) error {
// ForcePromote promotes image to primary with force option with 2 minutes // ForcePromote promotes image to primary with force option with 2 minutes
// timeout. If there is no response within 2 minutes,the rbd CLI process will be // timeout. If there is no response within 2 minutes,the rbd CLI process will be
// killed and an error is returned. // killed and an error is returned.
func (rv *rbdVolume) ForcePromote(cr *util.Credentials) error { func (rv *rbdVolume) ForcePromote(ctx context.Context, cr *util.Credentials) error {
promoteArgs := []string{ promoteArgs := []string{
"mirror", "image", "promote", "mirror", "image", "promote",
rv.String(), rv.String(),
@ -157,7 +157,7 @@ func (rv *rbdVolume) ForcePromote(cr *util.Credentials) error {
"--keyfile=" + cr.KeyFile, "--keyfile=" + cr.KeyFile,
} }
_, stderr, err := util.ExecCommandWithTimeout( _, stderr, err := util.ExecCommandWithTimeout(
context.TODO(), ctx,
// 2 minutes timeout as the Replication RPC timeout is 2.5 minutes. // 2 minutes timeout as the Replication RPC timeout is 2.5 minutes.
2*time.Minute, 2*time.Minute,
"rbd", "rbd",
@ -175,7 +175,7 @@ func (rv *rbdVolume) ForcePromote(cr *util.Credentials) error {
} }
// Demote demotes image to secondary. // Demote demotes image to secondary.
func (ri *rbdImage) Demote() error { func (ri *rbdImage) Demote(_ context.Context) error {
image, err := ri.open() image, err := ri.open()
if err != nil { if err != nil {
return fmt.Errorf("failed to open image %q with error: %w", ri, err) return fmt.Errorf("failed to open image %q with error: %w", ri, err)
@ -190,7 +190,7 @@ func (ri *rbdImage) Demote() error {
} }
// Resync resync image to correct the split-brain. // Resync resync image to correct the split-brain.
func (ri *rbdImage) Resync() error { func (ri *rbdImage) Resync(_ context.Context) error {
image, err := ri.open() image, err := ri.open()
if err != nil { if err != nil {
return fmt.Errorf("failed to open image %q with error: %w", ri, err) return fmt.Errorf("failed to open image %q with error: %w", ri, err)
@ -208,7 +208,7 @@ func (ri *rbdImage) Resync() error {
} }
// GetGlobalMirroringStatus get the mirroring status of an image. // GetGlobalMirroringStatus get the mirroring status of an image.
func (ri *rbdImage) GetGlobalMirroringStatus() (types.GlobalStatus, error) { func (ri *rbdImage) GetGlobalMirroringStatus(_ context.Context) (types.GlobalStatus, error) {
image, err := ri.open() image, err := ri.open()
if err != nil { if err != nil {
return nil, fmt.Errorf("failed to open image %q with error: %w", ri, err) return nil, fmt.Errorf("failed to open image %q with error: %w", ri, err)

View File

@ -46,6 +46,7 @@ func (rv *rbdVolume) RepairResyncedImageID(ctx context.Context, ready bool) erro
} }
func DisableVolumeReplication(mirror types.Mirror, func DisableVolumeReplication(mirror types.Mirror,
ctx context.Context,
primary, primary,
force bool, force bool,
) error { ) error {
@ -62,7 +63,7 @@ func DisableVolumeReplication(mirror types.Mirror,
// disabled the image on all the remote (secondary) clusters will get // disabled the image on all the remote (secondary) clusters will get
// auto-deleted. This helps in garbage collecting the volume // auto-deleted. This helps in garbage collecting the volume
// replication Kubernetes artifacts after failback operation. // replication Kubernetes artifacts after failback operation.
sts, rErr := mirror.GetGlobalMirroringStatus() sts, rErr := mirror.GetGlobalMirroringStatus(ctx)
if rErr != nil { if rErr != nil {
return fmt.Errorf("failed to get global state: %w", rErr) return fmt.Errorf("failed to get global state: %w", rErr)
} }
@ -78,13 +79,13 @@ func DisableVolumeReplication(mirror types.Mirror,
return fmt.Errorf("%w: secondary image status is up=%t and state=%s", return fmt.Errorf("%w: secondary image status is up=%t and state=%s",
ErrInvalidArgument, localStatus.IsUP(), localStatus.GetState()) ErrInvalidArgument, localStatus.IsUP(), localStatus.GetState())
} }
err := mirror.DisableMirroring(force) err := mirror.DisableMirroring(ctx, force)
if err != nil { if err != nil {
return fmt.Errorf("failed to disable image mirroring: %w", err) return fmt.Errorf("failed to disable image mirroring: %w", err)
} }
// the image state can be still disabling once we disable the mirroring // the image state can be still disabling once we disable the mirroring
// check the mirroring is disabled or not // check the mirroring is disabled or not
info, err := mirror.GetMirroringInfo() info, err := mirror.GetMirroringInfo(ctx)
if err != nil { if err != nil {
return fmt.Errorf("failed to get mirroring info of image: %w", err) return fmt.Errorf("failed to get mirroring info of image: %w", err)
} }

View File

@ -39,21 +39,21 @@ const (
// Mirror is the interface for managing mirroring on an RBD image or a group. // Mirror is the interface for managing mirroring on an RBD image or a group.
type Mirror interface { type Mirror interface {
// EnableMirroring enables mirroring on the resource with the specified mode. // EnableMirroring enables mirroring on the resource with the specified mode.
EnableMirroring(mode librbd.ImageMirrorMode) error EnableMirroring(ctx context.Context, mode librbd.ImageMirrorMode) error
// DisableMirroring disables mirroring on the resource with the option to force the operation // DisableMirroring disables mirroring on the resource with the option to force the operation
DisableMirroring(force bool) error DisableMirroring(ctx context.Context, force bool) error
// Promote promotes the resource to primary status with the option to force the operation // Promote promotes the resource to primary status with the option to force the operation
Promote(force bool) error Promote(ctx context.Context, force bool) error
// ForcePromote promotes the resource to primary status with a timeout // ForcePromote promotes the resource to primary status with a timeout
ForcePromote(cr *util.Credentials) error ForcePromote(ctx context.Context, cr *util.Credentials) error
// Demote demotes the resource to secondary status // Demote demotes the resource to secondary status
Demote() error Demote(ctx context.Context) error
// Resync resynchronizes the resource // Resync resynchronizes the resource
Resync() error Resync(ctx context.Context) error
// GetMirroringInfo returns the mirroring information of the resource // GetMirroringInfo returns the mirroring information of the resource
GetMirroringInfo() (MirrorInfo, error) GetMirroringInfo(ctx context.Context) (MirrorInfo, error)
// GetMirroringInfo returns the mirroring information of the resource // GetMirroringInfo returns the mirroring information of the resource
GetGlobalMirroringStatus() (GlobalStatus, error) GetGlobalMirroringStatus(ctx context.Context) (GlobalStatus, error)
// AddSnapshotScheduling adds a snapshot scheduling to the resource // AddSnapshotScheduling adds a snapshot scheduling to the resource
AddSnapshotScheduling(interval admin.Interval, startTime admin.StartTime) error AddSnapshotScheduling(interval admin.Interval, startTime admin.StartTime) error
} }