mirror of
synced 2025-03-18 13:19:29 +00:00
addessed golangci-lint issues in e2e folder. Signed-off-by: Madhu Rajanna <madhupr007@gmail.com>
571 lines
16 KiB
571 lines
16 KiB
Copyright 2021 The Ceph-CSI Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
See the License for the specific language governing permissions and
limitations under the License.
package e2e
import (
snapapi "github.com/kubernetes-csi/external-snapshotter/client/v7/apis/volumesnapshot/v1"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
const (
adminUser = "admin"
// validateSubvolumegroup validates whether subvolumegroup is present.
func validateSubvolumegroup(f *framework.Framework, subvolgrp string) error {
cmd := fmt.Sprintf("ceph fs subvolumegroup getpath %s %s", fileSystemName, subvolgrp)
stdOut, stdErr, err := execCommandInToolBoxPod(f, cmd, rookNamespace)
if err != nil {
return fmt.Errorf("failed to exec command in toolbox: %w", err)
if stdErr != "" {
return fmt.Errorf("failed to getpath for subvolumegroup %s : %v", subvolgrp, stdErr)
expectedGrpPath := "/volumes/" + subvolgrp
stdOut = strings.TrimSpace(stdOut)
if stdOut != expectedGrpPath {
return fmt.Errorf("error unexpected group path. Found: %s", stdOut)
return nil
func createCephfsStorageClass(
c kubernetes.Interface,
f *framework.Framework,
enablePool bool,
params map[string]string,
) error {
scPath := fmt.Sprintf("%s/%s", cephFSExamplePath, "storageclass.yaml")
sc, err := getStorageClass(scPath)
if err != nil {
return err
sc.Parameters["fsName"] = fileSystemName
sc.Parameters["csi.storage.k8s.io/provisioner-secret-namespace"] = cephCSINamespace
sc.Parameters["csi.storage.k8s.io/provisioner-secret-name"] = cephFSProvisionerSecretName
sc.Parameters["csi.storage.k8s.io/controller-expand-secret-namespace"] = cephCSINamespace
sc.Parameters["csi.storage.k8s.io/controller-expand-secret-name"] = cephFSProvisionerSecretName
sc.Parameters["csi.storage.k8s.io/node-stage-secret-namespace"] = cephCSINamespace
sc.Parameters["csi.storage.k8s.io/node-stage-secret-name"] = cephFSNodePluginSecretName
if enablePool {
sc.Parameters["pool"] = "myfs-replicated"
// overload any parameters that were passed
if params == nil {
// create an empty params, so that params["clusterID"] below
// does not panic
params = map[string]string{}
for param, value := range params {
sc.Parameters[param] = value
// fetch and set fsID from the cluster if not set in params
if _, found := params["clusterID"]; !found {
var fsID string
fsID, err = getClusterID(f)
if err != nil {
return fmt.Errorf("failed to get clusterID: %w", err)
sc.Parameters["clusterID"] = fsID
timeout := time.Duration(deployTimeout) * time.Minute
return wait.PollUntilContextTimeout(context.TODO(), poll, timeout, true, func(ctx context.Context) (bool, error) {
_, err = c.StorageV1().StorageClasses().Create(ctx, &sc, metav1.CreateOptions{})
if err != nil {
framework.Logf("error creating StorageClass %q: %v", sc.Name, err)
if isRetryableAPIError(err) {
return false, nil
return false, fmt.Errorf("failed to create StorageClass %q: %w", sc.Name, err)
return true, nil
func createCephfsSecret(f *framework.Framework, secretName, userName, userKey string) error {
scPath := fmt.Sprintf("%s/%s", cephFSExamplePath, "secret.yaml")
sc, err := getSecret(scPath)
if err != nil {
return err
if secretName != "" {
sc.Name = secretName
sc.StringData["adminID"] = userName
sc.StringData["adminKey"] = userKey
delete(sc.StringData, "userID")
delete(sc.StringData, "userKey")
sc.Namespace = cephCSINamespace
_, err = f.ClientSet.CoreV1().Secrets(cephCSINamespace).Create(context.TODO(), &sc, metav1.CreateOptions{})
return err
// unmountCephFSVolume unmounts a cephFS volume mounted on a pod.
func unmountCephFSVolume(f *framework.Framework, appName, pvcName string) error {
pod, err := f.ClientSet.CoreV1().Pods(f.UniqueName).Get(context.TODO(), appName, metav1.GetOptions{})
if err != nil {
framework.Logf("Error occurred getting pod %s in namespace %s", appName, f.UniqueName)
return fmt.Errorf("failed to get pod: %w", err)
pvc, err := getPersistentVolumeClaim(f.ClientSet, f.UniqueName, pvcName)
if err != nil {
framework.Logf("Error occurred getting PVC %s in namespace %s", pvcName, f.UniqueName)
return fmt.Errorf("failed to get pvc: %w", err)
cmd := fmt.Sprintf(
"umount /var/lib/kubelet/pods/%s/volumes/kubernetes.io~csi/%s/mount",
stdErr, err := execCommandInDaemonsetPod(
if stdErr != "" {
framework.Logf("StdErr occurred: %s", stdErr)
return err
func deleteBackingCephFSVolume(f *framework.Framework, pvc *v1.PersistentVolumeClaim) error {
imageData, err := getImageInfoFromPVC(pvc.Namespace, pvc.Name, f)
if err != nil {
return err
cmd := fmt.Sprintf("ceph fs subvolume rm %s %s %s", fileSystemName, imageData.imageName, subvolumegroup)
_, stdErr, err := execCommandInToolBoxPod(f, cmd, rookNamespace)
if err != nil {
return err
if stdErr != "" {
return fmt.Errorf("error deleting backing volume %s %v", imageData.imageName, stdErr)
return nil
type cephfsSubVolume struct {
Name string `json:"name"`
func listCephFSSubVolumes(f *framework.Framework, filesystem, groupname string) ([]cephfsSubVolume, error) {
var subVols []cephfsSubVolume
stdout, stdErr, err := execCommandInToolBoxPod(
fmt.Sprintf("ceph fs subvolume ls %s --group_name=%s --format=json", filesystem, groupname),
if err != nil {
return subVols, err
if stdErr != "" {
return subVols, fmt.Errorf("error listing subvolumes %v", stdErr)
err = json.Unmarshal([]byte(stdout), &subVols)
if err != nil {
return subVols, err
return subVols, nil
type cephfsSubvolumeMetadata struct {
PVCNameKey string `json:"csi.storage.k8s.io/pvc/name"`
PVCNamespaceKey string `json:"csi.storage.k8s.io/pvc/namespace"`
PVNameKey string `json:"csi.storage.k8s.io/pv/name"`
ClusterNameKey string `json:"csi.ceph.com/cluster/name"`
func listCephFSSubvolumeMetadata(
f *framework.Framework,
groupname string,
) (*cephfsSubvolumeMetadata, error) {
stdout, stdErr, err := execCommandInToolBoxPod(
fmt.Sprintf("ceph fs subvolume metadata ls %s %s --group_name=%s --format=json", filesystem, subvolume, groupname),
if err != nil {
return nil, err
if stdErr != "" {
return nil, fmt.Errorf("error listing subvolume metadata %v", stdErr)
metadata := &cephfsSubvolumeMetadata{}
err = json.Unmarshal([]byte(stdout), metadata)
if err != nil {
return metadata, err
return metadata, nil
type cephfsSnapshotMetadata struct {
VolSnapNameKey string `json:"csi.storage.k8s.io/volumesnapshot/name"`
VolSnapNamespaceKey string `json:"csi.storage.k8s.io/volumesnapshot/namespace"`
VolSnapContentNameKey string `json:"csi.storage.k8s.io/volumesnapshotcontent/name"`
ClusterNameKey string `json:"csi.ceph.com/cluster/name"`
func listCephFSSnapshotMetadata(
f *framework.Framework,
groupname string,
) (*cephfsSnapshotMetadata, error) {
stdout, stdErr, err := execCommandInToolBoxPod(
fmt.Sprintf("ceph fs subvolume snapshot metadata ls %s %s %s --group_name=%s --format=json",
filesystem, subvolume, snapname, groupname),
if err != nil {
return nil, err
if stdErr != "" {
return nil, fmt.Errorf("error listing subvolume snapshots metadata %v", stdErr)
metadata := &cephfsSnapshotMetadata{}
err = json.Unmarshal([]byte(stdout), metadata)
if err != nil {
return metadata, err
return metadata, nil
type cephfsSnapshot struct {
Name string `json:"name"`
func listCephFSSnapshots(f *framework.Framework, filesystem, subvolume, groupname string) ([]cephfsSnapshot, error) {
var snaps []cephfsSnapshot
stdout, stdErr, err := execCommandInToolBoxPod(
fmt.Sprintf("ceph fs subvolume snapshot ls %s %s --group_name=%s --format=json", filesystem, subvolume, groupname),
if err != nil {
return snaps, err
if stdErr != "" {
return snaps, fmt.Errorf("error listing subolume snapshots %v", stdErr)
err = json.Unmarshal([]byte(stdout), &snaps)
if err != nil {
return snaps, err
return snaps, nil
// getSubvolumepath validates whether subvolumegroup is present.
func getSubvolumePath(f *framework.Framework, filesystem, subvolgrp, subvolume string) (string, error) {
cmd := fmt.Sprintf("ceph fs subvolume getpath %s %s --group_name=%s", filesystem, subvolume, subvolgrp)
stdOut, stdErr, err := execCommandInToolBoxPod(f, cmd, rookNamespace)
if err != nil {
return "", err
if stdErr != "" {
return "", fmt.Errorf("failed to getpath for subvolume %s : %s", subvolume, stdErr)
return strings.TrimSpace(stdOut), nil
func getSnapName(snapNamespace, snapName string) (string, error) {
sclient, err := newSnapshotClient()
if err != nil {
return "", err
snap, err := sclient.
Get(context.TODO(), snapName, metav1.GetOptions{})
if err != nil {
return "", fmt.Errorf("failed to get volumesnapshot: %w", err)
sc, err := sclient.
Get(context.TODO(), *snap.Status.BoundVolumeSnapshotContentName, metav1.GetOptions{})
if err != nil {
return "", fmt.Errorf("failed to get volumesnapshotcontent: %w", err)
snapIDRegex := regexp.MustCompile(`(\w+\-?){5}$`)
snapID := snapIDRegex.FindString(*sc.Status.SnapshotHandle)
snapshotName := "csi-snap-" + snapID
framework.Logf("snapshotName= %s", snapshotName)
return snapshotName, nil
func deleteBackingCephFSSubvolumeSnapshot(
f *framework.Framework,
pvc *v1.PersistentVolumeClaim,
snap *snapapi.VolumeSnapshot,
) error {
snapshotName, err := getSnapName(snap.Namespace, snap.Name)
if err != nil {
return err
imageData, err := getImageInfoFromPVC(pvc.Namespace, pvc.Name, f)
if err != nil {
return err
cmd := fmt.Sprintf(
"ceph fs subvolume snapshot rm %s %s %s %s",
_, stdErr, err := execCommandInToolBoxPod(f, cmd, rookNamespace)
if err != nil {
return err
if stdErr != "" {
return fmt.Errorf("error deleting backing snapshot %s %v", snapshotName, stdErr)
return nil
func validateEncryptedCephfs(f *framework.Framework, pvName, appName string) error {
pod, err := f.ClientSet.CoreV1().Pods(f.UniqueName).Get(context.TODO(), appName, metav1.GetOptions{})
if err != nil {
return fmt.Errorf("failed to get pod %q in namespace %q: %w", appName, f.UniqueName, err)
volumeMountPath := fmt.Sprintf(
selector, err := getDaemonSetLabelSelector(f, cephCSINamespace, cephFSDeamonSetName)
if err != nil {
return fmt.Errorf("failed to get labels: %w", err)
opt := metav1.ListOptions{
LabelSelector: selector,
cmd := "getfattr --name=ceph.fscrypt.auth --only-values " + volumeMountPath
_, _, err = execCommandInContainer(f, cmd, cephCSINamespace, "csi-cephfsplugin", &opt)
if err != nil {
cmd = "getfattr --recursive --dump " + volumeMountPath
stdOut, stdErr, listErr := execCommandInContainer(f, cmd, cephCSINamespace, "csi-cephfsplugin", &opt)
if listErr == nil {
return fmt.Errorf("error checking for cephfs fscrypt xattr on %q. listing: %s %s",
volumeMountPath, stdOut, stdErr)
return fmt.Errorf("error checking file xattr: %w", err)
return nil
func getInfoFromPVC(pvcNamespace, pvcName string, f *framework.Framework) (string, string, error) {
c := f.ClientSet.CoreV1()
pvc, err := c.PersistentVolumeClaims(pvcNamespace).Get(context.TODO(), pvcName, metav1.GetOptions{})
if err != nil {
return "", "", fmt.Errorf("failed to get pvc: %w", err)
pv, err := c.PersistentVolumes().Get(context.TODO(), pvc.Spec.VolumeName, metav1.GetOptions{})
if err != nil {
return "", "", fmt.Errorf("failed to get pv: %w", err)
return pv.Name, pv.Spec.CSI.VolumeHandle, nil
func validateFscryptAndAppBinding(pvcPath, appPath string, kms kmsConfig, f *framework.Framework) error {
pvc, app, err := createPVCAndAppBinding(pvcPath, appPath, f, deployTimeout)
if err != nil {
return err
pvName, csiVolumeHandle, err := getInfoFromPVC(pvc.Namespace, pvc.Name, f)
if err != nil {
return err
err = validateEncryptedCephfs(f, pvName, app.Name)
if err != nil {
return err
if kms != noKMS && kms.canGetPassphrase() {
// check new passphrase created
_, stdErr := kms.getPassphrase(f, csiVolumeHandle)
if stdErr != "" {
return fmt.Errorf("failed to read passphrase from vault: %s", stdErr)
err = deletePVCAndApp("", f, pvc, app)
if err != nil {
return err
if kms != noKMS && kms.canGetPassphrase() {
// check new passphrase created
stdOut, _ := kms.getPassphrase(f, csiVolumeHandle)
if stdOut != "" {
return fmt.Errorf("passphrase found in vault while should be deleted: %s", stdOut)
if kms != noKMS && kms.canVerifyKeyDestroyed() {
destroyed, msg := kms.verifyKeyDestroyed(f, csiVolumeHandle)
if !destroyed {
return fmt.Errorf("passphrased was not destroyed: %s", msg)
} else if msg != "" {
framework.Logf("passphrase destroyed, but message returned: %s", msg)
return nil
//nolint:gocyclo,cyclop // test function
func validateFscryptClone(
pvcPath, appPath, pvcSmartClonePath, appSmartClonePath string,
kms kmsConfig,
f *framework.Framework,
) {
pvc, err := loadPVC(pvcPath)
if err != nil {
framework.Failf("failed to load PVC: %v", err)
pvc.Namespace = f.UniqueName
err = createPVCAndvalidatePV(f.ClientSet, pvc, deployTimeout)
if err != nil {
framework.Failf("failed to create PVC: %v", err)
app, err := loadApp(appPath)
if err != nil {
framework.Failf("failed to load application: %v", err)
label := make(map[string]string)
label[appKey] = appLabel
app.Namespace = f.UniqueName
app.Spec.Volumes[0].PersistentVolumeClaim.ClaimName = pvc.Name
app.Labels = label
opt := metav1.ListOptions{
LabelSelector: fmt.Sprintf("%s=%s", appKey, label[appKey]),
wErr := writeDataInPod(app, &opt, f)
if wErr != nil {
framework.Failf("failed to write data from application %v", wErr)
pvcClone, err := loadPVC(pvcSmartClonePath)
if err != nil {
framework.Failf("failed to load PVC: %v", err)
pvcClone.Spec.DataSource.Name = pvc.Name
pvcClone.Namespace = f.UniqueName
appClone, err := loadApp(appSmartClonePath)
if err != nil {
framework.Failf("failed to load application: %v", err)
appClone.Namespace = f.UniqueName
appClone.Labels = map[string]string{
appKey: f.UniqueName,
err = createPVCAndApp(f.UniqueName, f, pvcClone, appClone, deployTimeout)
if err != nil {
framework.Failf("failed to create PVC or application (%s): %v", f.UniqueName, err)
_, csiVolumeHandle, err := getInfoFromPVC(pvcClone.Namespace, pvcClone.Name, f)
if err != nil {
framework.Failf("failed to get pvc info: %s", err)
if kms != noKMS && kms.canGetPassphrase() {
// check new passphrase created
stdOut, stdErr := kms.getPassphrase(f, csiVolumeHandle)
if stdOut != "" {
framework.Logf("successfully read the passphrase from vault: %s", stdOut)
if stdErr != "" {
framework.Failf("failed to read passphrase from vault: %s", stdErr)
// delete parent pvc
err = deletePVCAndApp("", f, pvc, app)
if err != nil {
framework.Failf("failed to delete PVC or application: %v", err)
err = deletePVCAndApp(f.UniqueName, f, pvcClone, appClone)
if err != nil {
framework.Failf("failed to delete PVC or application (%s): %v", f.UniqueName, err)
if kms != noKMS && kms.canGetPassphrase() {
// check passphrase deleted
stdOut, _ := kms.getPassphrase(f, csiVolumeHandle)
if stdOut != "" {
framework.Failf("passphrase found in vault while should be deleted: %s", stdOut)
if kms != noKMS && kms.canVerifyKeyDestroyed() {
destroyed, msg := kms.verifyKeyDestroyed(f, csiVolumeHandle)
if !destroyed {
framework.Failf("passphrased was not destroyed: %s", msg)
} else if msg != "" {
framework.Logf("passphrase destroyed, but message returned: %s", msg)