ceph-csi/e2e/pvc.go

334 lines
10 KiB
Go
Raw Normal View History

package e2e
import (
"context"
"errors"
"fmt"
"strings"
"time"
v1 "k8s.io/api/core/v1"
apierrs "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/kubernetes"
"k8s.io/kubernetes/test/e2e/framework"
e2elog "k8s.io/kubernetes/test/e2e/framework/log"
e2epv "k8s.io/kubernetes/test/e2e/framework/pv"
)
func loadPVC(path string) (*v1.PersistentVolumeClaim, error) {
pvc := &v1.PersistentVolumeClaim{}
err := unmarshal(path, &pvc)
if err != nil {
return nil, err
}
return pvc, err
}
func createPVCAndvalidatePV(c kubernetes.Interface, pvc *v1.PersistentVolumeClaim, t int) error {
timeout := time.Duration(t) * time.Minute
pv := &v1.PersistentVolume{}
var err error
_, err = c.CoreV1().PersistentVolumeClaims(pvc.Namespace).Create(context.TODO(), pvc, metav1.CreateOptions{})
if err != nil {
return fmt.Errorf("failed to create pvc: %w", err)
}
if timeout == 0 {
return nil
}
name := pvc.Name
start := time.Now()
e2elog.Logf("Waiting up to %v to be in Bound state", pvc)
return wait.PollImmediate(poll, timeout, func() (bool, error) {
e2elog.Logf("waiting for PVC %s (%d seconds elapsed)", pvc.Name, int(time.Since(start).Seconds()))
pvc, err = c.CoreV1().PersistentVolumeClaims(pvc.Namespace).Get(context.TODO(), name, metav1.GetOptions{})
if err != nil {
e2elog.Logf("Error getting pvc in namespace: '%s': %v", pvc.Namespace, err)
if isRetryableAPIError(err) {
return false, nil
}
if apierrs.IsNotFound(err) {
return false, nil
}
return false, fmt.Errorf("failed to get pvc: %w", err)
}
if pvc.Spec.VolumeName == "" {
return false, nil
}
pv, err = c.CoreV1().PersistentVolumes().Get(context.TODO(), pvc.Spec.VolumeName, metav1.GetOptions{})
if err != nil {
return false, fmt.Errorf("failed to get pv: %w", err)
}
if isRetryableAPIError(err) {
return false, nil
}
if apierrs.IsNotFound(err) {
return false, nil
}
err = e2epv.WaitOnPVandPVC(
c,
&framework.TimeoutContext{ClaimBound: timeout, PVBound: timeout},
pvc.Namespace,
pv,
pvc)
if err != nil {
return false, fmt.Errorf("failed to wait for the pv and pvc to bind: %w", err)
}
return true, nil
})
}
func createPVCAndPV(c kubernetes.Interface, pvc *v1.PersistentVolumeClaim, pv *v1.PersistentVolume) error {
_, err := c.CoreV1().PersistentVolumeClaims(pvc.Namespace).Create(context.TODO(), pvc, metav1.CreateOptions{})
if err != nil {
return fmt.Errorf("failed to create pvc: %w", err)
}
_, err = c.CoreV1().PersistentVolumes().Create(context.TODO(), pv, metav1.CreateOptions{})
if err != nil {
return fmt.Errorf("failed to create pv: %w", err)
}
return err
}
func deletePVCAndPV(c kubernetes.Interface, pvc *v1.PersistentVolumeClaim, pv *v1.PersistentVolume, t int) error {
err := c.CoreV1().PersistentVolumeClaims(pvc.Namespace).Delete(context.TODO(), pvc.Name, metav1.DeleteOptions{})
if err != nil {
return fmt.Errorf("failed to delete pvc: %w", err)
}
err = c.CoreV1().PersistentVolumes().Delete(context.TODO(), pv.Name, metav1.DeleteOptions{})
if err != nil {
return fmt.Errorf("failed to delete pv: %w", err)
}
timeout := time.Duration(t) * time.Minute
start := time.Now()
pvcToDelete := pvc
err = wait.PollImmediate(poll, timeout, func() (bool, error) {
// Check that the PVC is deleted.
e2elog.Logf(
"waiting for PVC %s in state %s to be deleted (%d seconds elapsed)",
pvcToDelete.Name,
pvcToDelete.Status.String(),
int(time.Since(start).Seconds()))
pvcToDelete, err = c.CoreV1().
PersistentVolumeClaims(pvcToDelete.Namespace).
Get(context.TODO(), pvcToDelete.Name, metav1.GetOptions{})
if err == nil {
if pvcToDelete.Status.Phase == "" {
// this is unexpected, an empty Phase is not defined
// FIXME: see https://github.com/ceph/ceph-csi/issues/1874
e2elog.Logf("PVC %s is in a weird state: %s", pvcToDelete.Name, pvcToDelete.String())
}
return false, nil
}
if isRetryableAPIError(err) {
return false, nil
}
if !apierrs.IsNotFound(err) {
return false, fmt.Errorf(
"get on deleted PVC %v failed with error other than \"not found\": %w",
pvc.Name,
err)
}
return true, nil
})
if err != nil {
return fmt.Errorf("failed to poll: %w", err)
}
start = time.Now()
pvToDelete := pv
return wait.PollImmediate(poll, timeout, func() (bool, error) {
// Check that the PV is deleted.
e2elog.Logf(
"waiting for PV %s in state %s to be deleted (%d seconds elapsed)",
pvToDelete.Name,
pvToDelete.Status.String(),
int(time.Since(start).Seconds()))
pvToDelete, err = c.CoreV1().PersistentVolumes().Get(context.TODO(), pvToDelete.Name, metav1.GetOptions{})
if err == nil {
return false, nil
}
if isRetryableAPIError(err) {
return false, nil
}
if !apierrs.IsNotFound(err) {
return false, fmt.Errorf("delete PV %v failed with error other than \"not found\": %w", pv.Name, err)
}
return true, nil
})
}
func getPVCAndPV(
c kubernetes.Interface,
pvcName, pvcNamespace string) (*v1.PersistentVolumeClaim, *v1.PersistentVolume, error) {
pvc, err := c.CoreV1().PersistentVolumeClaims(pvcNamespace).Get(context.TODO(), pvcName, metav1.GetOptions{})
if err != nil {
return nil, nil, fmt.Errorf("failed to get PVC: %w", err)
}
pv, err := c.CoreV1().PersistentVolumes().Get(context.TODO(), pvc.Spec.VolumeName, metav1.GetOptions{})
if err != nil {
return pvc, nil, fmt.Errorf("failed to get PV: %w", err)
}
return pvc, pv, nil
}
func deletePVCAndValidatePV(c kubernetes.Interface, pvc *v1.PersistentVolumeClaim, t int) error {
timeout := time.Duration(t) * time.Minute
nameSpace := pvc.Namespace
name := pvc.Name
var err error
e2elog.Logf("Deleting PersistentVolumeClaim %v on namespace %v", name, nameSpace)
pvc, err = c.CoreV1().PersistentVolumeClaims(nameSpace).Get(context.TODO(), name, metav1.GetOptions{})
if err != nil {
return fmt.Errorf("failed to get pvc: %w", err)
}
pv, err := c.CoreV1().PersistentVolumes().Get(context.TODO(), pvc.Spec.VolumeName, metav1.GetOptions{})
if err != nil {
return fmt.Errorf("failed to get pv: %w", err)
}
err = c.CoreV1().PersistentVolumeClaims(nameSpace).Delete(context.TODO(), name, metav1.DeleteOptions{})
if err != nil {
return fmt.Errorf("delete of PVC %v failed: %w", name, err)
}
start := time.Now()
return wait.PollImmediate(poll, timeout, func() (bool, error) {
// Check that the PVC is really deleted.
e2elog.Logf(
"waiting for PVC %s in state %s to be deleted (%d seconds elapsed)",
name,
pvc.Status.String(),
int(time.Since(start).Seconds()))
pvc, err = c.CoreV1().PersistentVolumeClaims(nameSpace).Get(context.TODO(), name, metav1.GetOptions{})
if err == nil {
return false, nil
}
if isRetryableAPIError(err) {
return false, nil
}
if !apierrs.IsNotFound(err) {
return false, fmt.Errorf("get on deleted PVC %v failed with error other than \"not found\": %w", name, err)
}
// Examine the pv.ClaimRef and UID. Expect nil values.
_, err = c.CoreV1().PersistentVolumes().Get(context.TODO(), pv.Name, metav1.GetOptions{})
if err == nil {
return false, nil
}
if isRetryableAPIError(err) {
return false, nil
}
if !apierrs.IsNotFound(err) {
return false, fmt.Errorf("delete PV %v failed with error other than \"not found\": %w", pv.Name, err)
}
return true, nil
})
}
// getBoundPV returns a PV details.
func getBoundPV(client kubernetes.Interface, pvc *v1.PersistentVolumeClaim) (*v1.PersistentVolume, error) {
// Get new copy of the claim
claim, err := client.CoreV1().
PersistentVolumeClaims(pvc.Namespace).
Get(context.TODO(), pvc.Name, metav1.GetOptions{})
if err != nil {
return nil, fmt.Errorf("failed to get pvc: %w", err)
}
// Get the bound PV
pv, err := client.CoreV1().PersistentVolumes().Get(context.TODO(), claim.Spec.VolumeName, metav1.GetOptions{})
if err != nil {
return nil, fmt.Errorf("failed to get pv: %w", err)
}
return pv, nil
}
func checkPVSelectorValuesForPVC(f *framework.Framework, pvc *v1.PersistentVolumeClaim) error {
pv, err := getBoundPV(f.ClientSet, pvc)
if err != nil {
return err
}
if len(pv.Spec.NodeAffinity.Required.NodeSelectorTerms) == 0 {
return errors.New("found empty NodeSelectorTerms in PV")
}
rFound := false
zFound := false
for _, expression := range pv.Spec.NodeAffinity.Required.NodeSelectorTerms[0].MatchExpressions {
switch expression.Key {
case nodeCSIRegionLabel:
if rFound {
return errors.New("found multiple occurrences of topology key for region")
}
rFound = true
if expression.Values[0] != regionValue {
return errors.New("topology value for region label mismatch")
}
case nodeCSIZoneLabel:
if zFound {
return errors.New("found multiple occurrences of topology key for zone")
}
zFound = true
if expression.Values[0] != zoneValue {
return errors.New("topology value for zone label mismatch")
}
default:
return errors.New("unexpected key in node selector terms found in PV")
}
}
return nil
}
func getMetricsForPVC(f *framework.Framework, pvc *v1.PersistentVolumeClaim, t int) error {
kubelet, err := getKubeletIP(f.ClientSet)
if err != nil {
return err
}
// kubelet needs to be started with --read-only-port=10255
cmd := fmt.Sprintf("curl --silent 'http://%s:10255/metrics'", kubelet)
// retry as kubelet does not immediately have the metrics available
timeout := time.Duration(t) * time.Minute
return wait.PollImmediate(poll, timeout, func() (bool, error) {
stdOut, stdErr, err := execCommandInToolBoxPod(f, cmd, rookNamespace)
if err != nil {
e2elog.Logf("failed to get metrics for pvc %q (%v): %v", pvc.Name, err, stdErr)
return false, nil
}
if stdOut == "" {
e2elog.Logf("no metrics received from kublet on IP %s", kubelet)
return false, nil
}
namespace := fmt.Sprintf("namespace=%q", pvc.Namespace)
name := fmt.Sprintf("persistentvolumeclaim=%q", pvc.Name)
for _, line := range strings.Split(stdOut, "\n") {
if !strings.HasPrefix(line, "kubelet_volume_stats_") {
continue
}
if strings.Contains(line, namespace) && strings.Contains(line, name) {
// TODO: validate metrics if possible
e2elog.Logf("found metrics for pvc %s/%s: %s", pvc.Namespace, pvc.Name, line)
return true, nil
}
}
e2elog.Logf("no metrics found for pvc %s/%s", pvc.Namespace, pvc.Name)
return false, nil
})
}