diff --git a/internal/csi-addons/rbd/replication.go b/internal/csi-addons/rbd/replication.go index 8dc7d6ede..ae9db8094 100644 --- a/internal/csi-addons/rbd/replication.go +++ b/internal/csi-addons/rbd/replication.go @@ -256,9 +256,9 @@ func validateSchedulingInterval(interval string) error { func (rs *ReplicationServer) EnableVolumeReplication(ctx context.Context, req *replication.EnableVolumeReplicationRequest, ) (*replication.EnableVolumeReplicationResponse, error) { - volumeID := csicommon.GetIDFromReplication(req) - if volumeID == "" { - return nil, status.Error(codes.InvalidArgument, "empty volume ID in request") + reqID := csicommon.GetIDFromReplication(req) + if reqID == "" { + return nil, status.Error(codes.InvalidArgument, "empty ID in request") } cr, err := util.NewUserCredentials(req.GetSecrets()) if err != nil { @@ -271,25 +271,22 @@ func (rs *ReplicationServer) EnableVolumeReplication(ctx context.Context, return nil, err } - if acquired := rs.VolumeLocks.TryAcquire(volumeID); !acquired { - log.ErrorLog(ctx, util.VolumeOperationAlreadyExistsFmt, volumeID) + if acquired := rs.VolumeLocks.TryAcquire(reqID); !acquired { + log.ErrorLog(ctx, util.VolumeOperationAlreadyExistsFmt, reqID) - return nil, status.Errorf(codes.Aborted, util.VolumeOperationAlreadyExistsFmt, volumeID) + return nil, status.Errorf(codes.Aborted, util.VolumeOperationAlreadyExistsFmt, reqID) } - defer rs.VolumeLocks.Release(volumeID) + defer rs.VolumeLocks.Release(reqID) mgr := rbd.NewManager(rs.csiID, req.GetParameters(), req.GetSecrets()) defer mgr.Destroy(ctx) - rbdVol, err := mgr.GetVolumeByID(ctx, volumeID) + volumes, mirror, err := mgr.GetMirrorSource(ctx, reqID, req.GetReplicationSource()) if err != nil { return nil, getGRPCError(err) } - mirror, err := rbdVol.ToMirror() - if err != nil { - return nil, status.Error(codes.Internal, err.Error()) - } - + defer mirror.Destroy(ctx) + defer destoryVolumes(ctx, volumes) // extract the mirroring mode mirroringMode, err := getMirroringMode(ctx, req.GetParameters()) if err != nil { @@ -308,11 +305,13 @@ func (rs *ReplicationServer) EnableVolumeReplication(ctx context.Context, return nil, status.Error(codes.Internal, err.Error()) } if info.GetState() != librbd.MirrorImageEnabled.String() { - err = rbdVol.HandleParentImageExistence(ctx, flattenMode) - if err != nil { - log.ErrorLog(ctx, err.Error()) + for _, rbdVol := range volumes { + err = rbdVol.HandleParentImageExistence(ctx, flattenMode) + if err != nil { + err = fmt.Errorf("failed to handle parent image for volume group %q: %w", mirror, err) - return nil, getGRPCError(err) + return nil, getGRPCError(err) + } } err = mirror.EnableMirroring(ctx, mirroringMode) if err != nil { @@ -331,9 +330,9 @@ func (rs *ReplicationServer) EnableVolumeReplication(ctx context.Context, func (rs *ReplicationServer) DisableVolumeReplication(ctx context.Context, req *replication.DisableVolumeReplicationRequest, ) (*replication.DisableVolumeReplicationResponse, error) { - volumeID := csicommon.GetIDFromReplication(req) - if volumeID == "" { - return nil, status.Error(codes.InvalidArgument, "empty volume ID in request") + reqID := csicommon.GetIDFromReplication(req) + if reqID == "" { + return nil, status.Error(codes.InvalidArgument, "empty ID in request") } cr, err := util.NewUserCredentials(req.GetSecrets()) if err != nil { @@ -341,24 +340,22 @@ func (rs *ReplicationServer) DisableVolumeReplication(ctx context.Context, } defer cr.DeleteCredentials() - if acquired := rs.VolumeLocks.TryAcquire(volumeID); !acquired { - log.ErrorLog(ctx, util.VolumeOperationAlreadyExistsFmt, volumeID) + if acquired := rs.VolumeLocks.TryAcquire(reqID); !acquired { + log.ErrorLog(ctx, util.VolumeOperationAlreadyExistsFmt, reqID) - return nil, status.Errorf(codes.Aborted, util.VolumeOperationAlreadyExistsFmt, volumeID) + return nil, status.Errorf(codes.Aborted, util.VolumeOperationAlreadyExistsFmt, reqID) } - defer rs.VolumeLocks.Release(volumeID) + defer rs.VolumeLocks.Release(reqID) mgr := rbd.NewManager(rs.csiID, req.GetParameters(), req.GetSecrets()) defer mgr.Destroy(ctx) - rbdVol, err := mgr.GetVolumeByID(ctx, volumeID) + volumes, mirror, err := mgr.GetMirrorSource(ctx, reqID, req.GetReplicationSource()) if err != nil { return nil, getGRPCError(err) } - mirror, err := rbdVol.ToMirror() - if err != nil { - return nil, status.Error(codes.Internal, err.Error()) - } + defer mirror.Destroy(ctx) + defer destoryVolumes(ctx, volumes) // extract the force option force, err := getForceOption(ctx, req.GetParameters()) @@ -377,7 +374,7 @@ func (rs *ReplicationServer) DisableVolumeReplication(ctx context.Context, case librbd.MirrorImageDisabled.String(): // image mirroring is still disabling case librbd.MirrorImageDisabling.String(): - return nil, status.Errorf(codes.Aborted, "%s is in disabling state", volumeID) + return nil, status.Errorf(codes.Aborted, "%s is in disabling state", reqID) case librbd.MirrorImageEnabled.String(): err = corerbd.DisableVolumeReplication(mirror, ctx, info.IsPrimary(), force) if err != nil { @@ -399,9 +396,9 @@ func (rs *ReplicationServer) DisableVolumeReplication(ctx context.Context, func (rs *ReplicationServer) PromoteVolume(ctx context.Context, req *replication.PromoteVolumeRequest, ) (*replication.PromoteVolumeResponse, error) { - volumeID := csicommon.GetIDFromReplication(req) - if volumeID == "" { - return nil, status.Error(codes.InvalidArgument, "empty volume ID in request") + reqID := csicommon.GetIDFromReplication(req) + if reqID == "" { + return nil, status.Error(codes.InvalidArgument, "empty ID in request") } cr, err := util.NewUserCredentials(req.GetSecrets()) if err != nil { @@ -409,24 +406,22 @@ func (rs *ReplicationServer) PromoteVolume(ctx context.Context, } defer cr.DeleteCredentials() - if acquired := rs.VolumeLocks.TryAcquire(volumeID); !acquired { - log.ErrorLog(ctx, util.VolumeOperationAlreadyExistsFmt, volumeID) + if acquired := rs.VolumeLocks.TryAcquire(reqID); !acquired { + log.ErrorLog(ctx, util.VolumeOperationAlreadyExistsFmt, reqID) - return nil, status.Errorf(codes.Aborted, util.VolumeOperationAlreadyExistsFmt, volumeID) + return nil, status.Errorf(codes.Aborted, util.VolumeOperationAlreadyExistsFmt, reqID) } - defer rs.VolumeLocks.Release(volumeID) + defer rs.VolumeLocks.Release(reqID) mgr := rbd.NewManager(rs.csiID, req.GetParameters(), req.GetSecrets()) defer mgr.Destroy(ctx) - rbdVol, err := mgr.GetVolumeByID(ctx, volumeID) + volumes, mirror, err := mgr.GetMirrorSource(ctx, reqID, req.GetReplicationSource()) if err != nil { return nil, getGRPCError(err) } - mirror, err := rbdVol.ToMirror() - if err != nil { - return nil, status.Error(codes.Internal, err.Error()) - } + defer mirror.Destroy(ctx) + defer destoryVolumes(ctx, volumes) info, err := mirror.GetMirroringInfo(ctx) if err != nil { @@ -439,7 +434,7 @@ func (rs *ReplicationServer) PromoteVolume(ctx context.Context, return nil, status.Errorf( codes.InvalidArgument, "mirroring is not enabled on %s, image is in %s Mode", - volumeID, + reqID, info.GetState()) } @@ -476,10 +471,10 @@ func (rs *ReplicationServer) PromoteVolume(ctx context.Context, } log.DebugLog( ctx, - "Added scheduling at interval %s, start time %s for volume %s", + "Added scheduling at interval %s, start time %s for Id %s", interval, startTime, - rbdVol) + reqID) } return &replication.PromoteVolumeResponse{}, nil @@ -492,9 +487,9 @@ func (rs *ReplicationServer) PromoteVolume(ctx context.Context, func (rs *ReplicationServer) DemoteVolume(ctx context.Context, req *replication.DemoteVolumeRequest, ) (*replication.DemoteVolumeResponse, error) { - volumeID := csicommon.GetIDFromReplication(req) - if volumeID == "" { - return nil, status.Error(codes.InvalidArgument, "empty volume ID in request") + reqID := csicommon.GetIDFromReplication(req) + if reqID == "" { + return nil, status.Error(codes.InvalidArgument, "empty ID in request") } cr, err := util.NewUserCredentials(req.GetSecrets()) if err != nil { @@ -502,31 +497,22 @@ func (rs *ReplicationServer) DemoteVolume(ctx context.Context, } defer cr.DeleteCredentials() - if acquired := rs.VolumeLocks.TryAcquire(volumeID); !acquired { - log.ErrorLog(ctx, util.VolumeOperationAlreadyExistsFmt, volumeID) + if acquired := rs.VolumeLocks.TryAcquire(reqID); !acquired { + log.ErrorLog(ctx, util.VolumeOperationAlreadyExistsFmt, reqID) - return nil, status.Errorf(codes.Aborted, util.VolumeOperationAlreadyExistsFmt, volumeID) + return nil, status.Errorf(codes.Aborted, util.VolumeOperationAlreadyExistsFmt, reqID) } - defer rs.VolumeLocks.Release(volumeID) + defer rs.VolumeLocks.Release(reqID) mgr := rbd.NewManager(rs.csiID, req.GetParameters(), req.GetSecrets()) defer mgr.Destroy(ctx) - rbdVol, err := mgr.GetVolumeByID(ctx, volumeID) + volumes, mirror, err := mgr.GetMirrorSource(ctx, reqID, req.GetReplicationSource()) if err != nil { return nil, getGRPCError(err) } - mirror, err := rbdVol.ToMirror() - if err != nil { - return nil, status.Error(codes.Internal, err.Error()) - } - - creationTime, err := rbdVol.GetCreationTime() - if err != nil { - log.ErrorLog(ctx, err.Error()) - - return nil, status.Error(codes.Internal, err.Error()) - } + defer mirror.Destroy(ctx) + defer destoryVolumes(ctx, volumes) info, err := mirror.GetMirroringInfo(ctx) if err != nil { @@ -539,24 +525,33 @@ func (rs *ReplicationServer) DemoteVolume(ctx context.Context, return nil, status.Errorf( codes.InvalidArgument, "mirroring is not enabled on %s, image is in %s Mode", - volumeID, + reqID, info.GetState()) } // demote image to secondary if info.IsPrimary() { - // store the image creation time for resync - _, err = rbdVol.GetMetadata(imageCreationTimeKey) - if err != nil && errors.Is(err, librbd.ErrNotFound) { - log.DebugLog(ctx, "setting image creation time %s for %s", creationTime, rbdVol) - err = rbdVol.SetMetadata(imageCreationTimeKey, timestampToString(creationTime)) - } - if err != nil { - log.ErrorLog(ctx, err.Error()) + for _, vol := range volumes { + // store the image creation time for resync + creationTime, cErr := vol.GetCreationTime() + if cErr != nil { + log.ErrorLog(ctx, cErr.Error()) - return nil, status.Error(codes.Internal, err.Error()) - } + return nil, status.Error(codes.Internal, cErr.Error()) + } + // store the image creation time for resync + _, err = vol.GetMetadata(imageCreationTimeKey) + if err != nil && errors.Is(err, librbd.ErrNotFound) { + log.DebugLog(ctx, "setting image creation time %s for %s", creationTime, vol) + err = vol.SetMetadata(imageCreationTimeKey, timestampToString(creationTime)) + } + if err != nil { + log.ErrorLog(ctx, err.Error()) + + return nil, status.Error(codes.Internal, err.Error()) + } + } err = mirror.Demote(ctx) if err != nil { log.ErrorLog(ctx, err.Error()) @@ -603,9 +598,9 @@ func checkRemoteSiteStatus(ctx context.Context, mirrorStatus []types.SiteStatus) func (rs *ReplicationServer) ResyncVolume(ctx context.Context, req *replication.ResyncVolumeRequest, ) (*replication.ResyncVolumeResponse, error) { - volumeID := csicommon.GetIDFromReplication(req) - if volumeID == "" { - return nil, status.Error(codes.InvalidArgument, "empty volume ID in request") + reqID := csicommon.GetIDFromReplication(req) + if reqID == "" { + return nil, status.Error(codes.InvalidArgument, "empty ID in request") } cr, err := util.NewUserCredentials(req.GetSecrets()) if err != nil { @@ -613,23 +608,21 @@ func (rs *ReplicationServer) ResyncVolume(ctx context.Context, } defer cr.DeleteCredentials() - if acquired := rs.VolumeLocks.TryAcquire(volumeID); !acquired { - log.ErrorLog(ctx, util.VolumeOperationAlreadyExistsFmt, volumeID) + if acquired := rs.VolumeLocks.TryAcquire(reqID); !acquired { + log.ErrorLog(ctx, util.VolumeOperationAlreadyExistsFmt, reqID) - return nil, status.Errorf(codes.Aborted, util.VolumeOperationAlreadyExistsFmt, volumeID) + return nil, status.Errorf(codes.Aborted, util.VolumeOperationAlreadyExistsFmt, reqID) } - defer rs.VolumeLocks.Release(volumeID) + defer rs.VolumeLocks.Release(reqID) mgr := rbd.NewManager(rs.csiID, req.GetParameters(), req.GetSecrets()) defer mgr.Destroy(ctx) - rbdVol, err := mgr.GetVolumeByID(ctx, volumeID) + volumes, mirror, err := mgr.GetMirrorSource(ctx, reqID, req.GetReplicationSource()) if err != nil { return nil, getGRPCError(err) } - mirror, err := rbdVol.ToMirror() - if err != nil { - return nil, status.Error(codes.Internal, err.Error()) - } + defer mirror.Destroy(ctx) + defer destoryVolumes(ctx, volumes) info, err := mirror.GetMirroringInfo(ctx) if err != nil { @@ -694,39 +687,43 @@ func (rs *ReplicationServer) ResyncVolume(ctx context.Context, ready = checkRemoteSiteStatus(ctx, sts.GetAllSitesStatus()) } - creationTime, err := rbdVol.GetCreationTime() - if err != nil { - return nil, status.Errorf(codes.Internal, "failed to get image info for %s: %s", rbdVol, err.Error()) - } - - // image creation time is stored in the image metadata. it looks like - // `"seconds:1692879841 nanos:631526669"` - // If the image gets resynced the local image creation time will be - // lost, if the keys is not present in the image metadata then we can - // assume that the image is already resynced. - savedImageTime, err := rbdVol.GetMetadata(imageCreationTimeKey) - if err != nil && !errors.Is(err, librbd.ErrNotFound) { - return nil, status.Errorf(codes.Internal, - "failed to get %s key from image metadata for %s: %s", - imageCreationTimeKey, - rbdVol, - err.Error()) - } - - if savedImageTime != "" { - st, sErr := timestampFromString(savedImageTime) - if sErr != nil { - return nil, status.Errorf(codes.Internal, "failed to parse image creation time: %s", sErr.Error()) + for _, vol := range volumes { + creationTime, tErr := vol.GetCreationTime() + if tErr != nil { + return nil, status.Errorf(codes.Internal, "failed to get image info for %s: %s", vol, tErr.Error()) } - log.DebugLog(ctx, "image %s, savedImageTime=%v, currentImageTime=%v", rbdVol, st, creationTime.AsTime()) - if req.GetForce() && st.Equal(creationTime.AsTime()) { - err = mirror.Resync(ctx) - if err != nil { - return nil, getGRPCError(err) + + // image creation time is stored in the image metadata. it looks like + // `"seconds:1692879841 nanos:631526669"` + // If the image gets resynced the local image creation time will be + // lost, if the keys is not present in the image metadata then we can + // assume that the image is already resynced. + var savedImageTime string + savedImageTime, err = vol.GetMetadata(imageCreationTimeKey) + if err != nil && !errors.Is(err, librbd.ErrNotFound) { + return nil, status.Errorf(codes.Internal, + "failed to get %s key from image metadata for %s: %s", + imageCreationTimeKey, + vol, + err.Error()) + } + + if savedImageTime != "" { + st, sErr := timestampFromString(savedImageTime) + if sErr != nil { + return nil, status.Errorf(codes.Internal, "failed to parse image creation time: %s", sErr.Error()) + } + log.DebugLog(ctx, "image %s, savedImageTime=%v, currentImageTime=%v", vol, st, creationTime.AsTime()) + if req.GetForce() && st.Equal(creationTime.AsTime()) { + err = mirror.Resync(ctx) + if err != nil { + return nil, getGRPCError(err) + } + // Break the loop as we need to issue resync only once for the image or for the group. + break } } } - if !ready { err = checkVolumeResyncStatus(ctx, localStatus) if err != nil { @@ -734,9 +731,12 @@ func (rs *ReplicationServer) ResyncVolume(ctx context.Context, } } - err = rbdVol.RepairResyncedImageID(ctx, ready) - if err != nil { - return nil, status.Errorf(codes.Internal, "failed to resync Image ID: %s", err.Error()) + // update imageID for all the volumes + for _, vol := range volumes { + err = vol.RepairResyncedImageID(ctx, ready) + if err != nil { + return nil, status.Errorf(codes.Internal, "failed to resync Image ID: %s", err.Error()) + } } resp := &replication.ResyncVolumeResponse{ @@ -811,9 +811,9 @@ func getGRPCError(err error) error { func (rs *ReplicationServer) GetVolumeReplicationInfo(ctx context.Context, req *replication.GetVolumeReplicationInfoRequest, ) (*replication.GetVolumeReplicationInfoResponse, error) { - volumeID := csicommon.GetIDFromReplication(req) - if volumeID == "" { - return nil, status.Error(codes.InvalidArgument, "empty volume ID in request") + reqID := csicommon.GetIDFromReplication(req) + if reqID == "" { + return nil, status.Error(codes.InvalidArgument, "empty ID in request") } cr, err := util.NewUserCredentials(req.GetSecrets()) if err != nil { @@ -821,32 +821,21 @@ func (rs *ReplicationServer) GetVolumeReplicationInfo(ctx context.Context, } defer cr.DeleteCredentials() - if acquired := rs.VolumeLocks.TryAcquire(volumeID); !acquired { - log.ErrorLog(ctx, util.VolumeOperationAlreadyExistsFmt, volumeID) + if acquired := rs.VolumeLocks.TryAcquire(reqID); !acquired { + log.ErrorLog(ctx, util.VolumeOperationAlreadyExistsFmt, reqID) - return nil, status.Errorf(codes.Aborted, util.VolumeOperationAlreadyExistsFmt, volumeID) + return nil, status.Errorf(codes.Aborted, util.VolumeOperationAlreadyExistsFmt, reqID) } - defer rs.VolumeLocks.Release(volumeID) + defer rs.VolumeLocks.Release(reqID) mgr := rbd.NewManager(rs.csiID, nil, req.GetSecrets()) defer mgr.Destroy(ctx) - rbdVol, err := mgr.GetVolumeByID(ctx, volumeID) + volumes, mirror, err := mgr.GetMirrorSource(ctx, reqID, req.GetReplicationSource()) if err != nil { - switch { - case errors.Is(err, corerbd.ErrImageNotFound): - err = status.Errorf(codes.NotFound, err.Error()) - case errors.Is(err, util.ErrPoolNotFound): - err = status.Errorf(codes.NotFound, err.Error()) - default: - err = status.Errorf(codes.Internal, err.Error()) - } - - return nil, err - } - mirror, err := rbdVol.ToMirror() - if err != nil { - return nil, status.Error(codes.Internal, err.Error()) + return nil, getGRPCError(err) } + defer mirror.Destroy(ctx) + defer destoryVolumes(ctx, volumes) info, err := mirror.GetMirroringInfo(ctx) if err != nil { @@ -977,3 +966,10 @@ func checkVolumeResyncStatus(ctx context.Context, localStatus types.SiteStatus) return nil } + +// destoryVolumes destroys the volume connections. +func destoryVolumes(ctx context.Context, volumes []types.Volume) { + for _, vol := range volumes { + vol.Destroy(ctx) + } +} diff --git a/internal/csi-addons/rbd/volumegroup.go b/internal/csi-addons/rbd/volumegroup.go index 1b418a082..0b66504d1 100644 --- a/internal/csi-addons/rbd/volumegroup.go +++ b/internal/csi-addons/rbd/volumegroup.go @@ -88,11 +88,7 @@ func (vs *VolumeGroupServer) CreateVolumeGroup( // resolve all volumes volumes := make([]types.Volume, len(req.GetVolumeIds())) - defer func() { - for _, vol := range volumes { - vol.Destroy(ctx) - } - }() + defer destoryVolumes(ctx, volumes) for i, id := range req.GetVolumeIds() { vol, err := mgr.GetVolumeByID(ctx, id) if err != nil { @@ -348,11 +344,7 @@ func (vs *VolumeGroupServer) ModifyVolumeGroupMembership( // resolve all volumes volumes := make([]types.Volume, len(toAdd)) - defer func() { - for _, vol := range volumes { - vol.Destroy(ctx) - } - }() + defer destoryVolumes(ctx, volumes) for i, id := range toAdd { var vol types.Volume vol, err = mgr.GetVolumeByID(ctx, id) diff --git a/internal/rbd/group/volume_group.go b/internal/rbd/group/volume_group.go index 6e6e59019..39c0e3171 100644 --- a/internal/rbd/group/volume_group.go +++ b/internal/rbd/group/volume_group.go @@ -465,3 +465,7 @@ func (vg *volumeGroup) RemoveVolume(ctx context.Context, vol types.Volume) error func (vg *volumeGroup) ListVolumes(ctx context.Context) ([]types.Volume, error) { return vg.volumes, nil } + +func (vg *volumeGroup) ToMirror() (types.Mirror, error) { + return volumeGroupMirror{vg}, nil +} diff --git a/internal/rbd/manager.go b/internal/rbd/manager.go index 3c41afdc6..b79352766 100644 --- a/internal/rbd/manager.go +++ b/internal/rbd/manager.go @@ -22,6 +22,7 @@ import ( "fmt" "github.com/ceph/go-ceph/rados" + "github.com/csi-addons/spec/lib/go/replication" "github.com/ceph/ceph-csi/internal/journal" rbd_group "github.com/ceph/ceph-csi/internal/rbd/group" @@ -294,3 +295,78 @@ func (mgr *rbdManager) DeleteVolumeGroup(ctx context.Context, vg types.VolumeGro return nil } + +func (mgr *rbdManager) GetMirrorSource(ctx context.Context, reqID string, + rep *replication.ReplicationSource, +) ([]types.Volume, types.Mirror, error) { + switch { + // Backward compatibility: if rep is nil, we assume that the sidecar is still old and + // setting only volumeId not the replication source. + case rep == nil || rep.GetVolume() != nil: + rbdVol, err := mgr.GetVolumeByID(ctx, reqID) + if err != nil { + return nil, nil, fmt.Errorf("failed to get volume by id %q: %w", reqID, err) + } + defer func() { + if err != nil { + rbdVol.Destroy(ctx) + } + }() + var mir types.Mirror + mir, err = rbdVol.ToMirror() + if err != nil { + return nil, nil, fmt.Errorf("failed to convert volume %s to mirror: %w", rbdVol, err) + } + + return []types.Volume{rbdVol}, mir, nil + case rep.GetVolumegroup() != nil: + rbdGroup, err := mgr.GetVolumeGroupByID(ctx, reqID) + if err != nil { + return nil, nil, fmt.Errorf("failed to get volume group by id %q: %w", reqID, err) + } + defer func() { + if err != nil { + rbdGroup.Destroy(ctx) + } + }() + var mir types.Mirror + mir, err = rbdGroup.ToMirror() + if err != nil { + return nil, nil, fmt.Errorf("failed to convert volume group %s to mirror: %w", rbdGroup, err) + } + var vols []types.Volume + vols, err = rbdGroup.ListVolumes(ctx) + if err != nil { + return nil, nil, fmt.Errorf("failed to list volumes in volume group %q: %w", rbdGroup, err) + } + // Get all the volume with connection and return it + volumes := make([]types.Volume, len(vols)) + // Destroy connections if there is any error + defer func() { + if err != nil { + for _, vol := range vols { + vol.Destroy(ctx) + } + } + }() + + for i, vol := range vols { + var id string + id, err = vol.GetID(ctx) + if err != nil { + return nil, nil, fmt.Errorf("failed to get id for volume %q in group %q: %w", vol, rbdGroup, err) + } + var v types.Volume + v, err = mgr.GetVolumeByID(ctx, id) + if err != nil { + return nil, nil, fmt.Errorf("failed to get volume by id %q in group %q: %w", id, rbdGroup, err) + } + volumes[i] = v + } + + return volumes, mir, nil + + default: + return nil, nil, errors.New("replication source is not set") + } +} diff --git a/internal/rbd/types/group.go b/internal/rbd/types/group.go index 36f89e807..018455f15 100644 --- a/internal/rbd/types/group.go +++ b/internal/rbd/types/group.go @@ -66,4 +66,7 @@ type VolumeGroup interface { // ListVolumes returns a slice with all Volumes in the VolumeGroup. ListVolumes(ctx context.Context) ([]Volume, error) + + // ToMirror converts the VolumeGroup to a Mirror. + ToMirror() (Mirror, error) } diff --git a/internal/rbd/types/manager.go b/internal/rbd/types/manager.go index 334974932..633846ea2 100644 --- a/internal/rbd/types/manager.go +++ b/internal/rbd/types/manager.go @@ -18,6 +18,8 @@ package types import ( "context" + + "github.com/csi-addons/spec/lib/go/replication" ) // VolumeResolver can be used to construct a Volume from a CSI VolumeId. @@ -47,4 +49,8 @@ type Manager interface { // DeleteVolumeGroup removes VolumeGroup from the backend storage and // any details from the journal. DeleteVolumeGroup(ctx context.Context, vg VolumeGroup) error + + // GetMirrorSource returns the source of the mirror for the given volume or group. + GetMirrorSource(ctx context.Context, volumeID string, + rep *replication.ReplicationSource) ([]Volume, Mirror, error) } diff --git a/internal/rbd/types/mirror.go b/internal/rbd/types/mirror.go index 12c0bffdf..92a5f958a 100644 --- a/internal/rbd/types/mirror.go +++ b/internal/rbd/types/mirror.go @@ -38,6 +38,8 @@ const ( // Mirror is the interface for managing mirroring on an RBD image or a group. type Mirror interface { + // Destroy frees the resources used by the Mirror. + Destroy(ctx context.Context) // EnableMirroring enables mirroring on the resource with the specified mode. EnableMirroring(ctx context.Context, mode librbd.ImageMirrorMode) error // DisableMirroring disables mirroring on the resource with the option to force the operation