mirror of
https://github.com/ceph/ceph-csi.git
synced 2025-06-13 02:33:34 +00:00
Implement NodeStage and NodeUnstage for rbd
in NodeStage RPC call we have to map the device to the node plugin and make sure the the device will be mounted to the global path in nodeUnstage request unmount the device from global path and unmap the device if the volume mode is block we will be creating a file inside a stageTargetPath and it will be considered as the global path Signed-off-by: Madhu Rajanna <madhupr007@gmail.com>
This commit is contained in:
committed by
mergify[bot]
parent
7d8f465746
commit
f4c80dec9a
@ -43,7 +43,7 @@ func initVolumeMountCache(driverName, mountCacheDir string) {
|
||||
}
|
||||
|
||||
func remountCachedVolumes() error {
|
||||
if err := os.MkdirAll(volumeMountCache.nodeCacheStore.BasePath, 0755); err != nil {
|
||||
if err := util.CreateMountPoint(volumeMountCache.nodeCacheStore.BasePath); err != nil {
|
||||
klog.Errorf("mount-cache: failed to create %s: %v", volumeMountCache.nodeCacheStore.BasePath, err)
|
||||
return err
|
||||
}
|
||||
@ -124,7 +124,7 @@ func mountOneCacheEntry(volOptions *volumeOptions, vid *volumeIdentifier, me *vo
|
||||
return err
|
||||
}
|
||||
|
||||
isMnt, err := isMountPoint(me.StagingPath)
|
||||
isMnt, err := util.IsMountPoint(me.StagingPath)
|
||||
if err != nil {
|
||||
isMnt = false
|
||||
klog.Infof("mount-cache: failed to check volume mounted %s: %s %v", volID, me.StagingPath, err)
|
||||
|
@ -84,8 +84,8 @@ func (ns *NodeServer) NodeStageVolume(ctx context.Context, req *csi.NodeStageVol
|
||||
volOptions *volumeOptions
|
||||
vid *volumeIdentifier
|
||||
)
|
||||
if err := validateNodeStageVolumeRequest(req); err != nil {
|
||||
return nil, status.Error(codes.InvalidArgument, err.Error())
|
||||
if err := util.ValidateNodeStageVolumeRequest(req); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// Configuration
|
||||
@ -115,7 +115,7 @@ func (ns *NodeServer) NodeStageVolume(ctx context.Context, req *csi.NodeStageVol
|
||||
}
|
||||
}
|
||||
|
||||
if err = createMountPoint(stagingTargetPath); err != nil {
|
||||
if err = util.CreateMountPoint(stagingTargetPath); err != nil {
|
||||
klog.Errorf("failed to create staging mount point at %s for volume %s: %v", stagingTargetPath, volID, err)
|
||||
return nil, status.Error(codes.Internal, err.Error())
|
||||
}
|
||||
@ -125,7 +125,7 @@ func (ns *NodeServer) NodeStageVolume(ctx context.Context, req *csi.NodeStageVol
|
||||
|
||||
// Check if the volume is already mounted
|
||||
|
||||
isMnt, err := isMountPoint(stagingTargetPath)
|
||||
isMnt, err := util.IsMountPoint(stagingTargetPath)
|
||||
|
||||
if err != nil {
|
||||
klog.Errorf("stat failed: %v", err)
|
||||
@ -180,8 +180,8 @@ func (*NodeServer) mount(volOptions *volumeOptions, req *csi.NodeStageVolumeRequ
|
||||
func (ns *NodeServer) NodePublishVolume(ctx context.Context, req *csi.NodePublishVolumeRequest) (*csi.NodePublishVolumeResponse, error) {
|
||||
|
||||
mountOptions := []string{"bind"}
|
||||
if err := validateNodePublishVolumeRequest(req); err != nil {
|
||||
return nil, status.Error(codes.InvalidArgument, err.Error())
|
||||
if err := util.ValidateNodePublishVolumeRequest(req); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// Configuration
|
||||
@ -189,7 +189,7 @@ func (ns *NodeServer) NodePublishVolume(ctx context.Context, req *csi.NodePublis
|
||||
targetPath := req.GetTargetPath()
|
||||
volID := req.GetVolumeId()
|
||||
|
||||
if err := createMountPoint(targetPath); err != nil {
|
||||
if err := util.CreateMountPoint(targetPath); err != nil {
|
||||
klog.Errorf("failed to create mount point at %s: %v", targetPath, err)
|
||||
return nil, status.Error(codes.Internal, err.Error())
|
||||
}
|
||||
@ -218,7 +218,7 @@ func (ns *NodeServer) NodePublishVolume(ctx context.Context, req *csi.NodePublis
|
||||
|
||||
// Check if the volume is already mounted
|
||||
|
||||
isMnt, err := isMountPoint(targetPath)
|
||||
isMnt, err := util.IsMountPoint(targetPath)
|
||||
|
||||
if err != nil {
|
||||
klog.Errorf("stat failed: %v", err)
|
||||
@ -255,8 +255,8 @@ func (ns *NodeServer) NodePublishVolume(ctx context.Context, req *csi.NodePublis
|
||||
// NodeUnpublishVolume unmounts the volume from the target path
|
||||
func (ns *NodeServer) NodeUnpublishVolume(ctx context.Context, req *csi.NodeUnpublishVolumeRequest) (*csi.NodeUnpublishVolumeResponse, error) {
|
||||
var err error
|
||||
if err = validateNodeUnpublishVolumeRequest(req); err != nil {
|
||||
return nil, status.Error(codes.InvalidArgument, err.Error())
|
||||
if err = util.ValidateNodeUnpublishVolumeRequest(req); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
targetPath := req.GetTargetPath()
|
||||
@ -283,8 +283,8 @@ func (ns *NodeServer) NodeUnpublishVolume(ctx context.Context, req *csi.NodeUnpu
|
||||
// NodeUnstageVolume unstages the volume from the staging path
|
||||
func (ns *NodeServer) NodeUnstageVolume(ctx context.Context, req *csi.NodeUnstageVolumeRequest) (*csi.NodeUnstageVolumeResponse, error) {
|
||||
var err error
|
||||
if err = validateNodeUnstageVolumeRequest(req); err != nil {
|
||||
return nil, status.Error(codes.InvalidArgument, err.Error())
|
||||
if err = util.ValidateNodeUnstageVolumeRequest(req); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
stagingTargetPath := req.GetStagingTargetPath()
|
||||
|
@ -19,7 +19,6 @@ package cephfs
|
||||
import (
|
||||
"bytes"
|
||||
"encoding/json"
|
||||
"errors"
|
||||
"fmt"
|
||||
"os"
|
||||
"os/exec"
|
||||
@ -30,7 +29,6 @@ import (
|
||||
|
||||
"github.com/ceph/ceph-csi/pkg/util"
|
||||
"github.com/container-storage-interface/spec/lib/go/csi"
|
||||
"k8s.io/kubernetes/pkg/util/mount"
|
||||
)
|
||||
|
||||
type volumeID string
|
||||
@ -79,18 +77,6 @@ func execCommandJSON(v interface{}, program string, args ...string) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
// Used in isMountPoint()
|
||||
var dummyMount = mount.New("")
|
||||
|
||||
func isMountPoint(p string) (bool, error) {
|
||||
notMnt, err := dummyMount.IsLikelyNotMountPoint(p)
|
||||
if err != nil {
|
||||
return false, status.Error(codes.Internal, err.Error())
|
||||
}
|
||||
|
||||
return !notMnt, nil
|
||||
}
|
||||
|
||||
func pathExists(p string) bool {
|
||||
_, err := os.Stat(p)
|
||||
return err == nil
|
||||
@ -127,68 +113,3 @@ func (cs *ControllerServer) validateDeleteVolumeRequest() error {
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// Node service request validation
|
||||
func validateNodeStageVolumeRequest(req *csi.NodeStageVolumeRequest) error {
|
||||
if req.GetVolumeCapability() == nil {
|
||||
return errors.New("volume capability missing in request")
|
||||
}
|
||||
|
||||
if req.GetVolumeId() == "" {
|
||||
return errors.New("volume ID missing in request")
|
||||
}
|
||||
|
||||
if req.GetStagingTargetPath() == "" {
|
||||
return errors.New("staging target path missing in request")
|
||||
}
|
||||
|
||||
if req.GetSecrets() == nil || len(req.GetSecrets()) == 0 {
|
||||
return errors.New("stage secrets cannot be nil or empty")
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func validateNodeUnstageVolumeRequest(req *csi.NodeUnstageVolumeRequest) error {
|
||||
if req.GetVolumeId() == "" {
|
||||
return errors.New("volume ID missing in request")
|
||||
}
|
||||
|
||||
if req.GetStagingTargetPath() == "" {
|
||||
return errors.New("staging target path missing in request")
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func validateNodePublishVolumeRequest(req *csi.NodePublishVolumeRequest) error {
|
||||
if req.GetVolumeCapability() == nil {
|
||||
return errors.New("volume capability missing in request")
|
||||
}
|
||||
|
||||
if req.GetVolumeId() == "" {
|
||||
return errors.New("volume ID missing in request")
|
||||
}
|
||||
|
||||
if req.GetTargetPath() == "" {
|
||||
return errors.New("target path missing in request")
|
||||
}
|
||||
|
||||
if req.GetStagingTargetPath() == "" {
|
||||
return errors.New("staging target path missing in request")
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func validateNodeUnpublishVolumeRequest(req *csi.NodeUnpublishVolumeRequest) error {
|
||||
if req.GetVolumeId() == "" {
|
||||
return errors.New("volume ID missing in request")
|
||||
}
|
||||
|
||||
if req.GetTargetPath() == "" {
|
||||
return errors.New("target path missing in request")
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
@ -132,7 +132,7 @@ func mountCephRoot(volID volumeID, volOptions *volumeOptions, adminCr *util.Cred
|
||||
// Access to cephfs's / is required
|
||||
volOptions.RootPath = "/"
|
||||
|
||||
if err := createMountPoint(cephRoot); err != nil {
|
||||
if err := util.CreateMountPoint(cephRoot); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
|
@ -155,7 +155,7 @@ func mountFuse(mountPoint string, cr *util.Credentials, volOptions *volumeOption
|
||||
}
|
||||
|
||||
func (m *fuseMounter) mount(mountPoint string, cr *util.Credentials, volOptions *volumeOptions) error {
|
||||
if err := createMountPoint(mountPoint); err != nil {
|
||||
if err := util.CreateMountPoint(mountPoint); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
@ -186,7 +186,7 @@ func mountKernel(mountPoint string, cr *util.Credentials, volOptions *volumeOpti
|
||||
}
|
||||
|
||||
func (m *kernelMounter) mount(mountPoint string, cr *util.Credentials, volOptions *volumeOptions) error {
|
||||
if err := createMountPoint(mountPoint); err != nil {
|
||||
if err := util.CreateMountPoint(mountPoint); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
@ -236,7 +236,3 @@ func unmountVolume(mountPoint string) error {
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func createMountPoint(root string) error {
|
||||
return os.MkdirAll(root, 0750)
|
||||
}
|
||||
|
Reference in New Issue
Block a user