From 32ea550e3ac3aba39c02fb3df850552c31f019e3 Mon Sep 17 00:00:00 2001 From: Poornima G Date: Sat, 8 Jun 2019 05:06:03 +0000 Subject: [PATCH] Modify CephFs provisioner to use the ceph mgr commands Currently CephFs provisioner mounts the ceph filesystem and creates a subdirectory as a part of provisioning the volume. Ceph now supports commands to provision fs subvolumes, hance modify the provisioner to use ceph mgr commands to (de)provision fs subvolumes. Signed-off-by: Poornima G --- e2e/deploy-rook.go | 6 +- pkg/cephfs/cephuser.go | 6 +- pkg/cephfs/mountcache.go | 7 +- pkg/cephfs/util.go | 6 -- pkg/cephfs/volume.go | 198 ++++++++++++++++-------------------- pkg/cephfs/volumeoptions.go | 9 +- pkg/util/cephcmds.go | 9 +- scripts/travis-functest.sh | 2 +- 8 files changed, 114 insertions(+), 129 deletions(-) diff --git a/e2e/deploy-rook.go b/e2e/deploy-rook.go index 02a69968f..297c0ebec 100644 --- a/e2e/deploy-rook.go +++ b/e2e/deploy-rook.go @@ -69,8 +69,6 @@ func deployOperator(c kubernetes.Interface) { _, err := framework.RunKubectl("create", "-f", opPath) Expect(err).Should(BeNil()) - err = waitForDaemonSets("rook-ceph-agent", rookNS, c, deployTimeout) - Expect(err).Should(BeNil()) err = waitForDaemonSets("rook-discover", rookNS, c, deployTimeout) Expect(err).Should(BeNil()) err = waitForDeploymentComplete("rook-ceph-operator", rookNS, c, deployTimeout) @@ -80,10 +78,12 @@ func deployOperator(c kubernetes.Interface) { func deployCluster(c kubernetes.Interface) { opPath := fmt.Sprintf("%s/%s", rookURL, "cluster-test.yaml") framework.RunKubectlOrDie("create", "-f", opPath) + err := waitForDaemonSets("rook-ceph-agent", rookNS, c, deployTimeout) + Expect(err).Should(BeNil()) opt := &metav1.ListOptions{ LabelSelector: "app=rook-ceph-mon", } - err := checkCephPods(rookNS, c, 1, deployTimeout, opt) + err = checkCephPods(rookNS, c, 1, deployTimeout, opt) Expect(err).Should(BeNil()) } diff --git a/pkg/cephfs/cephuser.go b/pkg/cephfs/cephuser.go index 914391bb4..3424aed82 100644 --- a/pkg/cephfs/cephuser.go +++ b/pkg/cephfs/cephuser.go @@ -82,6 +82,10 @@ func getCephUser(volOptions *volumeOptions, adminCr *util.Credentials, volID vol func createCephUser(volOptions *volumeOptions, adminCr *util.Credentials, volID volumeID) (*cephEntity, error) { adminID, userID := genUserIDs(adminCr, volID) + volRootPath, err := getVolumeRootPathCeph(volOptions, adminCr, volID) + if err != nil { + return nil, err + } return getSingleCephEntity( "-m", volOptions.Monitors, @@ -91,7 +95,7 @@ func createCephUser(volOptions *volumeOptions, adminCr *util.Credentials, volID "-f", "json", "auth", "get-or-create", userID, // User capabilities - "mds", fmt.Sprintf("allow rw path=%s", getVolumeRootPathCeph(volID)), + "mds", fmt.Sprintf("allow rw path=%s", volRootPath), "mon", "allow r", "osd", fmt.Sprintf("allow rw pool=%s namespace=%s", volOptions.Pool, getVolumeNamespace(volID)), ) diff --git a/pkg/cephfs/mountcache.go b/pkg/cephfs/mountcache.go index 23b7e3856..962ccd12e 100644 --- a/pkg/cephfs/mountcache.go +++ b/pkg/cephfs/mountcache.go @@ -95,11 +95,16 @@ func mountOneCacheEntry(volOptions *volumeOptions, vid *volumeIdentifier, me *vo volID := vid.VolumeID if volOptions.ProvisionVolume { - volOptions.RootPath = getVolumeRootPathCeph(volumeID(vid.FsSubvolName)) cr, err = util.GetAdminCredentials(decodeCredentials(me.Secrets)) if err != nil { return err } + + volOptions.RootPath, err = getVolumeRootPathCeph(volOptions, cr, volumeID(vid.FsSubvolName)) + if err != nil { + return err + } + var entity *cephEntity entity, err = getCephUser(volOptions, cr, volumeID(vid.FsSubvolName)) if err != nil { diff --git a/pkg/cephfs/util.go b/pkg/cephfs/util.go index 52d9c9bb5..7cd07db7d 100644 --- a/pkg/cephfs/util.go +++ b/pkg/cephfs/util.go @@ -21,7 +21,6 @@ import ( "encoding/json" "errors" "fmt" - "os" "os/exec" "google.golang.org/grpc/codes" @@ -91,11 +90,6 @@ func isMountPoint(p string) (bool, error) { return !notMnt, nil } -func pathExists(p string) bool { - _, err := os.Stat(p) - return err == nil -} - // Controller service request validation func (cs *ControllerServer) validateCreateVolumeRequest(req *csi.CreateVolumeRequest) error { if err := cs.Driver.ValidateControllerServiceRequest(csi.ControllerServiceCapability_RPC_CREATE_DELETE_VOLUME); err != nil { diff --git a/pkg/cephfs/volume.go b/pkg/cephfs/volume.go index 6f73a037d..e93d8d7ec 100644 --- a/pkg/cephfs/volume.go +++ b/pkg/cephfs/volume.go @@ -17,9 +17,8 @@ limitations under the License. package cephfs import ( - "fmt" - "os" - "path" + "strconv" + "strings" "github.com/ceph/ceph-csi/pkg/util" @@ -27,132 +26,109 @@ import ( ) const ( - cephVolumesRoot = "csi-volumes" - - namespacePrefix = "ns-" + namespacePrefix = "fsvolumens_" + csiSubvolumeGroup = "csi" ) -func getCephRootPathLocal(volID volumeID) string { - return fmt.Sprintf("%s/controller/volumes/root-%s", PluginFolder, string(volID)) -} +var ( + // cephfsInit is used to create "csi" subvolume group for the first time the csi plugin loads. + // Subvolume group create gets called every time the plugin loads, though it doesn't result in error + // its unnecessary + cephfsInit = false +) -func getCephRootVolumePathLocal(volID volumeID) string { - return path.Join(getCephRootPathLocal(volID), cephVolumesRoot, string(volID)) -} +func getVolumeRootPathCeph(volOptions *volumeOptions, cr *util.Credentials, volID volumeID) (string, error) { + stdout, _, err := util.ExecCommand( + "ceph", + "fs", + "subvolume", + "getpath", + volOptions.FsName, + string(volID), + "--group_name", + csiSubvolumeGroup, + "-m", volOptions.Monitors, + "-c", util.CephConfigPath, + "-n", cephEntityClientPrefix+cr.ID, + "--key="+cr.Key) -func getVolumeRootPathCeph(volID volumeID) string { - return path.Join("/", cephVolumesRoot, string(volID)) + if err != nil { + klog.Errorf("failed to get the rootpath for the vol %s(%s)", string(volID), err) + return "", err + } + return strings.TrimSuffix(string(stdout), "\n"), nil } func getVolumeNamespace(volID volumeID) string { return namespacePrefix + string(volID) } -func setVolumeAttribute(root, attrName, attrValue string) error { - return execCommandErr("setfattr", "-n", attrName, "-v", attrValue, root) -} - -func createVolume(volOptions *volumeOptions, adminCr *util.Credentials, volID volumeID, bytesQuota int64) error { - if err := mountCephRoot(volID, volOptions, adminCr); err != nil { - return err - } - defer unmountCephRoot(volID) - - var ( - volRoot = getCephRootVolumePathLocal(volID) - volRootCreating = volRoot + "-creating" - ) - - if pathExists(volRoot) { - klog.V(4).Infof("cephfs: volume %s already exists, skipping creation", volID) - return nil - } - - if err := createMountPoint(volRootCreating); err != nil { - return err - } - - if bytesQuota > 0 { - if err := setVolumeAttribute(volRootCreating, "ceph.quota.max_bytes", fmt.Sprintf("%d", bytesQuota)); err != nil { +func createVolume(volOptions *volumeOptions, cr *util.Credentials, volID volumeID, bytesQuota int64) error { + //TODO: When we support multiple fs, need to hande subvolume group create for all fs's + if !cephfsInit { + err := execCommandErr( + "ceph", + "fs", + "subvolumegroup", + "create", + volOptions.FsName, + csiSubvolumeGroup, + "--mode", + "777", + "--pool_layout", + volOptions.Pool, + "-m", volOptions.Monitors, + "-c", util.CephConfigPath, + "-n", cephEntityClientPrefix+cr.ID, + "--key="+cr.Key) + if err != nil { + klog.Errorf("failed to create subvolume group csi, for the vol %s(%s)", string(volID), err) return err } + klog.V(4).Infof("cephfs: created subvolume group csi") + cephfsInit = true } - - if err := setVolumeAttribute(volRootCreating, "ceph.dir.layout.pool", volOptions.Pool); err != nil { - return fmt.Errorf("%v\ncephfs: Does pool '%s' exist?", err, volOptions.Pool) - } - - if err := setVolumeAttribute(volRootCreating, "ceph.dir.layout.pool_namespace", getVolumeNamespace(volID)); err != nil { - return err - } - - if err := os.Rename(volRootCreating, volRoot); err != nil { - return fmt.Errorf("couldn't mark volume %s as created: %v", volID, err) - } - - return nil -} - -func purgeVolume(volID volumeID, adminCr *util.Credentials, volOptions *volumeOptions) error { - if err := mountCephRoot(volID, volOptions, adminCr); err != nil { - return err - } - defer unmountCephRoot(volID) - - var ( - volRoot = getCephRootVolumePathLocal(volID) - volRootDeleting = volRoot + "-deleting" - ) - - if pathExists(volRoot) { - if err := os.Rename(volRoot, volRootDeleting); err != nil { - return fmt.Errorf("couldn't mark volume %s for deletion: %v", volID, err) - } - } else { - if !pathExists(volRootDeleting) { - klog.V(4).Infof("cephfs: volume %s not found, assuming it to be already deleted", volID) - return nil - } - } - - if err := os.RemoveAll(volRootDeleting); err != nil { - return fmt.Errorf("failed to delete volume %s: %v", volID, err) - } - - return nil -} - -func mountCephRoot(volID volumeID, volOptions *volumeOptions, adminCr *util.Credentials) error { - cephRoot := getCephRootPathLocal(volID) - - // Root path is not set for dynamically provisioned volumes - // Access to cephfs's / is required - volOptions.RootPath = "/" - - if err := createMountPoint(cephRoot); err != nil { - return err - } - - m, err := newMounter(volOptions) + err := execCommandErr( + "ceph", + "fs", + "subvolume", + "create", + volOptions.FsName, + string(volID), + strconv.FormatInt(bytesQuota, 10), + "--group_name", + csiSubvolumeGroup, + "-m", volOptions.Monitors, + "-c", util.CephConfigPath, + "-n", cephEntityClientPrefix+cr.ID, + "--key="+cr.Key) if err != nil { - return fmt.Errorf("failed to create mounter: %v", err) - } - - if err = m.mount(cephRoot, adminCr, volOptions); err != nil { - return fmt.Errorf("error mounting ceph root: %v", err) + klog.Errorf("failed to create subvolume %s(%s) in fs %s", string(volID), err, volOptions.FsName) + return err } return nil } -func unmountCephRoot(volID volumeID) { - cephRoot := getCephRootPathLocal(volID) - - if err := unmountVolume(cephRoot); err != nil { - klog.Errorf("failed to unmount %s with error %s", cephRoot, err) - } else { - if err := os.Remove(cephRoot); err != nil { - klog.Errorf("failed to remove %s with error %s", cephRoot, err) - } +func purgeVolume(volID volumeID, cr *util.Credentials, volOptions *volumeOptions) error { + err := execCommandErr( + "ceph", + "fs", + "subvolume", + "rm", + volOptions.FsName, + string(volID), + "--group_name", + csiSubvolumeGroup, + "--force", + "-m", volOptions.Monitors, + "-c", util.CephConfigPath, + "-n", cephEntityClientPrefix+cr.ID, + "--key="+cr.Key) + if err != nil { + klog.Errorf("failed to purge subvolume %s(%s) in fs %s", string(volID), err, volOptions.FsName) + return err } + + return nil } diff --git a/pkg/cephfs/volumeoptions.go b/pkg/cephfs/volumeoptions.go index e42027854..11f9f3d6c 100644 --- a/pkg/cephfs/volumeoptions.go +++ b/pkg/cephfs/volumeoptions.go @@ -18,6 +18,7 @@ package cephfs import ( "fmt" + "path" "strconv" "github.com/pkg/errors" @@ -225,7 +226,11 @@ func newVolumeOptionsFromVolID(volID string, volOpt, secrets map[string]string) } } - volOptions.RootPath = getVolumeRootPathCeph(volumeID(vid.FsSubvolName)) + volOptions.RootPath, err = getVolumeRootPathCeph(&volOptions, cr, volumeID(vid.FsSubvolName)) + if err != nil { + return nil, nil, err + } + volOptions.ProvisionVolume = true return &volOptions, &vid, nil @@ -267,7 +272,7 @@ func newVolumeOptionsFromVersion1Context(volID string, options, secrets map[stri return nil, nil, err } - opts.RootPath = getVolumeRootPathCeph(volumeID(volID)) + opts.RootPath = path.Join("/csi-volumes", string(volumeID(volID))) } else { if err = extractOption(&opts.RootPath, "rootPath", options); err != nil { return nil, nil, err diff --git a/pkg/util/cephcmds.go b/pkg/util/cephcmds.go index 5cd7bac47..eb61f6f44 100644 --- a/pkg/util/cephcmds.go +++ b/pkg/util/cephcmds.go @@ -31,9 +31,10 @@ import ( // ExecCommand executes passed in program with args and returns separate stdout and stderr streams func ExecCommand(program string, args ...string) (stdout, stderr []byte, err error) { var ( - cmd = exec.Command(program, args...) // nolint: gosec - stdoutBuf bytes.Buffer - stderrBuf bytes.Buffer + cmd = exec.Command(program, args...) // nolint: gosec + sanitizedArgs = StripSecretInArgs(args) + stdoutBuf bytes.Buffer + stderrBuf bytes.Buffer ) cmd.Stdout = &stdoutBuf @@ -41,7 +42,7 @@ func ExecCommand(program string, args ...string) (stdout, stderr []byte, err err if err := cmd.Run(); err != nil { return stdoutBuf.Bytes(), stderrBuf.Bytes(), fmt.Errorf("an error (%v)"+ - " occurred while running %s", err, program) + " occurred while running %s args: %v", err, program, sanitizedArgs) } return stdoutBuf.Bytes(), nil, nil diff --git a/scripts/travis-functest.sh b/scripts/travis-functest.sh index 09369ddf2..55805730b 100755 --- a/scripts/travis-functest.sh +++ b/scripts/travis-functest.sh @@ -11,6 +11,6 @@ sudo scripts/minikube.sh k8s-sidecar sudo chown -R travis: "$HOME"/.minikube /usr/local/bin/kubectl # functional tests -go test github.com/ceph/ceph-csi/e2e --rook-version=v1.0.1 --deploy-rook=true --deploy-timeout=10 -timeout=30m -v +go test github.com/ceph/ceph-csi/e2e --rook-version=master --deploy-rook=true --deploy-timeout=10 -timeout=30m -v sudo scripts/minikube.sh clean