cleanup: refactor helper functions in nodeserver.go

Refactored a couple of helper functions for easier resue.

* Code for building store.VolumeOptions is factored out into a separate function.

* Changed args of getCredentailsForVolume() and NodeServer.mount() so that
  instead of passing in whole csi.NodeStageVolumeRequest, only necessary
  properties are passed explicitly. This is to allow these functions to be
  called outside of NodeStageVolume() where NodeStageVolumeRequest is not
  available.

Signed-off-by: Robert Vasek <robert.vasek@cern.ch>
This commit is contained in:
Robert Vasek 2022-02-02 14:45:39 +01:00 committed by mergify[bot]
parent dec74a534f
commit aa6297e164

View File

@ -47,11 +47,10 @@ type NodeServer struct {
func getCredentialsForVolume( func getCredentialsForVolume(
volOptions *store.VolumeOptions, volOptions *store.VolumeOptions,
req *csi.NodeStageVolumeRequest) (*util.Credentials, error) { secrets map[string]string) (*util.Credentials, error) {
var ( var (
err error err error
cr *util.Credentials cr *util.Credentials
secrets = req.GetSecrets()
) )
if volOptions.ProvisionVolume { if volOptions.ProvisionVolume {
@ -64,7 +63,7 @@ func getCredentialsForVolume(
} else { } else {
// The volume is pre-made, credentials are in node stage secrets // The volume is pre-made, credentials are in node stage secrets
cr, err = util.NewUserCredentials(req.GetSecrets()) cr, err = util.NewUserCredentials(secrets)
if err != nil { if err != nil {
return nil, fmt.Errorf("failed to get user credentials from node stage secrets: %w", err) return nil, fmt.Errorf("failed to get user credentials from node stage secrets: %w", err)
} }
@ -73,11 +72,38 @@ func getCredentialsForVolume(
return cr, nil return cr, nil
} }
func (ns *NodeServer) getVolumeOptions(
ctx context.Context,
volID fsutil.VolumeID,
volContext,
volSecrets map[string]string,
) (*store.VolumeOptions, error) {
volOptions, _, err := store.NewVolumeOptionsFromVolID(ctx, string(volID), volContext, volSecrets)
if err != nil {
if !errors.Is(err, cerrors.ErrInvalidVolID) {
return nil, status.Error(codes.Internal, err.Error())
}
volOptions, _, err = store.NewVolumeOptionsFromStaticVolume(string(volID), volContext)
if err != nil {
if !errors.Is(err, cerrors.ErrNonStaticVolume) {
return nil, status.Error(codes.Internal, err.Error())
}
volOptions, _, err = store.NewVolumeOptionsFromMonitorList(string(volID), volContext, volSecrets)
if err != nil {
return nil, status.Error(codes.Internal, err.Error())
}
}
}
return volOptions, nil
}
// NodeStageVolume mounts the volume to a staging path on the node. // NodeStageVolume mounts the volume to a staging path on the node.
func (ns *NodeServer) NodeStageVolume( func (ns *NodeServer) NodeStageVolume(
ctx context.Context, ctx context.Context,
req *csi.NodeStageVolumeRequest) (*csi.NodeStageVolumeResponse, error) { req *csi.NodeStageVolumeRequest) (*csi.NodeStageVolumeResponse, error) {
var volOptions *store.VolumeOptions
if err := util.ValidateNodeStageVolumeRequest(req); err != nil { if err := util.ValidateNodeStageVolumeRequest(req); err != nil {
return nil, err return nil, err
} }
@ -94,29 +120,19 @@ func (ns *NodeServer) NodeStageVolume(
} }
defer ns.VolumeLocks.Release(req.GetVolumeId()) defer ns.VolumeLocks.Release(req.GetVolumeId())
volOptions, _, err := store.NewVolumeOptionsFromVolID(ctx, string(volID), req.GetVolumeContext(), req.GetSecrets()) volOptions, err := ns.getVolumeOptions(ctx, volID, req.GetVolumeContext(), req.GetSecrets())
if err != nil {
if !errors.Is(err, cerrors.ErrInvalidVolID) {
return nil, status.Error(codes.Internal, err.Error())
}
// gets mon IPs from the supplied cluster info
volOptions, _, err = store.NewVolumeOptionsFromStaticVolume(string(volID), req.GetVolumeContext())
if err != nil {
if !errors.Is(err, cerrors.ErrNonStaticVolume) {
return nil, status.Error(codes.Internal, err.Error())
}
// get mon IPs from the volume context
volOptions, _, err = store.NewVolumeOptionsFromMonitorList(string(volID), req.GetVolumeContext(),
req.GetSecrets())
if err != nil { if err != nil {
return nil, status.Error(codes.Internal, err.Error()) return nil, status.Error(codes.Internal, err.Error())
} }
}
}
defer volOptions.Destroy() defer volOptions.Destroy()
mnt, err := mounter.New(volOptions)
if err != nil {
log.ErrorLog(ctx, "failed to create mounter for volume %s: %v", volID, err)
return nil, status.Error(codes.Internal, err.Error())
}
// Check if the volume is already mounted // Check if the volume is already mounted
isMnt, err := util.IsMountPoint(stagingTargetPath) isMnt, err := util.IsMountPoint(stagingTargetPath)
@ -133,7 +149,16 @@ func (ns *NodeServer) NodeStageVolume(
} }
// It's not, mount now // It's not, mount now
if err = ns.mount(ctx, volOptions, req); err != nil {
if err = ns.mount(
ctx,
mnt,
volOptions,
fsutil.VolumeID(req.GetVolumeId()),
req.GetStagingTargetPath(),
req.GetSecrets(),
req.GetVolumeCapability(),
); err != nil {
return nil, err return nil, err
} }
@ -142,11 +167,16 @@ func (ns *NodeServer) NodeStageVolume(
return &csi.NodeStageVolumeResponse{}, nil return &csi.NodeStageVolumeResponse{}, nil
} }
func (*NodeServer) mount(ctx context.Context, volOptions *store.VolumeOptions, req *csi.NodeStageVolumeRequest) error { func (*NodeServer) mount(
stagingTargetPath := req.GetStagingTargetPath() ctx context.Context,
volID := fsutil.VolumeID(req.GetVolumeId()) mnt mounter.VolumeMounter,
volOptions *store.VolumeOptions,
cr, err := getCredentialsForVolume(volOptions, req) volID fsutil.VolumeID,
stagingTargetPath string,
secrets map[string]string,
volCap *csi.VolumeCapability,
) error {
cr, err := getCredentialsForVolume(volOptions, secrets)
if err != nil { if err != nil {
log.ErrorLog(ctx, "failed to get ceph credentials for volume %s: %v", volID, err) log.ErrorLog(ctx, "failed to get ceph credentials for volume %s: %v", volID, err)
@ -154,20 +184,13 @@ func (*NodeServer) mount(ctx context.Context, volOptions *store.VolumeOptions, r
} }
defer cr.DeleteCredentials() defer cr.DeleteCredentials()
m, err := mounter.New(volOptions) log.DebugLog(ctx, "cephfs: mounting volume %s with %s", volID, mnt.Name())
if err != nil {
log.ErrorLog(ctx, "failed to create mounter for volume %s: %v", volID, err)
return status.Error(codes.Internal, err.Error())
}
log.DebugLog(ctx, "cephfs: mounting volume %s with %s", volID, m.Name())
readOnly := "ro" readOnly := "ro"
if req.VolumeCapability.AccessMode.Mode == csi.VolumeCapability_AccessMode_MULTI_NODE_READER_ONLY || if volCap.AccessMode.Mode == csi.VolumeCapability_AccessMode_MULTI_NODE_READER_ONLY ||
req.VolumeCapability.AccessMode.Mode == csi.VolumeCapability_AccessMode_SINGLE_NODE_READER_ONLY { volCap.AccessMode.Mode == csi.VolumeCapability_AccessMode_SINGLE_NODE_READER_ONLY {
switch m.(type) { switch mnt.(type) {
case *mounter.FuseMounter: case *mounter.FuseMounter:
if !csicommon.MountOptionContains(strings.Split(volOptions.FuseMountOptions, ","), readOnly) { if !csicommon.MountOptionContains(strings.Split(volOptions.FuseMountOptions, ","), readOnly) {
volOptions.FuseMountOptions = util.MountOptionsAdd(volOptions.FuseMountOptions, readOnly) volOptions.FuseMountOptions = util.MountOptionsAdd(volOptions.FuseMountOptions, readOnly)
@ -179,7 +202,7 @@ func (*NodeServer) mount(ctx context.Context, volOptions *store.VolumeOptions, r
} }
} }
if err = m.Mount(ctx, stagingTargetPath, cr, volOptions); err != nil { if err = mnt.Mount(ctx, stagingTargetPath, cr, volOptions); err != nil {
log.ErrorLog(ctx, log.ErrorLog(ctx,
"failed to mount volume %s: %v Check dmesg logs if required.", "failed to mount volume %s: %v Check dmesg logs if required.",
volID, volID,
@ -201,8 +224,9 @@ func (ns *NodeServer) NodePublishVolume(
return nil, err return nil, err
} }
stagingTargetPath := req.GetStagingTargetPath()
targetPath := req.GetTargetPath() targetPath := req.GetTargetPath()
volID := req.GetVolumeId() volID := fsutil.VolumeID(req.GetVolumeId())
// Considering kubelet make sure the stage and publish operations // Considering kubelet make sure the stage and publish operations
// are serialized, we dont need any extra locking in nodePublish // are serialized, we dont need any extra locking in nodePublish
@ -238,8 +262,8 @@ func (ns *NodeServer) NodePublishVolume(
if err = mounter.BindMount( if err = mounter.BindMount(
ctx, ctx,
req.GetStagingTargetPath(), stagingTargetPath,
req.GetTargetPath(), targetPath,
req.GetReadonly(), req.GetReadonly(),
mountOptions); err != nil { mountOptions); err != nil {
log.ErrorLog(ctx, "failed to bind-mount volume %s: %v", volID, err) log.ErrorLog(ctx, "failed to bind-mount volume %s: %v", volID, err)