diff --git a/internal/csi-addons/rbd/replication.go b/internal/csi-addons/rbd/replication.go index ad7d8e948..58c1d80cd 100644 --- a/internal/csi-addons/rbd/replication.go +++ b/internal/csi-addons/rbd/replication.go @@ -18,7 +18,6 @@ package rbd import ( "context" - "encoding/json" "errors" "fmt" "regexp" @@ -891,11 +890,8 @@ func (rs *ReplicationServer) GetVolumeReplicationInfo(ctx context.Context, return nil, status.Errorf(codes.Internal, "failed to get remote status: %v", err) } - description := remoteStatus.GetDescription() - resp, err := getLastSyncInfo(ctx, description) + lastSyncInfo, err := remoteStatus.GetLastSyncInfo(ctx) if err != nil { - log.ErrorLog(ctx, "failed to parse last sync info from %q: %v", description, err) - if errors.Is(err, rbderrors.ErrLastSyncTimeNotFound) { return nil, status.Errorf(codes.NotFound, "failed to get last sync info: %v", err) } @@ -903,73 +899,18 @@ func (rs *ReplicationServer) GetVolumeReplicationInfo(ctx context.Context, return nil, status.Errorf(codes.Internal, "failed to get last sync info: %v", err) } - return resp, nil -} - -// This function gets the local snapshot time, last sync snapshot seconds -// and last sync bytes from the description of localStatus and convert -// it into required types. -func getLastSyncInfo(ctx context.Context, description string) (*replication.GetVolumeReplicationInfoResponse, error) { - // Format of the description will be as followed: - // description = `replaying, {"bytes_per_second":0.0,"bytes_per_snapshot":81920.0, - // "last_snapshot_bytes":81920,"last_snapshot_sync_seconds":0, - // "local_snapshot_timestamp":1684675261, - // "remote_snapshot_timestamp":1684675261,"replay_state":"idle"}` - // In case there is no last snapshot bytes returns 0 as the - // LastSyncBytes is optional. - // In case there is no last snapshot sync seconds, it returns nil as the - // LastSyncDuration is optional. - // In case there is no local snapshot timestamp return an error as the - // LastSyncTime is required. - - var response replication.GetVolumeReplicationInfoResponse - - if description == "" { - return nil, fmt.Errorf("empty description: %w", rbderrors.ErrLastSyncTimeNotFound) - } - log.DebugLog(ctx, "description: %s", description) - splittedString := strings.SplitN(description, ",", 2) - if len(splittedString) == 1 { - return nil, fmt.Errorf("no snapshot details: %w", rbderrors.ErrLastSyncTimeNotFound) - } - type localStatus struct { - LocalSnapshotTime int64 `json:"local_snapshot_timestamp"` - LastSnapshotBytes int64 `json:"last_snapshot_bytes"` - LastSnapshotDuration *int64 `json:"last_snapshot_sync_seconds"` + resp := replication.GetVolumeReplicationInfoResponse{ + LastSyncTime: timestamppb.New(lastSyncInfo.GetLastSyncTime()), + LastSyncBytes: lastSyncInfo.GetLastSyncBytes(), } - var localSnapInfo localStatus - err := json.Unmarshal([]byte(splittedString[1]), &localSnapInfo) - if err != nil { - return nil, fmt.Errorf("failed to unmarshal local snapshot info: %w", err) + // lastDuration is optional, can be nil + lastDuration := lastSyncInfo.GetLastSyncDuration() + if lastDuration != nil { + resp.LastSyncDuration = durationpb.New(*lastDuration) } - // If the json unmarsal is successful but the local snapshot time is 0, we - // need to consider it as an error as the LastSyncTime is required. - if localSnapInfo.LocalSnapshotTime == 0 { - return nil, fmt.Errorf("empty local snapshot timestamp: %w", rbderrors.ErrLastSyncTimeNotFound) - } - if localSnapInfo.LastSnapshotDuration != nil { - // converts localSnapshotDuration of type int64 to string format with - // appended `s` seconds required for time.ParseDuration - lastDurationTime := fmt.Sprintf("%ds", *localSnapInfo.LastSnapshotDuration) - // parse Duration from the lastDurationTime string - lastDuration, err := time.ParseDuration(lastDurationTime) - if err != nil { - return nil, fmt.Errorf("failed to parse last snapshot duration: %w", err) - } - // converts time.Duration to *durationpb.Duration - response.LastSyncDuration = durationpb.New(lastDuration) - } - - // converts localSnapshotTime of type int64 to time.Time - lastUpdateTime := time.Unix(localSnapInfo.LocalSnapshotTime, 0) - lastSyncTime := timestamppb.New(lastUpdateTime) - - response.LastSyncTime = lastSyncTime - response.LastSyncBytes = localSnapInfo.LastSnapshotBytes - - return &response, nil + return &resp, nil } func checkVolumeResyncStatus(ctx context.Context, localStatus types.SiteStatus) error { @@ -977,14 +918,12 @@ func checkVolumeResyncStatus(ctx context.Context, localStatus types.SiteStatus) // started or not, if we dont see local_snapshot_timestamp in the // description of localStatus, we are returning error. if we see the local // snapshot timestamp in the description we return resyncing started. - description := localStatus.GetDescription() - resp, err := getLastSyncInfo(ctx, description) + // + // Note: without a local_snapshot_timestamp, GetLastSyncInfo() returns an error. + _, err := localStatus.GetLastSyncInfo(ctx) if err != nil { return fmt.Errorf("failed to get last sync info: %w", err) } - if resp.GetLastSyncTime() == nil { - return errors.New("last sync time is nil") - } return nil } diff --git a/internal/csi-addons/rbd/replication_test.go b/internal/csi-addons/rbd/replication_test.go index 3a105dbf4..ab378d2c5 100644 --- a/internal/csi-addons/rbd/replication_test.go +++ b/internal/csi-addons/rbd/replication_test.go @@ -20,8 +20,6 @@ import ( "context" "errors" "reflect" - "strconv" - "strings" "testing" "time" @@ -32,12 +30,9 @@ import ( librbd "github.com/ceph/go-ceph/rbd" "github.com/ceph/go-ceph/rbd/admin" - "github.com/csi-addons/spec/lib/go/replication" "github.com/stretchr/testify/require" "google.golang.org/grpc/codes" "google.golang.org/grpc/status" - "google.golang.org/protobuf/types/known/durationpb" - "google.golang.org/protobuf/types/known/timestamppb" ) func TestValidateSchedulingInterval(t *testing.T) { @@ -418,147 +413,6 @@ func TestCheckRemoteSiteStatus(t *testing.T) { } } -func TestValidateLastSyncInfo(t *testing.T) { - t.Parallel() - ctx := context.TODO() - duration, err := time.ParseDuration(strconv.Itoa(int(56743)) + "s") - if err != nil { - t.Errorf("failed to parse duration)") - } - - tests := []struct { - name string - description string - info *replication.GetVolumeReplicationInfoResponse - expectedErr string - }{ - { - name: "valid description", - //nolint:lll // sample output cannot be split into multiple lines. - description: `replaying, {"bytes_per_second":0.0,"bytes_per_snapshot":81920.0,"last_snapshot_bytes":81920,"last_snapshot_sync_seconds":56743,"local_snapshot_timestamp":1684675261,"remote_snapshot_timestamp":1684675261,"replay_state":"idle"}`, - info: &replication.GetVolumeReplicationInfoResponse{ - LastSyncTime: timestamppb.New(time.Unix(1684675261, 0)), - LastSyncDuration: durationpb.New(duration), - LastSyncBytes: 81920, - }, - expectedErr: "", - }, - { - name: "empty description", - description: "", - info: &replication.GetVolumeReplicationInfoResponse{ - LastSyncTime: nil, - LastSyncDuration: nil, - LastSyncBytes: 0, - }, - expectedErr: rbderrors.ErrLastSyncTimeNotFound.Error(), - }, - { - name: "description without last_snapshot_bytes", - //nolint:lll // sample output cannot be split into multiple lines. - description: `replaying, {"bytes_per_second":0.0,"last_snapshot_sync_seconds":56743,"local_snapshot_timestamp":1684675261,"remote_snapshot_timestamp":1684675261,"replay_state":"idle"}`, - info: &replication.GetVolumeReplicationInfoResponse{ - LastSyncDuration: durationpb.New(duration), - LastSyncTime: timestamppb.New(time.Unix(1684675261, 0)), - LastSyncBytes: 0, - }, - expectedErr: "", - }, - { - name: "description without local_snapshot_time", - //nolint:lll // sample output cannot be split into multiple lines. - description: `replaying, {"bytes_per_second":0.0,"bytes_per_snapshot":81920.0,"last_snapshot_bytes":81920,"last_snapshot_sync_seconds":56743,"remote_snapshot_timestamp":1684675261,"replay_state":"idle"}`, - info: &replication.GetVolumeReplicationInfoResponse{ - LastSyncDuration: nil, - LastSyncTime: nil, - LastSyncBytes: 0, - }, - expectedErr: rbderrors.ErrLastSyncTimeNotFound.Error(), - }, - { - name: "description without last_snapshot_sync_seconds", - //nolint:lll // sample output cannot be split into multiple lines. - description: `replaying, {"bytes_per_second":0.0,"bytes_per_snapshot":81920.0,"last_snapshot_bytes":81920,"local_snapshot_timestamp":1684675261,"remote_snapshot_timestamp":1684675261,"replay_state":"idle"}`, - info: &replication.GetVolumeReplicationInfoResponse{ - LastSyncDuration: nil, - LastSyncTime: timestamppb.New(time.Unix(1684675261, 0)), - LastSyncBytes: 81920, - }, - expectedErr: "", - }, - { - name: "description with last_snapshot_sync_seconds = 0", - //nolint:lll // sample output cannot be split into multiple lines. - description: `replaying, {"bytes_per_second":0.0,"bytes_per_snapshot":81920.0,"last_snapshot_sync_seconds":0, - "last_snapshot_bytes":81920,"local_snapshot_timestamp":1684675261,"remote_snapshot_timestamp":1684675261,"replay_state":"idle"}`, - info: &replication.GetVolumeReplicationInfoResponse{ - LastSyncDuration: durationpb.New(time.Duration(0)), - LastSyncTime: timestamppb.New(time.Unix(1684675261, 0)), - LastSyncBytes: 81920, - }, - expectedErr: "", - }, - { - name: "description with invalid JSON", - //nolint:lll // sample output cannot be split into multiple lines. - description: `replaying,{"bytes_per_second":0.0,"last_snapshot_bytes":81920","bytes_per_snapshot":149504.0","remote_snapshot_timestamp":1662655501`, - info: &replication.GetVolumeReplicationInfoResponse{ - LastSyncDuration: nil, - LastSyncTime: nil, - LastSyncBytes: 0, - }, - expectedErr: "failed to unmarshal", - }, - { - name: "description with no JSON", - description: `replaying`, - info: &replication.GetVolumeReplicationInfoResponse{ - LastSyncDuration: nil, - LastSyncTime: nil, - LastSyncBytes: 0, - }, - expectedErr: rbderrors.ErrLastSyncTimeNotFound.Error(), - }, - } - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - t.Parallel() - teststruct, err := getLastSyncInfo(ctx, tt.description) - if err != nil && !strings.Contains(err.Error(), tt.expectedErr) { - // returned error - t.Errorf("getLastSyncInfo() returned error, expected: %v, got: %v", - tt.expectedErr, err) - } - if teststruct != nil { - if teststruct.GetLastSyncTime().GetSeconds() != tt.info.GetLastSyncTime().GetSeconds() { - t.Errorf("name: %v, getLastSyncInfo() %v, expected %v", - tt.name, - teststruct.GetLastSyncTime(), - tt.info.GetLastSyncTime()) - } - if tt.info.GetLastSyncDuration() == nil && teststruct.GetLastSyncDuration() != nil { - t.Errorf("name: %v, getLastSyncInfo() %v, expected %v", - tt.name, - teststruct.GetLastSyncDuration(), - tt.info.GetLastSyncDuration()) - } - if teststruct.GetLastSyncDuration().GetSeconds() != tt.info.GetLastSyncDuration().GetSeconds() { - t.Errorf("name: %v, getLastSyncInfo() %v, expected %v", - tt.name, - teststruct.GetLastSyncDuration(), - tt.info.GetLastSyncDuration()) - } - if teststruct.GetLastSyncBytes() != tt.info.GetLastSyncBytes() { - t.Errorf("name: %v, getLastSyncInfo() %v, expected %v", - tt.name, - teststruct.GetLastSyncBytes(), - tt.info.GetLastSyncBytes()) - } - } - }) - } -} - func TestGetGRPCError(t *testing.T) { t.Parallel() tests := []struct { diff --git a/internal/rbd/mirror.go b/internal/rbd/mirror.go index 67f79a1a5..16780c491 100644 --- a/internal/rbd/mirror.go +++ b/internal/rbd/mirror.go @@ -17,7 +17,9 @@ package rbd import ( "context" + "encoding/json" "fmt" + "strings" "time" rbderrors "github.com/ceph/ceph-csi/internal/rbd/errors" @@ -327,3 +329,75 @@ func (status SiteMirrorImageStatus) GetLastUpdate() time.Time { // convert the last update time to UTC return time.Unix(status.LastUpdate, 0).UTC() } + +func (status SiteMirrorImageStatus) GetLastSyncInfo(ctx context.Context) (types.SyncInfo, error) { + return newSyncInfo(ctx, status.Description) +} + +type syncInfo struct { + LocalSnapshotTime int64 `json:"local_snapshot_timestamp"` + LastSnapshotBytes int64 `json:"last_snapshot_bytes"` + LastSnapshotDuration *int64 `json:"last_snapshot_sync_seconds"` +} + +// Type assertion for ensuring an implementation of the full SyncInfo interface. +var _ types.SyncInfo = &syncInfo{} + +func newSyncInfo(ctx context.Context, description string) (types.SyncInfo, error) { + // Format of the description will be as followed: + // description = `replaying, {"bytes_per_second":0.0,"bytes_per_snapshot":81920.0, + // "last_snapshot_bytes":81920,"last_snapshot_sync_seconds":0, + // "local_snapshot_timestamp":1684675261, + // "remote_snapshot_timestamp":1684675261,"replay_state":"idle"}` + // In case there is no last snapshot bytes returns 0 as the + // LastSyncBytes is optional. + // In case there is no last snapshot sync seconds, it returns nil as the + // LastSyncDuration is optional. + // In case there is no local snapshot timestamp return an error as the + // LastSyncTime is required. + + if description == "" { + return nil, fmt.Errorf("empty description: %w", rbderrors.ErrLastSyncTimeNotFound) + } + log.DebugLog(ctx, "description: %s", description) + splittedString := strings.SplitN(description, ",", 2) + if len(splittedString) == 1 { + return nil, fmt.Errorf("no snapshot details: %w", rbderrors.ErrLastSyncTimeNotFound) + } + + var localSnapInfo syncInfo + err := json.Unmarshal([]byte(splittedString[1]), &localSnapInfo) + if err != nil { + return nil, fmt.Errorf("failed to unmarshal description %q into syncInfo: %w", description, err) + } + + // If the json unmarsal is successful but the local snapshot time is 0, we + // need to consider it as an error as the LastSyncTime is required. + if localSnapInfo.LocalSnapshotTime == 0 { + return nil, fmt.Errorf("empty local snapshot timestamp: %w", rbderrors.ErrLastSyncTimeNotFound) + } + + return &localSnapInfo, nil +} + +func (si *syncInfo) GetLastSyncTime() time.Time { + // converts localSnapshotTime of type int64 to time.Time + return time.Unix(si.LocalSnapshotTime, 0) +} + +func (si *syncInfo) GetLastSyncBytes() int64 { + return si.LastSnapshotBytes +} + +func (si *syncInfo) GetLastSyncDuration() *time.Duration { + var duration time.Duration + + if si.LastSnapshotDuration == nil { + duration = time.Duration(0) + } else { + // time.Duration is in nanoseconds + duration = time.Duration(*si.LastSnapshotDuration) * time.Second + } + + return &duration +} diff --git a/internal/rbd/mirror_test.go b/internal/rbd/mirror_test.go new file mode 100644 index 000000000..7dc78e8f0 --- /dev/null +++ b/internal/rbd/mirror_test.go @@ -0,0 +1,169 @@ +/* +Copyright 2025 The Ceph-CSI Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package rbd + +import ( + "context" + "strings" + "testing" + + rbderrors "github.com/ceph/ceph-csi/internal/rbd/errors" + "github.com/ceph/ceph-csi/internal/rbd/types" +) + +func TestValidateLastSyncInfo(t *testing.T) { + t.Parallel() + ctx := context.TODO() + duration := int64(56743) + zero := int64(0) + + tests := []struct { + name string + description string + info types.SyncInfo + expectedErr string + }{ + { + name: "valid description", + //nolint:lll // sample output cannot be split into multiple lines. + description: `replaying, {"bytes_per_second":0.0,"bytes_per_snapshot":81920.0,"last_snapshot_bytes":81920,"last_snapshot_sync_seconds":56743,"local_snapshot_timestamp":1684675261,"remote_snapshot_timestamp":1684675261,"replay_state":"idle"}`, + info: &syncInfo{ + LocalSnapshotTime: 1684675261, + LastSnapshotDuration: &duration, + LastSnapshotBytes: 81920, + }, + expectedErr: "", + }, + { + name: "empty description", + description: "", + info: &syncInfo{ + LastSnapshotDuration: nil, + LastSnapshotBytes: 0, + }, + expectedErr: rbderrors.ErrLastSyncTimeNotFound.Error(), + }, + { + name: "description without last_snapshot_bytes", + //nolint:lll // sample output cannot be split into multiple lines. + description: `replaying, {"bytes_per_second":0.0,"last_snapshot_sync_seconds":56743,"local_snapshot_timestamp":1684675261,"remote_snapshot_timestamp":1684675261,"replay_state":"idle"}`, + info: &syncInfo{ + LastSnapshotDuration: &duration, + LocalSnapshotTime: 1684675261, + LastSnapshotBytes: 0, + }, + expectedErr: "", + }, + { + name: "description without local_snapshot_time", + //nolint:lll // sample output cannot be split into multiple lines. + description: `replaying, {"bytes_per_second":0.0,"bytes_per_snapshot":81920.0,"last_snapshot_bytes":81920,"last_snapshot_sync_seconds":56743,"remote_snapshot_timestamp":1684675261,"replay_state":"idle"}`, + info: &syncInfo{ + LastSnapshotDuration: nil, + LastSnapshotBytes: 0, + }, + expectedErr: rbderrors.ErrLastSyncTimeNotFound.Error(), + }, + { + name: "description without last_snapshot_sync_seconds", + //nolint:lll // sample output cannot be split into multiple lines. + description: `replaying, {"bytes_per_second":0.0,"bytes_per_snapshot":81920.0,"last_snapshot_bytes":81920,"local_snapshot_timestamp":1684675261,"remote_snapshot_timestamp":1684675261,"replay_state":"idle"}`, + info: &syncInfo{ + LastSnapshotDuration: nil, + LocalSnapshotTime: 1684675261, + LastSnapshotBytes: 81920, + }, + expectedErr: "", + }, + { + name: "description with last_snapshot_sync_seconds = 0", + //nolint:lll // sample output cannot be split into multiple lines. + description: `replaying, {"bytes_per_second":0.0,"bytes_per_snapshot":81920.0,"last_snapshot_sync_seconds":0, + "last_snapshot_bytes":81920,"local_snapshot_timestamp":1684675261,"remote_snapshot_timestamp":1684675261,"replay_state":"idle"}`, + info: &syncInfo{ + LastSnapshotDuration: &zero, + LocalSnapshotTime: 1684675261, + LastSnapshotBytes: 81920, + }, + expectedErr: "", + }, + { + name: "description with invalid JSON", + //nolint:lll // sample output cannot be split into multiple lines. + description: `replaying,{"bytes_per_second":0.0,"last_snapshot_bytes":81920","bytes_per_snapshot":149504.0","remote_snapshot_timestamp":1662655501`, + info: &syncInfo{ + LastSnapshotDuration: nil, + LastSnapshotBytes: 0, + }, + expectedErr: "failed to unmarshal", + }, + { + name: "description with no JSON", + description: `replaying`, + info: &syncInfo{ + LastSnapshotDuration: nil, + LastSnapshotBytes: 0, + }, + expectedErr: rbderrors.ErrLastSyncTimeNotFound.Error(), + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + t.Parallel() + teststruct, err := newSyncInfo(ctx, tt.description) + if err != nil && !strings.Contains(err.Error(), tt.expectedErr) { + // returned error + t.Errorf("newSyncInfo() returned error, expected: %v, got: %v", + tt.expectedErr, err) + } + if teststruct != nil { + if teststruct.GetLastSyncTime().Unix() != tt.info.GetLastSyncTime().Unix() { + t.Errorf("name: %v, got %v, expected %v", + tt.name, + teststruct.GetLastSyncTime().Unix(), + tt.info.GetLastSyncTime().Unix()) + } + + ttLastSyncDuration := tt.info.GetLastSyncDuration() + tsLastSyncDuration := teststruct.GetLastSyncDuration() + if ttLastSyncDuration == nil && tsLastSyncDuration != nil { + t.Errorf("name: %v, got %v, expected %v", + tt.name, + ttLastSyncDuration, + tsLastSyncDuration) + } + if ttLastSyncDuration != nil && tsLastSyncDuration != nil { + ttLastSyncDurationSecs := ttLastSyncDuration.Seconds() + tsLastSyncDurationSecs := tsLastSyncDuration.Seconds() + if ttLastSyncDurationSecs != tsLastSyncDurationSecs { + t.Errorf("name: %v, got %v, expected %v", + tt.name, + ttLastSyncDuration, + tsLastSyncDuration) + } + } + + if teststruct.GetLastSyncBytes() != tt.info.GetLastSyncBytes() { + t.Errorf("name: %v, got %v, expected %v", + tt.name, + teststruct.GetLastSyncBytes(), + tt.info.GetLastSyncBytes()) + } + } + }) + } +} diff --git a/internal/rbd/types/mirror.go b/internal/rbd/types/mirror.go index 12c0bffdf..1d844ea0f 100644 --- a/internal/rbd/types/mirror.go +++ b/internal/rbd/types/mirror.go @@ -92,4 +92,22 @@ type SiteStatus interface { GetDescription() string // GetLastUpdate returns the last update time GetLastUpdate() time.Time + // GetLastSyncInfo returns the last SyncInfo details for the site + GetLastSyncInfo(ctx context.Context) (SyncInfo, error) +} + +// SyncInfo is the interface for fetching more details about a SiteStatus object. +type SyncInfo interface { + // GetLastSyncDuration returns the last snapshot sync duration. In case + // there is no last snapshot sync seconds, it returns nil as the + // LastSyncDuration is optional. + GetLastSyncDuration() *time.Duration + + // GetLastSyncTime returns the local snapshot timestamp. + GetLastSyncTime() time.Time + + // GetLastSyncBytes returns the last snapshot bytes of the last sync. In + // case there is no last snapshot bytes returns 0 as the LastSyncBytes is + // optional. + GetLastSyncBytes() int64 }