Modify CephFs provisioner to use the ceph mgr commands

Currently CephFs provisioner mounts the ceph filesystem
and creates a subdirectory as a part of provisioning the
volume. Ceph now supports commands to provision fs subvolumes,
hance modify the provisioner to use ceph mgr commands to
(de)provision fs subvolumes.

Signed-off-by: Poornima G <pgurusid@redhat.com>
This commit is contained in:
Poornima G 2019-06-08 05:06:03 +00:00 committed by mergify[bot]
parent fa68c35f3b
commit 32ea550e3a
8 changed files with 114 additions and 129 deletions

View File

@ -69,8 +69,6 @@ func deployOperator(c kubernetes.Interface) {
_, err := framework.RunKubectl("create", "-f", opPath) _, err := framework.RunKubectl("create", "-f", opPath)
Expect(err).Should(BeNil()) Expect(err).Should(BeNil())
err = waitForDaemonSets("rook-ceph-agent", rookNS, c, deployTimeout)
Expect(err).Should(BeNil())
err = waitForDaemonSets("rook-discover", rookNS, c, deployTimeout) err = waitForDaemonSets("rook-discover", rookNS, c, deployTimeout)
Expect(err).Should(BeNil()) Expect(err).Should(BeNil())
err = waitForDeploymentComplete("rook-ceph-operator", rookNS, c, deployTimeout) err = waitForDeploymentComplete("rook-ceph-operator", rookNS, c, deployTimeout)
@ -80,10 +78,12 @@ func deployOperator(c kubernetes.Interface) {
func deployCluster(c kubernetes.Interface) { func deployCluster(c kubernetes.Interface) {
opPath := fmt.Sprintf("%s/%s", rookURL, "cluster-test.yaml") opPath := fmt.Sprintf("%s/%s", rookURL, "cluster-test.yaml")
framework.RunKubectlOrDie("create", "-f", opPath) framework.RunKubectlOrDie("create", "-f", opPath)
err := waitForDaemonSets("rook-ceph-agent", rookNS, c, deployTimeout)
Expect(err).Should(BeNil())
opt := &metav1.ListOptions{ opt := &metav1.ListOptions{
LabelSelector: "app=rook-ceph-mon", LabelSelector: "app=rook-ceph-mon",
} }
err := checkCephPods(rookNS, c, 1, deployTimeout, opt) err = checkCephPods(rookNS, c, 1, deployTimeout, opt)
Expect(err).Should(BeNil()) Expect(err).Should(BeNil())
} }

View File

@ -82,6 +82,10 @@ func getCephUser(volOptions *volumeOptions, adminCr *util.Credentials, volID vol
func createCephUser(volOptions *volumeOptions, adminCr *util.Credentials, volID volumeID) (*cephEntity, error) { func createCephUser(volOptions *volumeOptions, adminCr *util.Credentials, volID volumeID) (*cephEntity, error) {
adminID, userID := genUserIDs(adminCr, volID) adminID, userID := genUserIDs(adminCr, volID)
volRootPath, err := getVolumeRootPathCeph(volOptions, adminCr, volID)
if err != nil {
return nil, err
}
return getSingleCephEntity( return getSingleCephEntity(
"-m", volOptions.Monitors, "-m", volOptions.Monitors,
@ -91,7 +95,7 @@ func createCephUser(volOptions *volumeOptions, adminCr *util.Credentials, volID
"-f", "json", "-f", "json",
"auth", "get-or-create", userID, "auth", "get-or-create", userID,
// User capabilities // User capabilities
"mds", fmt.Sprintf("allow rw path=%s", getVolumeRootPathCeph(volID)), "mds", fmt.Sprintf("allow rw path=%s", volRootPath),
"mon", "allow r", "mon", "allow r",
"osd", fmt.Sprintf("allow rw pool=%s namespace=%s", volOptions.Pool, getVolumeNamespace(volID)), "osd", fmt.Sprintf("allow rw pool=%s namespace=%s", volOptions.Pool, getVolumeNamespace(volID)),
) )

View File

@ -95,11 +95,16 @@ func mountOneCacheEntry(volOptions *volumeOptions, vid *volumeIdentifier, me *vo
volID := vid.VolumeID volID := vid.VolumeID
if volOptions.ProvisionVolume { if volOptions.ProvisionVolume {
volOptions.RootPath = getVolumeRootPathCeph(volumeID(vid.FsSubvolName))
cr, err = util.GetAdminCredentials(decodeCredentials(me.Secrets)) cr, err = util.GetAdminCredentials(decodeCredentials(me.Secrets))
if err != nil { if err != nil {
return err return err
} }
volOptions.RootPath, err = getVolumeRootPathCeph(volOptions, cr, volumeID(vid.FsSubvolName))
if err != nil {
return err
}
var entity *cephEntity var entity *cephEntity
entity, err = getCephUser(volOptions, cr, volumeID(vid.FsSubvolName)) entity, err = getCephUser(volOptions, cr, volumeID(vid.FsSubvolName))
if err != nil { if err != nil {

View File

@ -21,7 +21,6 @@ import (
"encoding/json" "encoding/json"
"errors" "errors"
"fmt" "fmt"
"os"
"os/exec" "os/exec"
"google.golang.org/grpc/codes" "google.golang.org/grpc/codes"
@ -91,11 +90,6 @@ func isMountPoint(p string) (bool, error) {
return !notMnt, nil return !notMnt, nil
} }
func pathExists(p string) bool {
_, err := os.Stat(p)
return err == nil
}
// Controller service request validation // Controller service request validation
func (cs *ControllerServer) validateCreateVolumeRequest(req *csi.CreateVolumeRequest) error { func (cs *ControllerServer) validateCreateVolumeRequest(req *csi.CreateVolumeRequest) error {
if err := cs.Driver.ValidateControllerServiceRequest(csi.ControllerServiceCapability_RPC_CREATE_DELETE_VOLUME); err != nil { if err := cs.Driver.ValidateControllerServiceRequest(csi.ControllerServiceCapability_RPC_CREATE_DELETE_VOLUME); err != nil {

View File

@ -17,9 +17,8 @@ limitations under the License.
package cephfs package cephfs
import ( import (
"fmt" "strconv"
"os" "strings"
"path"
"github.com/ceph/ceph-csi/pkg/util" "github.com/ceph/ceph-csi/pkg/util"
@ -27,132 +26,109 @@ import (
) )
const ( const (
cephVolumesRoot = "csi-volumes" namespacePrefix = "fsvolumens_"
csiSubvolumeGroup = "csi"
namespacePrefix = "ns-"
) )
func getCephRootPathLocal(volID volumeID) string { var (
return fmt.Sprintf("%s/controller/volumes/root-%s", PluginFolder, string(volID)) // cephfsInit is used to create "csi" subvolume group for the first time the csi plugin loads.
} // Subvolume group create gets called every time the plugin loads, though it doesn't result in error
// its unnecessary
cephfsInit = false
)
func getCephRootVolumePathLocal(volID volumeID) string { func getVolumeRootPathCeph(volOptions *volumeOptions, cr *util.Credentials, volID volumeID) (string, error) {
return path.Join(getCephRootPathLocal(volID), cephVolumesRoot, string(volID)) stdout, _, err := util.ExecCommand(
} "ceph",
"fs",
"subvolume",
"getpath",
volOptions.FsName,
string(volID),
"--group_name",
csiSubvolumeGroup,
"-m", volOptions.Monitors,
"-c", util.CephConfigPath,
"-n", cephEntityClientPrefix+cr.ID,
"--key="+cr.Key)
func getVolumeRootPathCeph(volID volumeID) string { if err != nil {
return path.Join("/", cephVolumesRoot, string(volID)) klog.Errorf("failed to get the rootpath for the vol %s(%s)", string(volID), err)
return "", err
}
return strings.TrimSuffix(string(stdout), "\n"), nil
} }
func getVolumeNamespace(volID volumeID) string { func getVolumeNamespace(volID volumeID) string {
return namespacePrefix + string(volID) return namespacePrefix + string(volID)
} }
func setVolumeAttribute(root, attrName, attrValue string) error { func createVolume(volOptions *volumeOptions, cr *util.Credentials, volID volumeID, bytesQuota int64) error {
return execCommandErr("setfattr", "-n", attrName, "-v", attrValue, root) //TODO: When we support multiple fs, need to hande subvolume group create for all fs's
} if !cephfsInit {
err := execCommandErr(
func createVolume(volOptions *volumeOptions, adminCr *util.Credentials, volID volumeID, bytesQuota int64) error { "ceph",
if err := mountCephRoot(volID, volOptions, adminCr); err != nil { "fs",
return err "subvolumegroup",
} "create",
defer unmountCephRoot(volID) volOptions.FsName,
csiSubvolumeGroup,
var ( "--mode",
volRoot = getCephRootVolumePathLocal(volID) "777",
volRootCreating = volRoot + "-creating" "--pool_layout",
) volOptions.Pool,
"-m", volOptions.Monitors,
if pathExists(volRoot) { "-c", util.CephConfigPath,
klog.V(4).Infof("cephfs: volume %s already exists, skipping creation", volID) "-n", cephEntityClientPrefix+cr.ID,
return nil "--key="+cr.Key)
} if err != nil {
klog.Errorf("failed to create subvolume group csi, for the vol %s(%s)", string(volID), err)
if err := createMountPoint(volRootCreating); err != nil {
return err
}
if bytesQuota > 0 {
if err := setVolumeAttribute(volRootCreating, "ceph.quota.max_bytes", fmt.Sprintf("%d", bytesQuota)); err != nil {
return err return err
} }
klog.V(4).Infof("cephfs: created subvolume group csi")
cephfsInit = true
} }
err := execCommandErr(
if err := setVolumeAttribute(volRootCreating, "ceph.dir.layout.pool", volOptions.Pool); err != nil { "ceph",
return fmt.Errorf("%v\ncephfs: Does pool '%s' exist?", err, volOptions.Pool) "fs",
} "subvolume",
"create",
if err := setVolumeAttribute(volRootCreating, "ceph.dir.layout.pool_namespace", getVolumeNamespace(volID)); err != nil { volOptions.FsName,
return err string(volID),
} strconv.FormatInt(bytesQuota, 10),
"--group_name",
if err := os.Rename(volRootCreating, volRoot); err != nil { csiSubvolumeGroup,
return fmt.Errorf("couldn't mark volume %s as created: %v", volID, err) "-m", volOptions.Monitors,
} "-c", util.CephConfigPath,
"-n", cephEntityClientPrefix+cr.ID,
return nil "--key="+cr.Key)
}
func purgeVolume(volID volumeID, adminCr *util.Credentials, volOptions *volumeOptions) error {
if err := mountCephRoot(volID, volOptions, adminCr); err != nil {
return err
}
defer unmountCephRoot(volID)
var (
volRoot = getCephRootVolumePathLocal(volID)
volRootDeleting = volRoot + "-deleting"
)
if pathExists(volRoot) {
if err := os.Rename(volRoot, volRootDeleting); err != nil {
return fmt.Errorf("couldn't mark volume %s for deletion: %v", volID, err)
}
} else {
if !pathExists(volRootDeleting) {
klog.V(4).Infof("cephfs: volume %s not found, assuming it to be already deleted", volID)
return nil
}
}
if err := os.RemoveAll(volRootDeleting); err != nil {
return fmt.Errorf("failed to delete volume %s: %v", volID, err)
}
return nil
}
func mountCephRoot(volID volumeID, volOptions *volumeOptions, adminCr *util.Credentials) error {
cephRoot := getCephRootPathLocal(volID)
// Root path is not set for dynamically provisioned volumes
// Access to cephfs's / is required
volOptions.RootPath = "/"
if err := createMountPoint(cephRoot); err != nil {
return err
}
m, err := newMounter(volOptions)
if err != nil { if err != nil {
return fmt.Errorf("failed to create mounter: %v", err) klog.Errorf("failed to create subvolume %s(%s) in fs %s", string(volID), err, volOptions.FsName)
} return err
if err = m.mount(cephRoot, adminCr, volOptions); err != nil {
return fmt.Errorf("error mounting ceph root: %v", err)
} }
return nil return nil
} }
func unmountCephRoot(volID volumeID) { func purgeVolume(volID volumeID, cr *util.Credentials, volOptions *volumeOptions) error {
cephRoot := getCephRootPathLocal(volID) err := execCommandErr(
"ceph",
if err := unmountVolume(cephRoot); err != nil { "fs",
klog.Errorf("failed to unmount %s with error %s", cephRoot, err) "subvolume",
} else { "rm",
if err := os.Remove(cephRoot); err != nil { volOptions.FsName,
klog.Errorf("failed to remove %s with error %s", cephRoot, err) string(volID),
} "--group_name",
csiSubvolumeGroup,
"--force",
"-m", volOptions.Monitors,
"-c", util.CephConfigPath,
"-n", cephEntityClientPrefix+cr.ID,
"--key="+cr.Key)
if err != nil {
klog.Errorf("failed to purge subvolume %s(%s) in fs %s", string(volID), err, volOptions.FsName)
return err
} }
return nil
} }

View File

@ -18,6 +18,7 @@ package cephfs
import ( import (
"fmt" "fmt"
"path"
"strconv" "strconv"
"github.com/pkg/errors" "github.com/pkg/errors"
@ -225,7 +226,11 @@ func newVolumeOptionsFromVolID(volID string, volOpt, secrets map[string]string)
} }
} }
volOptions.RootPath = getVolumeRootPathCeph(volumeID(vid.FsSubvolName)) volOptions.RootPath, err = getVolumeRootPathCeph(&volOptions, cr, volumeID(vid.FsSubvolName))
if err != nil {
return nil, nil, err
}
volOptions.ProvisionVolume = true volOptions.ProvisionVolume = true
return &volOptions, &vid, nil return &volOptions, &vid, nil
@ -267,7 +272,7 @@ func newVolumeOptionsFromVersion1Context(volID string, options, secrets map[stri
return nil, nil, err return nil, nil, err
} }
opts.RootPath = getVolumeRootPathCeph(volumeID(volID)) opts.RootPath = path.Join("/csi-volumes", string(volumeID(volID)))
} else { } else {
if err = extractOption(&opts.RootPath, "rootPath", options); err != nil { if err = extractOption(&opts.RootPath, "rootPath", options); err != nil {
return nil, nil, err return nil, nil, err

View File

@ -31,9 +31,10 @@ import (
// ExecCommand executes passed in program with args and returns separate stdout and stderr streams // ExecCommand executes passed in program with args and returns separate stdout and stderr streams
func ExecCommand(program string, args ...string) (stdout, stderr []byte, err error) { func ExecCommand(program string, args ...string) (stdout, stderr []byte, err error) {
var ( var (
cmd = exec.Command(program, args...) // nolint: gosec cmd = exec.Command(program, args...) // nolint: gosec
stdoutBuf bytes.Buffer sanitizedArgs = StripSecretInArgs(args)
stderrBuf bytes.Buffer stdoutBuf bytes.Buffer
stderrBuf bytes.Buffer
) )
cmd.Stdout = &stdoutBuf cmd.Stdout = &stdoutBuf
@ -41,7 +42,7 @@ func ExecCommand(program string, args ...string) (stdout, stderr []byte, err err
if err := cmd.Run(); err != nil { if err := cmd.Run(); err != nil {
return stdoutBuf.Bytes(), stderrBuf.Bytes(), fmt.Errorf("an error (%v)"+ return stdoutBuf.Bytes(), stderrBuf.Bytes(), fmt.Errorf("an error (%v)"+
" occurred while running %s", err, program) " occurred while running %s args: %v", err, program, sanitizedArgs)
} }
return stdoutBuf.Bytes(), nil, nil return stdoutBuf.Bytes(), nil, nil

View File

@ -11,6 +11,6 @@ sudo scripts/minikube.sh k8s-sidecar
sudo chown -R travis: "$HOME"/.minikube /usr/local/bin/kubectl sudo chown -R travis: "$HOME"/.minikube /usr/local/bin/kubectl
# functional tests # functional tests
go test github.com/ceph/ceph-csi/e2e --rook-version=v1.0.1 --deploy-rook=true --deploy-timeout=10 -timeout=30m -v go test github.com/ceph/ceph-csi/e2e --rook-version=master --deploy-rook=true --deploy-timeout=10 -timeout=30m -v
sudo scripts/minikube.sh clean sudo scripts/minikube.sh clean