package e2e

import (
	"context"
	"errors"
	"fmt"
	"strings"
	"time"

	v1 "k8s.io/api/core/v1"
	apierrs "k8s.io/apimachinery/pkg/api/errors"
	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
	"k8s.io/apimachinery/pkg/util/wait"
	"k8s.io/client-go/kubernetes"
	"k8s.io/kubernetes/test/e2e/framework"
	e2elog "k8s.io/kubernetes/test/e2e/framework/log"
	e2epv "k8s.io/kubernetes/test/e2e/framework/pv"
)

func loadPVC(path string) (*v1.PersistentVolumeClaim, error) {
	pvc := &v1.PersistentVolumeClaim{}
	err := unmarshal(path, &pvc)
	if err != nil {
		return nil, err
	}

	return pvc, err
}

func createPVCAndvalidatePV(c kubernetes.Interface, pvc *v1.PersistentVolumeClaim, t int) error {
	timeout := time.Duration(t) * time.Minute
	pv := &v1.PersistentVolume{}
	var err error
	_, err = c.CoreV1().PersistentVolumeClaims(pvc.Namespace).Create(context.TODO(), pvc, metav1.CreateOptions{})
	if err != nil {
		return fmt.Errorf("failed to create pvc: %w", err)
	}
	if timeout == 0 {
		return nil
	}
	name := pvc.Name
	namespace := pvc.Namespace
	start := time.Now()
	e2elog.Logf("Waiting up to %v to be in Bound state", pvc)

	return wait.PollImmediate(poll, timeout, func() (bool, error) {
		e2elog.Logf("waiting for PVC %s (%d seconds elapsed)", name, int(time.Since(start).Seconds()))
		pvc, err = c.CoreV1().PersistentVolumeClaims(namespace).Get(context.TODO(), name, metav1.GetOptions{})
		if err != nil {
			e2elog.Logf("Error getting pvc %q in namespace %q: %v", name, namespace, err)
			if isRetryableAPIError(err) {
				return false, nil
			}
			if apierrs.IsNotFound(err) {
				return false, nil
			}

			return false, fmt.Errorf("failed to get pvc: %w", err)
		}

		if pvc.Spec.VolumeName == "" {
			return false, nil
		}

		pv, err = c.CoreV1().PersistentVolumes().Get(context.TODO(), pvc.Spec.VolumeName, metav1.GetOptions{})
		if err != nil {
			if isRetryableAPIError(err) {
				return false, nil
			}
			if apierrs.IsNotFound(err) {
				return false, nil
			}

			return false, fmt.Errorf("failed to get pv: %w", err)
		}
		err = e2epv.WaitOnPVandPVC(
			c,
			&framework.TimeoutContext{ClaimBound: timeout, PVBound: timeout},
			namespace,
			pv,
			pvc)
		if err != nil {
			return false, fmt.Errorf("failed to wait for the pv and pvc to bind: %w", err)
		}

		return true, nil
	})
}

func createPVCAndPV(c kubernetes.Interface, pvc *v1.PersistentVolumeClaim, pv *v1.PersistentVolume) error {
	_, err := c.CoreV1().PersistentVolumeClaims(pvc.Namespace).Create(context.TODO(), pvc, metav1.CreateOptions{})
	if err != nil {
		return fmt.Errorf("failed to create pvc: %w", err)
	}
	_, err = c.CoreV1().PersistentVolumes().Create(context.TODO(), pv, metav1.CreateOptions{})
	if err != nil {
		return fmt.Errorf("failed to create pv: %w", err)
	}

	return err
}

