mirror of
https://github.com/ceph/ceph-csi.git
synced 2025-01-17 10:19:30 +00:00
rbd: migration of replication service to csi-addon
this commit removes grpc import from replication.go and replaced it with usual errors and passed gRPC responses in csi-addons Signed-off-by: riya-singhal31 <rsinghal@redhat.com>
This commit is contained in:
parent
66785c3bba
commit
cdaa9264eb
@ -326,7 +326,12 @@ func (rs *ReplicationServer) DisableVolumeReplication(ctx context.Context,
|
||||
case librbd.MirrorImageDisabling:
|
||||
return nil, status.Errorf(codes.Aborted, "%s is in disabling state", volumeID)
|
||||
case librbd.MirrorImageEnabled:
|
||||
return corerbd.DisableVolumeReplication(rbdVol, mirroringInfo, force)
|
||||
err = rbdVol.DisableVolumeReplication(mirroringInfo, force)
|
||||
if err != nil {
|
||||
return nil, getGRPCError(err)
|
||||
}
|
||||
|
||||
return &replication.DisableVolumeReplicationResponse{}, nil
|
||||
default:
|
||||
return nil, status.Errorf(codes.InvalidArgument, "image is in %s Mode", mirroringInfo.State)
|
||||
}
|
||||
@ -627,9 +632,7 @@ func (rs *ReplicationServer) ResyncVolume(ctx context.Context,
|
||||
|
||||
err = rbdVol.ResyncVol(localStatus, req.Force)
|
||||
if err != nil {
|
||||
log.ErrorLog(ctx, err.Error())
|
||||
|
||||
return nil, err
|
||||
return nil, getGRPCError(err)
|
||||
}
|
||||
|
||||
err = checkVolumeResyncStatus(localStatus)
|
||||
@ -649,6 +652,32 @@ func (rs *ReplicationServer) ResyncVolume(ctx context.Context,
|
||||
return resp, nil
|
||||
}
|
||||
|
||||
func getGRPCError(err error) error {
|
||||
if err == nil {
|
||||
return status.Error(codes.OK, codes.OK.String())
|
||||
}
|
||||
|
||||
errorStatusMap := map[error]codes.Code{
|
||||
corerbd.ErrFetchingLocalState: codes.Internal,
|
||||
corerbd.ErrResyncImageFailed: codes.Internal,
|
||||
corerbd.ErrDisableImageMirroringFailed: codes.Internal,
|
||||
corerbd.ErrFetchingMirroringInfo: codes.Internal,
|
||||
corerbd.ErrInvalidArgument: codes.InvalidArgument,
|
||||
corerbd.ErrAborted: codes.Aborted,
|
||||
corerbd.ErrFailedPrecondition: codes.FailedPrecondition,
|
||||
corerbd.ErrUnavailable: codes.Unavailable,
|
||||
}
|
||||
|
||||
for e, code := range errorStatusMap {
|
||||
if errors.Is(err, e) {
|
||||
return status.Error(code, err.Error())
|
||||
}
|
||||
}
|
||||
|
||||
// Handle any other non nil error not listed in the map
|
||||
return status.Error(codes.Unknown, err.Error())
|
||||
}
|
||||
|
||||
// GetVolumeReplicationInfo extracts the RBD volume information from the volumeID, If the
|
||||
// image is present, mirroring is enabled and the image is in primary state.
|
||||
func (rs *ReplicationServer) GetVolumeReplicationInfo(ctx context.Context,
|
||||
|
@ -18,6 +18,7 @@ package rbd
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"reflect"
|
||||
"strings"
|
||||
"testing"
|
||||
@ -27,6 +28,9 @@ import (
|
||||
|
||||
librbd "github.com/ceph/go-ceph/rbd"
|
||||
"github.com/ceph/go-ceph/rbd/admin"
|
||||
"github.com/stretchr/testify/assert"
|
||||
"google.golang.org/grpc/codes"
|
||||
"google.golang.org/grpc/status"
|
||||
"google.golang.org/protobuf/types/known/timestamppb"
|
||||
)
|
||||
|
||||
@ -494,3 +498,72 @@ func TestValidateLastSyncTime(t *testing.T) {
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestGetGRPCError(t *testing.T) {
|
||||
t.Parallel()
|
||||
tests := []struct {
|
||||
name string
|
||||
err error
|
||||
expectedErr error
|
||||
}{
|
||||
{
|
||||
name: "FetchingLocalStateFailed",
|
||||
err: corerbd.ErrFetchingLocalState,
|
||||
expectedErr: status.Error(codes.Internal, corerbd.ErrFetchingLocalState.Error()),
|
||||
},
|
||||
{
|
||||
name: "ResyncImageFailed",
|
||||
err: corerbd.ErrResyncImageFailed,
|
||||
expectedErr: status.Error(codes.Internal, corerbd.ErrResyncImageFailed.Error()),
|
||||
},
|
||||
{
|
||||
name: "DisableImageMirroringFailed",
|
||||
err: corerbd.ErrDisableImageMirroringFailed,
|
||||
expectedErr: status.Error(codes.Internal, corerbd.ErrDisableImageMirroringFailed.Error()),
|
||||
},
|
||||
{
|
||||
name: "FetchingMirroringInfoFailed",
|
||||
err: corerbd.ErrFetchingMirroringInfo,
|
||||
expectedErr: status.Error(codes.Internal, corerbd.ErrFetchingMirroringInfo.Error()),
|
||||
},
|
||||
{
|
||||
name: "InvalidArgument",
|
||||
err: corerbd.ErrInvalidArgument,
|
||||
expectedErr: status.Error(codes.InvalidArgument, corerbd.ErrInvalidArgument.Error()),
|
||||
},
|
||||
{
|
||||
name: "Aborted",
|
||||
err: corerbd.ErrAborted,
|
||||
expectedErr: status.Error(codes.Aborted, corerbd.ErrAborted.Error()),
|
||||
},
|
||||
{
|
||||
name: "FailedPrecondition",
|
||||
err: corerbd.ErrFailedPrecondition,
|
||||
expectedErr: status.Error(codes.FailedPrecondition, corerbd.ErrFailedPrecondition.Error()),
|
||||
},
|
||||
{
|
||||
name: "Unavailable",
|
||||
err: corerbd.ErrUnavailable,
|
||||
expectedErr: status.Error(codes.Unavailable, corerbd.ErrUnavailable.Error()),
|
||||
},
|
||||
{
|
||||
name: "InvalidError",
|
||||
err: errors.New("some error"),
|
||||
expectedErr: status.Error(codes.Unknown, "some error"),
|
||||
},
|
||||
{
|
||||
name: "NilError",
|
||||
err: nil,
|
||||
expectedErr: status.Error(codes.OK, "ok string"),
|
||||
},
|
||||
}
|
||||
|
||||
for _, tt := range tests {
|
||||
tt := tt
|
||||
t.Run(tt.name, func(t *testing.T) {
|
||||
t.Parallel()
|
||||
result := getGRPCError(tt.err)
|
||||
assert.Equal(t, tt.expectedErr, result)
|
||||
})
|
||||
}
|
||||
}
|
||||
|
@ -45,4 +45,22 @@ var (
|
||||
// ErrLastSyncTimeNotFound is returned when last sync time is not found for
|
||||
// the image.
|
||||
ErrLastSyncTimeNotFound = errors.New("last sync time not found")
|
||||
// ErrFailedPrecondition is returned when operation is rejected because the system is not in a state
|
||||
// required for the operation's execution.
|
||||
ErrFailedPrecondition = errors.New("system is not in a state required for the operation's execution")
|
||||
// ErrUnavailable is returned when the image needs to be recreated
|
||||
// locally and may be corrected by retrying with a backoff.
|
||||
ErrUnavailable = errors.New("image needs to be recreated")
|
||||
// ErrAborted is returned when the operation is aborted.
|
||||
ErrAborted = errors.New("operation got aborted")
|
||||
// ErrInvalidArgument is returned when the client specified an invalid argument.
|
||||
ErrInvalidArgument = errors.New("invalid arguments provided")
|
||||
// ErrFetchingLocalState is returned when the operation to fetch local state fails.
|
||||
ErrFetchingLocalState = errors.New("failed to get local state")
|
||||
// ErrDisableImageMirroringFailed is returned when the operation to disable image mirroring fails.
|
||||
ErrDisableImageMirroringFailed = errors.New("failed to disable image mirroring")
|
||||
// ErrFetchingMirroringInfo is returned when the operation to fetch mirroring info of image fails.
|
||||
ErrFetchingMirroringInfo = errors.New("failed to get mirroring info of image")
|
||||
// ErrResyncImageFailed is returned when the operation to resync the image fails.
|
||||
ErrResyncImageFailed = errors.New("failed to resync image")
|
||||
)
|
||||
|
@ -18,12 +18,10 @@ package rbd
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"strings"
|
||||
|
||||
librbd "github.com/ceph/go-ceph/rbd"
|
||||
"github.com/csi-addons/spec/lib/go/replication"
|
||||
"google.golang.org/grpc/codes"
|
||||
"google.golang.org/grpc/status"
|
||||
)
|
||||
|
||||
func (rv *rbdVolume) ResyncVol(localStatus librbd.SiteMirrorImageStatus, force bool) error {
|
||||
@ -31,19 +29,18 @@ func (rv *rbdVolume) ResyncVol(localStatus librbd.SiteMirrorImageStatus, force b
|
||||
// If the force option is not set return the error message to retry
|
||||
// with Force option.
|
||||
if !force {
|
||||
return status.Errorf(codes.FailedPrecondition,
|
||||
"image is in %q state, description (%s). Force resync to recover volume",
|
||||
localStatus.State, localStatus.Description)
|
||||
return fmt.Errorf("%w: image is in %q state, description (%s). Force resync to recover volume",
|
||||
ErrFailedPrecondition, localStatus.State, localStatus.Description)
|
||||
}
|
||||
err := rv.resyncImage()
|
||||
if err != nil {
|
||||
return status.Error(codes.Internal, err.Error())
|
||||
return fmt.Errorf("%w: failed to resync image: %w", ErrResyncImageFailed, err)
|
||||
}
|
||||
|
||||
// If we issued a resync, return a non-final error as image needs to be recreated
|
||||
// locally. Caller retries till RBD syncs an initial version of the image to
|
||||
// report its status in the resync request.
|
||||
return status.Error(codes.Unavailable, "awaiting initial resync due to split brain")
|
||||
return fmt.Errorf("%w: awaiting initial resync due to split brain", ErrUnavailable)
|
||||
}
|
||||
|
||||
return nil
|
||||
@ -85,10 +82,10 @@ func resyncRequired(localStatus librbd.SiteMirrorImageStatus) bool {
|
||||
return false
|
||||
}
|
||||
|
||||
func DisableVolumeReplication(rbdVol *rbdVolume,
|
||||
func (rv *rbdVolume) DisableVolumeReplication(
|
||||
mirroringInfo *librbd.MirrorImageInfo,
|
||||
force bool,
|
||||
) (*replication.DisableVolumeReplicationResponse, error) {
|
||||
) error {
|
||||
if !mirroringInfo.Primary {
|
||||
// Return success if the below condition is met
|
||||
// Local image is secondary
|
||||
@ -102,32 +99,30 @@ func DisableVolumeReplication(rbdVol *rbdVolume,
|
||||
// disabled the image on all the remote (secondary) clusters will get
|
||||
// auto-deleted. This helps in garbage collecting the volume
|
||||
// replication Kubernetes artifacts after failback operation.
|
||||
localStatus, rErr := rbdVol.GetLocalState()
|
||||
localStatus, rErr := rv.GetLocalState()
|
||||
if rErr != nil {
|
||||
return nil, status.Error(codes.Internal, rErr.Error())
|
||||
return fmt.Errorf("%w: %w", ErrFetchingLocalState, rErr)
|
||||
}
|
||||
if localStatus.Up && localStatus.State == librbd.MirrorImageStatusStateReplaying {
|
||||
return &replication.DisableVolumeReplicationResponse{}, nil
|
||||
return nil
|
||||
}
|
||||
|
||||
return nil, status.Errorf(codes.InvalidArgument,
|
||||
"secondary image status is up=%t and state=%s",
|
||||
localStatus.Up,
|
||||
localStatus.State)
|
||||
return fmt.Errorf("%w: secondary image status is up=%t and state=%s",
|
||||
ErrInvalidArgument, localStatus.Up, localStatus.State)
|
||||
}
|
||||
err := rbdVol.DisableImageMirroring(force)
|
||||
err := rv.DisableImageMirroring(force)
|
||||
if err != nil {
|
||||
return nil, status.Error(codes.Internal, err.Error())
|
||||
return fmt.Errorf("%w: %w", ErrDisableImageMirroringFailed, err)
|
||||
}
|
||||
// the image state can be still disabling once we disable the mirroring
|
||||
// check the mirroring is disabled or not
|
||||
mirroringInfo, err = rbdVol.GetImageMirroringInfo()
|
||||
mirroringInfo, err = rv.GetImageMirroringInfo()
|
||||
if err != nil {
|
||||
return nil, status.Error(codes.Internal, err.Error())
|
||||
return fmt.Errorf("%w: %w", ErrFetchingMirroringInfo, err)
|
||||
}
|
||||
if mirroringInfo.State == librbd.MirrorImageDisabling {
|
||||
return nil, status.Errorf(codes.Aborted, "%s is in disabling state", rbdVol.VolID)
|
||||
return fmt.Errorf("%w: %q is in disabling state", ErrAborted, rv.VolID)
|
||||
}
|
||||
|
||||
return &replication.DisableVolumeReplicationResponse{}, nil
|
||||
return nil
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user