rbd: Add ReplicationServer struct for replication operations

added ReplicationServer struct for the replication related
operation it also embed the ControllerServer which
already implements the helper functions like locking/unlocking etc.

removed getVolumeFromID and cleanup functions for better
code readability and easy maintaince.

Signed-off-by: Madhu Rajanna <madhupr007@gmail.com>
This commit is contained in:
Madhu Rajanna 2021-03-17 12:57:04 +05:30 committed by mergify[bot]
parent ce7f936551
commit aaf6b571b8
3 changed files with 153 additions and 72 deletions

View File

@ -28,7 +28,6 @@ import (
librbd "github.com/ceph/go-ceph/rbd" librbd "github.com/ceph/go-ceph/rbd"
"github.com/container-storage-interface/spec/lib/go/csi" "github.com/container-storage-interface/spec/lib/go/csi"
"github.com/kube-storage/spec/lib/go/replication"
"github.com/kubernetes-csi/csi-lib-utils/protosanitizer" "github.com/kubernetes-csi/csi-lib-utils/protosanitizer"
"google.golang.org/grpc/codes" "google.golang.org/grpc/codes"
"google.golang.org/grpc/status" "google.golang.org/grpc/status"
@ -42,11 +41,6 @@ const (
// controller server spec. // controller server spec.
type ControllerServer struct { type ControllerServer struct {
*csicommon.DefaultControllerServer *csicommon.DefaultControllerServer
// added UnimplementedControllerServer as a member of
// ControllerServer. if replication spec add more RPC services in the proto
// file, then we don't need to add all RPC methods leading to forward
// compatibility.
*replication.UnimplementedControllerServer
// A map storing all volumes with ongoing operations so that additional operations // A map storing all volumes with ongoing operations so that additional operations
// for that same volume (as defined by VolumeID/volume name) return an Aborted error // for that same volume (as defined by VolumeID/volume name) return an Aborted error
VolumeLocks *util.VolumeLocks VolumeLocks *util.VolumeLocks

View File

@ -37,6 +37,7 @@ type Driver struct {
ids *IdentityServer ids *IdentityServer
ns *NodeServer ns *NodeServer
cs *ControllerServer cs *ControllerServer
rs *ReplicationServer
} }
var ( var (
@ -81,6 +82,10 @@ func NewControllerServer(d *csicommon.CSIDriver) *ControllerServer {
} }
} }
func NewReplicationServer(c *ControllerServer) *ReplicationServer {
return &ReplicationServer{ControllerServer: c}
}
// NewNodeServer initialize a node server for rbd CSI driver. // NewNodeServer initialize a node server for rbd CSI driver.
func NewNodeServer(d *csicommon.CSIDriver, t string, topology map[string]string) (*NodeServer, error) { func NewNodeServer(d *csicommon.CSIDriver, t string, topology map[string]string) (*NodeServer, error) {
mounter := mount.New("") mounter := mount.New("")
@ -154,6 +159,7 @@ func (r *Driver) Run(conf *util.Config) {
if conf.IsControllerServer { if conf.IsControllerServer {
r.cs = NewControllerServer(r.cd) r.cs = NewControllerServer(r.cd)
r.rs = NewReplicationServer(r.cs)
} }
if !conf.IsControllerServer && !conf.IsNodeServer { if !conf.IsControllerServer && !conf.IsNodeServer {
topology, err = util.GetTopologyFromDomainLabels(conf.DomainLabels, conf.NodeID, conf.DriverName) topology, err = util.GetTopologyFromDomainLabels(conf.DomainLabels, conf.NodeID, conf.DriverName)
@ -172,8 +178,9 @@ func (r *Driver) Run(conf *util.Config) {
IS: r.ids, IS: r.ids,
CS: r.cs, CS: r.cs,
NS: r.ns, NS: r.ns,
// Register the replication controller to expose replication operations. // Register the replication controller to expose replication
RS: r.cs, // operations.
RS: r.rs,
} }
s.Start(conf.Endpoint, conf.HistogramOption, srv, conf.EnableGRPCMetrics) s.Start(conf.Endpoint, conf.HistogramOption, srv, conf.EnableGRPCMetrics)
if conf.EnableGRPCMetrics { if conf.EnableGRPCMetrics {

View File

@ -59,37 +59,16 @@ const (
forceKey = "force" forceKey = "force"
) )
// getVolumeFromID gets the rbd image details from the volumeID. // ReplicationServer struct of rbd CSI driver with supported methods of Replication
// TODO: move this to controllerserver.go and reuse it wherever its applicable. // controller server spec.
func (cs *ControllerServer) getVolumeFromID(ctx context.Context, volumeID string, secrets map[string]string) (*rbdVolume, *util.Credentials, error) { type ReplicationServer struct {
// validate the volume ID // added UnimplementedControllerServer as a member of
cr, err := util.NewUserCredentials(secrets) // ControllerServer. if replication spec add more RPC services in the proto
if err != nil { // file, then we don't need to add all RPC methods leading to forward
return nil, nil, status.Error(codes.Internal, err.Error()) // compatibility.
} *replication.UnimplementedControllerServer
// Embed ControllerServer as it implements helper functions
if volumeID == "" { *ControllerServer
return nil, cr, status.Error(codes.InvalidArgument, "empty volume ID in request")
}
if acquired := cs.VolumeLocks.TryAcquire(volumeID); !acquired {
util.ErrorLog(ctx, util.VolumeOperationAlreadyExistsFmt, volumeID)
return nil, cr, status.Errorf(codes.Aborted, util.VolumeOperationAlreadyExistsFmt, volumeID)
}
var rbdVol = &rbdVolume{}
rbdVol, err = genVolFromVolID(ctx, volumeID, cr, secrets)
if err != nil {
switch {
case errors.Is(err, ErrImageNotFound):
err = status.Errorf(codes.NotFound, "volume %s not found", volumeID)
case errors.Is(err, util.ErrPoolNotFound):
err = status.Errorf(codes.NotFound, "pool %s not found for %s", rbdVol.Pool, volumeID)
default:
err = status.Errorf(codes.Internal, err.Error())
}
}
return rbdVol, cr, err
} }
// getForceOption extracts the force option from the GRPC request parameters. // getForceOption extracts the force option from the GRPC request parameters.
@ -126,30 +105,39 @@ func getMirroringMode(ctx context.Context, parameters map[string]string) (librbd
return mirroringMode, nil return mirroringMode, nil
} }
// cleanup performs below resource cleanup operations.
func (cs *ControllerServer) cleanup(rbdVol *rbdVolume, cr *util.Credentials) {
if cr != nil {
// destroy the credential file
cr.DeleteCredentials()
}
if rbdVol != nil {
// release the volume lock
cs.VolumeLocks.Release(rbdVol.VolID)
// destroy the cluster connection
rbdVol.Destroy()
}
}
// EnableVolumeReplication extracts the RBD volume information from the // EnableVolumeReplication extracts the RBD volume information from the
// volumeID, If the image is present it will enable the mirroring based on the // volumeID, If the image is present it will enable the mirroring based on the
// user provided information. // user provided information.
// TODO: create new Replication controller struct for the replication operations. func (rs *ReplicationServer) EnableVolumeReplication(ctx context.Context,
func (cs *ControllerServer) EnableVolumeReplication(ctx context.Context,
req *replication.EnableVolumeReplicationRequest, req *replication.EnableVolumeReplicationRequest,
) (*replication.EnableVolumeReplicationResponse, error) { ) (*replication.EnableVolumeReplicationResponse, error) {
rbdVol, cr, err := cs.getVolumeFromID(ctx, req.GetVolumeId(), req.GetSecrets()) volumeID := req.GetVolumeId()
defer cs.cleanup(rbdVol, cr) if volumeID == "" {
return nil, status.Error(codes.InvalidArgument, "empty volume ID in request")
}
cr, err := util.NewUserCredentials(req.GetSecrets())
if err != nil { if err != nil {
return nil, status.Error(codes.Internal, err.Error())
}
defer cr.DeleteCredentials()
if acquired := rs.VolumeLocks.TryAcquire(volumeID); !acquired {
util.ErrorLog(ctx, util.VolumeOperationAlreadyExistsFmt, volumeID)
return nil, status.Errorf(codes.Aborted, util.VolumeOperationAlreadyExistsFmt, volumeID)
}
defer rs.VolumeLocks.Release(volumeID)
rbdVol, err := genVolFromVolID(ctx, volumeID, cr, req.GetSecrets())
defer rbdVol.Destroy()
if err != nil {
switch {
case errors.Is(err, ErrImageNotFound):
err = status.Errorf(codes.NotFound, "volume %s not found", volumeID)
case errors.Is(err, util.ErrPoolNotFound):
err = status.Errorf(codes.NotFound, "pool %s not found for %s", rbdVol.Pool, volumeID)
default:
err = status.Errorf(codes.Internal, err.Error())
}
return nil, err return nil, err
} }
// extract the mirroring mode // extract the mirroring mode
@ -177,15 +165,38 @@ func (cs *ControllerServer) EnableVolumeReplication(ctx context.Context,
// DisableVolumeReplication extracts the RBD volume information from the // DisableVolumeReplication extracts the RBD volume information from the
// volumeID, If the image is present and the mirroring is enabled on the RBD // volumeID, If the image is present and the mirroring is enabled on the RBD
// image it will disable the mirroring. // image it will disable the mirroring.
func (cs *ControllerServer) DisableVolumeReplication(ctx context.Context, func (rs *ReplicationServer) DisableVolumeReplication(ctx context.Context,
req *replication.DisableVolumeReplicationRequest, req *replication.DisableVolumeReplicationRequest,
) (*replication.DisableVolumeReplicationResponse, error) { ) (*replication.DisableVolumeReplicationResponse, error) {
rbdVol, cr, err := cs.getVolumeFromID(ctx, req.GetVolumeId(), req.GetSecrets()) volumeID := req.GetVolumeId()
defer cs.cleanup(rbdVol, cr) if volumeID == "" {
return nil, status.Error(codes.InvalidArgument, "empty volume ID in request")
}
cr, err := util.NewUserCredentials(req.GetSecrets())
if err != nil { if err != nil {
return nil, status.Error(codes.Internal, err.Error())
}
defer cr.DeleteCredentials()
if acquired := rs.VolumeLocks.TryAcquire(volumeID); !acquired {
util.ErrorLog(ctx, util.VolumeOperationAlreadyExistsFmt, volumeID)
return nil, status.Errorf(codes.Aborted, util.VolumeOperationAlreadyExistsFmt, volumeID)
}
defer rs.VolumeLocks.Release(volumeID)
rbdVol, err := genVolFromVolID(ctx, volumeID, cr, req.GetSecrets())
defer rbdVol.Destroy()
if err != nil {
switch {
case errors.Is(err, ErrImageNotFound):
err = status.Errorf(codes.NotFound, "volume %s not found", volumeID)
case errors.Is(err, util.ErrPoolNotFound):
err = status.Errorf(codes.NotFound, "pool %s not found for %s", rbdVol.Pool, volumeID)
default:
err = status.Errorf(codes.Internal, err.Error())
}
return nil, err return nil, err
} }
// extract the force option // extract the force option
force, err := getForceOption(ctx, req.GetParameters()) force, err := getForceOption(ctx, req.GetParameters())
if err != nil { if err != nil {
@ -224,15 +235,38 @@ func (cs *ControllerServer) DisableVolumeReplication(ctx context.Context,
// image is present, mirroring is enabled and the image is in demoted state it // image is present, mirroring is enabled and the image is in demoted state it
// will promote the volume as primary. // will promote the volume as primary.
// If the image is already primary it will return success. // If the image is already primary it will return success.
func (cs *ControllerServer) PromoteVolume(ctx context.Context, func (rs *ReplicationServer) PromoteVolume(ctx context.Context,
req *replication.PromoteVolumeRequest, req *replication.PromoteVolumeRequest,
) (*replication.PromoteVolumeResponse, error) { ) (*replication.PromoteVolumeResponse, error) {
rbdVol, cr, err := cs.getVolumeFromID(ctx, req.GetVolumeId(), req.GetSecrets()) volumeID := req.GetVolumeId()
defer cs.cleanup(rbdVol, cr) if volumeID == "" {
return nil, status.Error(codes.InvalidArgument, "empty volume ID in request")
}
cr, err := util.NewUserCredentials(req.GetSecrets())
if err != nil { if err != nil {
return nil, status.Error(codes.Internal, err.Error())
}
defer cr.DeleteCredentials()
if acquired := rs.VolumeLocks.TryAcquire(volumeID); !acquired {
util.ErrorLog(ctx, util.VolumeOperationAlreadyExistsFmt, volumeID)
return nil, status.Errorf(codes.Aborted, util.VolumeOperationAlreadyExistsFmt, volumeID)
}
defer rs.VolumeLocks.Release(volumeID)
rbdVol, err := genVolFromVolID(ctx, volumeID, cr, req.GetSecrets())
defer rbdVol.Destroy()
if err != nil {
switch {
case errors.Is(err, ErrImageNotFound):
err = status.Errorf(codes.NotFound, "volume %s not found", volumeID)
case errors.Is(err, util.ErrPoolNotFound):
err = status.Errorf(codes.NotFound, "pool %s not found for %s", rbdVol.Pool, volumeID)
default:
err = status.Errorf(codes.Internal, err.Error())
}
return nil, err return nil, err
} }
// extract the force option // extract the force option
force, err := getForceOption(ctx, req.GetParameters()) force, err := getForceOption(ctx, req.GetParameters())
if err != nil { if err != nil {
@ -265,15 +299,38 @@ func (cs *ControllerServer) PromoteVolume(ctx context.Context,
// volumeID, If the image is present, mirroring is enabled and the // volumeID, If the image is present, mirroring is enabled and the
// image is in promoted state it will demote the volume as secondary. // image is in promoted state it will demote the volume as secondary.
// If the image is already secondary it will return success. // If the image is already secondary it will return success.
func (cs *ControllerServer) DemoteVolume(ctx context.Context, func (rs *ReplicationServer) DemoteVolume(ctx context.Context,
req *replication.DemoteVolumeRequest, req *replication.DemoteVolumeRequest,
) (*replication.DemoteVolumeResponse, error) { ) (*replication.DemoteVolumeResponse, error) {
rbdVol, cr, err := cs.getVolumeFromID(ctx, req.GetVolumeId(), req.GetSecrets()) volumeID := req.GetVolumeId()
defer cs.cleanup(rbdVol, cr) if volumeID == "" {
return nil, status.Error(codes.InvalidArgument, "empty volume ID in request")
}
cr, err := util.NewUserCredentials(req.GetSecrets())
if err != nil { if err != nil {
return nil, status.Error(codes.Internal, err.Error())
}
defer cr.DeleteCredentials()
if acquired := rs.VolumeLocks.TryAcquire(volumeID); !acquired {
util.ErrorLog(ctx, util.VolumeOperationAlreadyExistsFmt, volumeID)
return nil, status.Errorf(codes.Aborted, util.VolumeOperationAlreadyExistsFmt, volumeID)
}
defer rs.VolumeLocks.Release(volumeID)
rbdVol, err := genVolFromVolID(ctx, volumeID, cr, req.GetSecrets())
defer rbdVol.Destroy()
if err != nil {
switch {
case errors.Is(err, ErrImageNotFound):
err = status.Errorf(codes.NotFound, "volume %s not found", volumeID)
case errors.Is(err, util.ErrPoolNotFound):
err = status.Errorf(codes.NotFound, "pool %s not found for %s", rbdVol.Pool, volumeID)
default:
err = status.Errorf(codes.Internal, err.Error())
}
return nil, err return nil, err
} }
mirroringInfo, err := rbdVol.getImageMirroringInfo() mirroringInfo, err := rbdVol.getImageMirroringInfo()
if err != nil { if err != nil {
util.ErrorLog(ctx, err.Error()) util.ErrorLog(ctx, err.Error())
@ -298,12 +355,35 @@ func (cs *ControllerServer) DemoteVolume(ctx context.Context,
// ResyncVolume extracts the RBD volume information from the volumeID, If the // ResyncVolume extracts the RBD volume information from the volumeID, If the
// image is present, mirroring is enabled and the image is in demoted state. // image is present, mirroring is enabled and the image is in demoted state.
// If yes it will resync the image to correct the split-brain. // If yes it will resync the image to correct the split-brain.
func (cs *ControllerServer) ResyncVolume(ctx context.Context, func (rs *ReplicationServer) ResyncVolume(ctx context.Context,
req *replication.ResyncVolumeRequest, req *replication.ResyncVolumeRequest,
) (*replication.ResyncVolumeResponse, error) { ) (*replication.ResyncVolumeResponse, error) {
rbdVol, cr, err := cs.getVolumeFromID(ctx, req.GetVolumeId(), req.GetSecrets()) volumeID := req.GetVolumeId()
defer cs.cleanup(rbdVol, cr) if volumeID == "" {
return nil, status.Error(codes.InvalidArgument, "empty volume ID in request")
}
cr, err := util.NewUserCredentials(req.GetSecrets())
if err != nil { if err != nil {
return nil, status.Error(codes.Internal, err.Error())
}
defer cr.DeleteCredentials()
if acquired := rs.VolumeLocks.TryAcquire(volumeID); !acquired {
util.ErrorLog(ctx, util.VolumeOperationAlreadyExistsFmt, volumeID)
return nil, status.Errorf(codes.Aborted, util.VolumeOperationAlreadyExistsFmt, volumeID)
}
defer rs.VolumeLocks.Release(volumeID)
rbdVol, err := genVolFromVolID(ctx, volumeID, cr, req.GetSecrets())
defer rbdVol.Destroy()
if err != nil {
switch {
case errors.Is(err, ErrImageNotFound):
err = status.Errorf(codes.NotFound, "volume %s not found", volumeID)
case errors.Is(err, util.ErrPoolNotFound):
err = status.Errorf(codes.NotFound, "pool %s not found for %s", rbdVol.Pool, volumeID)
default:
err = status.Errorf(codes.Internal, err.Error())
}
return nil, err return nil, err
} }