diff --git a/e2e/cephfs.go b/e2e/cephfs.go index 1e143a716..77af53ade 100644 --- a/e2e/cephfs.go +++ b/e2e/cephfs.go @@ -176,12 +176,12 @@ var _ = Describe("cephfs", func() { }) By("create a storage class with pool and a PVC then Bind it to an app", func() { - createCephfsStorageClass(f.ClientSet, f, true) + createCephfsStorageClass(f.ClientSet, f, true, "") validatePVCAndAppBinding(pvcPath, appPath, f) deleteResource(cephfsExamplePath + "storageclass.yaml") }) - createCephfsStorageClass(f.ClientSet, f, false) + createCephfsStorageClass(f.ClientSet, f, false, "") By("create and delete a PVC", func() { By("create a PVC and Bind it to an app", func() { @@ -258,6 +258,38 @@ var _ = Describe("cephfs", func() { } }) + By("validate multiple subvolumegroup creation", func() { + deleteResource(cephfsExamplePath + "storageclass.yaml") + // re-define configmap with information of multiple clusters. + subvolgrpInfo := map[string]string{ + "clusterID-1": "subvolgrp1", + "clusterID-2": "subvolgrp2", + } + createCustomConfigMap(f.ClientSet, cephfsDirPath, subvolgrpInfo) + createCephfsStorageClass(f.ClientSet, f, false, "clusterID-1") + validatePVCAndAppBinding(pvcPath, appPath, f) + deleteResource(cephfsExamplePath + "storageclass.yaml") + // verify subvolumegroup creation. + err := validateSubvolumegroup(f, "subvolgrp1") + if err != nil { + Fail(err.Error()) + } + + // create resources and verify subvolume group creation + // for the second cluster. + createCephfsStorageClass(f.ClientSet, f, false, "clusterID-2") + validatePVCAndAppBinding(pvcPath, appPath, f) + deleteResource(cephfsExamplePath + "storageclass.yaml") + err = validateSubvolumegroup(f, "subvolgrp2") + if err != nil { + Fail(err.Error()) + } + deleteConfigMap(cephfsDirPath) + }) + + createConfigMap(cephfsDirPath, f.ClientSet, f) + createCephfsStorageClass(f.ClientSet, f, false, "") + By("Resize PVC and check application directory size", func() { v, err := f.ClientSet.Discovery().ServerVersion() if err != nil { diff --git a/e2e/utils.go b/e2e/utils.go index d04e3461b..f13b056ce 100644 --- a/e2e/utils.go +++ b/e2e/utils.go @@ -258,7 +258,7 @@ func getStorageClass(path string) scv1.StorageClass { return sc } -func createCephfsStorageClass(c kubernetes.Interface, f *framework.Framework, enablePool bool) { +func createCephfsStorageClass(c kubernetes.Interface, f *framework.Framework, enablePool bool, clusterID string) { scPath := fmt.Sprintf("%s/%s", cephfsExamplePath, "storageclass.yaml") sc := getStorageClass(scPath) sc.Parameters["fsName"] = "myfs" @@ -278,6 +278,10 @@ func createCephfsStorageClass(c kubernetes.Interface, f *framework.Framework, en Expect(stdErr).Should(BeEmpty()) // remove new line present in fsID fsID = strings.Trim(fsID, "\n") + + if clusterID != "" { + fsID = clusterID + } sc.Namespace = cephCSINamespace sc.Parameters["clusterID"] = fsID _, err := c.StorageV1().StorageClasses().Create(context.TODO(), &sc, metav1.CreateOptions{}) @@ -1207,3 +1211,54 @@ func addTopologyDomainsToDSYaml(template, labels string) string { return strings.ReplaceAll(template, "# - \"--domainlabels=failure-domain/region,failure-domain/zone\"", "- \"--domainlabels="+labels+"\"") } + +// createCustomConfigMap provides multiple clusters information. +func createCustomConfigMap(c kubernetes.Interface, pluginPath string, subvolgrpInfo map[string]string) { + path := pluginPath + configMap + cm := v1.ConfigMap{} + err := unmarshal(path, &cm) + Expect(err).Should(BeNil()) + + // get mon list + mons := getMons(rookNamespace, c) + // get clusterIDs + var clusterID []string + for key := range subvolgrpInfo { + clusterID = append(clusterID, key) + } + conmap := []util.ClusterInfo{ + { + ClusterID: clusterID[0], + Monitors: mons, + }, + { + ClusterID: clusterID[1], + Monitors: mons, + }} + for i := 0; i < len(subvolgrpInfo); i++ { + conmap[i].CephFS.SubvolumeGroup = subvolgrpInfo[clusterID[i]] + } + data, err := json.Marshal(conmap) + Expect(err).Should(BeNil()) + cm.Data["config.json"] = string(data) + cm.Namespace = cephCSINamespace + // since a configmap is already created, update the existing configmap + _, updateErr := c.CoreV1().ConfigMaps(cephCSINamespace).Update(context.TODO(), &cm, metav1.UpdateOptions{}) + Expect(updateErr).Should(BeNil()) +} + +// validateSubvolumegroup validates whether subvolumegroup is present. +func validateSubvolumegroup(f *framework.Framework, subvolgrp string) error { + cmd := fmt.Sprintf("ceph fs subvolumegroup getpath myfs %s", subvolgrp) + stdOut, err := execCommandInToolBoxPod(f, cmd, rookNamespace) + Expect(err).Should(BeEmpty()) + if err != "" { + return fmt.Errorf("error subvolumegroup %s doesn't exist", subvolgrp) + } + expectedGrpPath := "/volumes/" + subvolgrp + stdOut = strings.TrimSpace(stdOut) + if stdOut != expectedGrpPath { + return fmt.Errorf("error unexpected group path. Found: %s", stdOut) + } + return nil +} diff --git a/internal/cephfs/volume.go b/internal/cephfs/volume.go index d60b4de4b..dc2df49cc 100644 --- a/internal/cephfs/volume.go +++ b/internal/cephfs/volume.go @@ -30,14 +30,12 @@ import ( ) var ( - // cephfsInit is used to create "csi" subvolume group for the first time the csi plugin loads. - // Subvolume group create gets called every time the plugin loads, though it doesn't result in error - // its unnecessary - cephfsInit = false - - // resizeSupportedList stores the mapping for clusterID and resize is - // supported in the particular cluster - resizeSupportedList = make(map[string]bool) + // clusterAdditionalInfo contains information regarding if resize is + // supported in the particular cluster and subvolumegroup is + // created or not. + // Subvolumegroup creation and volume resize decisions are + // taken through this additional cluster information. + clusterAdditionalInfo = make(map[string]*localClusterState) inValidCommmand = "no valid command found" ) @@ -85,9 +83,23 @@ func getVolumeRootPathCeph(ctx context.Context, volOptions *volumeOptions, cr *u return strings.TrimSuffix(string(stdout), "\n"), nil } +type localClusterState struct { + // set true if cluster supports resize functionality. + resizeSupported bool + // set true once a subvolumegroup is created + // for corresponding cluster. + subVolumeGroupCreated bool +} + func createVolume(ctx context.Context, volOptions *volumeOptions, cr *util.Credentials, volID volumeID, bytesQuota int64) error { - //TODO: When we support multiple fs, need to hande subvolume group create for all fs's - if !cephfsInit { + // verify if corresponding ClusterID key is present in the map, + // and if not, initialize with default values(false). + if _, keyPresent := clusterAdditionalInfo[volOptions.ClusterID]; !keyPresent { + clusterAdditionalInfo[volOptions.ClusterID] = &localClusterState{} + } + + // create subvolumegroup if not already created for the cluster. + if !clusterAdditionalInfo[volOptions.ClusterID].subVolumeGroupCreated { err := execCommandErr( ctx, "ceph", @@ -105,7 +117,7 @@ func createVolume(ctx context.Context, volOptions *volumeOptions, cr *util.Crede return err } klog.V(4).Infof(util.Log(ctx, "cephfs: created subvolume group %s"), volOptions.SubvolumeGroup) - cephfsInit = true + clusterAdditionalInfo[volOptions.ClusterID].subVolumeGroupCreated = true } args := []string{ @@ -144,7 +156,17 @@ func createVolume(ctx context.Context, volOptions *volumeOptions, cr *util.Crede // subvolume. If the command is not available as a fallback it will use // CreateVolume to resize the subvolume. func resizeVolume(ctx context.Context, volOptions *volumeOptions, cr *util.Credentials, volID volumeID, bytesQuota int64) error { - if supported, ok := resizeSupportedList[volOptions.ClusterID]; supported || !ok { + // keyPresent checks whether corresponding clusterID key is present in clusterAdditionalInfo + var keyPresent bool + // verify if corresponding ClusterID key is present in the map, + // and if not, initialize with default values(false). + if _, keyPresent = clusterAdditionalInfo[volOptions.ClusterID]; !keyPresent { + clusterAdditionalInfo[volOptions.ClusterID] = &localClusterState{} + } + + // resize subvolume when either it's supported, or when corresponding + // clusterID key was not present. + if clusterAdditionalInfo[volOptions.ClusterID].resizeSupported || !keyPresent { args := []string{ "fs", "subvolume", @@ -166,7 +188,7 @@ func resizeVolume(ctx context.Context, volOptions *volumeOptions, cr *util.Crede args[:]...) if err == nil { - resizeSupportedList[volOptions.ClusterID] = true + clusterAdditionalInfo[volOptions.ClusterID].resizeSupported = true return nil } // Incase the error is other than invalid command return error to the caller. @@ -175,7 +197,7 @@ func resizeVolume(ctx context.Context, volOptions *volumeOptions, cr *util.Crede return err } } - resizeSupportedList[volOptions.ClusterID] = false + clusterAdditionalInfo[volOptions.ClusterID].resizeSupported = false return createVolume(ctx, volOptions, cr, volID, bytesQuota) }