diff --git a/.travis.yml b/.travis.yml index 1233a07e9..a9437d32d 100644 --- a/.travis.yml +++ b/.travis.yml @@ -50,7 +50,6 @@ jobs: - name: cephcsi script: - scripts/skip-doc-change.sh || travis_terminate 0; - - make cephcsi || travis_terminate 1; - make image-cephcsi || travis_terminate 1; - sudo scripts/minikube.sh up || travis_terminate 1; # pull docker images to speed up e2e @@ -58,8 +57,8 @@ jobs: - scripts/minikube.sh k8s-sidecar - "sudo chown -R travis: $HOME/.minikube /usr/local/bin/kubectl" # functional tests - - make func-test TESTOPTIONS='--rook-version=v1.0.0 \ - --deploy-rook=true --deploy-timeout=10 -timeout=20m -v' + - make func-test TESTOPTIONS='--rook-version=v1.0.1 \ + --deploy-rook=true --deploy-timeout=10 -timeout=30m -v' deploy: - provider: script diff --git a/e2e/cephfs.go b/e2e/cephfs.go new file mode 100644 index 000000000..f1d73193b --- /dev/null +++ b/e2e/cephfs.go @@ -0,0 +1,81 @@ +package e2e + +import ( + "time" + + . "github.com/onsi/ginkgo" // nolint + + "k8s.io/kubernetes/test/e2e/framework" +) + +var ( + cephfsProvisioner = "csi-cephfsplugin-provisioner.yaml" + cephfsProvisionerRBAC = "csi-provisioner-rbac.yaml" + cephfsNodePlugin = "csi-cephfsplugin.yaml" + cephfsNodePluginRBAC = "csi-nodeplugin-rbac.yaml" +) + +var ( + cephfsDirPath = "../deploy/cephfs/kubernetes/" + + cephfsExamplePath = "../examples/cephfs/" +) + +func deployCephfsPlugin() { + //deploy provisioner + framework.RunKubectlOrDie("create", "-f", cephfsDirPath+cephfsProvisioner) + framework.RunKubectlOrDie("apply", "-f", cephfsDirPath+cephfsProvisionerRBAC) + //deploy nodeplugin + framework.RunKubectlOrDie("create", "-f", cephfsDirPath+cephfsNodePlugin) + framework.RunKubectlOrDie("apply", "-f", cephfsDirPath+cephfsNodePluginRBAC) +} + +var _ = Describe("cephfs", func() { + f := framework.NewDefaultFramework("cephfs") + //deploy cephfs CSI + BeforeEach(func() { + createFileSystem(f.ClientSet) + deployCephfsPlugin() + createCephfsStorageClass(f.ClientSet) + createCephfsSecret(f.ClientSet, f) + }) + + AfterEach(func() { + cephfsFiles := getFilesinDirectory(cephfsDirPath) + for _, file := range cephfsFiles { + res, err := framework.RunKubectl("delete", "-f", cephfsDirPath+file.Name()) + framework.Logf("failed to delete resource in %s with err %v", res, err) + } + deleteSecret(cephfsExamplePath + "secret.yaml") + deleteStorageClass(cephfsExamplePath + "storageclass.yaml") + deleteFileSystem() + }) + + Context("Test cephfs CSI", func() { + It("Test cephfs CSI", func() { + By("checking provisioner statefulset is running") + timeout := time.Duration(deployTimeout) * time.Minute + err := framework.WaitForStatefulSetReplicasReady("csi-cephfsplugin-provisioner", "default", f.ClientSet, 1*time.Second, timeout) + if err != nil { + Fail(err.Error()) + } + + By("checking nodeplugin deamonsets is running") + err = waitForDaemonSets("csi-cephfsplugin", "default", f.ClientSet, deployTimeout) + if err != nil { + Fail(err.Error()) + } + + By("create and delete a PVC", func() { + By("create a PVC and Bind it to an app", func() { + pvcPath := cephfsExamplePath + "pvc.yaml" + appPath := cephfsExamplePath + "pod.yaml" + validatePVCAndAppBinding(pvcPath, appPath, f) + + }) + + }) + }) + }) + +}) diff --git a/e2e/deploy-rook.go b/e2e/deploy-rook.go new file mode 100644 index 000000000..0f1379746 --- /dev/null +++ b/e2e/deploy-rook.go @@ -0,0 +1,113 @@ +package e2e + +import ( + "fmt" + "strings" + + . "github.com/onsi/gomega" // nolint + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/client-go/kubernetes" + "k8s.io/kubernetes/test/e2e/framework" +) + +var ( + rookURL = "https://raw.githubusercontent.com/rook/rook/$version/cluster/examples/kubernetes/ceph" +) + +var rookNS = "rook-ceph" + +func formRookURL(version string) { + rookURL = strings.Replace(rookURL, "$version", version, 1) +} + +func getK8sClient() kubernetes.Interface { + framework.Logf("Creating a kubernetes client") + client, err := framework.LoadClientset() + Expect(err).Should(BeNil()) + return client + +} + +func deployCommon() { + commonPath := fmt.Sprintf("%s/%s", rookURL, "common.yaml") + framework.RunKubectlOrDie("create", "-f", commonPath) +} + +func createFileSystem(c kubernetes.Interface) { + commonPath := fmt.Sprintf("%s/%s", rookURL, "filesystem-test.yaml") + framework.RunKubectlOrDie("create", "-f", commonPath) + opt := metav1.ListOptions{ + LabelSelector: "app=rook-ceph-mds", + } + err := checkCephPods(rookNS, c, 1, deployTimeout, opt) + Expect(err).Should(BeNil()) +} + +func createRBDPool() { + commonPath := fmt.Sprintf("%s/%s", rookURL, "pool-test.yaml") + framework.RunKubectlOrDie("create", "-f", commonPath) +} +func deleteFileSystem() { + commonPath := fmt.Sprintf("%s/%s", rookURL, "filesystem.yaml") + framework.RunKubectlOrDie("delete", "-f", commonPath) +} + +func deleteRBDPool() { + commonPath := fmt.Sprintf("%s/%s", rookURL, "pool-test.yaml") + framework.RunKubectlOrDie("delete", "-f", commonPath) +} + +func deployOperator(c kubernetes.Interface) { + opPath := fmt.Sprintf("%s/%s", rookURL, "operator.yaml") + + framework.RunKubectlOrDie("create", "-f", opPath) + err := waitForDaemonSets("rook-ceph-agent", rookNS, c, deployTimeout) + Expect(err).Should(BeNil()) + err = waitForDaemonSets("rook-discover", rookNS, c, deployTimeout) + Expect(err).Should(BeNil()) + err = waitForDeploymentComplete("rook-ceph-operator", rookNS, c, deployTimeout) + Expect(err).Should(BeNil()) +} + +func deployCluster(c kubernetes.Interface) { + opPath := fmt.Sprintf("%s/%s", rookURL, "cluster-test.yaml") + framework.RunKubectlOrDie("create", "-f", opPath) + opt := metav1.ListOptions{ + LabelSelector: "app=rook-ceph-mon", + } + err := checkCephPods(rookNS, c, 1, deployTimeout, opt) + Expect(err).Should(BeNil()) +} + +func deployToolBox(c kubernetes.Interface) { + opPath := fmt.Sprintf("%s/%s", rookURL, "toolbox.yaml") + framework.RunKubectlOrDie("create", "-f", opPath) + opt := metav1.ListOptions{ + LabelSelector: "app=rook-ceph-tools", + } + + name := getPodName(rookNS, c, opt) + err := waitForPodInRunningState(name, rookNS, c, deployTimeout) + Expect(err).Should(BeNil()) +} + +func deployRook() { + c := getK8sClient() + deployCommon() + deployOperator(c) + deployCluster(c) + deployToolBox(c) +} + +func tearDownRook() { + opPath := fmt.Sprintf("%s/%s", rookURL, "cluster-test.yaml") + framework.Cleanup(opPath, rookNS, "app=rook-ceph-mon") + opPath = fmt.Sprintf("%s/%s", rookURL, "toolbox.yaml") + framework.Cleanup(opPath, rookNS, "app=rook-ceph-tools") + + opPath = fmt.Sprintf("%s/%s", rookURL, "operator.yaml") + //TODO need to add selector for cleanup validation + framework.Cleanup(opPath, rookNS) + commonPath := fmt.Sprintf("%s/%s", rookURL, "common.yaml") + framework.RunKubectlOrDie("delete", "-f", commonPath) +} diff --git a/e2e/e2e_test.go b/e2e/e2e_test.go new file mode 100644 index 000000000..fa18227ee --- /dev/null +++ b/e2e/e2e_test.go @@ -0,0 +1,52 @@ +package e2e + +import ( + "flag" + "fmt" + "log" + "testing" + + . "github.com/onsi/ginkgo" + . "github.com/onsi/gomega" + "k8s.io/kubernetes/test/e2e/framework" +) + +var ( + RookVersion string + rookRequired bool + deployTimeout int +) + +func init() { + log.SetOutput(GinkgoWriter) + flag.StringVar(&RookVersion, "rook-version", "master", "rook version to pull yaml files") + + flag.BoolVar(&rookRequired, "deploy-rook", true, "deploy rook on kubernetes") + flag.IntVar(&deployTimeout, "deploy-timeout", 10, "timeout to wait for created kubernetes resources") + + // Register framework flags, then handle flags + framework.HandleFlags() + framework.AfterReadingAllFlags(&framework.TestContext) + + formRookURL(RookVersion) + fmt.Println("timeout for deploytimeout ", deployTimeout) +} + +//BeforeSuite deploys the rook-operator and ceph cluster +var _ = BeforeSuite(func() { + if rookRequired { + deployRook() + } +}) + +//AfterSuite removes the rook-operator and ceph cluster +var _ = AfterSuite(func() { + if rookRequired { + tearDownRook() + } +}) + +func TestE2E(t *testing.T) { + RegisterFailHandler(Fail) + RunSpecs(t, "E2e Suite") +} diff --git a/e2e/rbd.go b/e2e/rbd.go new file mode 100644 index 000000000..664e3e2c6 --- /dev/null +++ b/e2e/rbd.go @@ -0,0 +1,79 @@ +package e2e + +import ( + "time" + + . "github.com/onsi/ginkgo" // nolint + + "k8s.io/kubernetes/test/e2e/framework" +) + +var ( + rbdProvisioner = "csi-rbdplugin-provisioner.yaml" + rbdProvisionerRBAC = "csi-provisioner-rbac.yaml" + rbdNodePlugin = "csi-rbdplugin.yaml" + rbdNodePluginRBAC = "csi-nodeplugin-rbac.yaml" + rbdConfigMap = "csi-config-map.yaml" +) + +var ( + rbdDirPath = "../deploy/rbd/kubernetes/" + + rbdExamplePath = "../examples/rbd/" +) + +func deployRBDPlugin() { + // deploy provisioner + framework.RunKubectlOrDie("create", "-f", rbdDirPath+rbdProvisioner) + framework.RunKubectlOrDie("apply", "-f", rbdDirPath+rbdProvisionerRBAC) + //deploy nodeplugin + framework.RunKubectlOrDie("create", "-f", rbdDirPath+rbdNodePlugin) + framework.RunKubectlOrDie("apply", "-f", rbdDirPath+rbdNodePluginRBAC) +} + +var _ = Describe("RBD", func() { + f := framework.NewDefaultFramework("rbd") + //deploy RBD CSI + BeforeEach(func() { + createRBDPool() + createRBDConfigMap(f.ClientSet, f) + deployRBDPlugin() + createRBDStorageClass(f.ClientSet, f) + createRBDSecret(f.ClientSet, f) + }) + + AfterEach(func() { + rbdFiles := getFilesinDirectory(rbdDirPath) + for _, file := range rbdFiles { + res, err := framework.RunKubectl("delete", "-f", rbdDirPath+file.Name()) + framework.Logf("failed to delete resource in %s with err %v", res, err) + } + deleteRBDPool() + deleteSecret(rbdExamplePath + "secret.yaml") + deleteStorageClass(rbdExamplePath + "storageclass.yaml") + }) + + Context("Test RBD CSI", func() { + It("Test RBD CSI", func() { + By("checking provisioner statefulset is running") + timeout := time.Duration(deployTimeout) * time.Minute + err := framework.WaitForStatefulSetReplicasReady("csi-rbdplugin-provisioner", "default", f.ClientSet, 1*time.Second, timeout) + if err != nil { + Fail(err.Error()) + } + + By("checking nodeplugin deamonsets is running") + err = waitForDaemonSets("csi-rbdplugin", "default", f.ClientSet, deployTimeout) + if err != nil { + Fail(err.Error()) + } + + By("create a PVC and Bind it to an app", func() { + pvcPath := rbdExamplePath + "pvc.yaml" + appPath := rbdExamplePath + "pod.yaml" + validatePVCAndAppBinding(pvcPath, appPath, f) + }) + }) + }) + +}) diff --git a/e2e/utils.go b/e2e/utils.go new file mode 100644 index 000000000..abb5e1a5e --- /dev/null +++ b/e2e/utils.go @@ -0,0 +1,482 @@ +package e2e + +import ( + "encoding/base64" + "encoding/json" + "fmt" + "io/ioutil" + "os" + "strings" + "time" + + apierrs "k8s.io/apimachinery/pkg/api/errors" + + . "github.com/onsi/ginkgo" // nolint + . "github.com/onsi/gomega" // nolint + apps "k8s.io/api/apps/v1" + v1 "k8s.io/api/core/v1" + scv1 "k8s.io/api/storage/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/util/wait" + utilyaml "k8s.io/apimachinery/pkg/util/yaml" + "k8s.io/client-go/kubernetes" + clientset "k8s.io/client-go/kubernetes" + "k8s.io/kubernetes/pkg/client/conditions" + "k8s.io/kubernetes/test/e2e/framework" + testutils "k8s.io/kubernetes/test/utils" +) + +func getFilesinDirectory(path string) []os.FileInfo { + files, err := ioutil.ReadDir(path) + if err != nil { + framework.ExpectNoError(err) + } + return files +} + +var poll = 2 * time.Second + +func waitForDaemonSets(name, ns string, c clientset.Interface, t int) error { + timeout := time.Duration(t) * time.Minute + start := time.Now() + framework.Logf("Waiting up to %v for all daemonsets in namespace '%s' to start", + timeout, ns) + + return wait.PollImmediate(poll, timeout, func() (bool, error) { + ds, err := c.AppsV1().DaemonSets(ns).Get(name, metav1.GetOptions{}) + if err != nil { + framework.Logf("Error getting daemonsets in namespace: '%s': %v", ns, err) + if strings.Contains(err.Error(), "not found") { + return false, nil + } + if testutils.IsRetryableAPIError(err) { + return false, nil + } + return false, err + } + dNum := ds.Status.DesiredNumberScheduled + ready := ds.Status.NumberReady + framework.Logf("%d / %d pods ready in namespace '%s' in daemonset '%s' (%d seconds elapsed)", ready, dNum, ns, ds.ObjectMeta.Name, int(time.Since(start).Seconds())) + if ready != dNum { + return false, nil + } + + return true, nil + }) +} + +// Waits for the deployment to complete. + +func waitForDeploymentComplete(name, ns string, c clientset.Interface, t int) error { + var ( + deployment *apps.Deployment + reason string + err error + ) + timeout := time.Duration(t) * time.Minute + err = wait.PollImmediate(poll, timeout, func() (bool, error) { + deployment, err = c.AppsV1().Deployments(ns).Get(name, metav1.GetOptions{}) + if err != nil { + return false, err + } + + //TODO need to check rolling update + + // When the deployment status and its underlying resources reach the + // desired state, we're done + if deployment.Status.Replicas == deployment.Status.ReadyReplicas { + return true, nil + } + + reason = fmt.Sprintf("deployment status: %#v", deployment.Status) + framework.Logf(reason) + + return false, nil + }) + + if err == wait.ErrWaitTimeout { + err = fmt.Errorf("%s", reason) + } + if err != nil { + return fmt.Errorf("error waiting for deployment %q status to match expectation: %v", name, err) + } + return nil +} + +func execCommandInToolBox(f *framework.Framework, c string) string { + + cmd := []string{"/bin/sh", "-c", c} + opt := metav1.ListOptions{ + LabelSelector: "app=rook-ceph-tools", + } + podList, err := f.PodClientNS(rookNS).List(opt) + framework.ExpectNoError(err) + Expect(podList.Items).NotTo(BeNil()) + Expect(err).Should(BeNil()) + + podPot := framework.ExecOptions{ + Command: cmd, + PodName: podList.Items[0].Name, + Namespace: rookNS, + ContainerName: podList.Items[0].Spec.Containers[0].Name, + Stdin: nil, + CaptureStdout: true, + CaptureStderr: true, + PreserveWhitespace: true, + } + stdOut, stdErr, err := f.ExecWithOptions(podPot) + Expect(err).Should(BeNil()) + Expect(stdErr).Should(BeEmpty()) + return stdOut +} + +func getMons(ns string, c kubernetes.Interface) []string { + opt := metav1.ListOptions{ + LabelSelector: "app=rook-ceph-mon", + } + svcList, err := c.CoreV1().Services(ns).List(opt) + Expect(err).Should(BeNil()) + services := make([]string, 0) + for _, svc := range svcList.Items { + s := fmt.Sprintf("%s.%s.svc.cluster.local:%d", svc.Name, svc.Namespace, svc.Spec.Ports[0].Port) + services = append(services, s) + } + return services +} + +func getStorageClass(c kubernetes.Interface, path string) scv1.StorageClass { + sc := scv1.StorageClass{} + err := unmarshal(path, &sc) + Expect(err).Should(BeNil()) + + mons := getMons(rookNS, c) + sc.Parameters["monitors"] = strings.Join(mons, ",") + return sc +} + +func createCephfsStorageClass(c kubernetes.Interface) { + scPath := fmt.Sprintf("%s/%s", cephfsExamplePath, "storageclass.yaml") + sc := getStorageClass(c, scPath) + sc.Parameters["pool"] = "myfs-data0" + _, err := c.StorageV1().StorageClasses().Create(&sc) + Expect(err).Should(BeNil()) +} + +func createRBDStorageClass(c kubernetes.Interface, f *framework.Framework) { + scPath := fmt.Sprintf("%s/%s", rbdExamplePath, "storageclass.yaml") + sc := getStorageClass(c, scPath) + delete(sc.Parameters, "userid") + sc.Parameters["pool"] = "replicapool" + + fsID := execCommandInToolBox(f, "ceph fsid") + //remove new line present in fsID + fsID = strings.Trim(fsID, "\n") + + sc.Parameters["clusterID"] = fsID + _, err := c.StorageV1().StorageClasses().Create(&sc) + Expect(err).Should(BeNil()) +} + +func createRBDConfigMap(c kubernetes.Interface, f *framework.Framework) { + path := rbdDirPath + rbdConfigMap + cm := v1.ConfigMap{} + err := unmarshal(path, &cm) + Expect(err).Should(BeNil()) + + fsID := execCommandInToolBox(f, "ceph fsid") + //remove new line present in fsID + fsID = strings.Trim(fsID, "\n") + //get mon list + mons := getMons(rookNS, c) + conmap := []struct { + Clusterid string `json:"clusterID"` + Monitors []string `json:"monitors"` + }{ + { + fsID, + mons, + }, + } + data, err := json.Marshal(conmap) + Expect(err).Should(BeNil()) + cm.Data["config.json"] = string(data) + _, err = c.CoreV1().ConfigMaps("default").Create(&cm) + Expect(err).Should(BeNil()) +} + +func getSecret(path string) v1.Secret { + sc := v1.Secret{} + err := unmarshal(path, &sc) + //discard corruptInputError + if err != nil { + if _, ok := err.(base64.CorruptInputError); !ok { + Expect(err).Should(BeNil()) + } + } + return sc +} + +func createCephfsSecret(c kubernetes.Interface, f *framework.Framework) { + scPath := fmt.Sprintf("%s/%s", cephfsExamplePath, "secret.yaml") + sc := getSecret(scPath) + adminKey := execCommandInToolBox(f, "ceph auth get-key client.admin") + sc.Data["adminID"] = []byte("admin") + sc.Data["adminKey"] = []byte(adminKey) + delete(sc.Data, "userID") + delete(sc.Data, "userKey") + _, err := c.CoreV1().Secrets("default").Create(&sc) + Expect(err).Should(BeNil()) +} + +func createRBDSecret(c kubernetes.Interface, f *framework.Framework) { + scPath := fmt.Sprintf("%s/%s", rbdExamplePath, "secret.yaml") + sc := getSecret(scPath) + adminKey := execCommandInToolBox(f, "ceph auth get-key client.admin") + sc.Data["admin"] = []byte(adminKey) + delete(sc.Data, "kubernetes") + _, err := c.CoreV1().Secrets("default").Create(&sc) + Expect(err).Should(BeNil()) +} + +func deleteSecret(scPath string) { + _, err := framework.RunKubectl("delete", "-f", scPath) + Expect(err).Should(BeNil()) +} + +func deleteStorageClass(scPath string) { + _, err := framework.RunKubectl("delete", "-f", scPath) + Expect(err).Should(BeNil()) +} + +func loadPVC(path string) *v1.PersistentVolumeClaim { + pvc := &v1.PersistentVolumeClaim{} + err := unmarshal(path, &pvc) + if err != nil { + return nil + } + return pvc +} + +func createPVCAndvalidatePV(c kubernetes.Interface, pvc *v1.PersistentVolumeClaim, t int) error { + timeout := time.Duration(t) * time.Minute + pv := &v1.PersistentVolume{} + var err error + _, err = c.CoreV1().PersistentVolumeClaims(pvc.Namespace).Create(pvc) + Expect(err).Should(BeNil()) + name := pvc.Name + start := time.Now() + framework.Logf("Waiting up to %v to be in Bound state", pvc) + + return wait.PollImmediate(poll, timeout, func() (bool, error) { + framework.Logf("waiting for PVC %s (%d seconds elapsed)", pvc.Name, int(time.Since(start).Seconds())) + pvc, err = c.CoreV1().PersistentVolumeClaims(pvc.Namespace).Get(name, metav1.GetOptions{}) + if err != nil { + framework.Logf("Error getting pvc in namespace: '%s': %v", pvc.Namespace, err) + if testutils.IsRetryableAPIError(err) { + return false, nil + } + if apierrs.IsNotFound(err) { + return false, nil + } + return false, err + } + if pvc.Spec.VolumeName == "" { + return false, nil + } + pv, err = c.CoreV1().PersistentVolumes().Get(pvc.Spec.VolumeName, metav1.GetOptions{}) + if err != nil { + return false, err + } + if apierrs.IsNotFound(err) { + return false, nil + } + err = framework.WaitOnPVandPVC(c, pvc.Namespace, pv, pvc) + if err != nil { + return false, nil + } + return true, nil + }) +} + +func deletePVCAndValidatePV(c kubernetes.Interface, pvc *v1.PersistentVolumeClaim, t int) error { + timeout := time.Duration(t) * time.Minute + nameSpace := pvc.Namespace + name := pvc.Name + var err error + framework.Logf("Deleting PersistentVolumeClaim %v on namespace %v", name, nameSpace) + + pvc, err = c.CoreV1().PersistentVolumeClaims(nameSpace).Get(name, metav1.GetOptions{}) + if err != nil { + return err + } + pv, err := c.CoreV1().PersistentVolumes().Get(pvc.Spec.VolumeName, metav1.GetOptions{}) + if err != nil { + return err + } + + err = c.CoreV1().PersistentVolumeClaims(nameSpace).Delete(name, &metav1.DeleteOptions{}) + if err != nil { + return fmt.Errorf("delete of PVC %v failed: %v", name, err) + } + start := time.Now() + return wait.PollImmediate(poll, timeout, func() (bool, error) { + // Check that the PVC is really deleted. + framework.Logf("waiting for PVC %s in state %s to be deleted (%d seconds elapsed)", name, pvc.Status.String(), int(time.Since(start).Seconds())) + pvc, err = c.CoreV1().PersistentVolumeClaims(nameSpace).Get(name, metav1.GetOptions{}) + if err == nil { + return false, nil + } + if !apierrs.IsNotFound(err) { + return false, fmt.Errorf("get on deleted PVC %v failed with error other than \"not found\": %v", name, err) + } + + // Examine the pv.ClaimRef and UID. Expect nil values. + _, err = c.CoreV1().PersistentVolumes().Get(pv.Name, metav1.GetOptions{}) + if err == nil { + return false, nil + } + + if !apierrs.IsNotFound(err) { + return false, fmt.Errorf("delete PV %v failed with error other than \"not found\": %v", pv.Name, err) + } + + return true, nil + }) +} + +func loadApp(path string) *v1.Pod { + app := v1.Pod{} + err := unmarshal(path, &app) + if err != nil { + return nil + } + return &app +} + +func createApp(c kubernetes.Interface, app *v1.Pod, timeout int) error { + _, err := c.CoreV1().Pods(app.Namespace).Create(app) + if err != nil { + return err + } + return waitForPodInRunningState(app.Name, app.Namespace, c, timeout) +} + +func getPodName(ns string, c kubernetes.Interface, opt metav1.ListOptions) string { + ticker := time.NewTicker(1 * time.Second) + //TODO add stop logic + for range ticker.C { + podList, err := c.CoreV1().Pods(ns).List(opt) + framework.ExpectNoError(err) + Expect(podList.Items).NotTo(BeNil()) + Expect(err).Should(BeNil()) + + if len(podList.Items) != 0 { + return podList.Items[0].Name + } + } + return "" +} + +func waitForPodInRunningState(name, ns string, c kubernetes.Interface, t int) error { + timeout := time.Duration(t) * time.Minute + start := time.Now() + framework.Logf("Waiting up to %v to be in Running state", name) + return wait.PollImmediate(poll, timeout, func() (bool, error) { + pod, err := c.CoreV1().Pods(ns).Get(name, metav1.GetOptions{}) + if err != nil { + return false, err + } + switch pod.Status.Phase { + case v1.PodRunning: + return true, nil + case v1.PodFailed, v1.PodSucceeded: + return false, conditions.ErrPodCompleted + } + framework.Logf("%s app is in %s phase expected to be in Running state (%d seconds elapsed)", name, pod.Status.Phase, int(time.Since(start).Seconds())) + return false, nil + }) +} + +func deletePod(name, ns string, c kubernetes.Interface, t int) error { + timeout := time.Duration(t) * time.Minute + err := c.CoreV1().Pods(ns).Delete(name, &metav1.DeleteOptions{}) + if err != nil { + return err + } + start := time.Now() + framework.Logf("Waiting for pod %v to be deleted", name) + return wait.PollImmediate(poll, timeout, func() (bool, error) { + _, err := c.CoreV1().Pods(ns).Get(name, metav1.GetOptions{}) + + if apierrs.IsNotFound(err) { + return true, nil + } + framework.Logf("%s app to be deleted (%d seconds elapsed)", name, int(time.Since(start).Seconds())) + if err != nil { + return false, err + } + return false, nil + }) +} + +func unmarshal(fileName string, obj interface{}) error { + f, err := ioutil.ReadFile(fileName) + if err != nil { + return err + } + data, err := utilyaml.ToJSON(f) + if err != nil { + return err + } + + err = json.Unmarshal(data, obj) + return err +} + +func checkCephPods(ns string, c kubernetes.Interface, count int, t int, opt metav1.ListOptions) error { + timeout := time.Duration(t) * time.Minute + start := time.Now() + + return wait.PollImmediate(poll, timeout, func() (bool, error) { + podList, err := c.CoreV1().Pods(ns).List(opt) + if err != nil { + return false, err + } + + framework.Logf("pod count is %d expected count %d (%d seconds elapsed)", len(podList.Items), count, int(time.Since(start).Seconds())) + + if len(podList.Items) >= count { + return true, nil + } + + return false, nil + }) + +} + +func validatePVCAndAppBinding(pvcPath, appPath string, f *framework.Framework) { + pvc := loadPVC(pvcPath) + pvc.Namespace = f.UniqueName + framework.Logf("The PVC template %+v", pvc) + err := createPVCAndvalidatePV(f.ClientSet, pvc, deployTimeout) + if err != nil { + Fail(err.Error()) + } + + app := loadApp(appPath) + app.Namespace = f.UniqueName + err = createApp(f.ClientSet, app, deployTimeout) + if err != nil { + Fail(err.Error()) + } + + err = deletePod(app.Name, app.Namespace, f.ClientSet, deployTimeout) + if err != nil { + Fail(err.Error()) + } + + err = deletePVCAndValidatePV(f.ClientSet, pvc, deployTimeout) + if err != nil { + Fail(err.Error()) + } +}