csiaddons: read volumeId from source

read the volumeID from replication
source if the ID is missing read
it from req VolumeId as a fallback.

Signed-off-by: Madhu Rajanna <madhupr007@gmail.com>
This commit is contained in:
Madhu Rajanna 2024-06-24 13:00:34 +02:00 committed by mergify[bot]
parent a160bf1359
commit c03152bcaf
3 changed files with 106 additions and 12 deletions

View File

@ -26,6 +26,7 @@ import (
"strings" "strings"
"time" "time"
csicommon "github.com/ceph/ceph-csi/internal/csi-common"
corerbd "github.com/ceph/ceph-csi/internal/rbd" corerbd "github.com/ceph/ceph-csi/internal/rbd"
"github.com/ceph/ceph-csi/internal/util" "github.com/ceph/ceph-csi/internal/util"
"github.com/ceph/ceph-csi/internal/util/log" "github.com/ceph/ceph-csi/internal/util/log"
@ -247,7 +248,7 @@ func validateSchedulingInterval(interval string) error {
func (rs *ReplicationServer) EnableVolumeReplication(ctx context.Context, func (rs *ReplicationServer) EnableVolumeReplication(ctx context.Context,
req *replication.EnableVolumeReplicationRequest, req *replication.EnableVolumeReplicationRequest,
) (*replication.EnableVolumeReplicationResponse, error) { ) (*replication.EnableVolumeReplicationResponse, error) {
volumeID := req.GetVolumeId() volumeID := csicommon.GetIDFromReplication(req)
if volumeID == "" { if volumeID == "" {
return nil, status.Error(codes.InvalidArgument, "empty volume ID in request") return nil, status.Error(codes.InvalidArgument, "empty volume ID in request")
} }
@ -329,7 +330,7 @@ func (rs *ReplicationServer) EnableVolumeReplication(ctx context.Context,
func (rs *ReplicationServer) DisableVolumeReplication(ctx context.Context, func (rs *ReplicationServer) DisableVolumeReplication(ctx context.Context,
req *replication.DisableVolumeReplicationRequest, req *replication.DisableVolumeReplicationRequest,
) (*replication.DisableVolumeReplicationResponse, error) { ) (*replication.DisableVolumeReplicationResponse, error) {
volumeID := req.GetVolumeId() volumeID := csicommon.GetIDFromReplication(req)
if volumeID == "" { if volumeID == "" {
return nil, status.Error(codes.InvalidArgument, "empty volume ID in request") return nil, status.Error(codes.InvalidArgument, "empty volume ID in request")
} }
@ -404,7 +405,7 @@ func (rs *ReplicationServer) DisableVolumeReplication(ctx context.Context,
func (rs *ReplicationServer) PromoteVolume(ctx context.Context, func (rs *ReplicationServer) PromoteVolume(ctx context.Context,
req *replication.PromoteVolumeRequest, req *replication.PromoteVolumeRequest,
) (*replication.PromoteVolumeResponse, error) { ) (*replication.PromoteVolumeResponse, error) {
volumeID := req.GetVolumeId() volumeID := csicommon.GetIDFromReplication(req)
if volumeID == "" { if volumeID == "" {
return nil, status.Error(codes.InvalidArgument, "empty volume ID in request") return nil, status.Error(codes.InvalidArgument, "empty volume ID in request")
} }
@ -504,7 +505,7 @@ func (rs *ReplicationServer) PromoteVolume(ctx context.Context,
func (rs *ReplicationServer) DemoteVolume(ctx context.Context, func (rs *ReplicationServer) DemoteVolume(ctx context.Context,
req *replication.DemoteVolumeRequest, req *replication.DemoteVolumeRequest,
) (*replication.DemoteVolumeResponse, error) { ) (*replication.DemoteVolumeResponse, error) {
volumeID := req.GetVolumeId() volumeID := csicommon.GetIDFromReplication(req)
if volumeID == "" { if volumeID == "" {
return nil, status.Error(codes.InvalidArgument, "empty volume ID in request") return nil, status.Error(codes.InvalidArgument, "empty volume ID in request")
} }
@ -622,7 +623,7 @@ func checkRemoteSiteStatus(ctx context.Context, mirrorStatus *librbd.GlobalMirro
func (rs *ReplicationServer) ResyncVolume(ctx context.Context, func (rs *ReplicationServer) ResyncVolume(ctx context.Context,
req *replication.ResyncVolumeRequest, req *replication.ResyncVolumeRequest,
) (*replication.ResyncVolumeResponse, error) { ) (*replication.ResyncVolumeResponse, error) {
volumeID := req.GetVolumeId() volumeID := csicommon.GetIDFromReplication(req)
if volumeID == "" { if volumeID == "" {
return nil, status.Error(codes.InvalidArgument, "empty volume ID in request") return nil, status.Error(codes.InvalidArgument, "empty volume ID in request")
} }
@ -836,7 +837,7 @@ func getGRPCError(err error) error {
func (rs *ReplicationServer) GetVolumeReplicationInfo(ctx context.Context, func (rs *ReplicationServer) GetVolumeReplicationInfo(ctx context.Context,
req *replication.GetVolumeReplicationInfoRequest, req *replication.GetVolumeReplicationInfoRequest,
) (*replication.GetVolumeReplicationInfoResponse, error) { ) (*replication.GetVolumeReplicationInfoResponse, error) {
volumeID := req.GetVolumeId() volumeID := csicommon.GetIDFromReplication(req)
if volumeID == "" { if volumeID == "" {
return nil, status.Error(codes.InvalidArgument, "empty volume ID in request") return nil, status.Error(codes.InvalidArgument, "empty volume ID in request")
} }

View File

@ -116,6 +116,43 @@ func NewMiddlewareServerOption() grpc.ServerOption {
return grpc.UnaryInterceptor(grpc_middleware.ChainUnaryServer(middleWare...)) return grpc.UnaryInterceptor(grpc_middleware.ChainUnaryServer(middleWare...))
} }
// GetIDFromReplication returns the volumeID for Replication.
func GetIDFromReplication(req interface{}) string {
getID := func(r interface {
GetVolumeId() string
GetReplicationSource() *replication.ReplicationSource
},
) string {
reqID := ""
src := r.GetReplicationSource()
if src != nil && src.GetVolume() != nil {
reqID = src.GetVolume().GetVolumeId()
}
if reqID == "" {
reqID = r.GetVolumeId() //nolint:nolintlint,staticcheck // req.VolumeId is deprecated
}
return reqID
}
switch r := req.(type) {
case *replication.EnableVolumeReplicationRequest:
return getID(r)
case *replication.DisableVolumeReplicationRequest:
return getID(r)
case *replication.PromoteVolumeRequest:
return getID(r)
case *replication.DemoteVolumeRequest:
return getID(r)
case *replication.ResyncVolumeRequest:
return getID(r)
case *replication.GetVolumeReplicationInfoRequest:
return getID(r)
default:
return ""
}
}
func getReqID(req interface{}) string { func getReqID(req interface{}) string {
// if req is nil empty string will be returned // if req is nil empty string will be returned
reqID := "" reqID := ""
@ -156,17 +193,17 @@ func getReqID(req interface{}) string {
// Replication // Replication
case *replication.EnableVolumeReplicationRequest: case *replication.EnableVolumeReplicationRequest:
reqID = r.GetVolumeId() reqID = GetIDFromReplication(r)
case *replication.DisableVolumeReplicationRequest: case *replication.DisableVolumeReplicationRequest:
reqID = r.GetVolumeId() reqID = GetIDFromReplication(r)
case *replication.PromoteVolumeRequest: case *replication.PromoteVolumeRequest:
reqID = r.GetVolumeId() reqID = GetIDFromReplication(r)
case *replication.DemoteVolumeRequest: case *replication.DemoteVolumeRequest:
reqID = r.GetVolumeId() reqID = GetIDFromReplication(r)
case *replication.ResyncVolumeRequest: case *replication.ResyncVolumeRequest:
reqID = r.GetVolumeId() reqID = GetIDFromReplication(r)
case *replication.GetVolumeReplicationInfoRequest: case *replication.GetVolumeReplicationInfoRequest:
reqID = r.GetVolumeId() reqID = GetIDFromReplication(r)
} }
return reqID return reqID

View File

@ -94,6 +94,62 @@ func TestGetReqID(t *testing.T) {
&replication.GetVolumeReplicationInfoRequest{ &replication.GetVolumeReplicationInfoRequest{
VolumeId: fakeID, VolumeId: fakeID,
}, },
// volumeId is set in ReplicationSource
&replication.EnableVolumeReplicationRequest{
ReplicationSource: &replication.ReplicationSource{
Type: &replication.ReplicationSource_Volume{
Volume: &replication.ReplicationSource_VolumeSource{
VolumeId: fakeID,
},
},
},
},
&replication.DisableVolumeReplicationRequest{
ReplicationSource: &replication.ReplicationSource{
Type: &replication.ReplicationSource_Volume{
Volume: &replication.ReplicationSource_VolumeSource{
VolumeId: fakeID,
},
},
},
},
&replication.PromoteVolumeRequest{
ReplicationSource: &replication.ReplicationSource{
Type: &replication.ReplicationSource_Volume{
Volume: &replication.ReplicationSource_VolumeSource{
VolumeId: fakeID,
},
},
},
},
&replication.DemoteVolumeRequest{
ReplicationSource: &replication.ReplicationSource{
Type: &replication.ReplicationSource_Volume{
Volume: &replication.ReplicationSource_VolumeSource{
VolumeId: fakeID,
},
},
},
},
&replication.ResyncVolumeRequest{
ReplicationSource: &replication.ReplicationSource{
Type: &replication.ReplicationSource_Volume{
Volume: &replication.ReplicationSource_VolumeSource{
VolumeId: fakeID,
},
},
},
},
&replication.GetVolumeReplicationInfoRequest{
ReplicationSource: &replication.ReplicationSource{
Type: &replication.ReplicationSource_Volume{
Volume: &replication.ReplicationSource_VolumeSource{
VolumeId: fakeID,
},
},
},
},
} }
for _, r := range req { for _, r := range req {
if got := getReqID(r); got != fakeID { if got := getReqID(r); got != fakeID {