From 37970ae21215531027f544d08ec5e4f6a37bbf9c Mon Sep 17 00:00:00 2001 From: Madhu Rajanna Date: Tue, 30 Jul 2024 18:42:35 +0200 Subject: [PATCH] rbd: add context to mirror interface adding required ctx to the mirror interface as ctx is required for the volumegroup operations. Signed-off-by: Madhu Rajanna --- internal/csi-addons/rbd/replication.go | 28 +++++++++++++------------- internal/rbd/controllerserver.go | 4 ++-- internal/rbd/mirror.go | 20 +++++++++--------- internal/rbd/replication.go | 7 ++++--- internal/rbd/types/mirror.go | 16 +++++++-------- 5 files changed, 38 insertions(+), 37 deletions(-) diff --git a/internal/csi-addons/rbd/replication.go b/internal/csi-addons/rbd/replication.go index 09b0ca3fe..f58750892 100644 --- a/internal/csi-addons/rbd/replication.go +++ b/internal/csi-addons/rbd/replication.go @@ -300,7 +300,7 @@ func (rs *ReplicationServer) EnableVolumeReplication(ctx context.Context, return nil, err } - info, err := mirror.GetMirroringInfo() + info, err := mirror.GetMirroringInfo(ctx) if err != nil { log.ErrorLog(ctx, err.Error()) @@ -313,7 +313,7 @@ func (rs *ReplicationServer) EnableVolumeReplication(ctx context.Context, return nil, getGRPCError(err) } - err = mirror.EnableMirroring(mirroringMode) + err = mirror.EnableMirroring(ctx, mirroringMode) if err != nil { log.ErrorLog(ctx, err.Error()) @@ -365,7 +365,7 @@ func (rs *ReplicationServer) DisableVolumeReplication(ctx context.Context, return nil, err } - info, err := mirror.GetMirroringInfo() + info, err := mirror.GetMirroringInfo(ctx) if err != nil { log.ErrorLog(ctx, err.Error()) @@ -378,7 +378,7 @@ func (rs *ReplicationServer) DisableVolumeReplication(ctx context.Context, case librbd.MirrorImageDisabling.String(): return nil, status.Errorf(codes.Aborted, "%s is in disabling state", volumeID) case librbd.MirrorImageEnabled.String(): - err = corerbd.DisableVolumeReplication(mirror, info.IsPrimary(), force) + err = corerbd.DisableVolumeReplication(mirror, ctx, info.IsPrimary(), force) if err != nil { return nil, getGRPCError(err) } @@ -427,7 +427,7 @@ func (rs *ReplicationServer) PromoteVolume(ctx context.Context, return nil, status.Error(codes.Internal, err.Error()) } - info, err := mirror.GetMirroringInfo() + info, err := mirror.GetMirroringInfo(ctx) if err != nil { log.ErrorLog(ctx, err.Error()) @@ -447,9 +447,9 @@ func (rs *ReplicationServer) PromoteVolume(ctx context.Context, if req.GetForce() { // workaround for https://github.com/ceph/ceph-csi/issues/2736 // TODO: remove this workaround when the issue is fixed - err = mirror.ForcePromote(cr) + err = mirror.ForcePromote(ctx, cr) } else { - err = mirror.Promote(req.GetForce()) + err = mirror.Promote(ctx, req.GetForce()) } if err != nil { log.ErrorLog(ctx, err.Error()) @@ -527,7 +527,7 @@ func (rs *ReplicationServer) DemoteVolume(ctx context.Context, return nil, status.Error(codes.Internal, err.Error()) } - info, err := mirror.GetMirroringInfo() + info, err := mirror.GetMirroringInfo(ctx) if err != nil { log.ErrorLog(ctx, err.Error()) @@ -556,7 +556,7 @@ func (rs *ReplicationServer) DemoteVolume(ctx context.Context, return nil, status.Error(codes.Internal, err.Error()) } - err = mirror.Demote() + err = mirror.Demote(ctx) if err != nil { log.ErrorLog(ctx, err.Error()) @@ -630,7 +630,7 @@ func (rs *ReplicationServer) ResyncVolume(ctx context.Context, return nil, status.Error(codes.Internal, err.Error()) } - info, err := mirror.GetMirroringInfo() + info, err := mirror.GetMirroringInfo(ctx) if err != nil { // in case of Resync the image will get deleted and gets recreated and // it takes time for this operation. @@ -648,7 +648,7 @@ func (rs *ReplicationServer) ResyncVolume(ctx context.Context, return nil, status.Error(codes.InvalidArgument, "image is in primary state") } - sts, err := mirror.GetGlobalMirroringStatus() + sts, err := mirror.GetGlobalMirroringStatus(ctx) if err != nil { // the image gets recreated after issuing resync if errors.Is(err, corerbd.ErrImageNotFound) { @@ -719,7 +719,7 @@ func (rs *ReplicationServer) ResyncVolume(ctx context.Context, } log.DebugLog(ctx, "image %s, savedImageTime=%v, currentImageTime=%v", rbdVol, st, creationTime.AsTime()) if req.GetForce() && st.Equal(creationTime.AsTime()) { - err = mirror.Resync() + err = mirror.Resync(ctx) if err != nil { return nil, getGRPCError(err) } @@ -846,7 +846,7 @@ func (rs *ReplicationServer) GetVolumeReplicationInfo(ctx context.Context, return nil, status.Error(codes.Internal, err.Error()) } - info, err := mirror.GetMirroringInfo() + info, err := mirror.GetMirroringInfo(ctx) if err != nil { log.ErrorLog(ctx, err.Error()) @@ -862,7 +862,7 @@ func (rs *ReplicationServer) GetVolumeReplicationInfo(ctx context.Context, return nil, status.Error(codes.InvalidArgument, "image is not in primary state") } - mirrorStatus, err := mirror.GetGlobalMirroringStatus() + mirrorStatus, err := mirror.GetGlobalMirroringStatus(ctx) if err != nil { if errors.Is(err, corerbd.ErrImageNotFound) { return nil, status.Error(codes.Aborted, err.Error()) diff --git a/internal/rbd/controllerserver.go b/internal/rbd/controllerserver.go index cdf1443a5..56de60c58 100644 --- a/internal/rbd/controllerserver.go +++ b/internal/rbd/controllerserver.go @@ -988,7 +988,7 @@ func (cs *ControllerServer) DeleteVolume( func cleanupRBDImage(ctx context.Context, rbdVol *rbdVolume, cr *util.Credentials, ) (*csi.DeleteVolumeResponse, error) { - info, err := rbdVol.GetMirroringInfo() + info, err := rbdVol.GetMirroringInfo(ctx) if err != nil { log.ErrorLog(ctx, err.Error()) @@ -1007,7 +1007,7 @@ func cleanupRBDImage(ctx context.Context, // the image on all the remote (secondary) clusters will get // auto-deleted. This helps in garbage collecting the OMAP, PVC and PV // objects after failback operation. - sts, rErr := rbdVol.GetGlobalMirroringStatus() + sts, rErr := rbdVol.GetGlobalMirroringStatus(ctx) if rErr != nil { return nil, status.Error(codes.Internal, rErr.Error()) } diff --git a/internal/rbd/mirror.go b/internal/rbd/mirror.go index 08064dfa9..e20927d01 100644 --- a/internal/rbd/mirror.go +++ b/internal/rbd/mirror.go @@ -63,7 +63,7 @@ func (rv *rbdVolume) HandleParentImageExistence( if err != nil { return fmt.Errorf("failed to get parent of image %s: %w", rv, err) } - parentMirroringInfo, err := parent.GetMirroringInfo() + parentMirroringInfo, err := parent.GetMirroringInfo(ctx) if err != nil { return fmt.Errorf( "failed to get mirroring info of parent %q of image %q: %w", @@ -82,7 +82,7 @@ func (rv *rbdVolume) HandleParentImageExistence( var _ types.Mirror = &rbdVolume{} // EnableMirroring enables mirroring on an image. -func (ri *rbdImage) EnableMirroring(mode librbd.ImageMirrorMode) error { +func (ri *rbdImage) EnableMirroring(_ context.Context, mode librbd.ImageMirrorMode) error { image, err := ri.open() if err != nil { return fmt.Errorf("failed to open image %q with error: %w", ri, err) @@ -98,7 +98,7 @@ func (ri *rbdImage) EnableMirroring(mode librbd.ImageMirrorMode) error { } // DisableMirroring disables mirroring on an image. -func (ri *rbdImage) DisableMirroring(force bool) error { +func (ri *rbdImage) DisableMirroring(_ context.Context, force bool) error { image, err := ri.open() if err != nil { return fmt.Errorf("failed to open image %q with error: %w", ri, err) @@ -114,7 +114,7 @@ func (ri *rbdImage) DisableMirroring(force bool) error { } // GetMirroringInfo gets mirroring information of an image. -func (ri *rbdImage) GetMirroringInfo() (types.MirrorInfo, error) { +func (ri *rbdImage) GetMirroringInfo(_ context.Context) (types.MirrorInfo, error) { image, err := ri.open() if err != nil { return nil, fmt.Errorf("failed to open image %q with error: %w", ri, err) @@ -130,7 +130,7 @@ func (ri *rbdImage) GetMirroringInfo() (types.MirrorInfo, error) { } // Promote promotes image to primary. -func (ri *rbdImage) Promote(force bool) error { +func (ri *rbdImage) Promote(_ context.Context, force bool) error { image, err := ri.open() if err != nil { return fmt.Errorf("failed to open image %q with error: %w", ri, err) @@ -147,7 +147,7 @@ func (ri *rbdImage) Promote(force bool) error { // ForcePromote promotes image to primary with force option with 2 minutes // timeout. If there is no response within 2 minutes,the rbd CLI process will be // killed and an error is returned. -func (rv *rbdVolume) ForcePromote(cr *util.Credentials) error { +func (rv *rbdVolume) ForcePromote(ctx context.Context, cr *util.Credentials) error { promoteArgs := []string{ "mirror", "image", "promote", rv.String(), @@ -157,7 +157,7 @@ func (rv *rbdVolume) ForcePromote(cr *util.Credentials) error { "--keyfile=" + cr.KeyFile, } _, stderr, err := util.ExecCommandWithTimeout( - context.TODO(), + ctx, // 2 minutes timeout as the Replication RPC timeout is 2.5 minutes. 2*time.Minute, "rbd", @@ -175,7 +175,7 @@ func (rv *rbdVolume) ForcePromote(cr *util.Credentials) error { } // Demote demotes image to secondary. -func (ri *rbdImage) Demote() error { +func (ri *rbdImage) Demote(_ context.Context) error { image, err := ri.open() if err != nil { return fmt.Errorf("failed to open image %q with error: %w", ri, err) @@ -190,7 +190,7 @@ func (ri *rbdImage) Demote() error { } // Resync resync image to correct the split-brain. -func (ri *rbdImage) Resync() error { +func (ri *rbdImage) Resync(_ context.Context) error { image, err := ri.open() if err != nil { return fmt.Errorf("failed to open image %q with error: %w", ri, err) @@ -208,7 +208,7 @@ func (ri *rbdImage) Resync() error { } // GetGlobalMirroringStatus get the mirroring status of an image. -func (ri *rbdImage) GetGlobalMirroringStatus() (types.GlobalStatus, error) { +func (ri *rbdImage) GetGlobalMirroringStatus(_ context.Context) (types.GlobalStatus, error) { image, err := ri.open() if err != nil { return nil, fmt.Errorf("failed to open image %q with error: %w", ri, err) diff --git a/internal/rbd/replication.go b/internal/rbd/replication.go index 86c31cd85..badcb12ce 100644 --- a/internal/rbd/replication.go +++ b/internal/rbd/replication.go @@ -46,6 +46,7 @@ func (rv *rbdVolume) RepairResyncedImageID(ctx context.Context, ready bool) erro } func DisableVolumeReplication(mirror types.Mirror, + ctx context.Context, primary, force bool, ) error { @@ -62,7 +63,7 @@ func DisableVolumeReplication(mirror types.Mirror, // disabled the image on all the remote (secondary) clusters will get // auto-deleted. This helps in garbage collecting the volume // replication Kubernetes artifacts after failback operation. - sts, rErr := mirror.GetGlobalMirroringStatus() + sts, rErr := mirror.GetGlobalMirroringStatus(ctx) if rErr != nil { return fmt.Errorf("failed to get global state: %w", rErr) } @@ -78,13 +79,13 @@ func DisableVolumeReplication(mirror types.Mirror, return fmt.Errorf("%w: secondary image status is up=%t and state=%s", ErrInvalidArgument, localStatus.IsUP(), localStatus.GetState()) } - err := mirror.DisableMirroring(force) + err := mirror.DisableMirroring(ctx, force) if err != nil { return fmt.Errorf("failed to disable image mirroring: %w", err) } // the image state can be still disabling once we disable the mirroring // check the mirroring is disabled or not - info, err := mirror.GetMirroringInfo() + info, err := mirror.GetMirroringInfo(ctx) if err != nil { return fmt.Errorf("failed to get mirroring info of image: %w", err) } diff --git a/internal/rbd/types/mirror.go b/internal/rbd/types/mirror.go index 131fad844..12c0bffdf 100644 --- a/internal/rbd/types/mirror.go +++ b/internal/rbd/types/mirror.go @@ -39,21 +39,21 @@ const ( // Mirror is the interface for managing mirroring on an RBD image or a group. type Mirror interface { // EnableMirroring enables mirroring on the resource with the specified mode. - EnableMirroring(mode librbd.ImageMirrorMode) error + EnableMirroring(ctx context.Context, mode librbd.ImageMirrorMode) error // DisableMirroring disables mirroring on the resource with the option to force the operation - DisableMirroring(force bool) error + DisableMirroring(ctx context.Context, force bool) error // Promote promotes the resource to primary status with the option to force the operation - Promote(force bool) error + Promote(ctx context.Context, force bool) error // ForcePromote promotes the resource to primary status with a timeout - ForcePromote(cr *util.Credentials) error + ForcePromote(ctx context.Context, cr *util.Credentials) error // Demote demotes the resource to secondary status - Demote() error + Demote(ctx context.Context) error // Resync resynchronizes the resource - Resync() error + Resync(ctx context.Context) error // GetMirroringInfo returns the mirroring information of the resource - GetMirroringInfo() (MirrorInfo, error) + GetMirroringInfo(ctx context.Context) (MirrorInfo, error) // GetMirroringInfo returns the mirroring information of the resource - GetGlobalMirroringStatus() (GlobalStatus, error) + GetGlobalMirroringStatus(ctx context.Context) (GlobalStatus, error) // AddSnapshotScheduling adds a snapshot scheduling to the resource AddSnapshotScheduling(interval admin.Interval, startTime admin.StartTime) error }