func deletePVCAndPV(c kubernetes.Interface, pvc *v1.PersistentVolumeClaim, pv *v1.PersistentVolume, t int) error {
	err := c.CoreV1().PersistentVolumeClaims(pvc.Namespace).Delete(context.TODO(), pvc.Name, metav1.DeleteOptions{})
	if err != nil {
		return fmt.Errorf("failed to delete pvc: %w", err)
	}
	err = c.CoreV1().PersistentVolumes().Delete(context.TODO(), pv.Name, metav1.DeleteOptions{})
	if err != nil {
		return fmt.Errorf("failed to delete pv: %w", err)
	}

	timeout := time.Duration(t) * time.Minute
	start := time.Now()

	pvcToDelete := pvc
	err = wait.PollImmediate(poll, timeout, func() (bool, error) {
		// Check that the PVC is deleted.
		e2elog.Logf(
			"waiting for PVC %s in state %s to be deleted (%d seconds elapsed)",
			pvcToDelete.Name,
			pvcToDelete.Status.String(),
			int(time.Since(start).Seconds()))
		pvcToDelete, err = c.CoreV1().
			PersistentVolumeClaims(pvcToDelete.Namespace).
			Get(context.TODO(), pvcToDelete.Name, metav1.GetOptions{})
		if err == nil {
			if pvcToDelete.Status.Phase == "" {
				// this is unexpected, an empty Phase is not defined
				// FIXME: see https://github.com/ceph/ceph-csi/issues/1874
				e2elog.Logf("PVC %s is in a weird state: %s", pvcToDelete.Name, pvcToDelete.String())
			}

			return false, nil
		}
		if isRetryableAPIError(err) {
			return false, nil
		}
		if !apierrs.IsNotFound(err) {
			return false, fmt.Errorf(
				"get on deleted PVC %v failed with error other than \"not found\": %w",
				pvc.Name,
				err)
		}

		return true, nil
	})
	if err != nil {
		return fmt.Errorf("failed to poll: %w", err)
	}

	start = time.Now()
	pvToDelete := pv

	return wait.PollImmediate(poll, timeout, func() (bool, error) {
		// Check that the PV is deleted.
		e2elog.Logf(
			"waiting for PV %s in state %s to be deleted (%d seconds elapsed)",
			pvToDelete.Name,
			pvToDelete.Status.String(),
			int(time.Since(start).Seconds()))

		pvToDelete, err = c.CoreV1().PersistentVolumes().Get(context.TODO(), pvToDelete.Name, metav1.GetOptions{})
		if err == nil {
			return false, nil
		}
		if isRetryableAPIError(err) {
			return false, nil
		}
		if !apierrs.IsNotFound(err) {
			return false, fmt.Errorf("delete PV %v failed with error other than \"not found\": %w", pv.Name, err)
		}

		return true, nil
	})
}

func getPVCAndPV(
	c kubernetes.Interface,
	pvcName, pvcNamespace string) (*v1.PersistentVolumeClaim, *v1.PersistentVolume, error) {
	pvc, err := c.CoreV1().PersistentVolumeClaims(pvcNamespace).Get(context.TODO(), pvcName, metav1.GetOptions{})
	if err != nil {
		return nil, nil, fmt.Errorf("failed to get PVC: %w", err)
	}
	pv, err := c.CoreV1().PersistentVolumes().Get(context.TODO(), pvc.Spec.VolumeName, metav1.GetOptions{})
	if err != nil {
		return pvc, nil, fmt.Errorf("failed to get PV: %w", err)
	}

	return pvc, pv, nil
}

func deletePVCAndValidatePV(c kubernetes.Interface, pvc *v1.PersistentVolumeClaim, t int) error {
	timeout := time.Duration(t) * time.Minute
	nameSpace := pvc.Namespace
	name := pvc.Name
	var err error
	e2elog.Logf("Deleting PersistentVolumeClaim %v on namespace %v", name, nameSpace)

	pvc, err = c.CoreV1().PersistentVolumeClaims(nameSpace).Get(context.TODO(), name, metav1.GetOptions{})
	if err != nil {
		return fmt.Errorf("failed to get pvc: %w", err)
	}
	pv, err := c.CoreV1().PersistentVolumes().Get(context.TODO(), pvc.Spec.VolumeName, metav1.GetOptions{})
	if err != nil {
		return fmt.Errorf("failed to get pv: %w", err)
	}

	err = c.CoreV1().PersistentVolumeClaims(nameSpace).Delete(context.TODO(), name, metav1.DeleteOptions{})
	if err != nil {
		return fmt.Errorf("delete of PVC %v failed: %w", name, err)
	}
	start := time.Now()

	return wait.PollImmediate(poll, timeout, func() (bool, error) {
		// Check that the PVC is really deleted.
		e2elog.Logf(
			"waiting for PVC %s in state %s to be deleted (%d seconds elapsed)",
			name,
			pvc.Status.String(),
			int(time.Since(start).Seconds()))
		pvc, err = c.CoreV1().PersistentVolumeClaims(nameSpace).Get(context.TODO(), name, metav1.GetOptions{})
		if err == nil {
			return false, nil
		}
		if isRetryableAPIError(err) {
			return false, nil
		}
		if !apierrs.IsNotFound(err) {
			return false, fmt.Errorf("get on deleted PVC %v failed with error other than \"not found\": %w", name, err)
		}

		// Examine the pv.ClaimRef and UID. Expect nil values.
		_, err = c.CoreV1().PersistentVolumes().Get(context.TODO(), pv.Name, metav1.GetOptions{})
		if err == nil {
			return false, nil
		}
		if isRetryableAPIError(err) {
			return false, nil
		}
		if !apierrs.IsNotFound(err) {
			return false, fmt.Errorf("delete PV %v failed with error other than \"not found\": %w", pv.Name, err)
		}

		return true, nil
	})
}

