cleanup: introduce populateVolOptions(), to fill rbdVol from stage req

At present the nodeStageVolume() handle many logic of filling rbdvol
struct based on the request received and this method is complex to
follow. with this patch, filling or populating volOptions has been
segregrated and handled hence make the stage functions' job easy.

Signed-off-by: Humble Chirammal <hchiramm@redhat.com>
This commit is contained in:
Humble Chirammal 2021-09-02 13:50:26 +05:30 committed by mergify[bot]
parent f0b8a3f626
commit 3f31ca8a3a

View File

@ -149,28 +149,14 @@ func healerStageTransaction(ctx context.Context, cr *util.Credentials, volOps *r
return nil
}
// NodeStageVolume mounts the volume to a staging path on the node.
// Implementation notes:
// - stagingTargetPath is the directory passed in the request where the volume needs to be staged
// - We stage the volume into a directory, named after the VolumeID inside stagingTargetPath if
// it is a file system
// - We stage the volume into a file, named after the VolumeID inside stagingTargetPath if it is
// a block volume
// - Order of operation execution: (useful for defer stacking and when Unstaging to ensure steps
// are done in reverse, this is done in undoStagingTransaction)
// - Stash image metadata under staging path
// - Map the image (creates a device)
// - Create the staging file/directory under staging path
// - Stage the device (mount the device mapped for image)
// TODO: make this function less complex.
// nolint:gocyclo,cyclop // reduce complexity
func (ns *NodeServer) NodeStageVolume(
// populateRbdVol update the fields in rbdVolume struct based on the request it received.
func populateRbdVol(
ctx context.Context,
req *csi.NodeStageVolumeRequest) (*csi.NodeStageVolumeResponse, error) {
if err := util.ValidateNodeStageVolumeRequest(req); err != nil {
return nil, err
}
req *csi.NodeStageVolumeRequest,
cr *util.Credentials) (*rbdVolume, error) {
var err error
var j *journal.Connection
volID := req.GetVolumeId()
isBlock := req.GetVolumeCapability().GetBlock() != nil
disableInUseChecks := false
// MULTI_NODE_MULTI_WRITER is supported by default for Block access type volumes
@ -192,6 +178,77 @@ func (ns *NodeServer) NodeStageVolume(
disableInUseChecks = true
}
rv, err := genVolFromVolumeOptions(ctx, req.GetVolumeContext(), req.GetSecrets(), disableInUseChecks)
if err != nil {
return nil, status.Error(codes.Internal, err.Error())
}
rv.ThickProvision = isThickProvisionRequest(req.GetVolumeContext())
isStaticVol := isStaticVolume(req.GetVolumeContext())
// get rbd image name from the volume journal
// for static volumes, the image name is actually the volume ID itself
if isStaticVol {
rv.RbdImageName = volID
} else {
var vi util.CSIIdentifier
var imageAttributes *journal.ImageAttributes
err = vi.DecomposeCSIID(volID)
if err != nil {
err = fmt.Errorf("error decoding volume ID (%s): %w", volID, err)
return nil, status.Error(codes.Internal, err.Error())
}
j, err = volJournal.Connect(rv.Monitors, rv.RadosNamespace, cr)
if err != nil {
log.ErrorLog(ctx, "failed to establish cluster connection: %v", err)
return nil, status.Error(codes.Internal, err.Error())
}
defer j.Destroy()
imageAttributes, err = j.GetImageAttributes(
ctx, rv.Pool, vi.ObjectUUID, false)
if err != nil {
err = fmt.Errorf("error fetching image attributes for volume ID (%s): %w", volID, err)
return nil, status.Error(codes.Internal, err.Error())
}
rv.RbdImageName = imageAttributes.ImageName
}
rv.VolID = volID
rv.MapOptions = req.GetVolumeContext()["mapOptions"]
rv.UnmapOptions = req.GetVolumeContext()["unmapOptions"]
rv.Mounter = req.GetVolumeContext()["mounter"]
rv.LogDir = req.GetVolumeContext()["cephLogDir"]
if rv.LogDir == "" {
rv.LogDir = defaultLogDir
}
return rv, err
}
// NodeStageVolume mounts the volume to a staging path on the node.
// Implementation notes:
// - stagingTargetPath is the directory passed in the request where the volume needs to be staged
// - We stage the volume into a directory, named after the VolumeID inside stagingTargetPath if
// it is a file system
// - We stage the volume into a file, named after the VolumeID inside stagingTargetPath if it is
// a block volume
// - Order of operation execution: (useful for defer stacking and when Unstaging to ensure steps
// are done in reverse, this is done in undoStagingTransaction)
// - Stash image metadata under staging path
// - Map the image (creates a device)
// - Create the staging file/directory under staging path
// - Stage the device (mount the device mapped for image)
func (ns *NodeServer) NodeStageVolume(
ctx context.Context,
req *csi.NodeStageVolumeRequest) (*csi.NodeStageVolumeResponse, error) {
if err := util.ValidateNodeStageVolumeRequest(req); err != nil {
return nil, err
}
volID := req.GetVolumeId()
cr, err := util.NewUserCredentials(req.GetSecrets())
@ -232,64 +289,21 @@ func (ns *NodeServer) NodeStageVolume(
return nil, status.Error(codes.InvalidArgument, "missing required parameter imageFeatures")
}
volOptions, err := genVolFromVolumeOptions(ctx, req.GetVolumeContext(), req.GetSecrets(), disableInUseChecks)
rv, err := populateRbdVol(ctx, req, cr)
if err != nil {
return nil, status.Error(codes.Internal, err.Error())
return nil, err
}
volOptions.ThickProvision = isThickProvisionRequest(req.GetVolumeContext())
// get rbd image name from the volume journal
// for static volumes, the image name is actually the volume ID itself
if isStaticVol {
volOptions.RbdImageName = volID
} else {
var vi util.CSIIdentifier
var imageAttributes *journal.ImageAttributes
err = vi.DecomposeCSIID(volID)
if err != nil {
err = fmt.Errorf("error decoding volume ID (%s): %w", volID, err)
return nil, status.Error(codes.Internal, err.Error())
}
j, connErr := volJournal.Connect(volOptions.Monitors, volOptions.RadosNamespace, cr)
if connErr != nil {
log.ErrorLog(ctx, "failed to establish cluster connection: %v", connErr)
return nil, status.Error(codes.Internal, connErr.Error())
}
defer j.Destroy()
imageAttributes, err = j.GetImageAttributes(
ctx, volOptions.Pool, vi.ObjectUUID, false)
if err != nil {
err = fmt.Errorf("error fetching image attributes for volume ID (%s): %w", volID, err)
return nil, status.Error(codes.Internal, err.Error())
}
volOptions.RbdImageName = imageAttributes.ImageName
}
volOptions.VolID = volID
volOptions.MapOptions = req.GetVolumeContext()["mapOptions"]
volOptions.UnmapOptions = req.GetVolumeContext()["unmapOptions"]
volOptions.Mounter = req.GetVolumeContext()["mounter"]
volOptions.LogDir = req.GetVolumeContext()["cephLogDir"]
if volOptions.LogDir == "" {
volOptions.LogDir = defaultLogDir
}
err = volOptions.Connect(cr)
err = rv.Connect(cr)
if err != nil {
log.ErrorLog(ctx, "failed to connect to volume %s: %v", volOptions, err)
log.ErrorLog(ctx, "failed to connect to volume %s: %v", rv, err)
return nil, status.Error(codes.Internal, err.Error())
}
defer volOptions.Destroy()
defer rv.Destroy()
if isHealer {
err = healerStageTransaction(ctx, cr, volOptions, stagingParentPath)
err = healerStageTransaction(ctx, cr, rv, stagingParentPath)
if err != nil {
return nil, status.Error(codes.Internal, err.Error())
}
@ -300,19 +314,19 @@ func (ns *NodeServer) NodeStageVolume(
transaction := stageTransaction{}
// Stash image details prior to mapping the image (useful during Unstage as it has no
// voloptions passed to the RPC as per the CSI spec)
err = stashRBDImageMetadata(volOptions, stagingParentPath)
err = stashRBDImageMetadata(rv, stagingParentPath)
if err != nil {
return nil, status.Error(codes.Internal, err.Error())
}
defer func() {
if err != nil {
ns.undoStagingTransaction(ctx, req, transaction, volOptions)
ns.undoStagingTransaction(ctx, req, transaction, rv)
}
}()
// perform the actual staging and if this fails, have undoStagingTransaction
// cleans up for us
transaction, err = ns.stageTransaction(ctx, req, volOptions, isStaticVol)
transaction, err = ns.stageTransaction(ctx, req, rv, isStaticVol)
if err != nil {
return nil, status.Error(codes.Internal, err.Error())
}