mirror of
https://github.com/ceph/ceph-csi.git
synced 2025-06-13 10:33:35 +00:00
rbd: implements getVolumeReplicationInfo
This commit implements getVolumeReplicationInfo to get the last sync time and update it in volume replication CR. Signed-off-by: yati1998 <ypadia@redhat.com>
This commit is contained in:
@ -18,6 +18,7 @@ package rbd
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"errors"
|
||||
"fmt"
|
||||
"regexp"
|
||||
@ -35,6 +36,7 @@ import (
|
||||
"google.golang.org/grpc"
|
||||
"google.golang.org/grpc/codes"
|
||||
"google.golang.org/grpc/status"
|
||||
"google.golang.org/protobuf/types/known/timestamppb"
|
||||
)
|
||||
|
||||
// imageMirroringMode is used to indicate the mirroring mode for an RBD image.
|
||||
@ -834,6 +836,116 @@ func (rs *ReplicationServer) ResyncVolume(ctx context.Context,
|
||||
return resp, nil
|
||||
}
|
||||
|
||||
// GetVolumeReplicationInfo extracts the RBD volume information from the volumeID, If the
|
||||
// image is present, mirroring is enabled and the image is in primary state.
|
||||
func (rs *ReplicationServer) GetVolumeReplicationInfo(ctx context.Context,
|
||||
req *replication.GetVolumeReplicationInfoRequest,
|
||||
) (*replication.GetVolumeReplicationInfoResponse, error) {
|
||||
volumeID := req.GetVolumeId()
|
||||
if volumeID == "" {
|
||||
return nil, status.Error(codes.InvalidArgument, "empty volume ID in request")
|
||||
}
|
||||
cr, err := util.NewUserCredentials(req.GetSecrets())
|
||||
if err != nil {
|
||||
return nil, status.Error(codes.Internal, err.Error())
|
||||
}
|
||||
defer cr.DeleteCredentials()
|
||||
|
||||
if acquired := rs.VolumeLocks.TryAcquire(volumeID); !acquired {
|
||||
log.ErrorLog(ctx, util.VolumeOperationAlreadyExistsFmt, volumeID)
|
||||
|
||||
return nil, status.Errorf(codes.Aborted, util.VolumeOperationAlreadyExistsFmt, volumeID)
|
||||
}
|
||||
defer rs.VolumeLocks.Release(volumeID)
|
||||
rbdVol, err := GenVolFromVolID(ctx, volumeID, cr, req.GetSecrets())
|
||||
defer rbdVol.Destroy()
|
||||
if err != nil {
|
||||
switch {
|
||||
case errors.Is(err, ErrImageNotFound):
|
||||
err = status.Errorf(codes.NotFound, "volume %s not found", volumeID)
|
||||
case errors.Is(err, util.ErrPoolNotFound):
|
||||
err = status.Errorf(codes.NotFound, "pool %s not found for %s", rbdVol.Pool, volumeID)
|
||||
default:
|
||||
err = status.Errorf(codes.Internal, err.Error())
|
||||
}
|
||||
|
||||
return nil, err
|
||||
}
|
||||
|
||||
mirroringInfo, err := rbdVol.getImageMirroringInfo()
|
||||
if err != nil {
|
||||
log.ErrorLog(ctx, err.Error())
|
||||
|
||||
return nil, status.Error(codes.Aborted, err.Error())
|
||||
}
|
||||
|
||||
if mirroringInfo.State != librbd.MirrorImageEnabled {
|
||||
return nil, status.Error(codes.InvalidArgument, "image mirroring is not enabled")
|
||||
}
|
||||
|
||||
// return error if the image is not in primary state
|
||||
if !mirroringInfo.Primary {
|
||||
return nil, status.Error(codes.InvalidArgument, "image is not in primary state")
|
||||
}
|
||||
|
||||
mirrorStatus, err := rbdVol.getImageMirroringStatus()
|
||||
if err != nil {
|
||||
if errors.Is(err, ErrImageNotFound) {
|
||||
return nil, status.Error(codes.Aborted, err.Error())
|
||||
}
|
||||
log.ErrorLog(ctx, err.Error())
|
||||
|
||||
return nil, status.Error(codes.Internal, err.Error())
|
||||
}
|
||||
|
||||
localStatus, err := mirrorStatus.LocalStatus()
|
||||
if err != nil {
|
||||
log.ErrorLog(ctx, err.Error())
|
||||
|
||||
return nil, fmt.Errorf("failed to get local status: %w", err)
|
||||
}
|
||||
|
||||
description := localStatus.Description
|
||||
lastSyncTime, err := getLastSyncTime(description)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to get last sync time: %w", err)
|
||||
}
|
||||
|
||||
resp := &replication.GetVolumeReplicationInfoResponse{
|
||||
LastSyncTime: lastSyncTime,
|
||||
}
|
||||
|
||||
return resp, nil
|
||||
}
|
||||
|
||||
// This function gets the local snapshot time from the description
|
||||
// of localStatus and converts it into required type.
|
||||
func getLastSyncTime(description string) (*timestamppb.Timestamp, error) {
|
||||
// Format of the description will be as followed:
|
||||
// description = "replaying,{"bytes_per_second":0.0,
|
||||
// "bytes_per_snapshot":149504.0,"local_snapshot_timestamp":1662655501
|
||||
// ,"remote_snapshot_timestamp":1662655501}"
|
||||
// In case there is no local snapshot timestamp we can pass the default value
|
||||
if description == "" {
|
||||
return nil, nil
|
||||
}
|
||||
splittedString := strings.SplitN(description, ",", 2)
|
||||
type localStatus struct {
|
||||
LocalSnapshotTime int64 `json:"local_snapshot_timestamp"`
|
||||
}
|
||||
|
||||
var localSnapTime localStatus
|
||||
err := json.Unmarshal([]byte(splittedString[1]), &localSnapTime)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to unmarshal description: %w", err)
|
||||
}
|
||||
|
||||
lastUpdateTime := time.Unix(localSnapTime.LocalSnapshotTime, 0)
|
||||
lastSyncTime := timestamppb.New(lastUpdateTime)
|
||||
|
||||
return lastSyncTime, nil
|
||||
}
|
||||
|
||||
func resyncVolume(localStatus librbd.SiteMirrorImageStatus, rbdVol *rbdVolume, force bool) error {
|
||||
if resyncRequired(localStatus) {
|
||||
// If the force option is not set return the error message to retry
|
||||
|
@ -19,10 +19,13 @@ package rbd
|
||||
import (
|
||||
"context"
|
||||
"reflect"
|
||||
"strings"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
librbd "github.com/ceph/go-ceph/rbd"
|
||||
"github.com/ceph/go-ceph/rbd/admin"
|
||||
"google.golang.org/protobuf/types/known/timestamppb"
|
||||
)
|
||||
|
||||
func TestValidateSchedulingInterval(t *testing.T) {
|
||||
@ -432,3 +435,54 @@ func TestCheckRemoteSiteStatus(t *testing.T) {
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestValidateLastSyncTime(t *testing.T) {
|
||||
t.Parallel()
|
||||
tests := []struct {
|
||||
name string
|
||||
description string
|
||||
timestamp *timestamppb.Timestamp
|
||||
expectedErr string
|
||||
}{
|
||||
{
|
||||
"valid description",
|
||||
//nolint:lll // sample output cannot be split into multiple lines.
|
||||
`replaying,{"bytes_per_second":0.0,"bytes_per_snapshot":149504.0,"local_snapshot_timestamp":1662655501,"remote_snapshot_timestamp":1662655501}`,
|
||||
timestamppb.New(time.Unix(1662655501, 0)),
|
||||
"",
|
||||
},
|
||||
{
|
||||
"empty description",
|
||||
"",
|
||||
nil,
|
||||
"",
|
||||
},
|
||||
{
|
||||
"description without local_snapshot_timestamp",
|
||||
`replaying,{"bytes_per_second":0.0,"bytes_per_snapshot":149504.0,"remote_snapshot_timestamp":1662655501}`,
|
||||
nil,
|
||||
"",
|
||||
},
|
||||
{
|
||||
"description with invalid JSON",
|
||||
`replaying,{"bytes_per_second":0.0,"bytes_per_snapshot":149504.0","remote_snapshot_timestamp":1662655501`,
|
||||
nil,
|
||||
"failed to unmarshal description",
|
||||
},
|
||||
}
|
||||
for _, tt := range tests {
|
||||
tt := tt
|
||||
t.Run(tt.name, func(t *testing.T) {
|
||||
t.Parallel()
|
||||
ts, err := getLastSyncTime(tt.description)
|
||||
if err != nil && !strings.Contains(err.Error(), tt.expectedErr) {
|
||||
// returned error
|
||||
t.Errorf("getLastSyncTime() returned error, expected: %v, got: %v",
|
||||
tt.expectedErr, err)
|
||||
}
|
||||
if !ts.AsTime().Equal(tt.timestamp.AsTime()) {
|
||||
t.Errorf("getLastSyncTime() %v, expected %v", ts, tt.timestamp)
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
Reference in New Issue
Block a user