make CephFS SubvolumeGroup configurable

The name of the CephFS SubvolumeGroup for the CSI volumes was hardcoded to "csi". To make permission management in multi tenancy environments easier, this commit makes it possible to configure the CSI SubvolumeGroup.

related to #798 and #931
This commit is contained in:
Mathias Merscher 2020-04-29 12:03:08 +02:00 committed by mergify[bot]
parent 47226ccdf7
commit 0991cdf498
6 changed files with 110 additions and 53 deletions

View File

@ -25,6 +25,8 @@ serviceAccounts:
# monitors:
# - "<MONValue1>"
# - "<MONValue2>"
# cephFS:
# subvolumeGroup: "csi"
csiConfig: []
nodeplugin:

View File

@ -365,6 +365,15 @@ func deleteConfigMap(pluginPath string) {
}
}
// matches the definition in internal/util/csiconfig.go
type clusterInfo struct {
ClusterID string `json:"clusterID"`
Monitors []string `json:"monitors"`
CephFS struct {
SubvolumeGroup string `json:"subvolumeGroup"`
} `json:"cephFS"`
}
func createConfigMap(pluginPath string, c kubernetes.Interface, f *framework.Framework) {
path := pluginPath + configMap
cm := v1.ConfigMap{}
@ -379,15 +388,11 @@ func createConfigMap(pluginPath string, c kubernetes.Interface, f *framework.Fra
fsID = strings.Trim(fsID, "\n")
// get mon list
mons := getMons(rookNamespace, c)
conmap := []struct {
Clusterid string `json:"clusterID"`
Monitors []string `json:"monitors"`
}{
{
fsID,
mons,
},
}
conmap := []clusterInfo{{
ClusterID: fsID,
Monitors: mons,
}}
conmap[0].CephFS.SubvolumeGroup = "e2e"
data, err := json.Marshal(conmap)
Expect(err).Should(BeNil())
cm.Data["config.json"] = string(data)
@ -1012,7 +1017,7 @@ func deleteBackingCephFSVolume(f *framework.Framework, pvc *v1.PersistentVolumeC
opt := metav1.ListOptions{
LabelSelector: "app=rook-ceph-tools",
}
_, stdErr := execCommandInPod(f, "ceph fs subvolume rm myfs "+imageData.imageName+" csi", rookNamespace, &opt)
_, stdErr := execCommandInPod(f, "ceph fs subvolume rm myfs "+imageData.imageName+" e2e", rookNamespace, &opt)
Expect(stdErr).Should(BeEmpty())
if stdErr != "" {

View File

@ -12,6 +12,7 @@ kind: ConfigMap
# each such cluster in use.
# To add more clusters or edit MON addresses in an existing config map, use
# the `kubectl replace` command.
# The field "cephFS.subvolumeGroup" is optional and defaults to "csi".
# NOTE: Changes to the config map is automatically updated in the running pods,
# thus restarting existing pods using the config map is NOT required on edits
# to the config map.
@ -25,7 +26,10 @@ data:
"<MONValue2>",
...
"<MONValueN>"
]
],
"cephFS": {
"subvolumeGroup": "<subvolumegroup for cephfs volumes>"
}
}
]
metadata:

View File

