cephfs/volume: added createVolume and purgeVolume

This commit is contained in:
gman 2018-04-13 14:49:49 +02:00
parent c21d05a536
commit b7d856e562

View File

@ -19,52 +19,156 @@ package cephfs
import ( import (
"fmt" "fmt"
"os" "os"
"path"
) )
// Volumes are mounted in .../controller/volumes/vol-{UUID}
// The actual user data resides in .../vol-{UUID}/volume-data
// purgeVolume moves the user data to .../vol-{UUID}/volume-deleting and only then calls os.RemoveAll
const ( const (
volumeMounter_fuse = "fuse" cephRootPrefix = PluginFolder + "/controller/volumes/root-"
volumeMounter_kernel = "kernel" cephVolumePrefix = PluginFolder + "/controller/volumes/vol-"
cephVolumesRoot = "csi-volumes"
volumeDataSuffix = "volume-data"
volumeDeletingSuffix = "volume-deleting"
namespacePrefix = "csi-ns-"
) )
type volumeMounter interface { func getCephRootPath_local(volUuid string) string {
mount(mountPoint string, volOptions *volumeOptions) error return cephRootPrefix + volUuid
} }
type fuseMounter struct{} func getCephRootVolumePath_local(volUuid string) string {
return path.Join(getCephRootPath_local(volUuid), cephVolumesRoot, volUuid)
}
func execCommandAndValidate(program string, args ...string) error { func getCephRootVolumeDataPath_local(volUuid string) string {
out, err := execCommand(program, args...) return path.Join(getCephRootVolumePath_local(volUuid), volumeDataSuffix)
if err != nil { }
return fmt.Errorf("cephfs: %s failed with following error: %s\ncephfs: %s output: %s", program, err, program, out)
func getCephRootVolumeDeletingPath_local(volUuid string) string {
return path.Join(getCephRootVolumePath_local(volUuid), volumeDeletingSuffix)
}
func getVolumeRootPath_local(volUuid string) string {
return cephVolumePrefix + volUuid
}
func getVolumeRootPath_ceph(volUuid string) string {
return path.Join("/", cephVolumesRoot, volUuid)
}
func getVolumeDataPath_local(volUuid string) string {
return path.Join(getVolumeRootPath_local(volUuid), volumeDataSuffix)
}
func getVolumeDeletingPath_local(volUuid string) string {
return path.Join(getVolumeRootPath_local(volUuid), volumeDeletingSuffix)
}
func getVolumeNamespace(volUuid string) string {
return namespacePrefix + volUuid
}
func setVolumeAttribute(root, attrName, attrValue string) error {
return execCommandAndValidate("setfattr", "-n", attrName, "-v", attrValue, root)
}
func createVolume(volOptions *volumeOptions, adminCr *credentials, volUuid string, bytesQuota int64) error {
cephRoot := getCephRootPath_local(volUuid)
if err := createMountPoint(cephRoot); err != nil {
return err
}
// RootPath is not set for a dynamically provisioned volume
// Access to cephfs's / is required
volOptions.RootPath = "/"
if err := mountKernel(cephRoot, adminCr, volOptions); err != nil {
return fmt.Errorf("error mounting ceph root: %v", err)
}
defer func() {
unmountVolume(cephRoot)
os.Remove(cephRoot)
}()
volOptions.RootPath = getVolumeRootPath_ceph(volUuid)
localVolRoot := getCephRootVolumePath_local(volUuid)
if err := createMountPoint(localVolRoot); err != nil {
return err
}
if err := setVolumeAttribute(localVolRoot, "ceph.quota.max_bytes", fmt.Sprintf("%d", bytesQuota)); err != nil {
return err
}
if err := setVolumeAttribute(localVolRoot, "ceph.dir.layout.pool", volOptions.Pool); err != nil {
return fmt.Errorf("%v\ncephfs: Does pool '%s' exist?", err, volOptions.Pool)
}
if err := setVolumeAttribute(localVolRoot, "ceph.dir.layout.pool_namespace", getVolumeNamespace(volUuid)); err != nil {
return err
} }
return nil return nil
} }
func (m *fuseMounter) mount(mountPoint string, volOptions *volumeOptions) error { func purgeVolume(volId string, cr *credentials, volOptions *volumeOptions) error {
return execCommandAndValidate("ceph-fuse", mountPoint, "-n", "client."+volOptions.User, "-r", volOptions.RootPath) var (
volUuid = uuidFromVolumeId(volId)
volRoot string
dataPath string
delPath string
)
if volOptions.ProvisionVolume {
// RootPath is not set for a dynamically provisioned volume
volOptions.RootPath = "/"
volRoot = getCephRootPath_local(volUuid)
dataPath = getCephRootVolumeDataPath_local(volUuid)
delPath = getCephRootVolumeDeletingPath_local(volUuid)
} else {
volRoot = getVolumeRootPath_local(volUuid)
dataPath = getVolumeDataPath_local(volUuid)
delPath = getVolumeDeletingPath_local(volUuid)
} }
type kernelMounter struct{} if err := createMountPoint(volRoot); err != nil {
func (m *kernelMounter) mount(mountPoint string, volOptions *volumeOptions) error {
if err := execCommandAndValidate("modprobe", "ceph"); err != nil {
return err return err
} }
return execCommandAndValidate("mount", if err := mountKernel(volRoot, cr, volOptions); err != nil {
"-t", "ceph", return err
fmt.Sprintf("%s:%s", volOptions.Monitors, volOptions.RootPath),
mountPoint,
"-o",
fmt.Sprintf("name=%s,secretfile=%s", volOptions.User, getCephSecretPath(volOptions.User)),
)
} }
func unmountVolume(mountPoint string) error { defer func() {
return execCommandAndValidate("umount", mountPoint) if volOptions.ProvisionVolume {
os.Remove(getCephRootVolumePath_local(volUuid))
} }
func createMountPoint(root string) error { unmountVolume(volRoot)
return os.MkdirAll(root, 0750) os.Remove(volRoot)
}()
if err := os.Rename(dataPath, delPath); err != nil {
if os.IsNotExist(err) {
// dataPath doesn't exist if NodePublishVolume wasn't called
return nil
} else {
return fmt.Errorf("couldn't mark volume %s for deletion: %v", volId, err)
}
}
if err := os.RemoveAll(delPath); err != nil {
return fmt.Errorf("couldn't delete volume %s: %v", volId, err)
}
return nil
} }