rbd: implement GetMirrorSource in manager

implementing GetMirrorSource in manager
to return volume or the volumegroup based
on the replication source, if replication
source is nil return the volume details
for backward compatibility.

Signed-off-by: Madhu Rajanna <madhupr007@gmail.com>
This commit is contained in:
Madhu Rajanna
2024-11-19 11:26:00 +01:00
parent 9575e5ace8
commit 3afb22c8ac
6 changed files with 241 additions and 157 deletions

View File

@ -257,9 +257,9 @@ func validateSchedulingInterval(interval string) error {
func (rs *ReplicationServer) EnableVolumeReplication(ctx context.Context,
req *replication.EnableVolumeReplicationRequest,
) (*replication.EnableVolumeReplicationResponse, error) {
volumeID := csicommon.GetIDFromReplication(req)
if volumeID == "" {
return nil, status.Error(codes.InvalidArgument, "empty volume ID in request")
reqID := csicommon.GetIDFromReplication(req)
if reqID == "" {
return nil, status.Error(codes.InvalidArgument, "empty ID in request")
}
cr, err := util.NewUserCredentials(req.GetSecrets())
if err != nil {
@ -272,24 +272,23 @@ func (rs *ReplicationServer) EnableVolumeReplication(ctx context.Context,
return nil, err
}
if acquired := rs.VolumeLocks.TryAcquire(volumeID); !acquired {
log.ErrorLog(ctx, util.VolumeOperationAlreadyExistsFmt, volumeID)
if acquired := rs.VolumeLocks.TryAcquire(reqID); !acquired {
log.ErrorLog(ctx, util.VolumeOperationAlreadyExistsFmt, reqID)
return nil, status.Errorf(codes.Aborted, util.VolumeOperationAlreadyExistsFmt, volumeID)
return nil, status.Errorf(codes.Aborted, util.VolumeOperationAlreadyExistsFmt, reqID)
}
defer rs.VolumeLocks.Release(volumeID)
defer rs.VolumeLocks.Release(reqID)
mgr := rbd.NewManager(rs.driverInstance, req.GetParameters(), req.GetSecrets())
defer mgr.Destroy(ctx)
rbdVol, err := mgr.GetVolumeByID(ctx, volumeID)
volumes, mirror, err := mgr.GetMirrorSource(ctx, reqID, req.GetReplicationSource())
if err != nil {
log.ErrorLog(ctx, "failed to get mirror source with id %q: %v", reqID, err)
return nil, getGRPCError(err)
}
mirror, err := rbdVol.ToMirror()
if err != nil {
return nil, status.Error(codes.Internal, err.Error())
}
defer destoryVolumes(ctx, volumes)
// extract the mirroring mode
mirroringMode, err := getMirroringMode(ctx, req.GetParameters())
@ -309,17 +308,18 @@ func (rs *ReplicationServer) EnableVolumeReplication(ctx context.Context,
return nil, status.Error(codes.Internal, err.Error())
}
if info.GetState() != librbd.MirrorImageEnabled.String() {
err = rbdVol.HandleParentImageExistence(ctx, flattenMode)
if err != nil {
log.ErrorLog(ctx, err.Error())
for _, rbdVol := range volumes {
err = rbdVol.HandleParentImageExistence(ctx, flattenMode)
if err != nil {
err = fmt.Errorf("failed to handle parent image for volume group %q: %w", mirror, err)
return nil, getGRPCError(err)
}
err = mirror.EnableMirroring(ctx, mirroringMode)
if err != nil {
log.ErrorLog(ctx, err.Error())
return nil, getGRPCError(err)
}
err = mirror.EnableMirroring(ctx, mirroringMode)
if err != nil {
log.ErrorLog(ctx, err.Error())
return nil, status.Error(codes.Internal, err.Error())
return nil, status.Error(codes.Internal, err.Error())
}
}
}
@ -332,9 +332,9 @@ func (rs *ReplicationServer) EnableVolumeReplication(ctx context.Context,
func (rs *ReplicationServer) DisableVolumeReplication(ctx context.Context,
req *replication.DisableVolumeReplicationRequest,
) (*replication.DisableVolumeReplicationResponse, error) {
volumeID := csicommon.GetIDFromReplication(req)
if volumeID == "" {
return nil, status.Error(codes.InvalidArgument, "empty volume ID in request")
reqID := csicommon.GetIDFromReplication(req)
if reqID == "" {
return nil, status.Error(codes.InvalidArgument, "empty ID in request")
}
cr, err := util.NewUserCredentials(req.GetSecrets())
if err != nil {
@ -342,24 +342,23 @@ func (rs *ReplicationServer) DisableVolumeReplication(ctx context.Context,
}
defer cr.DeleteCredentials()
if acquired := rs.VolumeLocks.TryAcquire(volumeID); !acquired {
log.ErrorLog(ctx, util.VolumeOperationAlreadyExistsFmt, volumeID)
if acquired := rs.VolumeLocks.TryAcquire(reqID); !acquired {
log.ErrorLog(ctx, util.VolumeOperationAlreadyExistsFmt, reqID)
return nil, status.Errorf(codes.Aborted, util.VolumeOperationAlreadyExistsFmt, volumeID)
return nil, status.Errorf(codes.Aborted, util.VolumeOperationAlreadyExistsFmt, reqID)
}
defer rs.VolumeLocks.Release(volumeID)
defer rs.VolumeLocks.Release(reqID)
mgr := rbd.NewManager(rs.driverInstance, req.GetParameters(), req.GetSecrets())
defer mgr.Destroy(ctx)
rbdVol, err := mgr.GetVolumeByID(ctx, volumeID)
volumes, mirror, err := mgr.GetMirrorSource(ctx, reqID, req.GetReplicationSource())
if err != nil {
log.ErrorLog(ctx, "failed to get mirror source with id %q: %v", reqID, err)
return nil, getGRPCError(err)
}
mirror, err := rbdVol.ToMirror()
if err != nil {
return nil, status.Error(codes.Internal, err.Error())
}
defer destoryVolumes(ctx, volumes)
// extract the force option
force, err := getForceOption(ctx, req.GetParameters())
@ -378,7 +377,7 @@ func (rs *ReplicationServer) DisableVolumeReplication(ctx context.Context,
case librbd.MirrorImageDisabled.String():
// image mirroring is still disabling
case librbd.MirrorImageDisabling.String():
return nil, status.Errorf(codes.Aborted, "%s is in disabling state", volumeID)
return nil, status.Errorf(codes.Aborted, "%s is in disabling state", reqID)
case librbd.MirrorImageEnabled.String():
err = corerbd.DisableVolumeReplication(mirror, ctx, info.IsPrimary(), force)
if err != nil {
@ -400,9 +399,9 @@ func (rs *ReplicationServer) DisableVolumeReplication(ctx context.Context,
func (rs *ReplicationServer) PromoteVolume(ctx context.Context,
req *replication.PromoteVolumeRequest,
) (*replication.PromoteVolumeResponse, error) {
volumeID := csicommon.GetIDFromReplication(req)
if volumeID == "" {
return nil, status.Error(codes.InvalidArgument, "empty volume ID in request")
reqID := csicommon.GetIDFromReplication(req)
if reqID == "" {
return nil, status.Error(codes.InvalidArgument, "empty ID in request")
}
cr, err := util.NewUserCredentials(req.GetSecrets())
if err != nil {
@ -410,24 +409,23 @@ func (rs *ReplicationServer) PromoteVolume(ctx context.Context,
}
defer cr.DeleteCredentials()
if acquired := rs.VolumeLocks.TryAcquire(volumeID); !acquired {
log.ErrorLog(ctx, util.VolumeOperationAlreadyExistsFmt, volumeID)
if acquired := rs.VolumeLocks.TryAcquire(reqID); !acquired {
log.ErrorLog(ctx, util.VolumeOperationAlreadyExistsFmt, reqID)
return nil, status.Errorf(codes.Aborted, util.VolumeOperationAlreadyExistsFmt, volumeID)
return nil, status.Errorf(codes.Aborted, util.VolumeOperationAlreadyExistsFmt, reqID)
}
defer rs.VolumeLocks.Release(volumeID)
defer rs.VolumeLocks.Release(reqID)
mgr := rbd.NewManager(rs.driverInstance, req.GetParameters(), req.GetSecrets())
defer mgr.Destroy(ctx)
rbdVol, err := mgr.GetVolumeByID(ctx, volumeID)
volumes, mirror, err := mgr.GetMirrorSource(ctx, reqID, req.GetReplicationSource())
if err != nil {
log.ErrorLog(ctx, "failed to get mirror source with id %q: %v", reqID, err)
return nil, getGRPCError(err)
}
mirror, err := rbdVol.ToMirror()
if err != nil {
return nil, status.Error(codes.Internal, err.Error())
}
defer destoryVolumes(ctx, volumes)
info, err := mirror.GetMirroringInfo(ctx)
if err != nil {
@ -440,7 +438,7 @@ func (rs *ReplicationServer) PromoteVolume(ctx context.Context,
return nil, status.Errorf(
codes.InvalidArgument,
"mirroring is not enabled on %s, image is in %s Mode",
volumeID,
reqID,
info.GetState())
}
@ -477,10 +475,10 @@ func (rs *ReplicationServer) PromoteVolume(ctx context.Context,
}
log.DebugLog(
ctx,
"Added scheduling at interval %s, start time %s for volume %s",
"Added scheduling at interval %s, start time %s for Id %s",
interval,
startTime,
rbdVol)
reqID)
}
return &replication.PromoteVolumeResponse{}, nil
@ -493,9 +491,9 @@ func (rs *ReplicationServer) PromoteVolume(ctx context.Context,
func (rs *ReplicationServer) DemoteVolume(ctx context.Context,
req *replication.DemoteVolumeRequest,
) (*replication.DemoteVolumeResponse, error) {
volumeID := csicommon.GetIDFromReplication(req)
if volumeID == "" {
return nil, status.Error(codes.InvalidArgument, "empty volume ID in request")
reqID := csicommon.GetIDFromReplication(req)
if reqID == "" {
return nil, status.Error(codes.InvalidArgument, "empty ID in request")
}
cr, err := util.NewUserCredentials(req.GetSecrets())
if err != nil {
@ -503,31 +501,23 @@ func (rs *ReplicationServer) DemoteVolume(ctx context.Context,
}
defer cr.DeleteCredentials()
if acquired := rs.VolumeLocks.TryAcquire(volumeID); !acquired {
log.ErrorLog(ctx, util.VolumeOperationAlreadyExistsFmt, volumeID)
if acquired := rs.VolumeLocks.TryAcquire(reqID); !acquired {
log.ErrorLog(ctx, util.VolumeOperationAlreadyExistsFmt, reqID)
return nil, status.Errorf(codes.Aborted, util.VolumeOperationAlreadyExistsFmt, volumeID)
return nil, status.Errorf(codes.Aborted, util.VolumeOperationAlreadyExistsFmt, reqID)
}
defer rs.VolumeLocks.Release(volumeID)
defer rs.VolumeLocks.Release(reqID)
mgr := rbd.NewManager(rs.driverInstance, req.GetParameters(), req.GetSecrets())
defer mgr.Destroy(ctx)
rbdVol, err := mgr.GetVolumeByID(ctx, volumeID)
volumes, mirror, err := mgr.GetMirrorSource(ctx, reqID, req.GetReplicationSource())
if err != nil {
log.ErrorLog(ctx, "failed to get mirror source with id %q: %v", reqID, err)
return nil, getGRPCError(err)
}
mirror, err := rbdVol.ToMirror()
if err != nil {
return nil, status.Error(codes.Internal, err.Error())
}
creationTime, err := rbdVol.GetCreationTime(ctx)
if err != nil {
log.ErrorLog(ctx, err.Error())
return nil, status.Error(codes.Internal, err.Error())
}
defer destoryVolumes(ctx, volumes)
info, err := mirror.GetMirroringInfo(ctx)
if err != nil {
@ -540,24 +530,33 @@ func (rs *ReplicationServer) DemoteVolume(ctx context.Context,
return nil, status.Errorf(
codes.InvalidArgument,
"mirroring is not enabled on %s, image is in %s Mode",
volumeID,
reqID,
info.GetState())
}
// demote image to secondary
if info.IsPrimary() {
// store the image creation time for resync
_, err = rbdVol.GetMetadata(imageCreationTimeKey)
if err != nil && errors.Is(err, librbd.ErrNotFound) {
log.DebugLog(ctx, "setting image creation time %s for %s", creationTime, rbdVol)
err = rbdVol.SetMetadata(imageCreationTimeKey, timestampToString(creationTime))
}
if err != nil {
log.ErrorLog(ctx, err.Error())
for _, vol := range volumes {
// store the image creation time for resync
creationTime, cErr := vol.GetCreationTime(ctx)
if cErr != nil {
log.ErrorLog(ctx, cErr.Error())
return nil, status.Error(codes.Internal, err.Error())
}
return nil, status.Error(codes.Internal, cErr.Error())
}
// store the image creation time for resync
_, err = vol.GetMetadata(imageCreationTimeKey)
if err != nil && errors.Is(err, librbd.ErrNotFound) {
log.DebugLog(ctx, "setting image creation time %s for %s", creationTime, vol)
err = vol.SetMetadata(imageCreationTimeKey, timestampToString(creationTime))
}
if err != nil {
log.ErrorLog(ctx, err.Error())
return nil, status.Error(codes.Internal, err.Error())
}
}
err = mirror.Demote(ctx)
if err != nil {
log.ErrorLog(ctx, err.Error())
@ -604,9 +603,9 @@ func checkRemoteSiteStatus(ctx context.Context, mirrorStatus []types.SiteStatus)
func (rs *ReplicationServer) ResyncVolume(ctx context.Context,
req *replication.ResyncVolumeRequest,
) (*replication.ResyncVolumeResponse, error) {
volumeID := csicommon.GetIDFromReplication(req)
if volumeID == "" {
return nil, status.Error(codes.InvalidArgument, "empty volume ID in request")
reqID := csicommon.GetIDFromReplication(req)
if reqID == "" {
return nil, status.Error(codes.InvalidArgument, "empty ID in request")
}
cr, err := util.NewUserCredentials(req.GetSecrets())
if err != nil {
@ -614,23 +613,23 @@ func (rs *ReplicationServer) ResyncVolume(ctx context.Context,
}
defer cr.DeleteCredentials()
if acquired := rs.VolumeLocks.TryAcquire(volumeID); !acquired {
log.ErrorLog(ctx, util.VolumeOperationAlreadyExistsFmt, volumeID)
if acquired := rs.VolumeLocks.TryAcquire(reqID); !acquired {
log.ErrorLog(ctx, util.VolumeOperationAlreadyExistsFmt, reqID)
return nil, status.Errorf(codes.Aborted, util.VolumeOperationAlreadyExistsFmt, volumeID)
return nil, status.Errorf(codes.Aborted, util.VolumeOperationAlreadyExistsFmt, reqID)
}
defer rs.VolumeLocks.Release(volumeID)
mgr := rbd.NewManager(rs.driverInstance, req.GetParameters(), req.GetSecrets())
defer mgr.Destroy(ctx)
rbdVol, err := mgr.GetVolumeByID(ctx, volumeID)
volumes, mirror, err := mgr.GetMirrorSource(ctx, reqID, req.GetReplicationSource())
if err != nil {
log.ErrorLog(ctx, "failed to get mirror source with id %q: %v", reqID, err)
return nil, getGRPCError(err)
}
mirror, err := rbdVol.ToMirror()
if err != nil {
return nil, status.Error(codes.Internal, err.Error())
}
defer destoryVolumes(ctx, volumes)
info, err := mirror.GetMirroringInfo(ctx)
if err != nil {
@ -695,35 +694,40 @@ func (rs *ReplicationServer) ResyncVolume(ctx context.Context,
ready = checkRemoteSiteStatus(ctx, sts.GetAllSitesStatus())
}
creationTime, err := rbdVol.GetCreationTime(ctx)
if err != nil {
return nil, status.Errorf(codes.Internal, "failed to get image info for %s: %s", rbdVol, err.Error())
}
// image creation time is stored in the image metadata. it looks like
// `"seconds:1692879841 nanos:631526669"`
// If the image gets resynced the local image creation time will be
// lost, if the keys is not present in the image metadata then we can
// assume that the image is already resynced.
savedImageTime, err := rbdVol.GetMetadata(imageCreationTimeKey)
if err != nil && !errors.Is(err, librbd.ErrNotFound) {
return nil, status.Errorf(codes.Internal,
"failed to get %s key from image metadata for %s: %s",
imageCreationTimeKey,
rbdVol,
err.Error())
}
if savedImageTime != "" {
st, sErr := timestampFromString(savedImageTime)
if sErr != nil {
return nil, status.Errorf(codes.Internal, "failed to parse image creation time: %s", sErr.Error())
for _, vol := range volumes {
creationTime, tErr := vol.GetCreationTime(ctx)
if tErr != nil {
return nil, status.Errorf(codes.Internal, "failed to get image info for %s: %s", vol, tErr.Error())
}
log.DebugLog(ctx, "image %s, savedImageTime=%v, currentImageTime=%v", rbdVol, st, creationTime)
if req.GetForce() && st.Equal(*creationTime) {
err = mirror.Resync(ctx)
if err != nil {
return nil, getGRPCError(err)
// image creation time is stored in the image metadata. it looks like
// `"seconds:1692879841 nanos:631526669"`
// If the image gets resynced the local image creation time will be
// lost, if the keys is not present in the image metadata then we can
// assume that the image is already resynced.
var savedImageTime string
savedImageTime, err = vol.GetMetadata(imageCreationTimeKey)
if err != nil && !errors.Is(err, librbd.ErrNotFound) {
return nil, status.Errorf(codes.Internal,
"failed to get %s key from image metadata for %s: %s",
imageCreationTimeKey,
vol,
err.Error())
}
if savedImageTime != "" {
st, sErr := timestampFromString(savedImageTime)
if sErr != nil {
return nil, status.Errorf(codes.Internal, "failed to parse image creation time: %s", sErr.Error())
}
log.DebugLog(ctx, "image %s, savedImageTime=%v, currentImageTime=%v", vol, st, creationTime)
if req.GetForce() && st.Equal(*creationTime) {
err = mirror.Resync(ctx)
if err != nil {
return nil, getGRPCError(err)
}
// Break the loop as we need to issue resync only once for the image or for the group.
break
}
}
}
@ -735,9 +739,12 @@ func (rs *ReplicationServer) ResyncVolume(ctx context.Context,
}
}
err = rbdVol.RepairResyncedImageID(ctx, ready)
if err != nil {
return nil, status.Errorf(codes.Internal, "failed to resync Image ID: %s", err.Error())
// update imageID for all the volumes
for _, vol := range volumes {
err = vol.RepairResyncedImageID(ctx, ready)
if err != nil {
return nil, status.Errorf(codes.Internal, "failed to resync Image ID: %s", err.Error())
}
}
resp := &replication.ResyncVolumeResponse{
@ -812,9 +819,9 @@ func getGRPCError(err error) error {
func (rs *ReplicationServer) GetVolumeReplicationInfo(ctx context.Context,
req *replication.GetVolumeReplicationInfoRequest,
) (*replication.GetVolumeReplicationInfoResponse, error) {
volumeID := csicommon.GetIDFromReplication(req)
if volumeID == "" {
return nil, status.Error(codes.InvalidArgument, "empty volume ID in request")
reqID := csicommon.GetIDFromReplication(req)
if reqID == "" {
return nil, status.Error(codes.InvalidArgument, "empty ID in request")
}
cr, err := util.NewUserCredentials(req.GetSecrets())
if err != nil {
@ -824,36 +831,23 @@ func (rs *ReplicationServer) GetVolumeReplicationInfo(ctx context.Context,
}
defer cr.DeleteCredentials()
if acquired := rs.VolumeLocks.TryAcquire(volumeID); !acquired {
log.ErrorLog(ctx, util.VolumeOperationAlreadyExistsFmt, volumeID)
if acquired := rs.VolumeLocks.TryAcquire(reqID); !acquired {
log.ErrorLog(ctx, util.VolumeOperationAlreadyExistsFmt, reqID)
return nil, status.Errorf(codes.Aborted, util.VolumeOperationAlreadyExistsFmt, volumeID)
return nil, status.Errorf(codes.Aborted, util.VolumeOperationAlreadyExistsFmt, reqID)
}
defer rs.VolumeLocks.Release(volumeID)
defer rs.VolumeLocks.Release(reqID)
mgr := rbd.NewManager(rs.driverInstance, nil, req.GetSecrets())
defer mgr.Destroy(ctx)
rbdVol, err := mgr.GetVolumeByID(ctx, volumeID)
volumes, mirror, err := mgr.GetMirrorSource(ctx, reqID, req.GetReplicationSource())
if err != nil {
log.ErrorLog(ctx, "failed to get volume with id %q: %v", volumeID, err)
log.ErrorLog(ctx, "failed to get mirror source with id %q: %v", reqID, err)
switch {
case errors.Is(err, corerbd.ErrImageNotFound):
err = status.Error(codes.NotFound, err.Error())
case errors.Is(err, util.ErrPoolNotFound):
err = status.Error(codes.NotFound, err.Error())
default:
err = status.Error(codes.Internal, err.Error())
}
return nil, err
}
mirror, err := rbdVol.ToMirror()
if err != nil {
log.ErrorLog(ctx, "failed to convert volume %q to mirror type: %v", rbdVol, err)
return nil, status.Error(codes.Internal, err.Error())
return nil, getGRPCError(err)
}
defer destoryVolumes(ctx, volumes)
info, err := mirror.GetMirroringInfo(ctx)
if err != nil {
@ -990,3 +984,10 @@ func checkVolumeResyncStatus(ctx context.Context, localStatus types.SiteStatus)
return nil
}
// destoryVolumes destroys the volume connections.
func destoryVolumes(ctx context.Context, volumes []types.Volume) {
for _, vol := range volumes {
vol.Destroy(ctx)
}
}

View File

@ -90,11 +90,7 @@ func (vs *VolumeGroupServer) CreateVolumeGroup(
// resolve all volumes
volumes := make([]types.Volume, len(req.GetVolumeIds()))
defer func() {
for _, vol := range volumes {
vol.Destroy(ctx)
}
}()
defer destoryVolumes(ctx, volumes)
for i, id := range req.GetVolumeIds() {
vol, err := mgr.GetVolumeByID(ctx, id)
if err != nil {
@ -356,11 +352,7 @@ func (vs *VolumeGroupServer) ModifyVolumeGroupMembership(
// resolve all volumes
volumes := make([]types.Volume, len(toAdd))
defer func() {
for _, vol := range volumes {
vol.Destroy(ctx)
}
}()
defer destoryVolumes(ctx, volumes)
for i, id := range toAdd {
var vol types.Volume
vol, err = mgr.GetVolumeByID(ctx, id)