From 3afb22c8ac60a565725c209bf7cfbbd9f741dd89 Mon Sep 17 00:00:00 2001 From: Madhu Rajanna Date: Tue, 19 Nov 2024 11:26:00 +0100 Subject: [PATCH] rbd: implement GetMirrorSource in manager implementing GetMirrorSource in manager to return volume or the volumegroup based on the replication source, if replication source is nil return the volume details for backward compatibility. Signed-off-by: Madhu Rajanna --- internal/csi-addons/rbd/replication.go | 295 +++++++++++++------------ internal/csi-addons/rbd/volumegroup.go | 12 +- internal/rbd/group/volume_group.go | 4 + internal/rbd/manager.go | 78 +++++++ internal/rbd/types/group.go | 3 + internal/rbd/types/manager.go | 6 + 6 files changed, 241 insertions(+), 157 deletions(-) diff --git a/internal/csi-addons/rbd/replication.go b/internal/csi-addons/rbd/replication.go index 925139f7f..bd9151d8e 100644 --- a/internal/csi-addons/rbd/replication.go +++ b/internal/csi-addons/rbd/replication.go @@ -257,9 +257,9 @@ func validateSchedulingInterval(interval string) error { func (rs *ReplicationServer) EnableVolumeReplication(ctx context.Context, req *replication.EnableVolumeReplicationRequest, ) (*replication.EnableVolumeReplicationResponse, error) { - volumeID := csicommon.GetIDFromReplication(req) - if volumeID == "" { - return nil, status.Error(codes.InvalidArgument, "empty volume ID in request") + reqID := csicommon.GetIDFromReplication(req) + if reqID == "" { + return nil, status.Error(codes.InvalidArgument, "empty ID in request") } cr, err := util.NewUserCredentials(req.GetSecrets()) if err != nil { @@ -272,24 +272,23 @@ func (rs *ReplicationServer) EnableVolumeReplication(ctx context.Context, return nil, err } - if acquired := rs.VolumeLocks.TryAcquire(volumeID); !acquired { - log.ErrorLog(ctx, util.VolumeOperationAlreadyExistsFmt, volumeID) + if acquired := rs.VolumeLocks.TryAcquire(reqID); !acquired { + log.ErrorLog(ctx, util.VolumeOperationAlreadyExistsFmt, reqID) - return nil, status.Errorf(codes.Aborted, util.VolumeOperationAlreadyExistsFmt, volumeID) + return nil, status.Errorf(codes.Aborted, util.VolumeOperationAlreadyExistsFmt, reqID) } - defer rs.VolumeLocks.Release(volumeID) + defer rs.VolumeLocks.Release(reqID) mgr := rbd.NewManager(rs.driverInstance, req.GetParameters(), req.GetSecrets()) defer mgr.Destroy(ctx) - rbdVol, err := mgr.GetVolumeByID(ctx, volumeID) + volumes, mirror, err := mgr.GetMirrorSource(ctx, reqID, req.GetReplicationSource()) if err != nil { + log.ErrorLog(ctx, "failed to get mirror source with id %q: %v", reqID, err) + return nil, getGRPCError(err) } - mirror, err := rbdVol.ToMirror() - if err != nil { - return nil, status.Error(codes.Internal, err.Error()) - } + defer destoryVolumes(ctx, volumes) // extract the mirroring mode mirroringMode, err := getMirroringMode(ctx, req.GetParameters()) @@ -309,17 +308,18 @@ func (rs *ReplicationServer) EnableVolumeReplication(ctx context.Context, return nil, status.Error(codes.Internal, err.Error()) } if info.GetState() != librbd.MirrorImageEnabled.String() { - err = rbdVol.HandleParentImageExistence(ctx, flattenMode) - if err != nil { - log.ErrorLog(ctx, err.Error()) + for _, rbdVol := range volumes { + err = rbdVol.HandleParentImageExistence(ctx, flattenMode) + if err != nil { + err = fmt.Errorf("failed to handle parent image for volume group %q: %w", mirror, err) + return nil, getGRPCError(err) + } + err = mirror.EnableMirroring(ctx, mirroringMode) + if err != nil { + log.ErrorLog(ctx, err.Error()) - return nil, getGRPCError(err) - } - err = mirror.EnableMirroring(ctx, mirroringMode) - if err != nil { - log.ErrorLog(ctx, err.Error()) - - return nil, status.Error(codes.Internal, err.Error()) + return nil, status.Error(codes.Internal, err.Error()) + } } } @@ -332,9 +332,9 @@ func (rs *ReplicationServer) EnableVolumeReplication(ctx context.Context, func (rs *ReplicationServer) DisableVolumeReplication(ctx context.Context, req *replication.DisableVolumeReplicationRequest, ) (*replication.DisableVolumeReplicationResponse, error) { - volumeID := csicommon.GetIDFromReplication(req) - if volumeID == "" { - return nil, status.Error(codes.InvalidArgument, "empty volume ID in request") + reqID := csicommon.GetIDFromReplication(req) + if reqID == "" { + return nil, status.Error(codes.InvalidArgument, "empty ID in request") } cr, err := util.NewUserCredentials(req.GetSecrets()) if err != nil { @@ -342,24 +342,23 @@ func (rs *ReplicationServer) DisableVolumeReplication(ctx context.Context, } defer cr.DeleteCredentials() - if acquired := rs.VolumeLocks.TryAcquire(volumeID); !acquired { - log.ErrorLog(ctx, util.VolumeOperationAlreadyExistsFmt, volumeID) + if acquired := rs.VolumeLocks.TryAcquire(reqID); !acquired { + log.ErrorLog(ctx, util.VolumeOperationAlreadyExistsFmt, reqID) - return nil, status.Errorf(codes.Aborted, util.VolumeOperationAlreadyExistsFmt, volumeID) + return nil, status.Errorf(codes.Aborted, util.VolumeOperationAlreadyExistsFmt, reqID) } - defer rs.VolumeLocks.Release(volumeID) + defer rs.VolumeLocks.Release(reqID) mgr := rbd.NewManager(rs.driverInstance, req.GetParameters(), req.GetSecrets()) defer mgr.Destroy(ctx) - rbdVol, err := mgr.GetVolumeByID(ctx, volumeID) + volumes, mirror, err := mgr.GetMirrorSource(ctx, reqID, req.GetReplicationSource()) if err != nil { + log.ErrorLog(ctx, "failed to get mirror source with id %q: %v", reqID, err) + return nil, getGRPCError(err) } - mirror, err := rbdVol.ToMirror() - if err != nil { - return nil, status.Error(codes.Internal, err.Error()) - } + defer destoryVolumes(ctx, volumes) // extract the force option force, err := getForceOption(ctx, req.GetParameters()) @@ -378,7 +377,7 @@ func (rs *ReplicationServer) DisableVolumeReplication(ctx context.Context, case librbd.MirrorImageDisabled.String(): // image mirroring is still disabling case librbd.MirrorImageDisabling.String(): - return nil, status.Errorf(codes.Aborted, "%s is in disabling state", volumeID) + return nil, status.Errorf(codes.Aborted, "%s is in disabling state", reqID) case librbd.MirrorImageEnabled.String(): err = corerbd.DisableVolumeReplication(mirror, ctx, info.IsPrimary(), force) if err != nil { @@ -400,9 +399,9 @@ func (rs *ReplicationServer) DisableVolumeReplication(ctx context.Context, func (rs *ReplicationServer) PromoteVolume(ctx context.Context, req *replication.PromoteVolumeRequest, ) (*replication.PromoteVolumeResponse, error) { - volumeID := csicommon.GetIDFromReplication(req) - if volumeID == "" { - return nil, status.Error(codes.InvalidArgument, "empty volume ID in request") + reqID := csicommon.GetIDFromReplication(req) + if reqID == "" { + return nil, status.Error(codes.InvalidArgument, "empty ID in request") } cr, err := util.NewUserCredentials(req.GetSecrets()) if err != nil { @@ -410,24 +409,23 @@ func (rs *ReplicationServer) PromoteVolume(ctx context.Context, } defer cr.DeleteCredentials() - if acquired := rs.VolumeLocks.TryAcquire(volumeID); !acquired { - log.ErrorLog(ctx, util.VolumeOperationAlreadyExistsFmt, volumeID) + if acquired := rs.VolumeLocks.TryAcquire(reqID); !acquired { + log.ErrorLog(ctx, util.VolumeOperationAlreadyExistsFmt, reqID) - return nil, status.Errorf(codes.Aborted, util.VolumeOperationAlreadyExistsFmt, volumeID) + return nil, status.Errorf(codes.Aborted, util.VolumeOperationAlreadyExistsFmt, reqID) } - defer rs.VolumeLocks.Release(volumeID) + defer rs.VolumeLocks.Release(reqID) mgr := rbd.NewManager(rs.driverInstance, req.GetParameters(), req.GetSecrets()) defer mgr.Destroy(ctx) - rbdVol, err := mgr.GetVolumeByID(ctx, volumeID) + volumes, mirror, err := mgr.GetMirrorSource(ctx, reqID, req.GetReplicationSource()) if err != nil { + log.ErrorLog(ctx, "failed to get mirror source with id %q: %v", reqID, err) + return nil, getGRPCError(err) } - mirror, err := rbdVol.ToMirror() - if err != nil { - return nil, status.Error(codes.Internal, err.Error()) - } + defer destoryVolumes(ctx, volumes) info, err := mirror.GetMirroringInfo(ctx) if err != nil { @@ -440,7 +438,7 @@ func (rs *ReplicationServer) PromoteVolume(ctx context.Context, return nil, status.Errorf( codes.InvalidArgument, "mirroring is not enabled on %s, image is in %s Mode", - volumeID, + reqID, info.GetState()) } @@ -477,10 +475,10 @@ func (rs *ReplicationServer) PromoteVolume(ctx context.Context, } log.DebugLog( ctx, - "Added scheduling at interval %s, start time %s for volume %s", + "Added scheduling at interval %s, start time %s for Id %s", interval, startTime, - rbdVol) + reqID) } return &replication.PromoteVolumeResponse{}, nil @@ -493,9 +491,9 @@ func (rs *ReplicationServer) PromoteVolume(ctx context.Context, func (rs *ReplicationServer) DemoteVolume(ctx context.Context, req *replication.DemoteVolumeRequest, ) (*replication.DemoteVolumeResponse, error) { - volumeID := csicommon.GetIDFromReplication(req) - if volumeID == "" { - return nil, status.Error(codes.InvalidArgument, "empty volume ID in request") + reqID := csicommon.GetIDFromReplication(req) + if reqID == "" { + return nil, status.Error(codes.InvalidArgument, "empty ID in request") } cr, err := util.NewUserCredentials(req.GetSecrets()) if err != nil { @@ -503,31 +501,23 @@ func (rs *ReplicationServer) DemoteVolume(ctx context.Context, } defer cr.DeleteCredentials() - if acquired := rs.VolumeLocks.TryAcquire(volumeID); !acquired { - log.ErrorLog(ctx, util.VolumeOperationAlreadyExistsFmt, volumeID) + if acquired := rs.VolumeLocks.TryAcquire(reqID); !acquired { + log.ErrorLog(ctx, util.VolumeOperationAlreadyExistsFmt, reqID) - return nil, status.Errorf(codes.Aborted, util.VolumeOperationAlreadyExistsFmt, volumeID) + return nil, status.Errorf(codes.Aborted, util.VolumeOperationAlreadyExistsFmt, reqID) } - defer rs.VolumeLocks.Release(volumeID) + defer rs.VolumeLocks.Release(reqID) mgr := rbd.NewManager(rs.driverInstance, req.GetParameters(), req.GetSecrets()) defer mgr.Destroy(ctx) - rbdVol, err := mgr.GetVolumeByID(ctx, volumeID) + volumes, mirror, err := mgr.GetMirrorSource(ctx, reqID, req.GetReplicationSource()) if err != nil { + log.ErrorLog(ctx, "failed to get mirror source with id %q: %v", reqID, err) + return nil, getGRPCError(err) } - mirror, err := rbdVol.ToMirror() - if err != nil { - return nil, status.Error(codes.Internal, err.Error()) - } - - creationTime, err := rbdVol.GetCreationTime(ctx) - if err != nil { - log.ErrorLog(ctx, err.Error()) - - return nil, status.Error(codes.Internal, err.Error()) - } + defer destoryVolumes(ctx, volumes) info, err := mirror.GetMirroringInfo(ctx) if err != nil { @@ -540,24 +530,33 @@ func (rs *ReplicationServer) DemoteVolume(ctx context.Context, return nil, status.Errorf( codes.InvalidArgument, "mirroring is not enabled on %s, image is in %s Mode", - volumeID, + reqID, info.GetState()) } // demote image to secondary if info.IsPrimary() { - // store the image creation time for resync - _, err = rbdVol.GetMetadata(imageCreationTimeKey) - if err != nil && errors.Is(err, librbd.ErrNotFound) { - log.DebugLog(ctx, "setting image creation time %s for %s", creationTime, rbdVol) - err = rbdVol.SetMetadata(imageCreationTimeKey, timestampToString(creationTime)) - } - if err != nil { - log.ErrorLog(ctx, err.Error()) + for _, vol := range volumes { + // store the image creation time for resync + creationTime, cErr := vol.GetCreationTime(ctx) + if cErr != nil { + log.ErrorLog(ctx, cErr.Error()) - return nil, status.Error(codes.Internal, err.Error()) - } + return nil, status.Error(codes.Internal, cErr.Error()) + } + // store the image creation time for resync + _, err = vol.GetMetadata(imageCreationTimeKey) + if err != nil && errors.Is(err, librbd.ErrNotFound) { + log.DebugLog(ctx, "setting image creation time %s for %s", creationTime, vol) + err = vol.SetMetadata(imageCreationTimeKey, timestampToString(creationTime)) + } + if err != nil { + log.ErrorLog(ctx, err.Error()) + + return nil, status.Error(codes.Internal, err.Error()) + } + } err = mirror.Demote(ctx) if err != nil { log.ErrorLog(ctx, err.Error()) @@ -604,9 +603,9 @@ func checkRemoteSiteStatus(ctx context.Context, mirrorStatus []types.SiteStatus) func (rs *ReplicationServer) ResyncVolume(ctx context.Context, req *replication.ResyncVolumeRequest, ) (*replication.ResyncVolumeResponse, error) { - volumeID := csicommon.GetIDFromReplication(req) - if volumeID == "" { - return nil, status.Error(codes.InvalidArgument, "empty volume ID in request") + reqID := csicommon.GetIDFromReplication(req) + if reqID == "" { + return nil, status.Error(codes.InvalidArgument, "empty ID in request") } cr, err := util.NewUserCredentials(req.GetSecrets()) if err != nil { @@ -614,23 +613,23 @@ func (rs *ReplicationServer) ResyncVolume(ctx context.Context, } defer cr.DeleteCredentials() - if acquired := rs.VolumeLocks.TryAcquire(volumeID); !acquired { - log.ErrorLog(ctx, util.VolumeOperationAlreadyExistsFmt, volumeID) + if acquired := rs.VolumeLocks.TryAcquire(reqID); !acquired { + log.ErrorLog(ctx, util.VolumeOperationAlreadyExistsFmt, reqID) - return nil, status.Errorf(codes.Aborted, util.VolumeOperationAlreadyExistsFmt, volumeID) + return nil, status.Errorf(codes.Aborted, util.VolumeOperationAlreadyExistsFmt, reqID) } defer rs.VolumeLocks.Release(volumeID) + mgr := rbd.NewManager(rs.driverInstance, req.GetParameters(), req.GetSecrets()) defer mgr.Destroy(ctx) - rbdVol, err := mgr.GetVolumeByID(ctx, volumeID) + volumes, mirror, err := mgr.GetMirrorSource(ctx, reqID, req.GetReplicationSource()) if err != nil { + log.ErrorLog(ctx, "failed to get mirror source with id %q: %v", reqID, err) + return nil, getGRPCError(err) } - mirror, err := rbdVol.ToMirror() - if err != nil { - return nil, status.Error(codes.Internal, err.Error()) - } + defer destoryVolumes(ctx, volumes) info, err := mirror.GetMirroringInfo(ctx) if err != nil { @@ -695,35 +694,40 @@ func (rs *ReplicationServer) ResyncVolume(ctx context.Context, ready = checkRemoteSiteStatus(ctx, sts.GetAllSitesStatus()) } - creationTime, err := rbdVol.GetCreationTime(ctx) - if err != nil { - return nil, status.Errorf(codes.Internal, "failed to get image info for %s: %s", rbdVol, err.Error()) - } - - // image creation time is stored in the image metadata. it looks like - // `"seconds:1692879841 nanos:631526669"` - // If the image gets resynced the local image creation time will be - // lost, if the keys is not present in the image metadata then we can - // assume that the image is already resynced. - savedImageTime, err := rbdVol.GetMetadata(imageCreationTimeKey) - if err != nil && !errors.Is(err, librbd.ErrNotFound) { - return nil, status.Errorf(codes.Internal, - "failed to get %s key from image metadata for %s: %s", - imageCreationTimeKey, - rbdVol, - err.Error()) - } - - if savedImageTime != "" { - st, sErr := timestampFromString(savedImageTime) - if sErr != nil { - return nil, status.Errorf(codes.Internal, "failed to parse image creation time: %s", sErr.Error()) + for _, vol := range volumes { + creationTime, tErr := vol.GetCreationTime(ctx) + if tErr != nil { + return nil, status.Errorf(codes.Internal, "failed to get image info for %s: %s", vol, tErr.Error()) } - log.DebugLog(ctx, "image %s, savedImageTime=%v, currentImageTime=%v", rbdVol, st, creationTime) - if req.GetForce() && st.Equal(*creationTime) { - err = mirror.Resync(ctx) - if err != nil { - return nil, getGRPCError(err) + + // image creation time is stored in the image metadata. it looks like + // `"seconds:1692879841 nanos:631526669"` + // If the image gets resynced the local image creation time will be + // lost, if the keys is not present in the image metadata then we can + // assume that the image is already resynced. + var savedImageTime string + savedImageTime, err = vol.GetMetadata(imageCreationTimeKey) + if err != nil && !errors.Is(err, librbd.ErrNotFound) { + return nil, status.Errorf(codes.Internal, + "failed to get %s key from image metadata for %s: %s", + imageCreationTimeKey, + vol, + err.Error()) + } + + if savedImageTime != "" { + st, sErr := timestampFromString(savedImageTime) + if sErr != nil { + return nil, status.Errorf(codes.Internal, "failed to parse image creation time: %s", sErr.Error()) + } + log.DebugLog(ctx, "image %s, savedImageTime=%v, currentImageTime=%v", vol, st, creationTime) + if req.GetForce() && st.Equal(*creationTime) { + err = mirror.Resync(ctx) + if err != nil { + return nil, getGRPCError(err) + } + // Break the loop as we need to issue resync only once for the image or for the group. + break } } } @@ -735,9 +739,12 @@ func (rs *ReplicationServer) ResyncVolume(ctx context.Context, } } - err = rbdVol.RepairResyncedImageID(ctx, ready) - if err != nil { - return nil, status.Errorf(codes.Internal, "failed to resync Image ID: %s", err.Error()) + // update imageID for all the volumes + for _, vol := range volumes { + err = vol.RepairResyncedImageID(ctx, ready) + if err != nil { + return nil, status.Errorf(codes.Internal, "failed to resync Image ID: %s", err.Error()) + } } resp := &replication.ResyncVolumeResponse{ @@ -812,9 +819,9 @@ func getGRPCError(err error) error { func (rs *ReplicationServer) GetVolumeReplicationInfo(ctx context.Context, req *replication.GetVolumeReplicationInfoRequest, ) (*replication.GetVolumeReplicationInfoResponse, error) { - volumeID := csicommon.GetIDFromReplication(req) - if volumeID == "" { - return nil, status.Error(codes.InvalidArgument, "empty volume ID in request") + reqID := csicommon.GetIDFromReplication(req) + if reqID == "" { + return nil, status.Error(codes.InvalidArgument, "empty ID in request") } cr, err := util.NewUserCredentials(req.GetSecrets()) if err != nil { @@ -824,36 +831,23 @@ func (rs *ReplicationServer) GetVolumeReplicationInfo(ctx context.Context, } defer cr.DeleteCredentials() - if acquired := rs.VolumeLocks.TryAcquire(volumeID); !acquired { - log.ErrorLog(ctx, util.VolumeOperationAlreadyExistsFmt, volumeID) + if acquired := rs.VolumeLocks.TryAcquire(reqID); !acquired { + log.ErrorLog(ctx, util.VolumeOperationAlreadyExistsFmt, reqID) - return nil, status.Errorf(codes.Aborted, util.VolumeOperationAlreadyExistsFmt, volumeID) + return nil, status.Errorf(codes.Aborted, util.VolumeOperationAlreadyExistsFmt, reqID) } - defer rs.VolumeLocks.Release(volumeID) + defer rs.VolumeLocks.Release(reqID) + mgr := rbd.NewManager(rs.driverInstance, nil, req.GetSecrets()) defer mgr.Destroy(ctx) - rbdVol, err := mgr.GetVolumeByID(ctx, volumeID) + volumes, mirror, err := mgr.GetMirrorSource(ctx, reqID, req.GetReplicationSource()) if err != nil { - log.ErrorLog(ctx, "failed to get volume with id %q: %v", volumeID, err) + log.ErrorLog(ctx, "failed to get mirror source with id %q: %v", reqID, err) - switch { - case errors.Is(err, corerbd.ErrImageNotFound): - err = status.Error(codes.NotFound, err.Error()) - case errors.Is(err, util.ErrPoolNotFound): - err = status.Error(codes.NotFound, err.Error()) - default: - err = status.Error(codes.Internal, err.Error()) - } - - return nil, err - } - mirror, err := rbdVol.ToMirror() - if err != nil { - log.ErrorLog(ctx, "failed to convert volume %q to mirror type: %v", rbdVol, err) - - return nil, status.Error(codes.Internal, err.Error()) + return nil, getGRPCError(err) } + defer destoryVolumes(ctx, volumes) info, err := mirror.GetMirroringInfo(ctx) if err != nil { @@ -990,3 +984,10 @@ func checkVolumeResyncStatus(ctx context.Context, localStatus types.SiteStatus) return nil } + +// destoryVolumes destroys the volume connections. +func destoryVolumes(ctx context.Context, volumes []types.Volume) { + for _, vol := range volumes { + vol.Destroy(ctx) + } +} diff --git a/internal/csi-addons/rbd/volumegroup.go b/internal/csi-addons/rbd/volumegroup.go index 4727e548b..dd349ac8a 100644 --- a/internal/csi-addons/rbd/volumegroup.go +++ b/internal/csi-addons/rbd/volumegroup.go @@ -90,11 +90,7 @@ func (vs *VolumeGroupServer) CreateVolumeGroup( // resolve all volumes volumes := make([]types.Volume, len(req.GetVolumeIds())) - defer func() { - for _, vol := range volumes { - vol.Destroy(ctx) - } - }() + defer destoryVolumes(ctx, volumes) for i, id := range req.GetVolumeIds() { vol, err := mgr.GetVolumeByID(ctx, id) if err != nil { @@ -356,11 +352,7 @@ func (vs *VolumeGroupServer) ModifyVolumeGroupMembership( // resolve all volumes volumes := make([]types.Volume, len(toAdd)) - defer func() { - for _, vol := range volumes { - vol.Destroy(ctx) - } - }() + defer destoryVolumes(ctx, volumes) for i, id := range toAdd { var vol types.Volume vol, err = mgr.GetVolumeByID(ctx, id) diff --git a/internal/rbd/group/volume_group.go b/internal/rbd/group/volume_group.go index e8414f3dd..fa9159bdd 100644 --- a/internal/rbd/group/volume_group.go +++ b/internal/rbd/group/volume_group.go @@ -415,3 +415,7 @@ func (vg *volumeGroup) CreateSnapshots( return snapshots, nil } + +func (vg *volumeGroup) ToMirror() (types.Mirror, error) { + return volumeGroupMirror{vg}, nil +} diff --git a/internal/rbd/manager.go b/internal/rbd/manager.go index 61fcfdcaa..53385aee8 100644 --- a/internal/rbd/manager.go +++ b/internal/rbd/manager.go @@ -21,6 +21,8 @@ import ( "errors" "fmt" + "github.com/csi-addons/spec/lib/go/replication" + "github.com/ceph/ceph-csi/internal/journal" rbd_group "github.com/ceph/ceph-csi/internal/rbd/group" "github.com/ceph/ceph-csi/internal/rbd/types" @@ -504,6 +506,7 @@ func (mgr *rbdManager) CreateVolumeGroupSnapshot( return vgs, nil } +<<<<<<< HEAD // RegenerateVolumeGroupJournal regenerate the omap data for the volume group. // This performs the following operations: // - extracts clusterID and Mons from the cluster mapping @@ -633,4 +636,79 @@ func (mgr *rbdManager) RegenerateVolumeGroupJournal( groupHandle, vgName, requestName) return groupHandle, nil +======= +func (mgr *rbdManager) GetMirrorSource(ctx context.Context, reqID string, + rep *replication.ReplicationSource, +) ([]types.Volume, types.Mirror, error) { + switch { + // Backward compatibility: if rep is nil, we assume that the sidecar is still old and + // setting only volumeId not the replication source. + case rep == nil || rep.GetVolume() != nil: + rbdVol, err := mgr.GetVolumeByID(ctx, reqID) + if err != nil { + return nil, nil, fmt.Errorf("failed to get volume by id %q: %w", reqID, err) + } + defer func() { + if err != nil { + rbdVol.Destroy(ctx) + } + }() + var mir types.Mirror + mir, err = rbdVol.ToMirror() + if err != nil { + return nil, nil, fmt.Errorf("failed to convert volume %s to mirror: %w", rbdVol, err) + } + + return []types.Volume{rbdVol}, mir, nil + case rep.GetVolumegroup() != nil: + rbdGroup, err := mgr.GetVolumeGroupByID(ctx, reqID) + if err != nil { + return nil, nil, fmt.Errorf("failed to get volume group by id %q: %w", reqID, err) + } + defer func() { + if err != nil { + rbdGroup.Destroy(ctx) + } + }() + var mir types.Mirror + mir, err = rbdGroup.ToMirror() + if err != nil { + return nil, nil, fmt.Errorf("failed to convert volume group %s to mirror: %w", rbdGroup, err) + } + var vols []types.Volume + vols, err = rbdGroup.ListVolumes(ctx) + if err != nil { + return nil, nil, fmt.Errorf("failed to list volumes in volume group %q: %w", rbdGroup, err) + } + // Get all the volume with connection and return it + volumes := make([]types.Volume, len(vols)) + // Destroy connections if there is any error + defer func() { + if err != nil { + for _, vol := range vols { + vol.Destroy(ctx) + } + } + }() + + for i, vol := range vols { + var id string + id, err = vol.GetID(ctx) + if err != nil { + return nil, nil, fmt.Errorf("failed to get id for volume %q in group %q: %w", vol, rbdGroup, err) + } + var v types.Volume + v, err = mgr.GetVolumeByID(ctx, id) + if err != nil { + return nil, nil, fmt.Errorf("failed to get volume by id %q in group %q: %w", id, rbdGroup, err) + } + volumes[i] = v + } + + return volumes, mir, nil + + default: + return nil, nil, errors.New("replication source is not set") + } +>>>>>>> 71b235ff1 (rbd: implement GetMirrorSource in manager) } diff --git a/internal/rbd/types/group.go b/internal/rbd/types/group.go index 08183f9fb..f9b9b8f37 100644 --- a/internal/rbd/types/group.go +++ b/internal/rbd/types/group.go @@ -73,4 +73,7 @@ type VolumeGroup interface { // The Snapshots are crash consistent, and created as a consistency // group. CreateSnapshots(ctx context.Context, cr *util.Credentials, name string) ([]Snapshot, error) + + // ToMirror converts the VolumeGroup to a Mirror. + ToMirror() (Mirror, error) } diff --git a/internal/rbd/types/manager.go b/internal/rbd/types/manager.go index 458bc93dc..ca3bf63a4 100644 --- a/internal/rbd/types/manager.go +++ b/internal/rbd/types/manager.go @@ -18,6 +18,8 @@ package types import ( "context" + + "github.com/csi-addons/spec/lib/go/replication" ) // VolumeResolver can be used to construct a Volume from a CSI VolumeId. @@ -71,4 +73,8 @@ type Manager interface { // RegenerateVolumeGroupJournal regenerate the omap data for the volume group. // returns the volume group handle RegenerateVolumeGroupJournal(ctx context.Context, groupID, requestName string, volumeIds []string) (string, error) + + // GetMirrorSource returns the source of the mirror for the given volume or group. + GetMirrorSource(ctx context.Context, volumeID string, + rep *replication.ReplicationSource) ([]Volume, Mirror, error) }