rbd: cleanup mirror last-sync-info parsing

By introducing a SyncInfo interface, the `getLastSyncInfo` function can
be removed from the csi-addons/rbd package. Getting details from an RBD
mirror status, should be part of the main internal/rbd package, the
CSI-Addons components should only use the internal API, and not add any
parsing logic.

This makes it easier to enhance the SyncInfo interface in the future.

Signed-off-by: Niels de Vos <ndevos@ibm.com>
This commit is contained in:
Niels de Vos 2025-04-15 11:40:55 +02:00 committed by mergify[bot]
parent 31da09863e
commit 04257464bb
5 changed files with 273 additions and 219 deletions

View File

@ -18,7 +18,6 @@ package rbd
import (
"context"
"encoding/json"
"errors"
"fmt"
"regexp"
@ -891,11 +890,8 @@ func (rs *ReplicationServer) GetVolumeReplicationInfo(ctx context.Context,
return nil, status.Errorf(codes.Internal, "failed to get remote status: %v", err)
}
description := remoteStatus.GetDescription()
resp, err := getLastSyncInfo(ctx, description)
lastSyncInfo, err := remoteStatus.GetLastSyncInfo(ctx)
if err != nil {
log.ErrorLog(ctx, "failed to parse last sync info from %q: %v", description, err)
if errors.Is(err, rbderrors.ErrLastSyncTimeNotFound) {
return nil, status.Errorf(codes.NotFound, "failed to get last sync info: %v", err)
}
@ -903,73 +899,18 @@ func (rs *ReplicationServer) GetVolumeReplicationInfo(ctx context.Context,
return nil, status.Errorf(codes.Internal, "failed to get last sync info: %v", err)
}
return resp, nil
resp := replication.GetVolumeReplicationInfoResponse{
LastSyncTime: timestamppb.New(lastSyncInfo.GetLastSyncTime()),
LastSyncBytes: lastSyncInfo.GetLastSyncBytes(),
}
// This function gets the local snapshot time, last sync snapshot seconds
// and last sync bytes from the description of localStatus and convert
// it into required types.
func getLastSyncInfo(ctx context.Context, description string) (*replication.GetVolumeReplicationInfoResponse, error) {
// Format of the description will be as followed:
// description = `replaying, {"bytes_per_second":0.0,"bytes_per_snapshot":81920.0,
// "last_snapshot_bytes":81920,"last_snapshot_sync_seconds":0,
// "local_snapshot_timestamp":1684675261,
// "remote_snapshot_timestamp":1684675261,"replay_state":"idle"}`
// In case there is no last snapshot bytes returns 0 as the
// LastSyncBytes is optional.
// In case there is no last snapshot sync seconds, it returns nil as the
// LastSyncDuration is optional.
// In case there is no local snapshot timestamp return an error as the
// LastSyncTime is required.
var response replication.GetVolumeReplicationInfoResponse
if description == "" {
return nil, fmt.Errorf("empty description: %w", rbderrors.ErrLastSyncTimeNotFound)
}
log.DebugLog(ctx, "description: %s", description)
splittedString := strings.SplitN(description, ",", 2)
if len(splittedString) == 1 {
return nil, fmt.Errorf("no snapshot details: %w", rbderrors.ErrLastSyncTimeNotFound)
}
type localStatus struct {
LocalSnapshotTime int64 `json:"local_snapshot_timestamp"`
LastSnapshotBytes int64 `json:"last_snapshot_bytes"`
LastSnapshotDuration *int64 `json:"last_snapshot_sync_seconds"`
// lastDuration is optional, can be nil
lastDuration := lastSyncInfo.GetLastSyncDuration()
if lastDuration != nil {
resp.LastSyncDuration = durationpb.New(*lastDuration)
}
var localSnapInfo localStatus
err := json.Unmarshal([]byte(splittedString[1]), &localSnapInfo)
if err != nil {
return nil, fmt.Errorf("failed to unmarshal local snapshot info: %w", err)
}
// If the json unmarsal is successful but the local snapshot time is 0, we
// need to consider it as an error as the LastSyncTime is required.
if localSnapInfo.LocalSnapshotTime == 0 {
return nil, fmt.Errorf("empty local snapshot timestamp: %w", rbderrors.ErrLastSyncTimeNotFound)
}
if localSnapInfo.LastSnapshotDuration != nil {
// converts localSnapshotDuration of type int64 to string format with
// appended `s` seconds required for time.ParseDuration
lastDurationTime := fmt.Sprintf("%ds", *localSnapInfo.LastSnapshotDuration)
// parse Duration from the lastDurationTime string
lastDuration, err := time.ParseDuration(lastDurationTime)
if err != nil {
return nil, fmt.Errorf("failed to parse last snapshot duration: %w", err)
}
// converts time.Duration to *durationpb.Duration
response.LastSyncDuration = durationpb.New(lastDuration)
}
// converts localSnapshotTime of type int64 to time.Time
lastUpdateTime := time.Unix(localSnapInfo.LocalSnapshotTime, 0)
lastSyncTime := timestamppb.New(lastUpdateTime)
response.LastSyncTime = lastSyncTime
response.LastSyncBytes = localSnapInfo.LastSnapshotBytes
return &response, nil
return &resp, nil
}
func checkVolumeResyncStatus(ctx context.Context, localStatus types.SiteStatus) error {
@ -977,14 +918,12 @@ func checkVolumeResyncStatus(ctx context.Context, localStatus types.SiteStatus)
// started or not, if we dont see local_snapshot_timestamp in the
// description of localStatus, we are returning error. if we see the local
// snapshot timestamp in the description we return resyncing started.
description := localStatus.GetDescription()
resp, err := getLastSyncInfo(ctx, description)
//
// Note: without a local_snapshot_timestamp, GetLastSyncInfo() returns an error.
_, err := localStatus.GetLastSyncInfo(ctx)
if err != nil {
return fmt.Errorf("failed to get last sync info: %w", err)
}
if resp.GetLastSyncTime() == nil {
return errors.New("last sync time is nil")
}
return nil
}

View File

@ -20,8 +20,6 @@ import (
"context"
"errors"
"reflect"
"strconv"
"strings"
"testing"
"time"
@ -32,12 +30,9 @@ import (
librbd "github.com/ceph/go-ceph/rbd"
"github.com/ceph/go-ceph/rbd/admin"
"github.com/csi-addons/spec/lib/go/replication"
"github.com/stretchr/testify/require"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
"google.golang.org/protobuf/types/known/durationpb"
"google.golang.org/protobuf/types/known/timestamppb"
)
func TestValidateSchedulingInterval(t *testing.T) {
@ -418,147 +413,6 @@ func TestCheckRemoteSiteStatus(t *testing.T) {
}
}
func TestValidateLastSyncInfo(t *testing.T) {
t.Parallel()
ctx := context.TODO()
duration, err := time.ParseDuration(strconv.Itoa(int(56743)) + "s")
if err != nil {
t.Errorf("failed to parse duration)")
}
tests := []struct {
name string
description string
info *replication.GetVolumeReplicationInfoResponse
expectedErr string
}{
{
name: "valid description",
//nolint:lll // sample output cannot be split into multiple lines.
description: `replaying, {"bytes_per_second":0.0,"bytes_per_snapshot":81920.0,"last_snapshot_bytes":81920,"last_snapshot_sync_seconds":56743,"local_snapshot_timestamp":1684675261,"remote_snapshot_timestamp":1684675261,"replay_state":"idle"}`,
info: &replication.GetVolumeReplicationInfoResponse{
LastSyncTime: timestamppb.New(time.Unix(1684675261, 0)),
LastSyncDuration: durationpb.New(duration),
LastSyncBytes: 81920,
},
expectedErr: "",
},
{
name: "empty description",
description: "",
info: &replication.GetVolumeReplicationInfoResponse{
LastSyncTime: nil,
LastSyncDuration: nil,
LastSyncBytes: 0,
},
expectedErr: rbderrors.ErrLastSyncTimeNotFound.Error(),
},
{
name: "description without last_snapshot_bytes",
//nolint:lll // sample output cannot be split into multiple lines.
description: `replaying, {"bytes_per_second":0.0,"last_snapshot_sync_seconds":56743,"local_snapshot_timestamp":1684675261,"remote_snapshot_timestamp":1684675261,"replay_state":"idle"}`,
info: &replication.GetVolumeReplicationInfoResponse{
LastSyncDuration: durationpb.New(duration),
LastSyncTime: timestamppb.New(time.Unix(1684675261, 0)),
LastSyncBytes: 0,
},
expectedErr: "",
},
{
name: "description without local_snapshot_time",
//nolint:lll // sample output cannot be split into multiple lines.
description: `replaying, {"bytes_per_second":0.0,"bytes_per_snapshot":81920.0,"last_snapshot_bytes":81920,"last_snapshot_sync_seconds":56743,"remote_snapshot_timestamp":1684675261,"replay_state":"idle"}`,
info: &replication.GetVolumeReplicationInfoResponse{
LastSyncDuration: nil,
LastSyncTime: nil,
LastSyncBytes: 0,
},
expectedErr: rbderrors.ErrLastSyncTimeNotFound.Error(),
},
{
name: "description without last_snapshot_sync_seconds",
//nolint:lll // sample output cannot be split into multiple lines.
description: `replaying, {"bytes_per_second":0.0,"bytes_per_snapshot":81920.0,"last_snapshot_bytes":81920,"local_snapshot_timestamp":1684675261,"remote_snapshot_timestamp":1684675261,"replay_state":"idle"}`,
info: &replication.GetVolumeReplicationInfoResponse{
LastSyncDuration: nil,
LastSyncTime: timestamppb.New(time.Unix(1684675261, 0)),
LastSyncBytes: 81920,
},
expectedErr: "",
},
{
name: "description with last_snapshot_sync_seconds = 0",
//nolint:lll // sample output cannot be split into multiple lines.
description: `replaying, {"bytes_per_second":0.0,"bytes_per_snapshot":81920.0,"last_snapshot_sync_seconds":0,
"last_snapshot_bytes":81920,"local_snapshot_timestamp":1684675261,"remote_snapshot_timestamp":1684675261,"replay_state":"idle"}`,
info: &replication.GetVolumeReplicationInfoResponse{
LastSyncDuration: durationpb.New(time.Duration(0)),
LastSyncTime: timestamppb.New(time.Unix(1684675261, 0)),
LastSyncBytes: 81920,
},
expectedErr: "",
},
{
name: "description with invalid JSON",
//nolint:lll // sample output cannot be split into multiple lines.
description: `replaying,{"bytes_per_second":0.0,"last_snapshot_bytes":81920","bytes_per_snapshot":149504.0","remote_snapshot_timestamp":1662655501`,
info: &replication.GetVolumeReplicationInfoResponse{
LastSyncDuration: nil,
LastSyncTime: nil,
LastSyncBytes: 0,
},
expectedErr: "failed to unmarshal",
},
{
name: "description with no JSON",
description: `replaying`,
info: &replication.GetVolumeReplicationInfoResponse{
LastSyncDuration: nil,
LastSyncTime: nil,
LastSyncBytes: 0,
},
expectedErr: rbderrors.ErrLastSyncTimeNotFound.Error(),
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
t.Parallel()
teststruct, err := getLastSyncInfo(ctx, tt.description)
if err != nil && !strings.Contains(err.Error(), tt.expectedErr) {
// returned error
t.Errorf("getLastSyncInfo() returned error, expected: %v, got: %v",
tt.expectedErr, err)
}
if teststruct != nil {
if teststruct.GetLastSyncTime().GetSeconds() != tt.info.GetLastSyncTime().GetSeconds() {
t.Errorf("name: %v, getLastSyncInfo() %v, expected %v",
tt.name,
teststruct.GetLastSyncTime(),
tt.info.GetLastSyncTime())
}
if tt.info.GetLastSyncDuration() == nil && teststruct.GetLastSyncDuration() != nil {
t.Errorf("name: %v, getLastSyncInfo() %v, expected %v",
tt.name,
teststruct.GetLastSyncDuration(),
tt.info.GetLastSyncDuration())
}
if teststruct.GetLastSyncDuration().GetSeconds() != tt.info.GetLastSyncDuration().GetSeconds() {
t.Errorf("name: %v, getLastSyncInfo() %v, expected %v",
tt.name,
teststruct.GetLastSyncDuration(),
tt.info.GetLastSyncDuration())
}
if teststruct.GetLastSyncBytes() != tt.info.GetLastSyncBytes() {
t.Errorf("name: %v, getLastSyncInfo() %v, expected %v",
tt.name,
teststruct.GetLastSyncBytes(),
tt.info.GetLastSyncBytes())
}
}
})
}
}
func TestGetGRPCError(t *testing.T) {
t.Parallel()
tests := []struct {

View File

@ -17,7 +17,9 @@ package rbd
import (
"context"
"encoding/json"
"fmt"
"strings"
"time"
rbderrors "github.com/ceph/ceph-csi/internal/rbd/errors"
@ -327,3 +329,75 @@ func (status SiteMirrorImageStatus) GetLastUpdate() time.Time {
// convert the last update time to UTC
return time.Unix(status.LastUpdate, 0).UTC()
}
func (status SiteMirrorImageStatus) GetLastSyncInfo(ctx context.Context) (types.SyncInfo, error) {
return newSyncInfo(ctx, status.Description)
}
type syncInfo struct {
LocalSnapshotTime int64 `json:"local_snapshot_timestamp"`
LastSnapshotBytes int64 `json:"last_snapshot_bytes"`
LastSnapshotDuration *int64 `json:"last_snapshot_sync_seconds"`
}
// Type assertion for ensuring an implementation of the full SyncInfo interface.
var _ types.SyncInfo = &syncInfo{}
func newSyncInfo(ctx context.Context, description string) (types.SyncInfo, error) {
// Format of the description will be as followed:
// description = `replaying, {"bytes_per_second":0.0,"bytes_per_snapshot":81920.0,
// "last_snapshot_bytes":81920,"last_snapshot_sync_seconds":0,
// "local_snapshot_timestamp":1684675261,
// "remote_snapshot_timestamp":1684675261,"replay_state":"idle"}`
// In case there is no last snapshot bytes returns 0 as the
// LastSyncBytes is optional.
// In case there is no last snapshot sync seconds, it returns nil as the
// LastSyncDuration is optional.
// In case there is no local snapshot timestamp return an error as the
// LastSyncTime is required.
if description == "" {
return nil, fmt.Errorf("empty description: %w", rbderrors.ErrLastSyncTimeNotFound)
}
log.DebugLog(ctx, "description: %s", description)
splittedString := strings.SplitN(description, ",", 2)
if len(splittedString) == 1 {
return nil, fmt.Errorf("no snapshot details: %w", rbderrors.ErrLastSyncTimeNotFound)
}
var localSnapInfo syncInfo
err := json.Unmarshal([]byte(splittedString[1]), &localSnapInfo)
if err != nil {
return nil, fmt.Errorf("failed to unmarshal description %q into syncInfo: %w", description, err)
}
// If the json unmarsal is successful but the local snapshot time is 0, we
// need to consider it as an error as the LastSyncTime is required.
if localSnapInfo.LocalSnapshotTime == 0 {
return nil, fmt.Errorf("empty local snapshot timestamp: %w", rbderrors.ErrLastSyncTimeNotFound)
}
return &localSnapInfo, nil
}
func (si *syncInfo) GetLastSyncTime() time.Time {
// converts localSnapshotTime of type int64 to time.Time
return time.Unix(si.LocalSnapshotTime, 0)
}
func (si *syncInfo) GetLastSyncBytes() int64 {
return si.LastSnapshotBytes
}
func (si *syncInfo) GetLastSyncDuration() *time.Duration {
var duration time.Duration
if si.LastSnapshotDuration == nil {
duration = time.Duration(0)
} else {
// time.Duration is in nanoseconds
duration = time.Duration(*si.LastSnapshotDuration) * time.Second
}
return &duration
}

169
internal/rbd/mirror_test.go Normal file
View File

@ -0,0 +1,169 @@
/*
Copyright 2025 The Ceph-CSI Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package rbd
import (
"context"
"strings"
"testing"
rbderrors "github.com/ceph/ceph-csi/internal/rbd/errors"
"github.com/ceph/ceph-csi/internal/rbd/types"
)
func TestValidateLastSyncInfo(t *testing.T) {
t.Parallel()
ctx := context.TODO()
duration := int64(56743)
zero := int64(0)
tests := []struct {
name string
description string
info types.SyncInfo
expectedErr string
}{
{
name: "valid description",
//nolint:lll // sample output cannot be split into multiple lines.
description: `replaying, {"bytes_per_second":0.0,"bytes_per_snapshot":81920.0,"last_snapshot_bytes":81920,"last_snapshot_sync_seconds":56743,"local_snapshot_timestamp":1684675261,"remote_snapshot_timestamp":1684675261,"replay_state":"idle"}`,
info: &syncInfo{
LocalSnapshotTime: 1684675261,
LastSnapshotDuration: &duration,
LastSnapshotBytes: 81920,
},
expectedErr: "",
},
{
name: "empty description",
description: "",
info: &syncInfo{
LastSnapshotDuration: nil,
LastSnapshotBytes: 0,
},
expectedErr: rbderrors.ErrLastSyncTimeNotFound.Error(),
},
{
name: "description without last_snapshot_bytes",
//nolint:lll // sample output cannot be split into multiple lines.
description: `replaying, {"bytes_per_second":0.0,"last_snapshot_sync_seconds":56743,"local_snapshot_timestamp":1684675261,"remote_snapshot_timestamp":1684675261,"replay_state":"idle"}`,
info: &syncInfo{
LastSnapshotDuration: &duration,
LocalSnapshotTime: 1684675261,
LastSnapshotBytes: 0,
},
expectedErr: "",
},
{
name: "description without local_snapshot_time",
//nolint:lll // sample output cannot be split into multiple lines.
description: `replaying, {"bytes_per_second":0.0,"bytes_per_snapshot":81920.0,"last_snapshot_bytes":81920,"last_snapshot_sync_seconds":56743,"remote_snapshot_timestamp":1684675261,"replay_state":"idle"}`,
info: &syncInfo{
LastSnapshotDuration: nil,
LastSnapshotBytes: 0,
},
expectedErr: rbderrors.ErrLastSyncTimeNotFound.Error(),
},
{
name: "description without last_snapshot_sync_seconds",
//nolint:lll // sample output cannot be split into multiple lines.
description: `replaying, {"bytes_per_second":0.0,"bytes_per_snapshot":81920.0,"last_snapshot_bytes":81920,"local_snapshot_timestamp":1684675261,"remote_snapshot_timestamp":1684675261,"replay_state":"idle"}`,
info: &syncInfo{
LastSnapshotDuration: nil,
LocalSnapshotTime: 1684675261,
LastSnapshotBytes: 81920,
},
expectedErr: "",
},
{
name: "description with last_snapshot_sync_seconds = 0",
//nolint:lll // sample output cannot be split into multiple lines.
description: `replaying, {"bytes_per_second":0.0,"bytes_per_snapshot":81920.0,"last_snapshot_sync_seconds":0,
"last_snapshot_bytes":81920,"local_snapshot_timestamp":1684675261,"remote_snapshot_timestamp":1684675261,"replay_state":"idle"}`,
info: &syncInfo{
LastSnapshotDuration: &zero,
LocalSnapshotTime: 1684675261,
LastSnapshotBytes: 81920,
},
expectedErr: "",
},
{
name: "description with invalid JSON",
//nolint:lll // sample output cannot be split into multiple lines.
description: `replaying,{"bytes_per_second":0.0,"last_snapshot_bytes":81920","bytes_per_snapshot":149504.0","remote_snapshot_timestamp":1662655501`,
info: &syncInfo{
LastSnapshotDuration: nil,
LastSnapshotBytes: 0,
},
expectedErr: "failed to unmarshal",
},
{
name: "description with no JSON",
description: `replaying`,
info: &syncInfo{
LastSnapshotDuration: nil,
LastSnapshotBytes: 0,
},
expectedErr: rbderrors.ErrLastSyncTimeNotFound.Error(),
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
t.Parallel()
teststruct, err := newSyncInfo(ctx, tt.description)
if err != nil && !strings.Contains(err.Error(), tt.expectedErr) {
// returned error
t.Errorf("newSyncInfo() returned error, expected: %v, got: %v",
tt.expectedErr, err)
}
if teststruct != nil {
if teststruct.GetLastSyncTime().Unix() != tt.info.GetLastSyncTime().Unix() {
t.Errorf("name: %v, got %v, expected %v",
tt.name,
teststruct.GetLastSyncTime().Unix(),
tt.info.GetLastSyncTime().Unix())
}
ttLastSyncDuration := tt.info.GetLastSyncDuration()
tsLastSyncDuration := teststruct.GetLastSyncDuration()
if ttLastSyncDuration == nil && tsLastSyncDuration != nil {
t.Errorf("name: %v, got %v, expected %v",
tt.name,
ttLastSyncDuration,
tsLastSyncDuration)
}
if ttLastSyncDuration != nil && tsLastSyncDuration != nil {
ttLastSyncDurationSecs := ttLastSyncDuration.Seconds()
tsLastSyncDurationSecs := tsLastSyncDuration.Seconds()
if ttLastSyncDurationSecs != tsLastSyncDurationSecs {
t.Errorf("name: %v, got %v, expected %v",
tt.name,
ttLastSyncDuration,
tsLastSyncDuration)
}
}
if teststruct.GetLastSyncBytes() != tt.info.GetLastSyncBytes() {
t.Errorf("name: %v, got %v, expected %v",
tt.name,
teststruct.GetLastSyncBytes(),
tt.info.GetLastSyncBytes())
}
}
})
}
}

View File

@ -92,4 +92,22 @@ type SiteStatus interface {
GetDescription() string
// GetLastUpdate returns the last update time
GetLastUpdate() time.Time
// GetLastSyncInfo returns the last SyncInfo details for the site
GetLastSyncInfo(ctx context.Context) (SyncInfo, error)
}
// SyncInfo is the interface for fetching more details about a SiteStatus object.
type SyncInfo interface {
// GetLastSyncDuration returns the last snapshot sync duration. In case
// there is no last snapshot sync seconds, it returns nil as the
// LastSyncDuration is optional.
GetLastSyncDuration() *time.Duration
// GetLastSyncTime returns the local snapshot timestamp.
GetLastSyncTime() time.Time
// GetLastSyncBytes returns the last snapshot bytes of the last sync. In
// case there is no last snapshot bytes returns 0 as the LastSyncBytes is
// optional.
GetLastSyncBytes() int64
}