mirror of
https://github.com/ceph/ceph-csi.git
synced 2024-11-10 00:10:20 +00:00
rbd: implement GetMirrorSource in manager
implementing GetMirrorSource in manager to return volume or the volumegroup based on the replication source, if replication source is nil return the volume details for backward compatibility. Signed-off-by: Madhu Rajanna <madhupr007@gmail.com>
This commit is contained in:
parent
5ddcbfb334
commit
b4545b7a9b
@ -256,9 +256,9 @@ func validateSchedulingInterval(interval string) error {
|
|||||||
func (rs *ReplicationServer) EnableVolumeReplication(ctx context.Context,
|
func (rs *ReplicationServer) EnableVolumeReplication(ctx context.Context,
|
||||||
req *replication.EnableVolumeReplicationRequest,
|
req *replication.EnableVolumeReplicationRequest,
|
||||||
) (*replication.EnableVolumeReplicationResponse, error) {
|
) (*replication.EnableVolumeReplicationResponse, error) {
|
||||||
volumeID := csicommon.GetIDFromReplication(req)
|
reqID := csicommon.GetIDFromReplication(req)
|
||||||
if volumeID == "" {
|
if reqID == "" {
|
||||||
return nil, status.Error(codes.InvalidArgument, "empty volume ID in request")
|
return nil, status.Error(codes.InvalidArgument, "empty ID in request")
|
||||||
}
|
}
|
||||||
cr, err := util.NewUserCredentials(req.GetSecrets())
|
cr, err := util.NewUserCredentials(req.GetSecrets())
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@ -271,25 +271,22 @@ func (rs *ReplicationServer) EnableVolumeReplication(ctx context.Context,
|
|||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
if acquired := rs.VolumeLocks.TryAcquire(volumeID); !acquired {
|
if acquired := rs.VolumeLocks.TryAcquire(reqID); !acquired {
|
||||||
log.ErrorLog(ctx, util.VolumeOperationAlreadyExistsFmt, volumeID)
|
log.ErrorLog(ctx, util.VolumeOperationAlreadyExistsFmt, reqID)
|
||||||
|
|
||||||
return nil, status.Errorf(codes.Aborted, util.VolumeOperationAlreadyExistsFmt, volumeID)
|
return nil, status.Errorf(codes.Aborted, util.VolumeOperationAlreadyExistsFmt, reqID)
|
||||||
}
|
}
|
||||||
defer rs.VolumeLocks.Release(volumeID)
|
defer rs.VolumeLocks.Release(reqID)
|
||||||
|
|
||||||
mgr := rbd.NewManager(rs.csiID, req.GetParameters(), req.GetSecrets())
|
mgr := rbd.NewManager(rs.csiID, req.GetParameters(), req.GetSecrets())
|
||||||
defer mgr.Destroy(ctx)
|
defer mgr.Destroy(ctx)
|
||||||
|
|
||||||
rbdVol, err := mgr.GetVolumeByID(ctx, volumeID)
|
volumes, mirror, err := mgr.GetMirrorSource(ctx, reqID, req.GetReplicationSource())
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, getGRPCError(err)
|
return nil, getGRPCError(err)
|
||||||
}
|
}
|
||||||
mirror, err := rbdVol.ToMirror()
|
defer mirror.Destroy(ctx)
|
||||||
if err != nil {
|
defer destoryVolumes(ctx, volumes)
|
||||||
return nil, status.Error(codes.Internal, err.Error())
|
|
||||||
}
|
|
||||||
|
|
||||||
// extract the mirroring mode
|
// extract the mirroring mode
|
||||||
mirroringMode, err := getMirroringMode(ctx, req.GetParameters())
|
mirroringMode, err := getMirroringMode(ctx, req.GetParameters())
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@ -308,11 +305,13 @@ func (rs *ReplicationServer) EnableVolumeReplication(ctx context.Context,
|
|||||||
return nil, status.Error(codes.Internal, err.Error())
|
return nil, status.Error(codes.Internal, err.Error())
|
||||||
}
|
}
|
||||||
if info.GetState() != librbd.MirrorImageEnabled.String() {
|
if info.GetState() != librbd.MirrorImageEnabled.String() {
|
||||||
err = rbdVol.HandleParentImageExistence(ctx, flattenMode)
|
for _, rbdVol := range volumes {
|
||||||
if err != nil {
|
err = rbdVol.HandleParentImageExistence(ctx, flattenMode)
|
||||||
log.ErrorLog(ctx, err.Error())
|
if err != nil {
|
||||||
|
err = fmt.Errorf("failed to handle parent image for volume group %q: %w", mirror, err)
|
||||||
|
|
||||||
return nil, getGRPCError(err)
|
return nil, getGRPCError(err)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
err = mirror.EnableMirroring(ctx, mirroringMode)
|
err = mirror.EnableMirroring(ctx, mirroringMode)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@ -331,9 +330,9 @@ func (rs *ReplicationServer) EnableVolumeReplication(ctx context.Context,
|
|||||||
func (rs *ReplicationServer) DisableVolumeReplication(ctx context.Context,
|
func (rs *ReplicationServer) DisableVolumeReplication(ctx context.Context,
|
||||||
req *replication.DisableVolumeReplicationRequest,
|
req *replication.DisableVolumeReplicationRequest,
|
||||||
) (*replication.DisableVolumeReplicationResponse, error) {
|
) (*replication.DisableVolumeReplicationResponse, error) {
|
||||||
volumeID := csicommon.GetIDFromReplication(req)
|
reqID := csicommon.GetIDFromReplication(req)
|
||||||
if volumeID == "" {
|
if reqID == "" {
|
||||||
return nil, status.Error(codes.InvalidArgument, "empty volume ID in request")
|
return nil, status.Error(codes.InvalidArgument, "empty ID in request")
|
||||||
}
|
}
|
||||||
cr, err := util.NewUserCredentials(req.GetSecrets())
|
cr, err := util.NewUserCredentials(req.GetSecrets())
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@ -341,24 +340,22 @@ func (rs *ReplicationServer) DisableVolumeReplication(ctx context.Context,
|
|||||||
}
|
}
|
||||||
defer cr.DeleteCredentials()
|
defer cr.DeleteCredentials()
|
||||||
|
|
||||||
if acquired := rs.VolumeLocks.TryAcquire(volumeID); !acquired {
|
if acquired := rs.VolumeLocks.TryAcquire(reqID); !acquired {
|
||||||
log.ErrorLog(ctx, util.VolumeOperationAlreadyExistsFmt, volumeID)
|
log.ErrorLog(ctx, util.VolumeOperationAlreadyExistsFmt, reqID)
|
||||||
|
|
||||||
return nil, status.Errorf(codes.Aborted, util.VolumeOperationAlreadyExistsFmt, volumeID)
|
return nil, status.Errorf(codes.Aborted, util.VolumeOperationAlreadyExistsFmt, reqID)
|
||||||
}
|
}
|
||||||
defer rs.VolumeLocks.Release(volumeID)
|
defer rs.VolumeLocks.Release(reqID)
|
||||||
|
|
||||||
mgr := rbd.NewManager(rs.csiID, req.GetParameters(), req.GetSecrets())
|
mgr := rbd.NewManager(rs.csiID, req.GetParameters(), req.GetSecrets())
|
||||||
defer mgr.Destroy(ctx)
|
defer mgr.Destroy(ctx)
|
||||||
|
|
||||||
rbdVol, err := mgr.GetVolumeByID(ctx, volumeID)
|
volumes, mirror, err := mgr.GetMirrorSource(ctx, reqID, req.GetReplicationSource())
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, getGRPCError(err)
|
return nil, getGRPCError(err)
|
||||||
}
|
}
|
||||||
mirror, err := rbdVol.ToMirror()
|
defer mirror.Destroy(ctx)
|
||||||
if err != nil {
|
defer destoryVolumes(ctx, volumes)
|
||||||
return nil, status.Error(codes.Internal, err.Error())
|
|
||||||
}
|
|
||||||
|
|
||||||
// extract the force option
|
// extract the force option
|
||||||
force, err := getForceOption(ctx, req.GetParameters())
|
force, err := getForceOption(ctx, req.GetParameters())
|
||||||
@ -377,7 +374,7 @@ func (rs *ReplicationServer) DisableVolumeReplication(ctx context.Context,
|
|||||||
case librbd.MirrorImageDisabled.String():
|
case librbd.MirrorImageDisabled.String():
|
||||||
// image mirroring is still disabling
|
// image mirroring is still disabling
|
||||||
case librbd.MirrorImageDisabling.String():
|
case librbd.MirrorImageDisabling.String():
|
||||||
return nil, status.Errorf(codes.Aborted, "%s is in disabling state", volumeID)
|
return nil, status.Errorf(codes.Aborted, "%s is in disabling state", reqID)
|
||||||
case librbd.MirrorImageEnabled.String():
|
case librbd.MirrorImageEnabled.String():
|
||||||
err = corerbd.DisableVolumeReplication(mirror, ctx, info.IsPrimary(), force)
|
err = corerbd.DisableVolumeReplication(mirror, ctx, info.IsPrimary(), force)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@ -399,9 +396,9 @@ func (rs *ReplicationServer) DisableVolumeReplication(ctx context.Context,
|
|||||||
func (rs *ReplicationServer) PromoteVolume(ctx context.Context,
|
func (rs *ReplicationServer) PromoteVolume(ctx context.Context,
|
||||||
req *replication.PromoteVolumeRequest,
|
req *replication.PromoteVolumeRequest,
|
||||||
) (*replication.PromoteVolumeResponse, error) {
|
) (*replication.PromoteVolumeResponse, error) {
|
||||||
volumeID := csicommon.GetIDFromReplication(req)
|
reqID := csicommon.GetIDFromReplication(req)
|
||||||
if volumeID == "" {
|
if reqID == "" {
|
||||||
return nil, status.Error(codes.InvalidArgument, "empty volume ID in request")
|
return nil, status.Error(codes.InvalidArgument, "empty ID in request")
|
||||||
}
|
}
|
||||||
cr, err := util.NewUserCredentials(req.GetSecrets())
|
cr, err := util.NewUserCredentials(req.GetSecrets())
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@ -409,24 +406,22 @@ func (rs *ReplicationServer) PromoteVolume(ctx context.Context,
|
|||||||
}
|
}
|
||||||
defer cr.DeleteCredentials()
|
defer cr.DeleteCredentials()
|
||||||
|
|
||||||
if acquired := rs.VolumeLocks.TryAcquire(volumeID); !acquired {
|
if acquired := rs.VolumeLocks.TryAcquire(reqID); !acquired {
|
||||||
log.ErrorLog(ctx, util.VolumeOperationAlreadyExistsFmt, volumeID)
|
log.ErrorLog(ctx, util.VolumeOperationAlreadyExistsFmt, reqID)
|
||||||
|
|
||||||
return nil, status.Errorf(codes.Aborted, util.VolumeOperationAlreadyExistsFmt, volumeID)
|
return nil, status.Errorf(codes.Aborted, util.VolumeOperationAlreadyExistsFmt, reqID)
|
||||||
}
|
}
|
||||||
defer rs.VolumeLocks.Release(volumeID)
|
defer rs.VolumeLocks.Release(reqID)
|
||||||
|
|
||||||
mgr := rbd.NewManager(rs.csiID, req.GetParameters(), req.GetSecrets())
|
mgr := rbd.NewManager(rs.csiID, req.GetParameters(), req.GetSecrets())
|
||||||
defer mgr.Destroy(ctx)
|
defer mgr.Destroy(ctx)
|
||||||
|
|
||||||
rbdVol, err := mgr.GetVolumeByID(ctx, volumeID)
|
volumes, mirror, err := mgr.GetMirrorSource(ctx, reqID, req.GetReplicationSource())
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, getGRPCError(err)
|
return nil, getGRPCError(err)
|
||||||
}
|
}
|
||||||
mirror, err := rbdVol.ToMirror()
|
defer mirror.Destroy(ctx)
|
||||||
if err != nil {
|
defer destoryVolumes(ctx, volumes)
|
||||||
return nil, status.Error(codes.Internal, err.Error())
|
|
||||||
}
|
|
||||||
|
|
||||||
info, err := mirror.GetMirroringInfo(ctx)
|
info, err := mirror.GetMirroringInfo(ctx)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@ -439,7 +434,7 @@ func (rs *ReplicationServer) PromoteVolume(ctx context.Context,
|
|||||||
return nil, status.Errorf(
|
return nil, status.Errorf(
|
||||||
codes.InvalidArgument,
|
codes.InvalidArgument,
|
||||||
"mirroring is not enabled on %s, image is in %s Mode",
|
"mirroring is not enabled on %s, image is in %s Mode",
|
||||||
volumeID,
|
reqID,
|
||||||
info.GetState())
|
info.GetState())
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -476,10 +471,10 @@ func (rs *ReplicationServer) PromoteVolume(ctx context.Context,
|
|||||||
}
|
}
|
||||||
log.DebugLog(
|
log.DebugLog(
|
||||||
ctx,
|
ctx,
|
||||||
"Added scheduling at interval %s, start time %s for volume %s",
|
"Added scheduling at interval %s, start time %s for Id %s",
|
||||||
interval,
|
interval,
|
||||||
startTime,
|
startTime,
|
||||||
rbdVol)
|
reqID)
|
||||||
}
|
}
|
||||||
|
|
||||||
return &replication.PromoteVolumeResponse{}, nil
|
return &replication.PromoteVolumeResponse{}, nil
|
||||||
@ -492,9 +487,9 @@ func (rs *ReplicationServer) PromoteVolume(ctx context.Context,
|
|||||||
func (rs *ReplicationServer) DemoteVolume(ctx context.Context,
|
func (rs *ReplicationServer) DemoteVolume(ctx context.Context,
|
||||||
req *replication.DemoteVolumeRequest,
|
req *replication.DemoteVolumeRequest,
|
||||||
) (*replication.DemoteVolumeResponse, error) {
|
) (*replication.DemoteVolumeResponse, error) {
|
||||||
volumeID := csicommon.GetIDFromReplication(req)
|
reqID := csicommon.GetIDFromReplication(req)
|
||||||
if volumeID == "" {
|
if reqID == "" {
|
||||||
return nil, status.Error(codes.InvalidArgument, "empty volume ID in request")
|
return nil, status.Error(codes.InvalidArgument, "empty ID in request")
|
||||||
}
|
}
|
||||||
cr, err := util.NewUserCredentials(req.GetSecrets())
|
cr, err := util.NewUserCredentials(req.GetSecrets())
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@ -502,31 +497,22 @@ func (rs *ReplicationServer) DemoteVolume(ctx context.Context,
|
|||||||
}
|
}
|
||||||
defer cr.DeleteCredentials()
|
defer cr.DeleteCredentials()
|
||||||
|
|
||||||
if acquired := rs.VolumeLocks.TryAcquire(volumeID); !acquired {
|
if acquired := rs.VolumeLocks.TryAcquire(reqID); !acquired {
|
||||||
log.ErrorLog(ctx, util.VolumeOperationAlreadyExistsFmt, volumeID)
|
log.ErrorLog(ctx, util.VolumeOperationAlreadyExistsFmt, reqID)
|
||||||
|
|
||||||
return nil, status.Errorf(codes.Aborted, util.VolumeOperationAlreadyExistsFmt, volumeID)
|
return nil, status.Errorf(codes.Aborted, util.VolumeOperationAlreadyExistsFmt, reqID)
|
||||||
}
|
}
|
||||||
defer rs.VolumeLocks.Release(volumeID)
|
defer rs.VolumeLocks.Release(reqID)
|
||||||
|
|
||||||
mgr := rbd.NewManager(rs.csiID, req.GetParameters(), req.GetSecrets())
|
mgr := rbd.NewManager(rs.csiID, req.GetParameters(), req.GetSecrets())
|
||||||
defer mgr.Destroy(ctx)
|
defer mgr.Destroy(ctx)
|
||||||
|
|
||||||
rbdVol, err := mgr.GetVolumeByID(ctx, volumeID)
|
volumes, mirror, err := mgr.GetMirrorSource(ctx, reqID, req.GetReplicationSource())
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, getGRPCError(err)
|
return nil, getGRPCError(err)
|
||||||
}
|
}
|
||||||
mirror, err := rbdVol.ToMirror()
|
defer mirror.Destroy(ctx)
|
||||||
if err != nil {
|
defer destoryVolumes(ctx, volumes)
|
||||||
return nil, status.Error(codes.Internal, err.Error())
|
|
||||||
}
|
|
||||||
|
|
||||||
creationTime, err := rbdVol.GetCreationTime()
|
|
||||||
if err != nil {
|
|
||||||
log.ErrorLog(ctx, err.Error())
|
|
||||||
|
|
||||||
return nil, status.Error(codes.Internal, err.Error())
|
|
||||||
}
|
|
||||||
|
|
||||||
info, err := mirror.GetMirroringInfo(ctx)
|
info, err := mirror.GetMirroringInfo(ctx)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@ -539,24 +525,33 @@ func (rs *ReplicationServer) DemoteVolume(ctx context.Context,
|
|||||||
return nil, status.Errorf(
|
return nil, status.Errorf(
|
||||||
codes.InvalidArgument,
|
codes.InvalidArgument,
|
||||||
"mirroring is not enabled on %s, image is in %s Mode",
|
"mirroring is not enabled on %s, image is in %s Mode",
|
||||||
volumeID,
|
reqID,
|
||||||
info.GetState())
|
info.GetState())
|
||||||
}
|
}
|
||||||
|
|
||||||
// demote image to secondary
|
// demote image to secondary
|
||||||
if info.IsPrimary() {
|
if info.IsPrimary() {
|
||||||
// store the image creation time for resync
|
for _, vol := range volumes {
|
||||||
_, err = rbdVol.GetMetadata(imageCreationTimeKey)
|
// store the image creation time for resync
|
||||||
if err != nil && errors.Is(err, librbd.ErrNotFound) {
|
creationTime, cErr := vol.GetCreationTime()
|
||||||
log.DebugLog(ctx, "setting image creation time %s for %s", creationTime, rbdVol)
|
if cErr != nil {
|
||||||
err = rbdVol.SetMetadata(imageCreationTimeKey, timestampToString(creationTime))
|
log.ErrorLog(ctx, cErr.Error())
|
||||||
}
|
|
||||||
if err != nil {
|
|
||||||
log.ErrorLog(ctx, err.Error())
|
|
||||||
|
|
||||||
return nil, status.Error(codes.Internal, err.Error())
|
return nil, status.Error(codes.Internal, cErr.Error())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// store the image creation time for resync
|
||||||
|
_, err = vol.GetMetadata(imageCreationTimeKey)
|
||||||
|
if err != nil && errors.Is(err, librbd.ErrNotFound) {
|
||||||
|
log.DebugLog(ctx, "setting image creation time %s for %s", creationTime, vol)
|
||||||
|
err = vol.SetMetadata(imageCreationTimeKey, timestampToString(creationTime))
|
||||||
|
}
|
||||||
|
if err != nil {
|
||||||
|
log.ErrorLog(ctx, err.Error())
|
||||||
|
|
||||||
|
return nil, status.Error(codes.Internal, err.Error())
|
||||||
|
}
|
||||||
|
}
|
||||||
err = mirror.Demote(ctx)
|
err = mirror.Demote(ctx)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.ErrorLog(ctx, err.Error())
|
log.ErrorLog(ctx, err.Error())
|
||||||
@ -603,9 +598,9 @@ func checkRemoteSiteStatus(ctx context.Context, mirrorStatus []types.SiteStatus)
|
|||||||
func (rs *ReplicationServer) ResyncVolume(ctx context.Context,
|
func (rs *ReplicationServer) ResyncVolume(ctx context.Context,
|
||||||
req *replication.ResyncVolumeRequest,
|
req *replication.ResyncVolumeRequest,
|
||||||
) (*replication.ResyncVolumeResponse, error) {
|
) (*replication.ResyncVolumeResponse, error) {
|
||||||
volumeID := csicommon.GetIDFromReplication(req)
|
reqID := csicommon.GetIDFromReplication(req)
|
||||||
if volumeID == "" {
|
if reqID == "" {
|
||||||
return nil, status.Error(codes.InvalidArgument, "empty volume ID in request")
|
return nil, status.Error(codes.InvalidArgument, "empty ID in request")
|
||||||
}
|
}
|
||||||
cr, err := util.NewUserCredentials(req.GetSecrets())
|
cr, err := util.NewUserCredentials(req.GetSecrets())
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@ -613,23 +608,21 @@ func (rs *ReplicationServer) ResyncVolume(ctx context.Context,
|
|||||||
}
|
}
|
||||||
defer cr.DeleteCredentials()
|
defer cr.DeleteCredentials()
|
||||||
|
|
||||||
if acquired := rs.VolumeLocks.TryAcquire(volumeID); !acquired {
|
if acquired := rs.VolumeLocks.TryAcquire(reqID); !acquired {
|
||||||
log.ErrorLog(ctx, util.VolumeOperationAlreadyExistsFmt, volumeID)
|
log.ErrorLog(ctx, util.VolumeOperationAlreadyExistsFmt, reqID)
|
||||||
|
|
||||||
return nil, status.Errorf(codes.Aborted, util.VolumeOperationAlreadyExistsFmt, volumeID)
|
return nil, status.Errorf(codes.Aborted, util.VolumeOperationAlreadyExistsFmt, reqID)
|
||||||
}
|
}
|
||||||
defer rs.VolumeLocks.Release(volumeID)
|
defer rs.VolumeLocks.Release(reqID)
|
||||||
mgr := rbd.NewManager(rs.csiID, req.GetParameters(), req.GetSecrets())
|
mgr := rbd.NewManager(rs.csiID, req.GetParameters(), req.GetSecrets())
|
||||||
defer mgr.Destroy(ctx)
|
defer mgr.Destroy(ctx)
|
||||||
|
|
||||||
rbdVol, err := mgr.GetVolumeByID(ctx, volumeID)
|
volumes, mirror, err := mgr.GetMirrorSource(ctx, reqID, req.GetReplicationSource())
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, getGRPCError(err)
|
return nil, getGRPCError(err)
|
||||||
}
|
}
|
||||||
mirror, err := rbdVol.ToMirror()
|
defer mirror.Destroy(ctx)
|
||||||
if err != nil {
|
defer destoryVolumes(ctx, volumes)
|
||||||
return nil, status.Error(codes.Internal, err.Error())
|
|
||||||
}
|
|
||||||
|
|
||||||
info, err := mirror.GetMirroringInfo(ctx)
|
info, err := mirror.GetMirroringInfo(ctx)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@ -694,39 +687,43 @@ func (rs *ReplicationServer) ResyncVolume(ctx context.Context,
|
|||||||
ready = checkRemoteSiteStatus(ctx, sts.GetAllSitesStatus())
|
ready = checkRemoteSiteStatus(ctx, sts.GetAllSitesStatus())
|
||||||
}
|
}
|
||||||
|
|
||||||
creationTime, err := rbdVol.GetCreationTime()
|
for _, vol := range volumes {
|
||||||
if err != nil {
|
creationTime, tErr := vol.GetCreationTime()
|
||||||
return nil, status.Errorf(codes.Internal, "failed to get image info for %s: %s", rbdVol, err.Error())
|
if tErr != nil {
|
||||||
}
|
return nil, status.Errorf(codes.Internal, "failed to get image info for %s: %s", vol, tErr.Error())
|
||||||
|
|
||||||
// image creation time is stored in the image metadata. it looks like
|
|
||||||
// `"seconds:1692879841 nanos:631526669"`
|
|
||||||
// If the image gets resynced the local image creation time will be
|
|
||||||
// lost, if the keys is not present in the image metadata then we can
|
|
||||||
// assume that the image is already resynced.
|
|
||||||
savedImageTime, err := rbdVol.GetMetadata(imageCreationTimeKey)
|
|
||||||
if err != nil && !errors.Is(err, librbd.ErrNotFound) {
|
|
||||||
return nil, status.Errorf(codes.Internal,
|
|
||||||
"failed to get %s key from image metadata for %s: %s",
|
|
||||||
imageCreationTimeKey,
|
|
||||||
rbdVol,
|
|
||||||
err.Error())
|
|
||||||
}
|
|
||||||
|
|
||||||
if savedImageTime != "" {
|
|
||||||
st, sErr := timestampFromString(savedImageTime)
|
|
||||||
if sErr != nil {
|
|
||||||
return nil, status.Errorf(codes.Internal, "failed to parse image creation time: %s", sErr.Error())
|
|
||||||
}
|
}
|
||||||
log.DebugLog(ctx, "image %s, savedImageTime=%v, currentImageTime=%v", rbdVol, st, creationTime.AsTime())
|
|
||||||
if req.GetForce() && st.Equal(creationTime.AsTime()) {
|
// image creation time is stored in the image metadata. it looks like
|
||||||
err = mirror.Resync(ctx)
|
// `"seconds:1692879841 nanos:631526669"`
|
||||||
if err != nil {
|
// If the image gets resynced the local image creation time will be
|
||||||
return nil, getGRPCError(err)
|
// lost, if the keys is not present in the image metadata then we can
|
||||||
|
// assume that the image is already resynced.
|
||||||
|
var savedImageTime string
|
||||||
|
savedImageTime, err = vol.GetMetadata(imageCreationTimeKey)
|
||||||
|
if err != nil && !errors.Is(err, librbd.ErrNotFound) {
|
||||||
|
return nil, status.Errorf(codes.Internal,
|
||||||
|
"failed to get %s key from image metadata for %s: %s",
|
||||||
|
imageCreationTimeKey,
|
||||||
|
vol,
|
||||||
|
err.Error())
|
||||||
|
}
|
||||||
|
|
||||||
|
if savedImageTime != "" {
|
||||||
|
st, sErr := timestampFromString(savedImageTime)
|
||||||
|
if sErr != nil {
|
||||||
|
return nil, status.Errorf(codes.Internal, "failed to parse image creation time: %s", sErr.Error())
|
||||||
|
}
|
||||||
|
log.DebugLog(ctx, "image %s, savedImageTime=%v, currentImageTime=%v", vol, st, creationTime.AsTime())
|
||||||
|
if req.GetForce() && st.Equal(creationTime.AsTime()) {
|
||||||
|
err = mirror.Resync(ctx)
|
||||||
|
if err != nil {
|
||||||
|
return nil, getGRPCError(err)
|
||||||
|
}
|
||||||
|
// Break the loop as we need to issue resync only once for the image or for the group.
|
||||||
|
break
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if !ready {
|
if !ready {
|
||||||
err = checkVolumeResyncStatus(ctx, localStatus)
|
err = checkVolumeResyncStatus(ctx, localStatus)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@ -734,9 +731,12 @@ func (rs *ReplicationServer) ResyncVolume(ctx context.Context,
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
err = rbdVol.RepairResyncedImageID(ctx, ready)
|
// update imageID for all the volumes
|
||||||
if err != nil {
|
for _, vol := range volumes {
|
||||||
return nil, status.Errorf(codes.Internal, "failed to resync Image ID: %s", err.Error())
|
err = vol.RepairResyncedImageID(ctx, ready)
|
||||||
|
if err != nil {
|
||||||
|
return nil, status.Errorf(codes.Internal, "failed to resync Image ID: %s", err.Error())
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
resp := &replication.ResyncVolumeResponse{
|
resp := &replication.ResyncVolumeResponse{
|
||||||
@ -811,9 +811,9 @@ func getGRPCError(err error) error {
|
|||||||
func (rs *ReplicationServer) GetVolumeReplicationInfo(ctx context.Context,
|
func (rs *ReplicationServer) GetVolumeReplicationInfo(ctx context.Context,
|
||||||
req *replication.GetVolumeReplicationInfoRequest,
|
req *replication.GetVolumeReplicationInfoRequest,
|
||||||
) (*replication.GetVolumeReplicationInfoResponse, error) {
|
) (*replication.GetVolumeReplicationInfoResponse, error) {
|
||||||
volumeID := csicommon.GetIDFromReplication(req)
|
reqID := csicommon.GetIDFromReplication(req)
|
||||||
if volumeID == "" {
|
if reqID == "" {
|
||||||
return nil, status.Error(codes.InvalidArgument, "empty volume ID in request")
|
return nil, status.Error(codes.InvalidArgument, "empty ID in request")
|
||||||
}
|
}
|
||||||
cr, err := util.NewUserCredentials(req.GetSecrets())
|
cr, err := util.NewUserCredentials(req.GetSecrets())
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@ -821,32 +821,21 @@ func (rs *ReplicationServer) GetVolumeReplicationInfo(ctx context.Context,
|
|||||||
}
|
}
|
||||||
defer cr.DeleteCredentials()
|
defer cr.DeleteCredentials()
|
||||||
|
|
||||||
if acquired := rs.VolumeLocks.TryAcquire(volumeID); !acquired {
|
if acquired := rs.VolumeLocks.TryAcquire(reqID); !acquired {
|
||||||
log.ErrorLog(ctx, util.VolumeOperationAlreadyExistsFmt, volumeID)
|
log.ErrorLog(ctx, util.VolumeOperationAlreadyExistsFmt, reqID)
|
||||||
|
|
||||||
return nil, status.Errorf(codes.Aborted, util.VolumeOperationAlreadyExistsFmt, volumeID)
|
return nil, status.Errorf(codes.Aborted, util.VolumeOperationAlreadyExistsFmt, reqID)
|
||||||
}
|
}
|
||||||
defer rs.VolumeLocks.Release(volumeID)
|
defer rs.VolumeLocks.Release(reqID)
|
||||||
mgr := rbd.NewManager(rs.csiID, nil, req.GetSecrets())
|
mgr := rbd.NewManager(rs.csiID, nil, req.GetSecrets())
|
||||||
defer mgr.Destroy(ctx)
|
defer mgr.Destroy(ctx)
|
||||||
|
|
||||||
rbdVol, err := mgr.GetVolumeByID(ctx, volumeID)
|
volumes, mirror, err := mgr.GetMirrorSource(ctx, reqID, req.GetReplicationSource())
|
||||||
if err != nil {
|
if err != nil {
|
||||||
switch {
|
return nil, getGRPCError(err)
|
||||||
case errors.Is(err, corerbd.ErrImageNotFound):
|
|
||||||
err = status.Errorf(codes.NotFound, err.Error())
|
|
||||||
case errors.Is(err, util.ErrPoolNotFound):
|
|
||||||
err = status.Errorf(codes.NotFound, err.Error())
|
|
||||||
default:
|
|
||||||
err = status.Errorf(codes.Internal, err.Error())
|
|
||||||
}
|
|
||||||
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
mirror, err := rbdVol.ToMirror()
|
|
||||||
if err != nil {
|
|
||||||
return nil, status.Error(codes.Internal, err.Error())
|
|
||||||
}
|
}
|
||||||
|
defer mirror.Destroy(ctx)
|
||||||
|
defer destoryVolumes(ctx, volumes)
|
||||||
|
|
||||||
info, err := mirror.GetMirroringInfo(ctx)
|
info, err := mirror.GetMirroringInfo(ctx)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@ -977,3 +966,10 @@ func checkVolumeResyncStatus(ctx context.Context, localStatus types.SiteStatus)
|
|||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// destoryVolumes destroys the volume connections.
|
||||||
|
func destoryVolumes(ctx context.Context, volumes []types.Volume) {
|
||||||
|
for _, vol := range volumes {
|
||||||
|
vol.Destroy(ctx)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
@ -88,11 +88,7 @@ func (vs *VolumeGroupServer) CreateVolumeGroup(
|
|||||||
|
|
||||||
// resolve all volumes
|
// resolve all volumes
|
||||||
volumes := make([]types.Volume, len(req.GetVolumeIds()))
|
volumes := make([]types.Volume, len(req.GetVolumeIds()))
|
||||||
defer func() {
|
defer destoryVolumes(ctx, volumes)
|
||||||
for _, vol := range volumes {
|
|
||||||
vol.Destroy(ctx)
|
|
||||||
}
|
|
||||||
}()
|
|
||||||
for i, id := range req.GetVolumeIds() {
|
for i, id := range req.GetVolumeIds() {
|
||||||
vol, err := mgr.GetVolumeByID(ctx, id)
|
vol, err := mgr.GetVolumeByID(ctx, id)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@ -348,11 +344,7 @@ func (vs *VolumeGroupServer) ModifyVolumeGroupMembership(
|
|||||||
|
|
||||||
// resolve all volumes
|
// resolve all volumes
|
||||||
volumes := make([]types.Volume, len(toAdd))
|
volumes := make([]types.Volume, len(toAdd))
|
||||||
defer func() {
|
defer destoryVolumes(ctx, volumes)
|
||||||
for _, vol := range volumes {
|
|
||||||
vol.Destroy(ctx)
|
|
||||||
}
|
|
||||||
}()
|
|
||||||
for i, id := range toAdd {
|
for i, id := range toAdd {
|
||||||
var vol types.Volume
|
var vol types.Volume
|
||||||
vol, err = mgr.GetVolumeByID(ctx, id)
|
vol, err = mgr.GetVolumeByID(ctx, id)
|
||||||
|
@ -465,3 +465,7 @@ func (vg *volumeGroup) RemoveVolume(ctx context.Context, vol types.Volume) error
|
|||||||
func (vg *volumeGroup) ListVolumes(ctx context.Context) ([]types.Volume, error) {
|
func (vg *volumeGroup) ListVolumes(ctx context.Context) ([]types.Volume, error) {
|
||||||
return vg.volumes, nil
|
return vg.volumes, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (vg *volumeGroup) ToMirror() (types.Mirror, error) {
|
||||||
|
return volumeGroupMirror{vg}, nil
|
||||||
|
}
|
||||||
|
@ -22,6 +22,7 @@ import (
|
|||||||
"fmt"
|
"fmt"
|
||||||
|
|
||||||
"github.com/ceph/go-ceph/rados"
|
"github.com/ceph/go-ceph/rados"
|
||||||
|
"github.com/csi-addons/spec/lib/go/replication"
|
||||||
|
|
||||||
"github.com/ceph/ceph-csi/internal/journal"
|
"github.com/ceph/ceph-csi/internal/journal"
|
||||||
rbd_group "github.com/ceph/ceph-csi/internal/rbd/group"
|
rbd_group "github.com/ceph/ceph-csi/internal/rbd/group"
|
||||||
@ -294,3 +295,78 @@ func (mgr *rbdManager) DeleteVolumeGroup(ctx context.Context, vg types.VolumeGro
|
|||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (mgr *rbdManager) GetMirrorSource(ctx context.Context, reqID string,
|
||||||
|
rep *replication.ReplicationSource,
|
||||||
|
) ([]types.Volume, types.Mirror, error) {
|
||||||
|
switch {
|
||||||
|
// Backward compatibility: if rep is nil, we assume that the sidecar is still old and
|
||||||
|
// setting only volumeId not the replication source.
|
||||||
|
case rep == nil || rep.GetVolume() != nil:
|
||||||
|
rbdVol, err := mgr.GetVolumeByID(ctx, reqID)
|
||||||
|
if err != nil {
|
||||||
|
return nil, nil, fmt.Errorf("failed to get volume by id %q: %w", reqID, err)
|
||||||
|
}
|
||||||
|
defer func() {
|
||||||
|
if err != nil {
|
||||||
|
rbdVol.Destroy(ctx)
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
var mir types.Mirror
|
||||||
|
mir, err = rbdVol.ToMirror()
|
||||||
|
if err != nil {
|
||||||
|
return nil, nil, fmt.Errorf("failed to convert volume %s to mirror: %w", rbdVol, err)
|
||||||
|
}
|
||||||
|
|
||||||
|
return []types.Volume{rbdVol}, mir, nil
|
||||||
|
case rep.GetVolumegroup() != nil:
|
||||||
|
rbdGroup, err := mgr.GetVolumeGroupByID(ctx, reqID)
|
||||||
|
if err != nil {
|
||||||
|
return nil, nil, fmt.Errorf("failed to get volume group by id %q: %w", reqID, err)
|
||||||
|
}
|
||||||
|
defer func() {
|
||||||
|
if err != nil {
|
||||||
|
rbdGroup.Destroy(ctx)
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
var mir types.Mirror
|
||||||
|
mir, err = rbdGroup.ToMirror()
|
||||||
|
if err != nil {
|
||||||
|
return nil, nil, fmt.Errorf("failed to convert volume group %s to mirror: %w", rbdGroup, err)
|
||||||
|
}
|
||||||
|
var vols []types.Volume
|
||||||
|
vols, err = rbdGroup.ListVolumes(ctx)
|
||||||
|
if err != nil {
|
||||||
|
return nil, nil, fmt.Errorf("failed to list volumes in volume group %q: %w", rbdGroup, err)
|
||||||
|
}
|
||||||
|
// Get all the volume with connection and return it
|
||||||
|
volumes := make([]types.Volume, len(vols))
|
||||||
|
// Destroy connections if there is any error
|
||||||
|
defer func() {
|
||||||
|
if err != nil {
|
||||||
|
for _, vol := range vols {
|
||||||
|
vol.Destroy(ctx)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
|
||||||
|
for i, vol := range vols {
|
||||||
|
var id string
|
||||||
|
id, err = vol.GetID(ctx)
|
||||||
|
if err != nil {
|
||||||
|
return nil, nil, fmt.Errorf("failed to get id for volume %q in group %q: %w", vol, rbdGroup, err)
|
||||||
|
}
|
||||||
|
var v types.Volume
|
||||||
|
v, err = mgr.GetVolumeByID(ctx, id)
|
||||||
|
if err != nil {
|
||||||
|
return nil, nil, fmt.Errorf("failed to get volume by id %q in group %q: %w", id, rbdGroup, err)
|
||||||
|
}
|
||||||
|
volumes[i] = v
|
||||||
|
}
|
||||||
|
|
||||||
|
return volumes, mir, nil
|
||||||
|
|
||||||
|
default:
|
||||||
|
return nil, nil, errors.New("replication source is not set")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
@ -66,4 +66,7 @@ type VolumeGroup interface {
|
|||||||
|
|
||||||
// ListVolumes returns a slice with all Volumes in the VolumeGroup.
|
// ListVolumes returns a slice with all Volumes in the VolumeGroup.
|
||||||
ListVolumes(ctx context.Context) ([]Volume, error)
|
ListVolumes(ctx context.Context) ([]Volume, error)
|
||||||
|
|
||||||
|
// ToMirror converts the VolumeGroup to a Mirror.
|
||||||
|
ToMirror() (Mirror, error)
|
||||||
}
|
}
|
||||||
|
@ -18,6 +18,8 @@ package types
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
|
|
||||||
|
"github.com/csi-addons/spec/lib/go/replication"
|
||||||
)
|
)
|
||||||
|
|
||||||
// VolumeResolver can be used to construct a Volume from a CSI VolumeId.
|
// VolumeResolver can be used to construct a Volume from a CSI VolumeId.
|
||||||
@ -47,4 +49,8 @@ type Manager interface {
|
|||||||
// DeleteVolumeGroup removes VolumeGroup from the backend storage and
|
// DeleteVolumeGroup removes VolumeGroup from the backend storage and
|
||||||
// any details from the journal.
|
// any details from the journal.
|
||||||
DeleteVolumeGroup(ctx context.Context, vg VolumeGroup) error
|
DeleteVolumeGroup(ctx context.Context, vg VolumeGroup) error
|
||||||
|
|
||||||
|
// GetMirrorSource returns the source of the mirror for the given volume or group.
|
||||||
|
GetMirrorSource(ctx context.Context, volumeID string,
|
||||||
|
rep *replication.ReplicationSource) ([]Volume, Mirror, error)
|
||||||
}
|
}
|
||||||
|
@ -38,6 +38,8 @@ const (
|
|||||||
|
|
||||||
// Mirror is the interface for managing mirroring on an RBD image or a group.
|
// Mirror is the interface for managing mirroring on an RBD image or a group.
|
||||||
type Mirror interface {
|
type Mirror interface {
|
||||||
|
// Destroy frees the resources used by the Mirror.
|
||||||
|
Destroy(ctx context.Context)
|
||||||
// EnableMirroring enables mirroring on the resource with the specified mode.
|
// EnableMirroring enables mirroring on the resource with the specified mode.
|
||||||
EnableMirroring(ctx context.Context, mode librbd.ImageMirrorMode) error
|
EnableMirroring(ctx context.Context, mode librbd.ImageMirrorMode) error
|
||||||
// DisableMirroring disables mirroring on the resource with the option to force the operation
|
// DisableMirroring disables mirroring on the resource with the option to force the operation
|
||||||
|
Loading…
Reference in New Issue
Block a user