@ -29,10 +29,6 @@ import (
"k8s.io/klog"
)
const (
csiSubvolumeGroup = "csi"
)
var (
// cephfsInit is used to create "csi" subvolume group for the first time the csi plugin loads.
// Subvolume group create gets called every time the plugin loads, though it doesn't result in error
@ -65,7 +61,7 @@ func getVolumeRootPathCeph(ctx context.Context, volOptions *volumeOptions, cr *u
volOptions.FsName,
string(volID),
"--group_name",
csiSubvolumeGroup,
volOptions.SubvolumeGroup,
"-m", volOptions.Monitors,
"-c", util.CephConfigPath,
"-n", cephEntityClientPrefix+cr.ID,
@ -93,16 +89,16 @@ func createVolume(ctx context.Context, volOptions *volumeOptions, cr *util.Crede
"subvolumegroup",
"create",
volOptions.FsName,
csiSubvolumeGroup,
volOptions.SubvolumeGroup,
"-m", volOptions.Monitors,
"-c", util.CephConfigPath,
"-n", cephEntityClientPrefix+cr.ID,
"--keyfile="+cr.KeyFile)
if err != nil {
klog.Errorf(util.Log(ctx, "failed to create subvolume group csi, for the vol %s(%s)"), string(volID), err)
klog.Errorf(util.Log(ctx, "failed to create subvolume group %s, for the vol %s(%s)"), volOptions.SubvolumeGroup, string(volID), err)
return err
}
klog.V(4).Infof(util.Log(ctx, "cephfs: created subvolume group csi"))
klog.V(4).Infof(util.Log(ctx, "cephfs: created subvolume group %s"), volOptions.SubvolumeGroup)
cephfsInit = true
}
@ -114,7 +110,7 @@ func createVolume(ctx context.Context, volOptions *volumeOptions, cr *util.Crede
string(volID),
strconv.FormatInt(bytesQuota, 10),
"--group_name",
csiSubvolumeGroup,
volOptions.SubvolumeGroup,
"--mode", "777",
"-m", volOptions.Monitors,
"-c", util.CephConfigPath,
@ -212,7 +208,7 @@ func purgeVolume(ctx context.Context, volID volumeID, cr *util.Credentials, volO
volOptions.FsName,
string(volID),
"--group_name",
csiSubvolumeGroup,
volOptions.SubvolumeGroup,
"-m", volOptions.Monitors,
"-c", util.CephConfigPath,
"-n", cephEntityClientPrefix+cr.ID,

View File

@ -45,6 +45,7 @@ type volumeOptions struct {
ProvisionVolume bool `json:"provisionVolume"`
KernelMountOptions string `json:"kernelMountOptions"`
FuseMountOptions string `json:"fuseMountOptions"`
SubvolumeGroup string
}
func validateNonEmptyField(field, fieldName string) error {
@ -109,24 +110,30 @@ func extractMounter(dest *string, options map[string]string) error {
return nil
}
func getMonsAndClusterID(options map[string]string) (string, string, error) {
func getClusterInformation(options map[string]string) (string, string, string, error) {
clusterID, ok := options["clusterID"]
if !ok {
err := fmt.Errorf("clusterID must be set")
return "", "", err
return "", "", "", err
}
if err := validateNonEmptyField(clusterID, "clusterID"); err != nil {
return "", "", err
return "", "", "", err
}
monitors, err := util.Mons(csiConfigFile, clusterID)
if err != nil {
err = errors.Wrapf(err, "failed to fetch monitor list using clusterID (%s)", clusterID)
return "", "", err
return "", "", "", err
}
return monitors, clusterID, err
subvolumeGroup, err := util.CephFSSubvolumeGroup(csiConfigFile, clusterID)
if err != nil {
err = errors.Wrapf(err, "failed to fetch subvolumegroup using clusterID (%s)", clusterID)
return "", "", "", err
}
return clusterID, monitors, subvolumeGroup, err
}
// newVolumeOptions generates a new instance of volumeOptions from the provided
@ -140,7 +147,7 @@ func newVolumeOptions(ctx context.Context, requestName string, req *csi.CreateVo
volOptions := req.GetParameters()
opts.Monitors, opts.ClusterID, err = getMonsAndClusterID(volOptions)
opts.ClusterID, opts.Monitors, opts.SubvolumeGroup, err = getClusterInformation(volOptions)
if err != nil {
return nil, err
}
@ -225,6 +232,10 @@ func newVolumeOptionsFromVolID(ctx context.Context, volID string, volOpt, secret
return nil, nil, errors.Wrapf(err, "failed to fetch monitor list using clusterID (%s)", vi.ClusterID)
}
if volOptions.SubvolumeGroup, err = util.CephFSSubvolumeGroup(csiConfigFile, vi.ClusterID); err != nil {
return nil, nil, errors.Wrapf(err, "failed to fetch subvolumegroup list using clusterID (%s)", vi.ClusterID)
}
cr, err := util.NewAdminCredentials(secrets)
if err != nil {
return nil, nil, err
@ -262,6 +273,10 @@ func newVolumeOptionsFromVolID(ctx context.Context, volID string, volOpt, secret
return nil, nil, err
}
if err = extractOptionalOption(&volOptions.SubvolumeGroup, "subvolumeGroup", volOpt); err != nil {
return nil, nil, err
}
if err = extractMounter(&volOptions.Mounter, volOpt); err != nil {
return nil, nil, err
}
@ -358,7 +373,7 @@ func newVolumeOptionsFromStaticVolume(volID string, options map[string]string) (
// store NOT of static boolean
opts.ProvisionVolume = !staticVol
opts.Monitors, opts.ClusterID, err = getMonsAndClusterID(options)
opts.ClusterID, opts.Monitors, opts.SubvolumeGroup, err = getClusterInformation(options)
if err != nil {
return nil, nil, err
}

View File

@ -23,53 +23,88 @@ import (
"strings"
)
// clusterInfo strongly typed JSON spec for the below JSON structure
const (
// defaultCsiSubvolumeGroup defines the default name for the CephFS CSI subvolumegroup.
// This was hardcoded once and defaults to the old value to keep backward compatibility.
defaultCsiSubvolumeGroup = "csi"
)
// clusterInfo strongly typed JSON spec for the above JSON structure
type clusterInfo struct {
// ClusterID is used for unique identification
ClusterID string `json:"clusterID"`
// Monitors is monitor list for corresponding cluster ID
Monitors []string `json:"monitors"`
// CephFS contains CephFS specific options
CephFS struct {
// SubvolumeGroup contains the name of the SubvolumeGroup for CSI volumes
SubvolumeGroup string `json:"subvolumeGroup"`
} `json:"cephFS"`
}
/*
Mons returns a comma separated MON list from the csi config for the given clusterID
Expected JSON structure in the passed in config file is,
[
{
"clusterID": "<cluster-id>",
"monitors":
[
"<monitor-value>",
"<monitor-value>",
...
]
},
...
]
*/
func Mons(pathToConfig, clusterID string) (string, error) {
// Expected JSON structure in the passed in config file is,
// [
// {
// "clusterID": "<cluster-id>",
// "monitors":
// [
// "<monitor-value>",
// "<monitor-value>",
// ...
// ],
// "cephFS": {
// "subvolumeGroup": "<subvolumegroup for cephfs volumes>"
// }
// },
// ...
// ]
func readClusterInfo(pathToConfig, clusterID string) (clusterInfo, error) {
var config []clusterInfo
// #nosec
content, err := ioutil.ReadFile(pathToConfig)
if err != nil {
err = fmt.Errorf("error fetching configuration for cluster ID (%s). (%s)", clusterID, err)
return "", err
return clusterInfo{}, err
}
err = json.Unmarshal(content, &config)
if err != nil {
return "", fmt.Errorf("unmarshal failed: %v. raw buffer response: %s",
return clusterInfo{}, fmt.Errorf("unmarshal failed: %v. raw buffer response: %s",
err, string(content))
}
for _, cluster := range config {
if cluster.ClusterID == clusterID {
if len(cluster.Monitors) == 0 {
return "", fmt.Errorf("empty monitor list for cluster ID (%s) in config", clusterID)
}
return strings.Join(cluster.Monitors, ","), nil
return cluster, nil
}
}
return "", fmt.Errorf("missing configuration for cluster ID (%s)", clusterID)
return clusterInfo{}, fmt.Errorf("missing configuration for cluster ID (%s)", clusterID)
}
// Mons returns a comma separated MON list from the csi config for the given clusterID
func Mons(pathToConfig, clusterID string) (string, error) {
cluster, err := readClusterInfo(pathToConfig, clusterID)
if err != nil {
return "", err
}
if len(cluster.Monitors) == 0 {
return "", fmt.Errorf("empty monitor list for cluster ID (%s) in config", clusterID)
}
return strings.Join(cluster.Monitors, ","), nil
}
// CephFSSubvolumeGroup returns the subvolumeGroup for CephFS volumes. If not set, it returns the default value "csi"
func CephFSSubvolumeGroup(pathToConfig, clusterID string) (string, error) {
cluster, err := readClusterInfo(pathToConfig, clusterID)
if err != nil {
return "", err
}
if cluster.CephFS.SubvolumeGroup == "" {
return defaultCsiSubvolumeGroup, nil
}
return cluster.CephFS.SubvolumeGroup, nil
}