// getBoundPV returns a PV details.
func getBoundPV(client kubernetes.Interface, pvc *v1.PersistentVolumeClaim) (*v1.PersistentVolume, error) {
	// Get new copy of the claim
	claim, err := client.CoreV1().
		PersistentVolumeClaims(pvc.Namespace).
		Get(context.TODO(), pvc.Name, metav1.GetOptions{})
	if err != nil {
		return nil, fmt.Errorf("failed to get pvc: %w", err)
	}

	// Get the bound PV
	pv, err := client.CoreV1().PersistentVolumes().Get(context.TODO(), claim.Spec.VolumeName, metav1.GetOptions{})
	if err != nil {
		return nil, fmt.Errorf("failed to get pv: %w", err)
	}

	return pv, nil
}

func checkPVSelectorValuesForPVC(f *framework.Framework, pvc *v1.PersistentVolumeClaim) error {
	pv, err := getBoundPV(f.ClientSet, pvc)
	if err != nil {
		return err
	}

	if len(pv.Spec.NodeAffinity.Required.NodeSelectorTerms) == 0 {
		return errors.New("found empty NodeSelectorTerms in PV")
	}

	rFound := false
	zFound := false
	for _, expression := range pv.Spec.NodeAffinity.Required.NodeSelectorTerms[0].MatchExpressions {
		switch expression.Key {
		case nodeCSIRegionLabel:
			if rFound {
				return errors.New("found multiple occurrences of topology key for region")
			}
			rFound = true
			if expression.Values[0] != regionValue {
				return errors.New("topology value for region label mismatch")
			}
		case nodeCSIZoneLabel:
			if zFound {
				return errors.New("found multiple occurrences of topology key for zone")
			}
			zFound = true
			if expression.Values[0] != zoneValue {
				return errors.New("topology value for zone label mismatch")
			}
		default:
			return errors.New("unexpected key in node selector terms found in PV")
		}
	}

	return nil
}

func getMetricsForPVC(f *framework.Framework, pvc *v1.PersistentVolumeClaim, t int) error {
	kubelet, err := getKubeletIP(f.ClientSet)
	if err != nil {
		return err
	}

	// kubelet needs to be started with --read-only-port=10255
	cmd := fmt.Sprintf("curl --silent 'http://%s:10255/metrics'", kubelet)

	// retry as kubelet does not immediately have the metrics available
	timeout := time.Duration(t) * time.Minute

	return wait.PollImmediate(poll, timeout, func() (bool, error) {
		stdOut, stdErr, err := execCommandInToolBoxPod(f, cmd, rookNamespace)
		if err != nil {
			e2elog.Logf("failed to get metrics for pvc %q (%v): %v", pvc.Name, err, stdErr)

			return false, nil
		}
		if stdOut == "" {
			e2elog.Logf("no metrics received from kublet on IP %s", kubelet)

			return false, nil
		}

		namespace := fmt.Sprintf("namespace=%q", pvc.Namespace)
		name := fmt.Sprintf("persistentvolumeclaim=%q", pvc.Name)

		for _, line := range strings.Split(stdOut, "\n") {
			if !strings.HasPrefix(line, "kubelet_volume_stats_") {
				continue
			}
			if strings.Contains(line, namespace) && strings.Contains(line, name) {
				// TODO: validate metrics if possible
				e2elog.Logf("found metrics for pvc %s/%s: %s", pvc.Namespace, pvc.Name, line)

				return true, nil
			}
		}

		e2elog.Logf("no metrics found for pvc %s/%s", pvc.Namespace, pvc.Name)

		return false, nil
	})
}