package e2e import ( "context" "encoding/base64" "encoding/json" "errors" "fmt" "io/ioutil" "regexp" "strings" "sync" "time" v1 "k8s.io/api/core/v1" scv1 "k8s.io/api/storage/v1" "k8s.io/apimachinery/pkg/api/resource" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/fields" utilyaml "k8s.io/apimachinery/pkg/util/yaml" "k8s.io/client-go/kubernetes" "k8s.io/kubernetes/test/e2e/framework" e2elog "k8s.io/kubernetes/test/e2e/framework/log" ) /* #nosec:G101, values not credententials, just a reference to the location.*/ const ( defaultNs = "default" vaultSecretNs = "/secret/ceph-csi/" // rook created cephfs user cephfsNodePluginSecretName = "rook-csi-cephfs-node" cephfsProvisionerSecretName = "rook-csi-cephfs-provisioner" // Secret created inside the cephCSINamespace, can be modified. The // Rook secrets get reconciled and changes are undone (needed for // encryption). rbdNodePluginSecretName = "csi-rbd-secret" rbdProvisionerSecretName = "csi-rbd-secret" rookTolBoxPodLabel = "app=rook-ceph-tools" rbdmountOptions = "mountOptions" retainPolicy = v1.PersistentVolumeReclaimRetain // deletePolicy is the default policy in E2E. deletePolicy = v1.PersistentVolumeReclaimDelete ) var ( // cli flags deployTimeout int deployCephFS bool deployRBD bool testCephFS bool testRBD bool upgradeTesting bool upgradeVersion string cephCSINamespace string rookNamespace string radosNamespace string ns string vaultAddr string poll = 2 * time.Second ) func initResouces() { ns = fmt.Sprintf("--namespace=%v", cephCSINamespace) vaultAddr = fmt.Sprintf("http://vault.%s.svc.cluster.local:8200", cephCSINamespace) } func getMons(ns string, c kubernetes.Interface) ([]string, error) { opt := metav1.ListOptions{ LabelSelector: "app=rook-ceph-mon", } services := make([]string, 0) svcList, err := c.CoreV1().Services(ns).List(context.TODO(), opt) if err != nil { return services, err } for i := range svcList.Items { s := fmt.Sprintf("%s.%s.svc.cluster.local:%d", svcList.Items[i].Name, svcList.Items[i].Namespace, svcList.Items[i].Spec.Ports[0].Port) services = append(services, s) } return services, nil } func getStorageClass(path string) (scv1.StorageClass, error) { sc := scv1.StorageClass{} err := unmarshal(path, &sc) return sc, err } func getSecret(path string) (v1.Secret, error) { sc := v1.Secret{} err := unmarshal(path, &sc) // discard corruptInputError if err != nil { var b64cie base64.CorruptInputError if !errors.As(err, &b64cie) { return sc, err } } return sc, nil } func deleteResource(scPath string) error { data, err := replaceNamespaceInTemplate(scPath) if err != nil { e2elog.Logf("failed to read content from %s %v", scPath, err) } _, err = framework.RunKubectlInput(cephCSINamespace, data, ns, "delete", "-f", "-") if err != nil { e2elog.Logf("failed to delete %s %v", scPath, err) } return err } func unmarshal(fileName string, obj interface{}) error { f, err := ioutil.ReadFile(fileName) if err != nil { return err } data, err := utilyaml.ToJSON(f) if err != nil { return err } err = json.Unmarshal(data, obj) return err } // createPVCAndApp creates pvc and pod // if name is not empty same will be set as pvc and app name. func createPVCAndApp(name string, f *framework.Framework, pvc *v1.PersistentVolumeClaim, app *v1.Pod, pvcTimeout int) error { if name != "" { pvc.Name = name app.Name = name app.Spec.Volumes[0].PersistentVolumeClaim.ClaimName = name } err := createPVCAndvalidatePV(f.ClientSet, pvc, pvcTimeout) if err != nil { return err } err = createApp(f.ClientSet, app, deployTimeout) return err } // deletePVCAndApp delete pvc and pod // if name is not empty same will be set as pvc and app name. func deletePVCAndApp(name string, f *framework.Framework, pvc *v1.PersistentVolumeClaim, app *v1.Pod) error { if name != "" { pvc.Name = name app.Name = name app.Spec.Volumes[0].PersistentVolumeClaim.ClaimName = name } err := deletePod(app.Name, app.Namespace, f.ClientSet, deployTimeout) if err != nil { return err } err = deletePVCAndValidatePV(f.ClientSet, pvc, deployTimeout) return err } func createPVCAndAppBinding(pvcPath, appPath string, f *framework.Framework, pvcTimeout int) (*v1.PersistentVolumeClaim, *v1.Pod, error) { pvc, err := loadPVC(pvcPath) if err != nil { return nil, nil, err } pvc.Namespace = f.UniqueName app, err := loadApp(appPath) if err != nil { return nil, nil, err } app.Namespace = f.UniqueName err = createPVCAndApp("", f, pvc, app, pvcTimeout) if err != nil { return nil, nil, err } return pvc, app, nil } func validatePVCAndAppBinding(pvcPath, appPath string, f *framework.Framework) error { pvc, app, err := createPVCAndAppBinding(pvcPath, appPath, f, deployTimeout) if err != nil { return err } err = deletePVCAndApp("", f, pvc, app) return err } func getMountType(appName, appNamespace, mountPath string, f *framework.Framework) (string, error) { opt := metav1.ListOptions{ FieldSelector: fields.OneTermEqualSelector("metadata.name", appName).String(), } cmd := fmt.Sprintf("lsblk -o TYPE,MOUNTPOINT | grep '%s' | awk '{print $1}'", mountPath) stdOut, stdErr, err := execCommandInPod(f, cmd, appNamespace, &opt) if err != nil { return "", err } if stdErr != "" { return strings.TrimSpace(stdOut), fmt.Errorf(stdErr) } return strings.TrimSpace(stdOut), nil } // readVaultSecret method will execute few commands to try read the secret for // specified key from inside the vault container: // * authenticate with vault and ignore any stdout (we do not need output) // * issue get request for particular key // resulting in stdOut (first entry in tuple) - output that contains the key // or stdErr (second entry in tuple) - error getting the key. func readVaultSecret(key string, f *framework.Framework) (string, string) { loginCmd := fmt.Sprintf("vault login -address=%s sample_root_token_id > /dev/null", vaultAddr) readSecret := fmt.Sprintf("vault kv get -address=%s -field=data %s%s", vaultAddr, vaultSecretNs, key) cmd := fmt.Sprintf("%s && %s", loginCmd, readSecret) opt := metav1.ListOptions{ LabelSelector: "app=vault", } stdOut, stdErr := execCommandInPodAndAllowFail(f, cmd, cephCSINamespace, &opt) return strings.TrimSpace(stdOut), strings.TrimSpace(stdErr) } func validateNormalUserPVCAccess(pvcPath string, f *framework.Framework) error { pvc, err := loadPVC(pvcPath) if err != nil { return err } pvc.Namespace = f.UniqueName pvc.Name = f.UniqueName err = createPVCAndvalidatePV(f.ClientSet, pvc, deployTimeout) if err != nil { return err } var user int64 = 2000 app := &v1.Pod{ TypeMeta: metav1.TypeMeta{ Kind: "Pod", APIVersion: "v1", }, ObjectMeta: metav1.ObjectMeta{ Name: "pod-run-as-non-root", Namespace: f.UniqueName, Labels: map[string]string{ "app": "pod-run-as-non-root", }, }, Spec: v1.PodSpec{ Containers: []v1.Container{ { Name: "write-pod", Image: "registry.centos.org/centos:latest", Command: []string{"/bin/sleep", "999999"}, SecurityContext: &v1.SecurityContext{ RunAsUser: &user, }, VolumeMounts: []v1.VolumeMount{ { MountPath: "/target", Name: "target", }, }, }, }, Volumes: []v1.Volume{ { Name: "target", VolumeSource: v1.VolumeSource{ PersistentVolumeClaim: &v1.PersistentVolumeClaimVolumeSource{ ClaimName: pvc.Name, ReadOnly: false}, }, }, }, }, } err = createApp(f.ClientSet, app, deployTimeout) if err != nil { return err } opt := metav1.ListOptions{ LabelSelector: "app=pod-run-as-non-root", } _, stdErr, err := execCommandInPod(f, "echo testing > /target/testing", app.Namespace, &opt) if err != nil { return nil } if stdErr != "" { return fmt.Errorf("failed to touch a file as non-root user %v", stdErr) } err = deletePod(app.Name, app.Namespace, f.ClientSet, deployTimeout) if err != nil { return err } err = deletePVCAndValidatePV(f.ClientSet, pvc, deployTimeout) return err } // writeDataInPod fill zero content to a file in the provided POD volume. func writeDataInPod(app *v1.Pod, f *framework.Framework) error { app.Labels = map[string]string{"app": "write-data-in-pod"} app.Namespace = f.UniqueName err := createApp(f.ClientSet, app, deployTimeout) if err != nil { return err } opt := metav1.ListOptions{ LabelSelector: "app=write-data-in-pod", } // write data to PVC. The idea here is to fill some content in the file // instead of filling and reverifying the md5sum/data integrity filePath := app.Spec.Containers[0].VolumeMounts[0].MountPath + "/test" // While writing more data we are encountering issues in E2E timeout, so keeping it low for now _, writeErr, err := execCommandInPod(f, fmt.Sprintf("dd if=/dev/zero of=%s bs=1M count=10 status=none", filePath), app.Namespace, &opt) if err != nil { return err } if writeErr != "" { err = fmt.Errorf("failed to write data %v", writeErr) } return err } func checkDataPersist(pvcPath, appPath string, f *framework.Framework) error { data := "checking data persist" pvc, err := loadPVC(pvcPath) if err != nil { return err } pvc.Namespace = f.UniqueName app, err := loadApp(appPath) if err != nil { return err } app.Labels = map[string]string{"app": "validate-data"} app.Namespace = f.UniqueName err = createPVCAndApp("", f, pvc, app, deployTimeout) if err != nil { return err } opt := metav1.ListOptions{ LabelSelector: "app=validate-data", } // write data to PVC filePath := app.Spec.Containers[0].VolumeMounts[0].MountPath + "/test" _, stdErr, err := execCommandInPod(f, fmt.Sprintf("echo %s > %s", data, filePath), app.Namespace, &opt) if err != nil { return nil } if stdErr != "" { return fmt.Errorf("failed to write data to a file %v", stdErr) } // delete app err = deletePod(app.Name, app.Namespace, f.ClientSet, deployTimeout) if err != nil { return err } // recreate app and check data persist err = createApp(f.ClientSet, app, deployTimeout) if err != nil { return err } persistData, stdErr, err := execCommandInPod(f, fmt.Sprintf("cat %s", filePath), app.Namespace, &opt) if err != nil { return err } if stdErr != "" { return fmt.Errorf("failed to get file content %v", stdErr) } if !strings.Contains(persistData, data) { return fmt.Errorf("data not persistent expected data %s received data %s ", data, persistData) } err = deletePVCAndApp("", f, pvc, app) return err } func pvcDeleteWhenPoolNotFound(pvcPath string, cephfs bool, f *framework.Framework) error { pvc, err := loadPVC(pvcPath) if err != nil { return err } pvc.Namespace = f.UniqueName err = createPVCAndvalidatePV(f.ClientSet, pvc, deployTimeout) if err != nil { return err } if cephfs { err = deleteBackingCephFSVolume(f, pvc) if err != nil { return err } // delete cephfs filesystem err = deletePool("myfs", cephfs, f) if err != nil { return err } } else { err = deleteBackingRBDImage(f, pvc) if err != nil { return err } // delete rbd pool err = deletePool(defaultRBDPool, cephfs, f) if err != nil { return err } } err = deletePVCAndValidatePV(f.ClientSet, pvc, deployTimeout) return err } func checkMountOptions(pvcPath, appPath string, f *framework.Framework, mountFlags []string) error { pvc, err := loadPVC(pvcPath) if err != nil { return err } pvc.Namespace = f.UniqueName app, err := loadApp(appPath) if err != nil { return err } app.Labels = map[string]string{"app": "validate-mount-opt"} app.Namespace = f.UniqueName err = createPVCAndApp("", f, pvc, app, deployTimeout) if err != nil { return err } opt := metav1.ListOptions{ LabelSelector: "app=validate-mount-opt", } cmd := fmt.Sprintf("mount |grep %s", app.Spec.Containers[0].VolumeMounts[0].MountPath) data, stdErr, err := execCommandInPod(f, cmd, app.Namespace, &opt) if err != nil { return err } if stdErr != "" { return fmt.Errorf("failed to get mount point %v", stdErr) } for _, f := range mountFlags { if !strings.Contains(data, f) { return fmt.Errorf("mount option %s not found in %s", f, data) } } err = deletePVCAndApp("", f, pvc, app) return err } func addTopologyDomainsToDSYaml(template, labels string) string { return strings.ReplaceAll(template, "# - \"--domainlabels=failure-domain/region,failure-domain/zone\"", "- \"--domainlabels="+labels+"\"") } func oneReplicaDeployYaml(template string) string { var re = regexp.MustCompile(`(\s+replicas:) \d+`) return re.ReplaceAllString(template, `$1 1`) } func enableTopologyInTemplate(data string) string { return strings.ReplaceAll(data, "--feature-gates=Topology=false", "--feature-gates=Topology=true") } func validatePVCClone(sourcePvcPath, clonePvcPath, clonePvcAppPath string, f *framework.Framework) { var wg sync.WaitGroup totalCount := 10 wgErrs := make([]error, totalCount) pvc, err := loadPVC(sourcePvcPath) if err != nil { e2elog.Failf("failed to load PVC with error %v", err) } pvc.Namespace = f.UniqueName err = createPVCAndvalidatePV(f.ClientSet, pvc, deployTimeout) if err != nil { e2elog.Failf("failed to create PVC with error %v", err) } // validate created backend rbd images validateRBDImageCount(f, 1) pvcClone, err := loadPVC(clonePvcPath) if err != nil { e2elog.Failf("failed to load PVC with error %v", err) } pvcClone.Spec.DataSource.Name = pvc.Name pvcClone.Namespace = f.UniqueName appClone, err := loadApp(clonePvcAppPath) if err != nil { e2elog.Failf("failed to load application with error %v", err) } appClone.Namespace = f.UniqueName wg.Add(totalCount) // create clone and bind it to an app for i := 0; i < totalCount; i++ { go func(w *sync.WaitGroup, n int, p v1.PersistentVolumeClaim, a v1.Pod) { name := fmt.Sprintf("%s%d", f.UniqueName, n) wgErrs[n] = createPVCAndApp(name, f, &p, &a, deployTimeout) w.Done() }(&wg, i, *pvcClone, *appClone) } wg.Wait() failed := 0 for i, err := range wgErrs { if err != nil { // not using Failf() as it aborts the test and does not log other errors e2elog.Logf("failed to create PVC (%s%d): %v", f.UniqueName, i, err) failed++ } } if failed != 0 { e2elog.Failf("creating PVCs failed, %d errors were logged", failed) } // total images in cluster is 1 parent rbd image+ total // temporary clone+ total clones totalCloneCount := totalCount + totalCount + 1 validateRBDImageCount(f, totalCloneCount) // delete parent pvc err = deletePVCAndValidatePV(f.ClientSet, pvc, deployTimeout) if err != nil { e2elog.Failf("failed to delete PVC with error %v", err) } totalCloneCount = totalCount + totalCount validateRBDImageCount(f, totalCloneCount) wg.Add(totalCount) // delete clone and app for i := 0; i < totalCount; i++ { go func(w *sync.WaitGroup, n int, p v1.PersistentVolumeClaim, a v1.Pod) { name := fmt.Sprintf("%s%d", f.UniqueName, n) p.Spec.DataSource.Name = name wgErrs[n] = deletePVCAndApp(name, f, &p, &a) w.Done() }(&wg, i, *pvcClone, *appClone) } wg.Wait() for i, err := range wgErrs { if err != nil { // not using Failf() as it aborts the test and does not log other errors e2elog.Logf("failed to delete PVC and application (%s%d): %v", f.UniqueName, i, err) failed++ } } if failed != 0 { e2elog.Failf("deleting PVCs and applications failed, %d errors were logged", failed) } validateRBDImageCount(f, 0) } // validateController simulates the required operations to validate the // controller. // Controller will generates the omap data when the PV is created. // for that we need to do below operations // Create PVC with Retain policy // Store the PVC and PV kubernetes objects so that we can create static // binding between PVC-PV // Delete the omap data created for PVC // Create the static PVC and PV and let controller regenerate the omap // Mount the PVC to application (NodeStage/NodePublish should work) // Resize the PVC // Delete the Application and PVC. func validateController(f *framework.Framework, pvcPath, appPath, scPath string) error { size := "1Gi" poolName := defaultRBDPool expandSize := "10Gi" var err error // create storageclass with retain err = createRBDStorageClass(f.ClientSet, f, nil, nil, retainPolicy) if err != nil { return fmt.Errorf("failed to create storageclass with error %v", err) } // create pvc pvc, err := loadPVC(pvcPath) if err != nil { return fmt.Errorf("failed to load PVC with error %v", err) } resizePvc, err := loadPVC(pvcPath) if err != nil { return fmt.Errorf("failed to load PVC with error %v", err) } resizePvc.Namespace = f.UniqueName pvc.Spec.Resources.Requests[v1.ResourceStorage] = resource.MustParse(size) pvc.Namespace = f.UniqueName err = createPVCAndvalidatePV(f.ClientSet, pvc, deployTimeout) if err != nil { return fmt.Errorf("failed to create PVC with error %v", err) } // get pvc and pv object pvc, pv, err := getPVCAndPV(f.ClientSet, pvc.Name, pvc.Namespace) if err != nil { return fmt.Errorf("failed to get PVC with error %v", err) } // Recreate storageclass with delete policy err = deleteResource(scPath) if err != nil { return fmt.Errorf("failed to delete storageclass with error %v", err) } err = createRBDStorageClass(f.ClientSet, f, nil, nil, deletePolicy) if err != nil { return fmt.Errorf("failed to create storageclass with error %v", err) } // delete omap data err = deletePVCImageJournalInPool(f, pvc, poolName) if err != nil { return err } err = deletePVCCSIJournalInPool(f, pvc, poolName) if err != nil { return err } // delete pvc and pv err = deletePVCAndPV(f.ClientSet, pvc, pv, deployTimeout) if err != nil { return fmt.Errorf("failed to delete PVC or PV with error %v", err) } // create pvc and pv with application pv.Spec.ClaimRef = nil pv.Spec.PersistentVolumeReclaimPolicy = deletePolicy // unset the resource version as should not be set on objects to be created pvc.ResourceVersion = "" pv.ResourceVersion = "" err = createPVCAndPV(f.ClientSet, pvc, pv) if err != nil { e2elog.Failf("failed to create PVC or PV with error %v", err) } // bind PVC to application app, err := loadApp(appPath) if err != nil { return err } app.Labels = map[string]string{"app": "resize-pvc"} app.Namespace = f.UniqueName opt := metav1.ListOptions{ LabelSelector: "app=resize-pvc", } err = createApp(f.ClientSet, app, deployTimeout) if err != nil { return err } // resize PVC err = expandPVCSize(f.ClientSet, resizePvc, expandSize, deployTimeout) if err != nil { return err } if *pvc.Spec.VolumeMode == v1.PersistentVolumeFilesystem { err = checkDirSize(app, f, &opt, expandSize) if err != nil { return err } } if *pvc.Spec.VolumeMode == v1.PersistentVolumeBlock { err = checkDeviceSize(app, f, &opt, expandSize) if err != nil { return err } } // delete pvc and storageclass err = deletePVCAndApp("", f, resizePvc, app) if err != nil { return err } return deleteResource(rbdExamplePath + "storageclass.yaml") } // k8sVersionGreaterEquals checks the ServerVersion of the Kubernetes cluster // and compares it to the major.minor version passed. In case the version of // the cluster is equal or higher to major.minor, `true` is returned, `false` // otherwise. // // If fetching the ServerVersion of the Kubernetes cluster fails, the calling // test case is marked as `FAILED` and gets aborted. // // nolint:unparam // currently major is always 1, this can change in the future func k8sVersionGreaterEquals(c kubernetes.Interface, major, minor int) bool { v, err := c.Discovery().ServerVersion() if err != nil { e2elog.Failf("failed to get server version with error %v", err) // Failf() marks the case as failure, and returns from the // Go-routine that runs the case. This function will not have a // return value. } maj := fmt.Sprintf("%d", major) min := fmt.Sprintf("%d", minor) return (v.Major > maj) || (v.Major == maj && v.Minor >= min) }