/* Copyright 2021 The Ceph-CSI Authors. Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. */ package e2e import ( "context" "crypto/md5" //nolint:gosec // hash generation "encoding/base64" "encoding/json" "errors" "fmt" "os" "regexp" "strconv" "strings" "sync" "time" snapapi "github.com/kubernetes-csi/external-snapshotter/client/v8/apis/volumesnapshot/v1" appsv1 "k8s.io/api/apps/v1" batch "k8s.io/api/batch/v1" v1 "k8s.io/api/core/v1" scv1 "k8s.io/api/storage/v1" "k8s.io/apimachinery/pkg/api/resource" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/util/wait" utilyaml "k8s.io/apimachinery/pkg/util/yaml" "k8s.io/client-go/kubernetes" "k8s.io/kubernetes/test/e2e/framework" e2ekubectl "k8s.io/kubernetes/test/e2e/framework/kubectl" ) /* #nosec:G101, values not credentials, just a reference to the location.*/ const ( defaultNs = "default" defaultSCName = "" rbdType = "rbd" cephfsType = "cephfs" volumesType = "volumes" snapsType = "snaps" groupSnapsType = "groupsnaps" rookToolBoxPodLabel = "app=rook-ceph-tools" rbdMountOptions = "mountOptions" retainPolicy = v1.PersistentVolumeReclaimRetain // deletePolicy is the default policy in E2E. deletePolicy = v1.PersistentVolumeReclaimDelete // Default key and label for Listoptions. appKey = "app" appLabel = "write-data-in-pod" appCloneLabel = "app-clone" noError = "" // labels/selector used to list/delete rbd pods. rbdPodLabels = "app in (ceph-csi-rbd, csi-rbdplugin, csi-rbdplugin-provisioner)" exitOneErr = "command terminated with exit code 1" // cluster Name, set by user. clusterNameKey = "csi.ceph.com/cluster/name" defaultClusterName = "k8s-cluster-1" ) var ( // cli flags. deployTimeout int deployCephFS bool deployRBD bool deployNFS bool testCephFS bool testCephFSFscrypt bool testRBD bool testRBDFSCrypt bool testNBD bool testNFS bool helmTest bool upgradeTesting bool upgradeVersion string cephCSINamespace string rookNamespace string radosNamespace string poll = 2 * time.Second isOpenShift bool clusterID string nfsDriverName string ) type cephfsFilesystem struct { Name string `json:"name"` MetadataPool string `json:"metadata_pool"` } // listCephFSFileSystems list CephFS filesystems in json format. func listCephFSFileSystems(f *framework.Framework) ([]cephfsFilesystem, error) { var fsList []cephfsFilesystem stdout, stdErr, err := execCommandInToolBoxPod( f, "ceph fs ls --format=json", rookNamespace) if err != nil { return fsList, err } if stdErr != "" { return fsList, fmt.Errorf("error listing fs %v", stdErr) } err = json.Unmarshal([]byte(stdout), &fsList) if err != nil { return fsList, err } return fsList, nil } // getCephFSMetadataPoolName get CephFS pool name from filesystem name. func getCephFSMetadataPoolName(f *framework.Framework, filesystem string) (string, error) { fsList, err := listCephFSFileSystems(f) if err != nil { return "", fmt.Errorf("list CephFS filesystem failed: %w", err) } for _, v := range fsList { if v.Name != filesystem { continue } return v.MetadataPool, nil } return "", fmt.Errorf("metadata pool name not found for filesystem: %s", filesystem) } func compareStdoutWithCount(stdOut string, count int) error { stdOut = strings.TrimSuffix(stdOut, "\n") res, err := strconv.Atoi(stdOut) if err != nil { return fmt.Errorf("failed to convert string to int: %v", stdOut) } if res != count { return fmt.Errorf("expected omap object count %d, got %d", count, res) } return nil } // validateOmapCount validates no of OMAP entries on the given pool. // Works with Cephfs and RBD drivers and mode can be snapsType or volumesType. func validateOmapCount(f *framework.Framework, count int, driver, pool, mode string) { type radosListCommand struct { volumeMode string driverType string radosLsCmd, radosLsCmdFilter string radosLsKeysCmd, radosLsKeysCmdFilter string } radosListCommands := []radosListCommand{ { volumeMode: volumesType, driverType: cephfsType, radosLsCmd: "rados ls " + cephfsOptions(pool), radosLsCmdFilter: fmt.Sprintf("rados ls %s | grep -v default | grep -v csi.volume.group. | grep -c ^csi.volume.", cephfsOptions(pool)), radosLsKeysCmd: "rados listomapkeys csi.volumes.default " + cephfsOptions(pool), radosLsKeysCmdFilter: fmt.Sprintf("rados listomapkeys csi.volumes.default %s | wc -l", cephfsOptions(pool)), }, { volumeMode: volumesType, driverType: rbdType, radosLsCmd: "rados ls " + rbdOptions(pool), radosLsCmdFilter: fmt.Sprintf( "rados ls %s | grep -v default | grep -v csi.volume.group. | grep -c ^csi.volume.", rbdOptions(pool)), radosLsKeysCmd: "rados listomapkeys csi.volumes.default " + rbdOptions(pool), radosLsKeysCmdFilter: fmt.Sprintf("rados listomapkeys csi.volumes.default %s | wc -l", rbdOptions(pool)), }, { volumeMode: snapsType, driverType: cephfsType, radosLsCmd: "rados ls " + cephfsOptions(pool), radosLsCmdFilter: fmt.Sprintf("rados ls %s | grep -v default | grep -c ^csi.snap.", cephfsOptions(pool)), radosLsKeysCmd: "rados listomapkeys csi.snaps.default " + cephfsOptions(pool), radosLsKeysCmdFilter: fmt.Sprintf("rados listomapkeys csi.snaps.default %s | wc -l", cephfsOptions(pool)), }, { volumeMode: snapsType, driverType: rbdType, radosLsCmd: "rados ls " + rbdOptions(pool), radosLsCmdFilter: fmt.Sprintf("rados ls %s | grep -v default | grep -c ^csi.snap.", rbdOptions(pool)), radosLsKeysCmd: "rados listomapkeys csi.snaps.default " + rbdOptions(pool), radosLsKeysCmdFilter: fmt.Sprintf("rados listomapkeys csi.snaps.default %s | wc -l", rbdOptions(pool)), }, { volumeMode: groupSnapsType, driverType: cephfsType, radosLsCmd: "rados ls" + cephfsOptions(pool), radosLsCmdFilter: fmt.Sprintf("rados ls %s | grep -v default | grep -c ^csi.volume.group.", cephfsOptions(pool)), radosLsKeysCmd: "rados listomapkeys csi.groups.default " + cephfsOptions(pool), radosLsKeysCmdFilter: fmt.Sprintf("rados listomapkeys csi.groups.default %s | wc -l", cephfsOptions(pool)), }, } for _, cmds := range radosListCommands { if !strings.EqualFold(cmds.volumeMode, mode) || !strings.EqualFold(cmds.driverType, driver) { continue } filterCmds := []string{cmds.radosLsCmdFilter, cmds.radosLsKeysCmdFilter} filterLessCmds := []string{cmds.radosLsCmd, cmds.radosLsKeysCmd} for i, cmd := range filterCmds { stdOut, stdErr, err := execCommandInToolBoxPod(f, cmd, rookNamespace) if err != nil { if !strings.Contains(err.Error(), exitOneErr) { framework.Failf("failed to execute rados command '%s' : err=%v stdErr=%s", cmd, err, stdErr) } } if stdErr != "" { framework.Failf("failed to execute rados command '%s' : stdErr=%s", cmd, stdErr) } err = compareStdoutWithCount(stdOut, count) if err == nil { continue } saveErr := fmt.Errorf("failed to validate omap count for %s: %w", cmd, err) if strings.Contains(err.Error(), "expected omap object count") { stdOut, stdErr, err = execCommandInToolBoxPod(f, filterLessCmds[i], rookNamespace) if err == nil { framework.Logf("additional debug info: rados ls command output: %s, stdErr: %s", stdOut, stdErr) } } framework.Failf("%v", saveErr) } } } func getMons(ns string, c kubernetes.Interface) ([]string, error) { opt := metav1.ListOptions{ LabelSelector: "app=rook-ceph-mon", } services := make([]string, 0) var svcList *v1.ServiceList t := time.Duration(deployTimeout) * time.Minute err := wait.PollUntilContextTimeout(context.TODO(), poll, t, true, func(ctx context.Context) (bool, error) { var svcErr error svcList, svcErr = c.CoreV1().Services(ns).List(ctx, opt) if svcErr != nil { if isRetryableAPIError(svcErr) { return false, nil } return false, fmt.Errorf("failed to list Services in namespace %q: %w", ns, svcErr) } return true, nil }) if err != nil { return services, fmt.Errorf("could not get Services: %w", err) } for i := range svcList.Items { s := fmt.Sprintf( "%s.%s.svc.cluster.local:%d", svcList.Items[i].Name, svcList.Items[i].Namespace, svcList.Items[i].Spec.Ports[0].Port) services = append(services, s) } return services, nil } func getMonsHash(mons string) string { return fmt.Sprintf("%x", md5.Sum([]byte(mons))) //nolint:gosec // hash generation } func getClusterID(f *framework.Framework) (string, error) { if clusterID != "" { return clusterID, nil } fsID, stdErr, err := execCommandInToolBoxPod(f, "ceph fsid", rookNamespace) if err != nil { return "", fmt.Errorf("failed getting clusterID through toolbox: %w", err) } if stdErr != "" { return "", fmt.Errorf("error getting fsid: %s", stdErr) } // remove new line present in fsID return strings.Trim(fsID, "\n"), nil } func getStorageClass(path string) (scv1.StorageClass, error) { sc := scv1.StorageClass{} err := unmarshal(path, &sc) return sc, err } func getSecret(path string) (v1.Secret, error) { sc := v1.Secret{} err := unmarshal(path, &sc) // discard corruptInputError if err != nil { var b64cie base64.CorruptInputError if !errors.As(err, &b64cie) { return sc, err } } return sc, nil } func deleteResource(scPath string) error { data, err := replaceNamespaceInTemplate(scPath) if err != nil { framework.Logf("failed to read content from %s %v", scPath, err) } err = retryKubectlInput(cephCSINamespace, kubectlDelete, data, deployTimeout, "--ignore-not-found=true") if err != nil { framework.Logf("failed to delete %s %v", scPath, err) } return err } func unmarshal(fileName string, obj interface{}) error { f, err := os.ReadFile(fileName) if err != nil { return err } data, err := utilyaml.ToJSON(f) if err != nil { return err } err = json.Unmarshal(data, obj) return err } // createPVCAndApp creates pvc and pod // if name is not empty same will be set as pvc and app name. func createPVCAndApp( name string, f *framework.Framework, pvc *v1.PersistentVolumeClaim, app *v1.Pod, pvcTimeout int, ) error { if name != "" { pvc.Name = name app.Name = name app.Spec.Volumes[0].PersistentVolumeClaim.ClaimName = name } err := createPVCAndvalidatePV(f.ClientSet, pvc, pvcTimeout) if err != nil { return err } err = createApp(f.ClientSet, app, deployTimeout) return err } // createPVCAndDeploymentApp creates pvc and deployment. func createPVCAndDeploymentApp( f *framework.Framework, pvc *v1.PersistentVolumeClaim, app *appsv1.Deployment, pvcTimeout int, ) error { err := createPVCAndvalidatePV(f.ClientSet, pvc, pvcTimeout) if err != nil { return err } err = createDeploymentApp(f.ClientSet, app, deployTimeout) return err } // validatePVCAndDeploymentAppBinding creates PVC and Deployment, and waits until // all its replicas are Running. Use `replicas` to override default number of replicas // defined in `deploymentPath` Deployment manifest. func validatePVCAndDeploymentAppBinding( f *framework.Framework, pvcPath string, deploymentPath string, namespace string, replicas *int32, pvcTimeout int, ) (*v1.PersistentVolumeClaim, *appsv1.Deployment, error) { pvc, err := loadPVC(pvcPath) if err != nil { return nil, nil, fmt.Errorf("failed to load PVC: %w", err) } pvc.Namespace = namespace depl, err := loadAppDeployment(deploymentPath) if err != nil { return nil, nil, fmt.Errorf("failed to load Deployment: %w", err) } depl.Namespace = f.UniqueName if replicas != nil { depl.Spec.Replicas = replicas } err = createPVCAndDeploymentApp(f, pvc, depl, pvcTimeout) if err != nil { return nil, nil, err } err = waitForDeploymentComplete(f.ClientSet, depl.Name, depl.Namespace, deployTimeout) if err != nil { return nil, nil, err } return pvc, depl, nil } // DeletePVCAndDeploymentApp deletes pvc and deployment. func deletePVCAndDeploymentApp( f *framework.Framework, pvc *v1.PersistentVolumeClaim, app *appsv1.Deployment, ) error { err := deleteDeploymentApp(f.ClientSet, app.Name, app.Namespace, deployTimeout) if err != nil { return err } err = deletePVCAndValidatePV(f.ClientSet, pvc, deployTimeout) return err } // deletePVCAndApp delete pvc and pod // if name is not empty same will be set as pvc and app name. func deletePVCAndApp(name string, f *framework.Framework, pvc *v1.PersistentVolumeClaim, app *v1.Pod) error { if name != "" { pvc.Name = name app.Name = name app.Spec.Volumes[0].PersistentVolumeClaim.ClaimName = name } err := deletePod(app.Name, app.Namespace, f.ClientSet, deployTimeout) if err != nil { return err } err = deletePVCAndValidatePV(f.ClientSet, pvc, deployTimeout) return err } func createPVCAndAppBinding( pvcPath, appPath string, f *framework.Framework, pvcTimeout int, ) (*v1.PersistentVolumeClaim, *v1.Pod, error) { pvc, err := loadPVC(pvcPath) if err != nil { return nil, nil, err } pvc.Namespace = f.UniqueName app, err := loadApp(appPath) if err != nil { return nil, nil, err } app.Namespace = f.UniqueName err = createPVCAndApp("", f, pvc, app, pvcTimeout) if err != nil { return nil, nil, err } return pvc, app, nil } func validatePVCAndAppBinding(pvcPath, appPath string, f *framework.Framework) error { pvc, app, err := createPVCAndAppBinding(pvcPath, appPath, f, deployTimeout) if err != nil { return err } err = deletePVCAndApp("", f, pvc, app) return err } func getMountType(selector, mountPath string, f *framework.Framework) (string, error) { opt := metav1.ListOptions{ LabelSelector: selector, } cmd := fmt.Sprintf("lsblk -o TYPE,MOUNTPOINT | grep '%s' | awk '{print $1}'", mountPath) stdOut, stdErr, err := execCommandInContainer(f, cmd, cephCSINamespace, "csi-rbdplugin", &opt) if err != nil { return "", err } if stdErr != "" { return strings.TrimSpace(stdOut), fmt.Errorf("%s", stdErr) } return strings.TrimSpace(stdOut), nil } func validateNormalUserPVCAccess(pvcPath string, f *framework.Framework) error { writeTest := func(ns string, opts *metav1.ListOptions) error { _, stdErr, err := execCommandInPod(f, "echo testing > /target/testing", ns, opts) if err != nil { return fmt.Errorf("failed to exec command in pod: %w", err) } if stdErr != "" { return fmt.Errorf("failed to touch a file as non-root user %v", stdErr) } return nil } return validateNormalUserPVCAccessFunc(pvcPath, f, writeTest) } func validateInodeCount(pvcPath string, f *framework.Framework, inodes int) error { countInodes := func(ns string, opts *metav1.ListOptions) error { stdOut, stdErr, err := execCommandInPod(f, "df --output=itotal /target | tail -n1", ns, opts) if err != nil { return fmt.Errorf("failed to exec command in pod: %w", err) } if stdErr != "" { return fmt.Errorf("failed to list inodes in pod: %v", stdErr) } itotal, err := strconv.Atoi(strings.TrimSpace(stdOut)) if err != nil { return fmt.Errorf("failed to parse itotal %q to int: %w", strings.TrimSpace(stdOut), err) } if inodes != itotal { return fmt.Errorf("expected inodes (%d) do not match itotal on volume (%d)", inodes, itotal) } return nil } return validateNormalUserPVCAccessFunc(pvcPath, f, countInodes) } func validateNormalUserPVCAccessFunc( pvcPath string, f *framework.Framework, validate func(ns string, opts *metav1.ListOptions) error, ) error { pvc, err := loadPVC(pvcPath) if err != nil { return err } pvc.Namespace = f.UniqueName pvc.Name = f.UniqueName err = createPVCAndvalidatePV(f.ClientSet, pvc, deployTimeout) if err != nil { return err } var user int64 = 2000 app := &v1.Pod{ TypeMeta: metav1.TypeMeta{ Kind: "Pod", APIVersion: "v1", }, ObjectMeta: metav1.ObjectMeta{ Name: "pod-run-as-non-root", Namespace: f.UniqueName, Labels: map[string]string{ "app": "pod-run-as-non-root", }, }, Spec: v1.PodSpec{ SecurityContext: &v1.PodSecurityContext{FSGroup: &user}, Containers: []v1.Container{ { Name: "write-pod", Image: "quay.io/centos/centos:latest", Command: []string{"/bin/sleep", "999999"}, SecurityContext: &v1.SecurityContext{ RunAsUser: &user, }, VolumeMounts: []v1.VolumeMount{ { MountPath: "/target", Name: "target", }, }, }, }, Volumes: []v1.Volume{ { Name: "target", VolumeSource: v1.VolumeSource{ PersistentVolumeClaim: &v1.PersistentVolumeClaimVolumeSource{ ClaimName: pvc.Name, ReadOnly: false, }, }, }, }, }, } err = createApp(f.ClientSet, app, deployTimeout) if err != nil { return err } opt := metav1.ListOptions{ LabelSelector: "app=pod-run-as-non-root", } err = validate(app.Namespace, &opt) if err != nil { return fmt.Errorf("failed to run validation function: %w", err) } // metrics for BlockMode was added in Kubernetes 1.22 isBlockMode := false if pvc.Spec.VolumeMode != nil { isBlockMode = (*pvc.Spec.VolumeMode == v1.PersistentVolumeBlock) } if !isBlockMode && !isOpenShift { err = getMetricsForPVC(f, pvc, deployTimeout) if err != nil { return err } } err = deletePod(app.Name, app.Namespace, f.ClientSet, deployTimeout) if err != nil { return err } err = deletePVCAndValidatePV(f.ClientSet, pvc, deployTimeout) return err } // writeDataInPod fill zero content to a file in the provided POD volume. func writeDataInPod(app *v1.Pod, opt *metav1.ListOptions, f *framework.Framework) error { app.Namespace = f.UniqueName err := createApp(f.ClientSet, app, deployTimeout) if err != nil { return err } // write data to PVC. The idea here is to fill some content in the file // instead of filling and reverifying the md5sum/data integrity filePath := app.Spec.Containers[0].VolumeMounts[0].MountPath + "/test" // While writing more data we are encountering issues in E2E timeout, so keeping it low for now _, writeErr, err := execCommandInPod( f, fmt.Sprintf("dd if=/dev/zero of=%s bs=1M count=10 status=none", filePath), app.Namespace, opt) if err != nil { return err } if writeErr != "" { err = fmt.Errorf("failed to write data %v", writeErr) } return err } func checkDataPersist(pvcPath, appPath string, f *framework.Framework) error { data := "checking data persist" pvc, err := loadPVC(pvcPath) if err != nil { return err } pvc.Namespace = f.UniqueName app, err := loadApp(appPath) if err != nil { return err } app.Labels = map[string]string{"app": "validate-data"} app.Namespace = f.UniqueName err = createPVCAndApp("", f, pvc, app, deployTimeout) if err != nil { return err } opt := metav1.ListOptions{ LabelSelector: "app=validate-data", } // write data to PVC filePath := app.Spec.Containers[0].VolumeMounts[0].MountPath + "/test" _, stdErr, err := execCommandInPod(f, fmt.Sprintf("echo %s > %s", data, filePath), app.Namespace, &opt) if err != nil { return fmt.Errorf("failed to exec command in pod: %w", err) } if stdErr != "" { return fmt.Errorf("failed to write data to a file %v", stdErr) } // delete app err = deletePod(app.Name, app.Namespace, f.ClientSet, deployTimeout) if err != nil { return err } // recreate app and check data persist err = createApp(f.ClientSet, app, deployTimeout) if err != nil { return err } persistData, stdErr, err := execCommandInPod(f, "cat "+filePath, app.Namespace, &opt) if err != nil { return err } if stdErr != "" { return fmt.Errorf("failed to get file content %v", stdErr) } if !strings.Contains(persistData, data) { return fmt.Errorf("data not persistent expected data %s received data %s ", data, persistData) } err = deletePVCAndApp("", f, pvc, app) return err } func pvcDeleteWhenPoolNotFound(pvcPath string, cephFS bool, f *framework.Framework) error { pvc, err := loadPVC(pvcPath) if err != nil { return err } pvc.Namespace = f.UniqueName err = createPVCAndvalidatePV(f.ClientSet, pvc, deployTimeout) if err != nil { return err } if cephFS { err = deleteBackingCephFSVolume(f, pvc) if err != nil { return err } // delete cephFS filesystem err = deletePool(fileSystemName, cephFS, f) if err != nil { return err } } else { err = deleteBackingRBDImage(f, pvc) if err != nil { return err } // delete rbd pool err = deletePool(defaultRBDPool, cephFS, f) if err != nil { return err } } err = deletePVCAndValidatePV(f.ClientSet, pvc, deployTimeout) return err } func checkMountOptions(pvcPath, appPath string, f *framework.Framework, mountFlags []string) error { pvc, err := loadPVC(pvcPath) if err != nil { return err } pvc.Namespace = f.UniqueName app, err := loadApp(appPath) if err != nil { return err } app.Labels = map[string]string{"app": "validate-mount-opt"} app.Namespace = f.UniqueName err = createPVCAndApp("", f, pvc, app, deployTimeout) if err != nil { return err } opt := metav1.ListOptions{ LabelSelector: "app=validate-mount-opt", } cmd := "mount |grep " + app.Spec.Containers[0].VolumeMounts[0].MountPath data, stdErr, err := execCommandInPod(f, cmd, app.Namespace, &opt) if err != nil { return err } if stdErr != "" { return fmt.Errorf("failed to get mount point %v", stdErr) } for _, f := range mountFlags { if !strings.Contains(data, f) { return fmt.Errorf("mount option %s not found in %s", f, data) } } err = deletePVCAndApp("", f, pvc, app) return err } func addTopologyDomainsToDSYaml(template, labels string) string { return strings.ReplaceAll(template, "# - \"--domainlabels=failure-domain/region,failure-domain/zone\"", "- \"--domainlabels="+labels+"\"") } func oneReplicaDeployYaml(template string) string { re := regexp.MustCompile(`(\s+replicas:) \d+`) return re.ReplaceAllString(template, `$1 1`) } // replaceLogLevelInTemplate replaces the log level in the template file to 5. func replaceLogLevelInTemplate(template string) string { // Regular expression to find --v= arguments re := regexp.MustCompile(`--v=\d+`) // template can contain different log levels, replace it with --v=5 return re.ReplaceAllString(template, "--v=5") } func enableReadAffinityInTemplate(template string) string { return strings.ReplaceAll(template, "# - \"--enable-read-affinity=true\"", "- \"--enable-read-affinity=true\"") } func addCrsuhLocationLabels(template, labels string) string { return strings.ReplaceAll(template, "# - \"--crush-location-labels=topology.io/zone,topology.io/rack\"", "- \"--crush-location-labels="+labels+"\"") } func writeDataAndCalChecksum(app *v1.Pod, opt *metav1.ListOptions, f *framework.Framework) (string, error) { filePath := app.Spec.Containers[0].VolumeMounts[0].MountPath + "/test" // write data in PVC err := writeDataInPod(app, opt, f) if err != nil { framework.Logf("failed to write data in the pod: %v", err) return "", err } checkSum, err := calculateSHA512sum(f, app, filePath, opt) if err != nil { framework.Logf("failed to calculate checksum: %v", err) return checkSum, err } err = deletePod(app.Name, app.Namespace, f.ClientSet, deployTimeout) if err != nil { framework.Failf("failed to delete pod: %v", err) } return checkSum, nil } //nolint:gocyclo,gocognit,nestif,cyclop // reduce complexity func validatePVCClone( totalCount int, sourcePvcPath, sourceAppPath, clonePvcPath, clonePvcAppPath, restoreSCName, dataPool string, kms kmsConfig, validatePVC validateFunc, f *framework.Framework, ) { var wg sync.WaitGroup wgErrs := make([]error, totalCount) chErrs := make([]error, totalCount) pvc, err := loadPVC(sourcePvcPath) if err != nil { framework.Failf("failed to load PVC: %v", err) } label := make(map[string]string) pvc.Namespace = f.UniqueName err = createPVCAndvalidatePV(f.ClientSet, pvc, deployTimeout) if err != nil { framework.Failf("failed to create PVC: %v", err) } app, err := loadApp(sourceAppPath) if err != nil { framework.Failf("failed to load app: %v", err) } label[appKey] = appLabel app.Namespace = f.UniqueName app.Spec.Volumes[0].PersistentVolumeClaim.ClaimName = pvc.Name app.Labels = label opt := metav1.ListOptions{ LabelSelector: fmt.Sprintf("%s=%s", appKey, label[appKey]), } checkSum := "" pvc, err = getPersistentVolumeClaim(f.ClientSet, pvc.Namespace, pvc.Name) if err != nil { framework.Failf("failed to get pvc %v", err) } if *pvc.Spec.VolumeMode == v1.PersistentVolumeFilesystem { checkSum, err = writeDataAndCalChecksum(app, &opt, f) if err != nil { framework.Failf("failed to calculate checksum: %v", err) } } // validate created backend rbd images validateRBDImageCount(f, 1, defaultRBDPool) pvcClone, err := loadPVC(clonePvcPath) if err != nil { framework.Failf("failed to load PVC: %v", err) } pvcClone.Spec.DataSource.Name = pvc.Name pvcClone.Namespace = f.UniqueName if restoreSCName != "" { pvcClone.Spec.StorageClassName = &restoreSCName } appClone, err := loadApp(clonePvcAppPath) if err != nil { framework.Failf("failed to load application: %v", err) } appClone.Namespace = f.UniqueName wg.Add(totalCount) // create clone and bind it to an app for i := range totalCount { go func(n int, p v1.PersistentVolumeClaim, a v1.Pod) { name := fmt.Sprintf("%s%d", f.UniqueName, n) label := make(map[string]string) label[appKey] = name a.Labels = label opt := metav1.ListOptions{ LabelSelector: fmt.Sprintf("%s=%s", appKey, label[appKey]), } wgErrs[n] = createPVCAndApp(name, f, &p, &a, deployTimeout) if wgErrs[n] == nil && dataPool != noDataPool { wgErrs[n] = checkPVCDataPoolForImageInPool(f, &p, defaultRBDPool, dataPool) } if wgErrs[n] == nil && kms != noKMS { if kms.canGetPassphrase() { imageData, sErr := getImageInfoFromPVC(p.Namespace, name, f) if sErr != nil { wgErrs[n] = fmt.Errorf( "failed to get image info for %s namespace=%s volumehandle=%s error=%w", name, p.Namespace, imageData.csiVolumeHandle, sErr) } else { // check new passphrase created stdOut, stdErr := kms.getPassphrase(f, imageData.csiVolumeHandle) if stdOut != "" { framework.Logf("successfully read the passphrase from vault: %s", stdOut) } if stdErr != "" { wgErrs[n] = fmt.Errorf("failed to read passphrase from vault: %s", stdErr) } } } } if *pvc.Spec.VolumeMode == v1.PersistentVolumeFilesystem && wgErrs[n] == nil { filePath := a.Spec.Containers[0].VolumeMounts[0].MountPath + "/test" var checkSumClone string framework.Logf("Calculating checksum clone for filepath %s", filePath) checkSumClone, chErrs[n] = calculateSHA512sum(f, &a, filePath, &opt) framework.Logf("checksum for clone is %s", checkSumClone) if chErrs[n] != nil { framework.Logf("Failed calculating checksum clone %s", chErrs[n]) } if checkSumClone != checkSum { framework.Logf("checksum didn't match. checksum=%s and checksumclone=%s", checkSum, checkSumClone) } } if wgErrs[n] == nil && validatePVC != nil && kms != noKMS { wgErrs[n] = validatePVC(f, &p, &a) } wg.Done() }(i, *pvcClone, *appClone) } wg.Wait() failed := 0 for i, err := range wgErrs { if err != nil { // not using Failf() as it aborts the test and does not log other errors framework.Logf("failed to create PVC (%s%d): %v", f.UniqueName, i, err) failed++ } } if failed != 0 { framework.Failf("creating PVCs failed, %d errors were logged", failed) } for i, err := range chErrs { if err != nil { // not using Failf() as it aborts the test and does not log other errors framework.Logf("failed to calculate checksum (%s%d): %v", f.UniqueName, i, err) failed++ } } if failed != 0 { framework.Failf("calculating checksum failed, %d errors were logged", failed) } // total images in cluster is 1 parent rbd image+ total // temporary clone+ total clones totalCloneCount := totalCount + totalCount + 1 validateRBDImageCount(f, totalCloneCount, defaultRBDPool) // delete parent pvc err = deletePVCAndValidatePV(f.ClientSet, pvc, deployTimeout) if err != nil { framework.Failf("failed to delete PVC: %v", err) } totalCloneCount = totalCount + totalCount validateRBDImageCount(f, totalCloneCount, defaultRBDPool) wg.Add(totalCount) // delete clone and app for i := range totalCount { go func(n int, p v1.PersistentVolumeClaim, a v1.Pod) { name := fmt.Sprintf("%s%d", f.UniqueName, n) p.Spec.DataSource.Name = name var imageData imageInfoFromPVC var sErr error if kms != noKMS { if kms.canGetPassphrase() { imageData, sErr = getImageInfoFromPVC(p.Namespace, name, f) if sErr != nil { wgErrs[n] = fmt.Errorf( "failed to get image info for %s namespace=%s volumehandle=%s error=%w", name, p.Namespace, imageData.csiVolumeHandle, sErr) } } } if wgErrs[n] == nil { wgErrs[n] = deletePVCAndApp(name, f, &p, &a) if wgErrs[n] == nil && kms != noKMS { if kms.canGetPassphrase() { // check passphrase deleted stdOut, _ := kms.getPassphrase(f, imageData.csiVolumeHandle) if stdOut != "" { wgErrs[n] = fmt.Errorf("passphrase found in vault while should be deleted: %s", stdOut) } } if wgErrs[n] == nil && kms.canVerifyKeyDestroyed() { destroyed, msg := kms.verifyKeyDestroyed(f, imageData.csiVolumeHandle) if !destroyed { wgErrs[n] = fmt.Errorf("passphrased was not destroyed: %s", msg) } } } } wg.Done() }(i, *pvcClone, *appClone) } wg.Wait() for i, err := range wgErrs { if err != nil { // not using Failf() as it aborts the test and does not log other errors framework.Logf("failed to delete PVC and application (%s%d): %v", f.UniqueName, i, err) failed++ } } if failed != 0 { framework.Failf("deleting PVCs and applications failed, %d errors were logged", failed) } validateRBDImageCount(f, 0, defaultRBDPool) } //nolint:gocyclo,gocognit,nestif,cyclop // reduce complexity func validatePVCSnapshot( totalCount int, pvcPath, appPath, snapshotPath, pvcClonePath, appClonePath string, kms, restoreKMS kmsConfig, restoreSCName, dataPool string, f *framework.Framework, isEncryptedPVC validateFunc, ) { var wg sync.WaitGroup wgErrs := make([]error, totalCount) chErrs := make([]error, totalCount) err := createRBDSnapshotClass(f) if err != nil { framework.Failf("failed to create storageclass: %v", err) } defer func() { err = deleteRBDSnapshotClass() if err != nil { framework.Failf("failed to delete VolumeSnapshotClass: %v", err) } }() pvc, err := loadPVC(pvcPath) if err != nil { framework.Failf("failed to load PVC: %v", err) } label := make(map[string]string) pvc.Namespace = f.UniqueName err = createPVCAndvalidatePV(f.ClientSet, pvc, deployTimeout) if err != nil { framework.Failf("failed to create PVC: %v", err) } app, err := loadApp(appPath) if err != nil { framework.Failf("failed to load app: %v", err) } // write data in PVC label[appKey] = appLabel app.Namespace = f.UniqueName app.Labels = label opt := metav1.ListOptions{ LabelSelector: fmt.Sprintf("%s=%s", appKey, label[appKey]), } app.Spec.Volumes[0].PersistentVolumeClaim.ClaimName = pvc.Name checkSum, err := writeDataAndCalChecksum(app, &opt, f) if err != nil { framework.Failf("failed to calculate checksum: %v", err) } validateRBDImageCount(f, 1, defaultRBDPool) snap := getSnapshot(snapshotPath) snap.Namespace = f.UniqueName snap.Spec.Source.PersistentVolumeClaimName = &pvc.Name wg.Add(totalCount) // create snapshot for i := range totalCount { go func(n int, s snapapi.VolumeSnapshot) { s.Name = fmt.Sprintf("%s%d", f.UniqueName, n) wgErrs[n] = createSnapshot(&s, deployTimeout) if wgErrs[n] == nil && kms != noKMS { if kms.canGetPassphrase() { content, sErr := getVolumeSnapshotContent(s.Namespace, s.Name) if sErr != nil { wgErrs[n] = fmt.Errorf( "failed to get snapshotcontent for %s in namespace %s: %w", s.Name, s.Namespace, sErr) } else { // check new passphrase created _, stdErr := kms.getPassphrase(f, *content.Status.SnapshotHandle) if stdErr != "" { wgErrs[n] = fmt.Errorf("failed to read passphrase from vault: %s", stdErr) } } } } wg.Done() }(i, snap) } wg.Wait() failed := 0 for i, err := range wgErrs { if err != nil { // not using Failf() as it aborts the test and does not log other errors framework.Logf("failed to create snapshot (%s%d): %v", f.UniqueName, i, err) failed++ } } if failed != 0 { framework.Failf("creating snapshots failed, %d errors were logged", failed) } // total images in cluster is 1 parent rbd image+ total snaps validateRBDImageCount(f, totalCount+1, defaultRBDPool) pvcClone, err := loadPVC(pvcClonePath) if err != nil { framework.Failf("failed to load PVC: %v", err) } appClone, err := loadApp(appClonePath) if err != nil { framework.Failf("failed to load application: %v", err) } pvcClone.Namespace = f.UniqueName appClone.Namespace = f.UniqueName pvcClone.Spec.DataSource.Name = fmt.Sprintf("%s%d", f.UniqueName, 0) if restoreSCName != "" { pvcClone.Spec.StorageClassName = &restoreSCName } // create multiple PVC from same snapshot wg.Add(totalCount) for i := range totalCount { go func(n int, p v1.PersistentVolumeClaim, a v1.Pod) { name := fmt.Sprintf("%s%d", f.UniqueName, n) label := make(map[string]string) label[appKey] = name a.Labels = label opt := metav1.ListOptions{ LabelSelector: fmt.Sprintf("%s=%s", appKey, label[appKey]), } wgErrs[n] = createPVCAndApp(name, f, &p, &a, deployTimeout) if wgErrs[n] == nil && restoreKMS != noKMS { if restoreKMS.canGetPassphrase() { imageData, sErr := getImageInfoFromPVC(p.Namespace, name, f) if sErr != nil { wgErrs[n] = fmt.Errorf( "failed to get image info for %s namespace=%s volumehandle=%s error=%w", name, p.Namespace, imageData.csiVolumeHandle, sErr) } else { // check new passphrase created _, stdErr := restoreKMS.getPassphrase(f, imageData.csiVolumeHandle) if stdErr != "" { wgErrs[n] = fmt.Errorf("failed to read passphrase from vault: %s", stdErr) } } } wgErrs[n] = isEncryptedPVC(f, &p, &a) } if wgErrs[n] == nil { filePath := a.Spec.Containers[0].VolumeMounts[0].MountPath + "/test" var checkSumClone string framework.Logf("calculating checksum clone for filepath %s", filePath) checkSumClone, chErrs[n] = calculateSHA512sum(f, &a, filePath, &opt) framework.Logf("checksum value for the clone is %s with pod name %s", checkSumClone, name) if chErrs[n] != nil { framework.Logf("failed to calculate checksum for clone: %s", chErrs[n]) } if checkSumClone != checkSum { framework.Logf( "checksum value didn't match. checksum=%s and checksumclone=%s", checkSum, checkSumClone) } } wg.Done() }(i, *pvcClone, *appClone) } wg.Wait() for i, err := range wgErrs { if err != nil { // not using Failf() as it aborts the test and does not log other errors framework.Logf("failed to create PVC and application (%s%d): %v", f.UniqueName, i, err) failed++ } } if failed != 0 { framework.Failf("creating PVCs and applications failed, %d errors were logged", failed) } for i, err := range chErrs { if err != nil { // not using Failf() as it aborts the test and does not log other errors framework.Logf("failed to calculate checksum (%s%d): %v", f.UniqueName, i, err) failed++ } } if failed != 0 { framework.Failf("calculating checksum failed, %d errors were logged", failed) } // total images in cluster is 1 parent rbd image+ total // snaps+ total clones totalCloneCount := totalCount + totalCount + 1 validateRBDImageCount(f, totalCloneCount, defaultRBDPool) wg.Add(totalCount) // delete clone and app for i := range totalCount { go func(n int, p v1.PersistentVolumeClaim, a v1.Pod) { name := fmt.Sprintf("%s%d", f.UniqueName, n) p.Spec.DataSource.Name = name wgErrs[n] = deletePVCAndApp(name, f, &p, &a) wg.Done() }(i, *pvcClone, *appClone) } wg.Wait() for i, err := range wgErrs { if err != nil { // not using Failf() as it aborts the test and does not log other errors framework.Logf("failed to delete PVC and application (%s%d): %v", f.UniqueName, i, err) failed++ } } if failed != 0 { framework.Failf("deleting PVCs and applications failed, %d errors were logged", failed) } // total images in cluster is 1 parent rbd image+ total // snaps validateRBDImageCount(f, totalCount+1, defaultRBDPool) // create clones from different snapshots and bind it to an // app wg.Add(totalCount) for i := range totalCount { go func(n int, p v1.PersistentVolumeClaim, a v1.Pod) { name := fmt.Sprintf("%s%d", f.UniqueName, n) p.Spec.DataSource.Name = name wgErrs[n] = createPVCAndApp(name, f, &p, &a, deployTimeout) if wgErrs[n] == nil && dataPool != noDataPool { wgErrs[n] = checkPVCDataPoolForImageInPool(f, &p, defaultRBDPool, dataPool) } wg.Done() }(i, *pvcClone, *appClone) } wg.Wait() for i, err := range wgErrs { if err != nil { // not using Failf() as it aborts the test and does not log other errors framework.Logf("failed to create PVC and application (%s%d): %v", f.UniqueName, i, err) failed++ } } if failed != 0 { framework.Failf("creating PVCs and applications failed, %d errors were logged", failed) } // total images in cluster is 1 parent rbd image+ total // snaps+ total clones totalCloneCount = totalCount + totalCount + 1 validateRBDImageCount(f, totalCloneCount, defaultRBDPool) // delete parent pvc err = deletePVCAndValidatePV(f.ClientSet, pvc, deployTimeout) if err != nil { framework.Failf("failed to delete PVC: %v", err) } // total images in cluster is total snaps+ total clones totalSnapCount := totalCount + totalCount validateRBDImageCount(f, totalSnapCount, defaultRBDPool) wg.Add(totalCount) // delete snapshot for i := range totalCount { go func(n int, s snapapi.VolumeSnapshot) { s.Name = fmt.Sprintf("%s%d", f.UniqueName, n) content := &snapapi.VolumeSnapshotContent{} var err error if kms != noKMS { if kms.canGetPassphrase() { content, err = getVolumeSnapshotContent(s.Namespace, s.Name) if err != nil { wgErrs[n] = fmt.Errorf( "failed to get snapshotcontent for %s in namespace %s: %w", s.Name, s.Namespace, err) } } } if wgErrs[n] == nil { wgErrs[n] = deleteSnapshot(&s, deployTimeout) if wgErrs[n] == nil && kms != noKMS { if kms.canGetPassphrase() { // check passphrase deleted stdOut, _ := kms.getPassphrase(f, *content.Status.SnapshotHandle) if stdOut != "" { wgErrs[n] = fmt.Errorf("passphrase found in vault while should be deleted: %s", stdOut) } } if wgErrs[n] == nil && kms.canVerifyKeyDestroyed() { destroyed, msg := kms.verifyKeyDestroyed(f, *content.Status.SnapshotHandle) if !destroyed { wgErrs[n] = fmt.Errorf("passphrased was not destroyed: %s", msg) } } } } wg.Done() }(i, snap) } wg.Wait() for i, err := range wgErrs { if err != nil { // not using Failf() as it aborts the test and does not log other errors framework.Logf("failed to delete snapshot (%s%d): %v", f.UniqueName, i, err) failed++ } } if failed != 0 { framework.Failf("deleting snapshots failed, %d errors were logged", failed) } validateRBDImageCount(f, totalCount, defaultRBDPool) wg.Add(totalCount) // delete clone and app for i := range totalCount { go func(n int, p v1.PersistentVolumeClaim, a v1.Pod) { name := fmt.Sprintf("%s%d", f.UniqueName, n) p.Spec.DataSource.Name = name wgErrs[n] = deletePVCAndApp(name, f, &p, &a) wg.Done() }(i, *pvcClone, *appClone) } wg.Wait() for i, err := range wgErrs { if err != nil { // not using Failf() as it aborts the test and does not log other errors framework.Logf("failed to delete PVC and application (%s%d): %v", f.UniqueName, i, err) failed++ } } if failed != 0 { framework.Failf("deleting PVCs and applications failed, %d errors were logged", failed) } // validate created backend rbd images validateRBDImageCount(f, 0, defaultRBDPool) } // validateController simulates the required operations to validate the // controller. // Controller will generates the omap data when the PV is created. // for that we need to do below operations // Create PVC with Retain policy // Store the PVC and PV kubernetes objects so that we can create static // binding between PVC-PV // Delete the omap data created for PVC // Create the static PVC and PV and let controller regenerate the omap // Mount the PVC to application (NodeStage/NodePublish should work) // Resize the PVC // Delete the Application and PVC. func validateController( f *framework.Framework, pvcPath, appPath, scPath string, scOptions, scParams map[string]string, ) error { size := "1Gi" poolName := defaultRBDPool expandSize := "10Gi" var err error // create storageclass with retain err = createRBDStorageClass(f.ClientSet, f, defaultSCName, scOptions, scParams, retainPolicy) if err != nil { return fmt.Errorf("failed to create storageclass: %w", err) } // create pvc pvc, err := loadPVC(pvcPath) if err != nil { return fmt.Errorf("failed to load PVC: %w", err) } resizePvc := pvc.DeepCopy() resizePvc.Namespace = f.UniqueName pvc.Spec.Resources.Requests[v1.ResourceStorage] = resource.MustParse(size) pvc.Namespace = f.UniqueName err = createPVCAndvalidatePV(f.ClientSet, pvc, deployTimeout) if err != nil { return fmt.Errorf("failed to create PVC: %w", err) } // get pvc and pv object pvc, pv, err := getPVCAndPV(f.ClientSet, pvc.Name, pvc.Namespace) if err != nil { return fmt.Errorf("failed to get PVC: %w", err) } // Recreate storageclass with delete policy err = deleteResource(scPath) if err != nil { return fmt.Errorf("failed to delete storageclass: %w", err) } err = createRBDStorageClass(f.ClientSet, f, defaultSCName, scOptions, scParams, deletePolicy) if err != nil { return fmt.Errorf("failed to create storageclass: %w", err) } // delete omap data err = deleteJournalInfoInPool(f, pvc, poolName) if err != nil { return err } // delete pvc and pv err = deletePVCAndPV(f.ClientSet, pvc, pv, deployTimeout) if err != nil { return fmt.Errorf("failed to delete PVC or PV: %w", err) } // create pvc and pv with application pv.Spec.ClaimRef = nil pv.Spec.PersistentVolumeReclaimPolicy = deletePolicy // unset the resource version as should not be set on objects to be created pvc.ResourceVersion = "" pv.ResourceVersion = "" err = createPVCAndPV(f.ClientSet, pvc, pv) if err != nil { framework.Failf("failed to create PVC or PV: %v", err) } // bind PVC to application app, err := loadApp(appPath) if err != nil { return err } app.Labels = map[string]string{"app": "resize-pvc"} app.Namespace = f.UniqueName opt := metav1.ListOptions{ LabelSelector: "app=resize-pvc", } err = createApp(f.ClientSet, app, deployTimeout) if err != nil { return err } if scParams["encrypted"] == strconv.FormatBool(true) { // check encryption err = isBlockEncryptedPVC(f, resizePvc, app) if err != nil { return err } } else { // resize PVC err = expandPVCSize(f.ClientSet, resizePvc, expandSize, deployTimeout) if err != nil { return err } switch *pvc.Spec.VolumeMode { case v1.PersistentVolumeFilesystem: err = checkDirSize(app, f, &opt, expandSize) if err != nil { return err } case v1.PersistentVolumeBlock: err = checkDeviceSize(app, f, &opt, expandSize) if err != nil { return err } } } // delete pvc and storageclass err = deletePVCAndApp("", f, resizePvc, app) if err != nil { return err } return deleteResource(rbdExamplePath + "storageclass.yaml") } // k8sVersionGreaterEquals checks the ServerVersion of the Kubernetes cluster // and compares it to the major.minor version passed. In case the version of // the cluster is equal or higher to major.minor, `true` is returned, `false` // otherwise. // If fetching the ServerVersion of the Kubernetes cluster fails, the calling // test case is marked as `FAILED` and gets aborted. // //nolint:unused // Unused code will be used in future. func k8sVersionGreaterEquals(c kubernetes.Interface, major, minor int) bool { v, err := c.Discovery().ServerVersion() if err != nil { framework.Failf("failed to get server version: %v", err) // Failf() marks the case as failure, and returns from the // Go-routine that runs the case. This function will not have a // return value. } maj := strconv.Itoa(major) min := strconv.Itoa(minor) return (v.Major > maj) || (v.Major == maj && v.Minor >= min) } // waitForJobCompletion polls the status of the given job and waits until the // jobs has succeeded or until the timeout is hit. func waitForJobCompletion(c kubernetes.Interface, ns, job string, timeout int) error { t := time.Duration(timeout) * time.Minute start := time.Now() framework.Logf("waiting for Job %s/%s to be in state %q", ns, job, batch.JobComplete) return wait.PollUntilContextTimeout(context.TODO(), poll, t, true, func(ctx context.Context) (bool, error) { j, err := c.BatchV1().Jobs(ns).Get(ctx, job, metav1.GetOptions{}) if err != nil { if isRetryableAPIError(err) { return false, nil } return false, fmt.Errorf("failed to get Job: %w", err) } if j.Status.CompletionTime != nil { // Job has successfully completed return true, nil } framework.Logf( "Job %s/%s has not completed yet (%d seconds elapsed)", ns, job, int(time.Since(start).Seconds())) return false, nil }) } // kubectlAction is used to tell retryKubectlInput() what action needs to be // done. type kubectlAction string const ( // kubectlCreate tells retryKubectlInput() to run "create". kubectlCreate = kubectlAction("create") // kubectlDelete tells retryKubectlInput() to run "delete". kubectlDelete = kubectlAction("delete") ) // String returns the string format of the kubectlAction, this is automatically // used when formatting strings with %s or %q. func (ka kubectlAction) String() string { return string(ka) } // retryKubectlInput takes a namespace and action telling kubectl what to do, // it then feeds data through stdin to the process. This function retries until // no error occurred, or the timeout passed. func retryKubectlInput(namespace string, action kubectlAction, data string, t int, args ...string) error { timeout := time.Duration(t) * time.Minute framework.Logf("waiting for kubectl (%s -f args %s) to finish", action, args) start := time.Now() return wait.PollUntilContextTimeout(context.TODO(), poll, timeout, true, func(_ context.Context) (bool, error) { cmd := []string{} if len(args) != 0 { cmd = append(cmd, strings.Join(args, "")) } cmd = append(cmd, []string{string(action), "-f", "-"}...) _, err := e2ekubectl.RunKubectlInput(namespace, data, cmd...) if err != nil { if isRetryableAPIError(err) { return false, nil } if action == kubectlCreate && isAlreadyExistsCLIError(err) { return true, nil } if action == kubectlDelete && isNotFoundCLIError(err) { return true, nil } framework.Logf( "will run kubectl (%s) args (%s) again (%d seconds elapsed)", action, args, int(time.Since(start).Seconds())) return false, fmt.Errorf("failed to run kubectl: %w", err) } return true, nil }) } // retryKubectlFile takes a namespace and action telling kubectl what to do // with the passed filename and arguments. This function retries until no error // occurred, or the timeout passed. func retryKubectlFile(namespace string, action kubectlAction, filename string, t int, args ...string) error { timeout := time.Duration(t) * time.Minute framework.Logf("waiting for kubectl (%s -f %q args %s) to finish", action, filename, args) start := time.Now() return wait.PollUntilContextTimeout(context.TODO(), poll, timeout, true, func(_ context.Context) (bool, error) { cmd := []string{} if len(args) != 0 { cmd = append(cmd, strings.Join(args, "")) } cmd = append(cmd, []string{string(action), "-f", filename}...) _, err := e2ekubectl.RunKubectl(namespace, cmd...) if err != nil { if isRetryableAPIError(err) { return false, nil } if action == kubectlCreate && isAlreadyExistsCLIError(err) { return true, nil } if action == kubectlDelete && isNotFoundCLIError(err) { return true, nil } framework.Logf( "will run kubectl (%s -f %q args %s) again (%d seconds elapsed)", action, filename, args, int(time.Since(start).Seconds())) return false, fmt.Errorf("failed to run kubectl: %w", err) } return true, nil }) } // retryKubectlArgs takes a namespace and action telling kubectl what to do // with the passed arguments. This function retries until no error occurred, or // the timeout passed. // //nolint:unparam // retryKubectlArgs will be used with kubectlDelete arg later on. func retryKubectlArgs(namespace string, action kubectlAction, t int, args ...string) error { timeout := time.Duration(t) * time.Minute args = append([]string{string(action)}, args...) framework.Logf("waiting for kubectl (%s args) to finish", args) start := time.Now() return wait.PollUntilContextTimeout(context.TODO(), poll, timeout, true, func(_ context.Context) (bool, error) { _, err := e2ekubectl.RunKubectl(namespace, args...) if err != nil { if isRetryableAPIError(err) { return false, nil } if action == kubectlCreate && isAlreadyExistsCLIError(err) { return true, nil } if action == kubectlDelete && isNotFoundCLIError(err) { return true, nil } framework.Logf( "will run kubectl (%s) again (%d seconds elapsed)", args, int(time.Since(start).Seconds())) return false, fmt.Errorf("failed to run kubectl: %w", err) } return true, nil }) } // rwopSupported indicates that a test using RWOP is expected to succeed. If // the accessMode is reported as invalid, rwopSupported will be set to false. var rwopSupported = true // rwopMayFail returns true if the accessMode is not valid. k8s v1.22 requires // a feature gate, which might not be set. In case the accessMode is invalid, // the featuregate is not set, and testing RWOP is not possible. func rwopMayFail(err error) bool { if !rwopSupported { return true } if strings.Contains(err.Error(), `invalid: spec.accessModes: Unsupported value: "ReadWriteOncePod"`) { rwopSupported = false } return !rwopSupported } // getConfigFile returns the config file path at the preferred location if it // exists there. Returns the fallback location otherwise. func getConfigFile(filename, preferred, fallback string) string { configFile := preferred + filename if _, err := os.Stat(configFile); os.IsNotExist(err) { configFile = fallback + filename } return configFile } type nfsExportsFSAL struct { Name string `json:"name"` UserID string `json:"user_id"` FSName string `json:"fs_name"` } type nfsExportsClients struct { Addresses []string `json:"addresses"` AccessType string `json:"access_type"` Squash string `json:"squash"` } type cephNFSExport struct { ExportID int `json:"export_id"` Path string `json:"path"` ClusterID string `json:"cluster_id"` Pseudo string `json:"pseudo"` AccessType string `json:"access_type"` Squash string `json:"squash"` SecurityLabel bool `json:"security_label"` Protocols []int `json:"protocols"` Transports []string `json:"transports"` FSAL nfsExportsFSAL `json:"fsal"` Clients []nfsExportsClients `json:"clients"` SecTypes []string `json:"secTypes"` } // Get list of exports for a cluster_id. func listExports(f *framework.Framework, clusterID string) (*[]cephNFSExport, error) { var exportList []cephNFSExport stdout, stdErr, err := execCommandInToolBoxPod( f, "ceph nfs export ls "+clusterID+" --detailed", rookNamespace) if err != nil { return nil, err } if stdErr != "" { return nil, fmt.Errorf("error listing exports in clusterID %v", stdErr) } err = json.Unmarshal([]byte(stdout), &exportList) if err != nil { return nil, err } return &exportList, nil } // Check the export for a listed ip address and confirm that the export has // been setup correctly. func checkExports(f *framework.Framework, clusterID, clientString string) bool { exportList, err := listExports(f, clusterID) if err != nil { framework.Logf("failed to fetch list of exports: %v", err) return false } found := false for i := range len(*exportList) { export := (*exportList)[i] for _, client := range export.Clients { for _, address := range client.Addresses { if address == clientString { found = true break } } if found { if client.AccessType != "rw" { framework.Logf("Unexpected value for client AccessType: %s", client.AccessType) return false } break } } if found { if export.AccessType != "none" { framework.Logf("Unexpected value for default AccessType: %s", export.AccessType) return false } break } } if !found { framework.Logf("Could not find the configured clients in the list of exports") return false } return true } // createSubvolumegroup creates a subvolumegroup. func createSubvolumegroup(f *framework.Framework, fileSystemName, subvolumegroup string) error { cmd := fmt.Sprintf("ceph fs subvolumegroup create %s %s", fileSystemName, subvolumegroup) _, stdErr, err := execCommandInToolBoxPod(f, cmd, rookNamespace) if err != nil { return fmt.Errorf("failed to exec command in toolbox: %w", err) } if stdErr != "" { return fmt.Errorf("failed to create subvolumegroup %s : %v", subvolumegroup, stdErr) } return nil } // deleteSubvolumegroup creates a subvolumegroup. func deleteSubvolumegroup(f *framework.Framework, fileSystemName, subvolumegroup string) error { cmd := fmt.Sprintf("ceph fs subvolumegroup rm %s %s", fileSystemName, subvolumegroup) _, stdErr, err := execCommandInToolBoxPod(f, cmd, rookNamespace) if err != nil { return fmt.Errorf("failed to exec command in toolbox: %w", err) } if stdErr != "" { return fmt.Errorf("failed to remove subvolumegroup %s : %v", subvolumegroup, stdErr) } return nil }