Enable leader election in v1.14+

Use Deployment with leader election instead of StatefulSet

Deployment behaves better when a node gets disconnected
from the rest of the cluster - new provisioner leader
is elected in ~15 seconds, while it may take up to
5 minutes for StatefulSet to start a new replica.

Refer: kubernetes-csi/external-provisioner@52d1fbc

Signed-off-by: Madhu Rajanna <madhupr007@gmail.com>
This commit is contained in:
Madhu Rajanna
2019-07-25 12:19:44 +05:30
committed by mergify[bot]
parent 0786225937
commit 02bcb5f16a
112 changed files with 2485 additions and 40 deletions

View File

@ -6,6 +6,7 @@ import (
. "github.com/onsi/ginkgo" // nolint
clientset "k8s.io/client-go/kubernetes"
"k8s.io/kubernetes/test/e2e/framework"
e2elog "k8s.io/kubernetes/test/e2e/framework/log"
)
@ -17,10 +18,15 @@ var (
cephfsNodePluginRBAC = "csi-nodeplugin-rbac.yaml"
cephfsDeploymentName = "csi-cephfsplugin-provisioner"
cephfsDeamonSetName = "csi-cephfsplugin"
cephfsDirPath = "../deploy/cephfs/kubernetes/"
cephfsDirPath = "../deploy/cephfs/kubernetes"
cephfsExamplePath = "../examples/cephfs/"
)
func updateCephfsDirPath(c clientset.Interface) {
version := getKubeVersionToDeploy(c)
cephfsDirPath = fmt.Sprintf("%s/%s/", cephfsDirPath, version)
}
func deployCephfsPlugin() {
// deploy provisioner
framework.RunKubectlOrDie("create", "-f", cephfsDirPath+cephfsProvisioner)
@ -34,8 +40,9 @@ var _ = Describe("cephfs", func() {
f := framework.NewDefaultFramework("cephfs")
// deploy cephfs CSI
BeforeEach(func() {
updateCephfsDirPath(f.ClientSet)
createFileSystem(f.ClientSet)
createConfigMap(f.ClientSet, f)
createConfigMap(cephfsDirPath, f.ClientSet, f)
deployCephfsPlugin()
createCephfsStorageClass(f.ClientSet, f)
createCephfsSecret(f.ClientSet, f)
@ -58,9 +65,16 @@ var _ = Describe("cephfs", func() {
It("Test cephfs CSI", func() {
pvcPath := cephfsExamplePath + "pvc.yaml"
appPath := cephfsExamplePath + "pod.yaml"
By("checking provisioner statefulset is running")
By("checking provisioner statefulset/deployment is running")
timeout := time.Duration(deployTimeout) * time.Minute
err := framework.WaitForStatefulSetReplicasReady(cephfsDeploymentName, namespace, f.ClientSet, 1*time.Second, timeout)
var err error
sts := deployProvAsSTS(f.ClientSet)
if sts {
err = framework.WaitForStatefulSetReplicasReady(cephfsDeploymentName, namespace, f.ClientSet, 1*time.Second, timeout)
} else {
err = waitForDeploymentComplete(cephfsDeploymentName, namespace, f.ClientSet, deployTimeout)
}
if err != nil {
Fail(err.Error())
}

View File

@ -6,6 +6,7 @@ import (
. "github.com/onsi/ginkgo" // nolint
clientset "k8s.io/client-go/kubernetes"
"k8s.io/kubernetes/test/e2e/framework"
e2elog "k8s.io/kubernetes/test/e2e/framework/log"
)
@ -15,14 +16,19 @@ var (
rbdProvisionerRBAC = "csi-provisioner-rbac.yaml"
rbdNodePlugin = "csi-rbdplugin.yaml"
rbdNodePluginRBAC = "csi-nodeplugin-rbac.yaml"
rbdConfigMap = "csi-config-map.yaml"
rbdDirPath = "../deploy/rbd/kubernetes/"
configMap = "csi-config-map.yaml"
rbdDirPath = "../deploy/rbd/kubernetes"
rbdExamplePath = "../examples/rbd/"
rbdDeploymentName = "csi-rbdplugin-provisioner"
rbdDaemonsetName = "csi-rbdplugin"
namespace = "default"
)
func updaterbdDirPath(c clientset.Interface) {
version := getKubeVersionToDeploy(c)
rbdDirPath = fmt.Sprintf("%s/%s/", rbdDirPath, version)
}
func deployRBDPlugin() {
// deploy provisioner
framework.RunKubectlOrDie("create", "-f", rbdDirPath+rbdProvisioner)
@ -36,8 +42,9 @@ var _ = Describe("RBD", func() {
f := framework.NewDefaultFramework("rbd")
// deploy RBD CSI
BeforeEach(func() {
updaterbdDirPath(f.ClientSet)
createRBDPool()
createConfigMap(f.ClientSet, f)
createConfigMap(rbdDirPath, f.ClientSet, f)
deployRBDPlugin()
createRBDStorageClass(f.ClientSet, f)
createRBDSecret(f.ClientSet, f)
@ -68,9 +75,15 @@ var _ = Describe("RBD", func() {
appClonePath := rbdExamplePath + "pod-restore.yaml"
snapshotPath := rbdExamplePath + "snapshot.yaml"
By("checking provisioner statefulset is running")
By("checking provisioner statefulset/deployment is running")
timeout := time.Duration(deployTimeout) * time.Minute
err := framework.WaitForStatefulSetReplicasReady(rbdDeploymentName, namespace, f.ClientSet, 1*time.Second, timeout)
var err error
sts := deployProvAsSTS(f.ClientSet)
if sts {
err = framework.WaitForStatefulSetReplicasReady(rbdDeploymentName, namespace, f.ClientSet, 1*time.Second, timeout)
} else {
err = waitForDeploymentComplete(rbdDeploymentName, namespace, f.ClientSet, deployTimeout)
}
if err != nil {
Fail(err.Error())
}

View File

@ -9,6 +9,8 @@ import (
"strings"
"time"
"k8s.io/klog"
"github.com/kubernetes-csi/external-snapshotter/pkg/apis/volumesnapshot/v1alpha1"
snapClient "github.com/kubernetes-csi/external-snapshotter/pkg/client/clientset/versioned/typed/volumesnapshot/v1alpha1"
. "github.com/onsi/ginkgo" // nolint
@ -45,6 +47,31 @@ type snapInfo struct {
Timestamp string `json:"timestamp"`
}
func deployProvAsSTS(c clientset.Interface) bool {
// kubeMinor to use deployment instead of statefulset for provisioner
const kubeMinor = "14"
v, err := c.Discovery().ServerVersion()
if err != nil {
klog.Errorf("failed to get server version with error %v", err)
return false
}
if v.Minor < kubeMinor {
return true
}
return false
}
func getKubeVersionToDeploy(c clientset.Interface) string {
sts := deployProvAsSTS(c)
version := ""
if sts {
version = "v1.13"
} else {
version = "v1.14+"
}
return version
}
func waitForDaemonSets(name, ns string, c clientset.Interface, t int) error {
timeout := time.Duration(t) * time.Minute
start := time.Now()
@ -97,7 +124,7 @@ func waitForDeploymentComplete(name, ns string, c clientset.Interface, t int) er
return true, nil
}
reason = fmt.Sprintf("deployment status: %#v", deployment.Status)
reason = fmt.Sprintf("deployment status: %#v", deployment.Status.String())
e2elog.Logf(reason)
return false, nil
@ -234,8 +261,8 @@ func createRBDSnapshotClass(f *framework.Framework) {
Expect(err).Should(BeNil())
}
func createConfigMap(c kubernetes.Interface, f *framework.Framework) {
path := rbdDirPath + rbdConfigMap
func createConfigMap(pluginPath string, c kubernetes.Interface, f *framework.Framework) {
path := pluginPath + configMap
cm := v1.ConfigMap{}
err := unmarshal(path, &cm)
Expect(err).Should(BeNil())