Changes to accommodate client-go changes and kube vendor update

to v1.18.0

Signed-off-by: Humble Chirammal <hchiramm@redhat.com>
This commit is contained in:
Humble Chirammal
2020-04-14 12:34:33 +05:30
committed by mergify[bot]
parent 4c96ad3c85
commit 34fc1d847e
1083 changed files with 50505 additions and 155846 deletions

View File

@ -11,308 +11,6 @@
"ForbiddenPrefixes": [
"k8s.io/kubernetes/pkg/client/unversioned/testclient"
]
},
{
"SelectorRegexp": "k8s[.]io/(api/|apimachinery/|apiextensions-apiserver/|apiserver/)",
"AllowedPrefixes": [
"k8s.io/api/apps/v1",
"k8s.io/api/apps/v1beta1",
"k8s.io/api/authentication/v1",
"k8s.io/api/authorization/v1beta1",
"k8s.io/api/autoscaling/v1",
"k8s.io/api/autoscaling/v2beta1",
"k8s.io/api/autoscaling/v2beta2",
"k8s.io/api/batch/v1",
"k8s.io/api/batch/v1beta1",
"k8s.io/api/certificates/v1beta1",
"k8s.io/api/core/v1",
"k8s.io/api/coordination/v1",
"k8s.io/api/discovery/v1beta1",
"k8s.io/api/extensions/v1beta1",
"k8s.io/api/policy/v1beta1",
"k8s.io/api/rbac/v1",
"k8s.io/api/storage/v1",
"k8s.io/apimachinery/pkg/api/equality",
"k8s.io/apimachinery/pkg/api/errors",
"k8s.io/apimachinery/pkg/api/meta",
"k8s.io/apimachinery/pkg/api/resource",
"k8s.io/apimachinery/pkg/apis/meta/v1",
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured",
"k8s.io/apimachinery/pkg/conversion",
"k8s.io/apimachinery/pkg/fields",
"k8s.io/apimachinery/pkg/labels",
"k8s.io/apimachinery/pkg/runtime",
"k8s.io/apimachinery/pkg/runtime/schema",
"k8s.io/apimachinery/pkg/runtime/serializer",
"k8s.io/apimachinery/pkg/types",
"k8s.io/apimachinery/pkg/util/clock",
"k8s.io/apimachinery/pkg/util/diff",
"k8s.io/apimachinery/pkg/util/errors",
"k8s.io/apimachinery/pkg/util/intstr",
"k8s.io/apimachinery/pkg/util/json",
"k8s.io/apimachinery/pkg/util/rand",
"k8s.io/apimachinery/pkg/util/runtime",
"k8s.io/apimachinery/pkg/util/sets",
"k8s.io/apimachinery/pkg/util/strategicpatch",
"k8s.io/apimachinery/pkg/util/uuid",
"k8s.io/apimachinery/pkg/util/wait",
"k8s.io/apimachinery/pkg/util/version",
"k8s.io/apimachinery/pkg/watch",
"k8s.io/apiserver/pkg/authentication/serviceaccount",
"k8s.io/apiserver/pkg/storage/names",
"k8s.io/apiserver/pkg/util/feature",
"k8s.io/apiextensions-apiserver/pkg/features",
"k8s.io/apimachinery/pkg/api/validation",
"k8s.io/apimachinery/pkg/apis/meta/internalversion",
"k8s.io/apimachinery/pkg/selection",
"k8s.io/apimachinery/pkg/util/validation",
"k8s.io/apimachinery/pkg/util/validation/field",
"k8s.io/apiserver/pkg/authentication/authenticator",
"k8s.io/apiserver/pkg/authentication/user",
"k8s.io/apiserver/pkg/features",
"k8s.io/apiserver/pkg/registry/generic",
"k8s.io/apimachinery/pkg/version",
"k8s.io/api/imagepolicy/v1alpha1",
"k8s.io/apiserver/pkg/admission",
"k8s.io/apiserver/pkg/storage",
"k8s.io/api/batch/v2alpha1",
"k8s.io/apiserver/pkg/registry/rest",
"k8s.io/api/scheduling/v1alpha1",
"k8s.io/api/admissionregistration/v1",
"k8s.io/api/admissionregistration/v1beta1",
"k8s.io/api/authorization/v1",
"k8s.io/api/settings/v1alpha1",
"k8s.io/api/admission/v1beta1",
"k8s.io/api/admission/v1",
"k8s.io/api/networking/v1",
"k8s.io/component-base/config",
"k8s.io/component-base/config/v1alpha1",
"k8s.io/api/scheduling/v1"
]
},
{
"SelectorRegexp": "github[.]com/",
"AllowedPrefixes": [
"github.com/cloudflare/cfssl/config",
"github.com/cloudflare/cfssl/helpers",
"github.com/cloudflare/cfssl/signer",
"github.com/cloudflare/cfssl/signer/local",
"github.com/davecgh/go-spew/spew",
"github.com/docker/distribution/reference",
"github.com/evanphx/json-patch",
"github.com/golang/groupcache/lru",
"github.com/GoogleCloudPlatform/k8s-cloud-provider/pkg/cloud",
"github.com/google/gofuzz",
"github.com/prometheus/client_golang/prometheus",
"github.com/robfig/cron",
"github.com/spf13/pflag",
"github.com/stretchr/testify/assert",
"github.com/stretchr/testify/mock",
"github.com/stretchr/testify/require",
"github.com/google/gofuzz",
"github.com/golang/protobuf/ptypes/wrappers",
"github.com/golang/protobuf/proto",
"github.com/container-storage-interface/spec/lib/go/csi"
]
},
{
"SelectorRegexp": "k8s[.]io/client-go/",
"AllowedPrefixes": [
"k8s.io/client-go/util/keyutil",
"k8s.io/client-go/discovery",
"k8s.io/client-go/dynamic",
"k8s.io/client-go/informers",
"k8s.io/client-go/informers/apps/v1",
"k8s.io/client-go/informers/apps/v1beta1",
"k8s.io/client-go/informers/autoscaling/v1",
"k8s.io/client-go/informers/batch/v1",
"k8s.io/client-go/informers/certificates/v1beta1",
"k8s.io/client-go/informers/core/v1",
"k8s.io/client-go/informers/extensions/v1beta1",
"k8s.io/client-go/informers/policy/v1beta1",
"k8s.io/client-go/informers/rbac/v1",
"k8s.io/client-go/informers/storage/v1",
"k8s.io/client-go/kubernetes",
"k8s.io/client-go/kubernetes/fake",
"k8s.io/client-go/kubernetes/scheme",
"k8s.io/client-go/kubernetes/typed/apps/v1",
"k8s.io/client-go/kubernetes/typed/authentication/v1",
"k8s.io/client-go/kubernetes/typed/autoscaling/v1",
"k8s.io/client-go/kubernetes/typed/certificates/v1beta1",
"k8s.io/client-go/kubernetes/typed/core/v1",
"k8s.io/client-go/kubernetes/typed/policy/v1beta1",
"k8s.io/client-go/kubernetes/typed/rbac/v1",
"k8s.io/client-go/listers/apps/v1",
"k8s.io/client-go/listers/apps/v1beta1",
"k8s.io/client-go/listers/autoscaling/v1",
"k8s.io/client-go/listers/batch/v1",
"k8s.io/client-go/listers/certificates/v1beta1",
"k8s.io/client-go/listers/coordination/v1",
"k8s.io/client-go/listers/core/v1",
"k8s.io/client-go/listers/discovery/v1alpha1",
"k8s.io/client-go/listers/discovery/v1beta1",
"k8s.io/client-go/listers/coordination/v1",
"k8s.io/client-go/listers/extensions/v1beta1",
"k8s.io/client-go/listers/policy/v1beta1",
"k8s.io/client-go/listers/rbac/v1",
"k8s.io/client-go/listers/storage/v1",
"k8s.io/client-go/metadata",
"k8s.io/client-go/pkg/version",
"k8s.io/client-go/rest",
"k8s.io/client-go/scale",
"k8s.io/client-go/testing",
"k8s.io/client-go/tools/cache",
"k8s.io/client-go/tools/leaderelection/resourcelock",
"k8s.io/client-go/tools/pager",
"k8s.io/client-go/tools/record",
"k8s.io/client-go/tools/reference",
"k8s.io/client-go/tools/watch",
"k8s.io/client-go/transport",
"k8s.io/client-go/util/cert",
"k8s.io/client-go/util/flowcontrol",
"k8s.io/client-go/util/retry",
"k8s.io/client-go/util/testing",
"k8s.io/client-go/util/workqueue"
]
},
{
"SelectorRegexp": "k8s[.]io/kubernetes/pkg",
"AllowedPrefixes": [
"k8s.io/kubernetes/pkg/api/legacyscheme",
"k8s.io/kubernetes/pkg/api/v1/endpoints",
"k8s.io/kubernetes/pkg/api/v1/node",
"k8s.io/kubernetes/pkg/api/v1/pod",
"k8s.io/kubernetes/pkg/apis/apps/v1",
"k8s.io/kubernetes/pkg/apis/autoscaling",
"k8s.io/kubernetes/pkg/apis/certificates/v1beta1",
"k8s.io/kubernetes/pkg/apis/core",
"k8s.io/kubernetes/pkg/apis/core/helper",
"k8s.io/kubernetes/pkg/apis/core/install",
"k8s.io/kubernetes/pkg/apis/core/v1",
"k8s.io/kubernetes/pkg/apis/core/v1/helper",
"k8s.io/kubernetes/pkg/apis/core/validation",
"k8s.io/kubernetes/pkg/apis/discovery",
"k8s.io/kubernetes/pkg/apis/discovery/validation",
"k8s.io/kubernetes/pkg/cloudprovider",
"k8s.io/kubernetes/pkg/cloudprovider/providers/gce",
"k8s.io/kubernetes/pkg/controller",
"k8s.io/kubernetes/pkg/controller/apis/config",
"k8s.io/kubernetes/pkg/controller/apis/config/v1alpha1",
"k8s.io/kubernetes/pkg/controller/certificates",
"k8s.io/kubernetes/pkg/controller/daemon",
"k8s.io/kubernetes/pkg/controller/daemon/util",
"k8s.io/kubernetes/pkg/controller/deployment",
"k8s.io/kubernetes/pkg/controller/deployment/util",
"k8s.io/kubernetes/pkg/controller/garbagecollector",
"k8s.io/kubernetes/pkg/controller/garbagecollector/metaonly",
"k8s.io/kubernetes/pkg/controller/history",
"k8s.io/kubernetes/pkg/controller/job",
"k8s.io/kubernetes/pkg/controller/namespace",
"k8s.io/kubernetes/pkg/controller/namespace/deletion",
"k8s.io/kubernetes/pkg/controller/nodeipam",
"k8s.io/kubernetes/pkg/controller/nodeipam/ipam",
"k8s.io/kubernetes/pkg/controller/nodeipam/ipam/cidrset",
"k8s.io/kubernetes/pkg/controller/nodeipam/ipam/sync",
"k8s.io/kubernetes/pkg/controller/nodelifecycle",
"k8s.io/kubernetes/pkg/controller/nodelifecycle/scheduler",
"k8s.io/kubernetes/pkg/controller/podautoscaler",
"k8s.io/kubernetes/pkg/controller/podautoscaler/metrics",
"k8s.io/kubernetes/pkg/controller/replicaset",
"k8s.io/kubernetes/pkg/controller/util/node",
"k8s.io/kubernetes/pkg/controller/volume/attachdetach",
"k8s.io/kubernetes/pkg/controller/volume/attachdetach/cache",
"k8s.io/kubernetes/pkg/controller/volume/attachdetach/metrics",
"k8s.io/kubernetes/pkg/controller/volume/attachdetach/populator",
"k8s.io/kubernetes/pkg/controller/volume/attachdetach/reconciler",
"k8s.io/kubernetes/pkg/controller/volume/attachdetach/statusupdater",
"k8s.io/kubernetes/pkg/controller/volume/attachdetach/util",
"k8s.io/kubernetes/pkg/controller/volume/events",
"k8s.io/kubernetes/pkg/controller/volume/expand",
"k8s.io/kubernetes/pkg/controller/volume/expand/cache",
"k8s.io/kubernetes/pkg/controller/volume/persistentvolume",
"k8s.io/kubernetes/pkg/controller/volume/persistentvolume/metrics",
"k8s.io/kubernetes/pkg/features",
"k8s.io/kubernetes/pkg/kubectl/scheme",
"k8s.io/kubernetes/pkg/kubelet/apis",
"k8s.io/kubernetes/pkg/kubelet/events",
"k8s.io/kubernetes/pkg/kubelet/types",
"k8s.io/kubernetes/pkg/kubelet/util/format",
"k8s.io/kubernetes/pkg/quota",
"k8s.io/kubernetes/pkg/registry/core/secret",
"k8s.io/kubernetes/pkg/scheduler/algorithm",
"k8s.io/kubernetes/pkg/scheduler/algorithm/predicates",
"k8s.io/kubernetes/pkg/scheduler/nodeinfo",
"k8s.io/kubernetes/pkg/serviceaccount",
"k8s.io/kubernetes/pkg/util/goroutinemap",
"k8s.io/kubernetes/pkg/util/goroutinemap/exponentialbackoff",
"k8s.io/kubernetes/pkg/util/hash",
"k8s.io/kubernetes/pkg/util/labels",
"k8s.io/kubernetes/pkg/util/node",
"k8s.io/kubernetes/pkg/util/slice",
"k8s.io/kubernetes/pkg/util/taints",
"k8s.io/kubernetes/pkg/volume",
"k8s.io/kubernetes/pkg/volume/util",
"k8s.io/kubernetes/pkg/volume/util/operationexecutor",
"k8s.io/kubernetes/pkg/volume/util/recyclerclient",
"k8s.io/kubernetes/pkg/volume/util/subpath",
"k8s.io/kubernetes/pkg/volume/util/types",
"k8s.io/kubernetes/pkg/volume/util/volumepathhandler",
"k8s.io/kubernetes/pkg/api/service",
"k8s.io/kubernetes/pkg/apis/scheduling",
"k8s.io/kubernetes/pkg/capabilities",
"k8s.io/kubernetes/pkg/master/ports",
"k8s.io/kubernetes/pkg/scheduler/api",
"k8s.io/kubernetes/pkg/scheduler/util",
"k8s.io/kubernetes/pkg/scheduler/listers",
"k8s.io/kubernetes/pkg/security/apparmor",
"k8s.io/kubernetes/pkg/util/parsers",
"k8s.io/kubernetes/pkg/fieldpath",
"k8s.io/kubernetes/pkg/scheduler/volumebinder",
"k8s.io/kubernetes/pkg/util/resizefs",
"k8s.io/kubernetes/pkg/apis/apps",
"k8s.io/kubernetes/pkg/scheduler/metrics"
]
},
{
"SelectorRegexp": "k8s[.]io/(metrics/|utils/|heapster/|kube-controller-manager/)",
"AllowedPrefixes": [
"k8s.io/heapster/metrics/api/v1/types",
"k8s.io/kube-controller-manager/config/v1alpha1",
"k8s.io/metrics/pkg/apis/custom_metrics/v1beta2",
"k8s.io/metrics/pkg/apis/external_metrics/v1beta1",
"k8s.io/metrics/pkg/apis/metrics/v1alpha1",
"k8s.io/metrics/pkg/apis/metrics/v1beta1",
"k8s.io/metrics/pkg/client/clientset/versioned/scheme",
"k8s.io/metrics/pkg/client/clientset/versioned/typed/metrics/v1beta1",
"k8s.io/metrics/pkg/client/custom_metrics",
"k8s.io/metrics/pkg/client/external_metrics",
"k8s.io/utils/exec",
"k8s.io/utils/integer",
"k8s.io/utils/io",
"k8s.io/utils/mount",
"k8s.io/utils/net",
"k8s.io/utils/nsenter",
"k8s.io/utils/path",
"k8s.io/utils/pointer",
"k8s.io/utils/strings"
]
},
{
"SelectorRegexp": "golang[.]org/",
"AllowedPrefixes": [
"golang.org/x/time/rate",
"golang.org/x/sys/unix",
"golang.org/x/oauth2",
"golang.org/x/net/context",
"google.golang.org/api/compute/v1",
"google.golang.org/api/googleapi",
"google.golang.org/api/compute/v0.alpha",
"google.golang.org/api/container/v1",
"google.golang.org/api/compute/v0.beta",
"google.golang.org/api/tpu/v1",
"golang.org/x/net/context",
"google.golang.org/grpc"
]
}
]
}

View File

@ -14,7 +14,6 @@ go_test(
],
embed = [":go_default_library"],
deps = [
"//pkg/api/testapi:go_default_library",
"//pkg/apis/core/install:go_default_library",
"//pkg/controller/testutil:go_default_library",
"//pkg/securitycontext:go_default_library",
@ -31,6 +30,7 @@ go_test(
"//staging/src/k8s.io/apimachinery/pkg/util/uuid:go_default_library",
"//staging/src/k8s.io/client-go/kubernetes:go_default_library",
"//staging/src/k8s.io/client-go/kubernetes/fake:go_default_library",
"//staging/src/k8s.io/client-go/kubernetes/scheme:go_default_library",
"//staging/src/k8s.io/client-go/rest:go_default_library",
"//staging/src/k8s.io/client-go/tools/cache:go_default_library",
"//staging/src/k8s.io/client-go/tools/record:go_default_library",

View File

@ -22,7 +22,7 @@ import (
"time"
v1authenticationapi "k8s.io/api/authentication/v1"
"k8s.io/api/core/v1"
v1 "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/fields"
@ -120,11 +120,11 @@ func (b SAControllerClientBuilder) Config(name string) (*restclient.Config, erro
lw := &cache.ListWatch{
ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
options.FieldSelector = fieldSelector
return b.CoreClient.Secrets(b.Namespace).List(options)
return b.CoreClient.Secrets(b.Namespace).List(context.TODO(), options)
},
WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
options.FieldSelector = fieldSelector
return b.CoreClient.Secrets(b.Namespace).Watch(options)
return b.CoreClient.Secrets(b.Namespace).Watch(context.TODO(), options)
},
}
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
@ -157,7 +157,7 @@ func (b SAControllerClientBuilder) Config(name string) (*restclient.Config, erro
if !valid {
klog.Warningf("secret %s contained an invalid API token for %s/%s", secret.Name, sa.Namespace, sa.Name)
// try to delete the secret containing the invalid token
if err := b.CoreClient.Secrets(secret.Namespace).Delete(secret.Name, &metav1.DeleteOptions{}); err != nil && !apierrors.IsNotFound(err) {
if err := b.CoreClient.Secrets(secret.Namespace).Delete(context.TODO(), secret.Name, metav1.DeleteOptions{}); err != nil && !apierrors.IsNotFound(err) {
klog.Warningf("error deleting secret %s containing invalid API token for %s/%s: %v", secret.Name, sa.Namespace, sa.Name, err)
}
// continue watching for good tokens
@ -186,7 +186,7 @@ func (b SAControllerClientBuilder) getAuthenticatedConfig(sa *v1.ServiceAccount,
// Try token review first
tokenReview := &v1authenticationapi.TokenReview{Spec: v1authenticationapi.TokenReviewSpec{Token: token}}
if tokenResult, err := b.AuthenticationClient.TokenReviews().Create(tokenReview); err == nil {
if tokenResult, err := b.AuthenticationClient.TokenReviews().Create(context.TODO(), tokenReview, metav1.CreateOptions{}); err == nil {
if !tokenResult.Status.Authenticated {
klog.Warningf("Token for %s/%s did not authenticate correctly", sa.Namespace, sa.Name)
return nil, false, nil
@ -207,7 +207,7 @@ func (b SAControllerClientBuilder) getAuthenticatedConfig(sa *v1.ServiceAccount,
if err != nil {
return nil, false, err
}
err = client.Get().AbsPath("/apis").Do().Error()
err = client.Get().AbsPath("/apis").Do(context.TODO()).Error()
if apierrors.IsUnauthorized(err) {
klog.Warningf("Token for %s/%s did not authenticate correctly: %v", sa.Namespace, sa.Name, err)
return nil, false, nil

View File

@ -17,6 +17,7 @@ limitations under the License.
package controller
import (
"context"
"fmt"
"net/http"
"sync"
@ -25,6 +26,7 @@ import (
"golang.org/x/oauth2"
v1authenticationapi "k8s.io/api/authentication/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/clock"
"k8s.io/apimachinery/pkg/util/wait"
apiserverserviceaccount "k8s.io/apiserver/pkg/authentication/serviceaccount"
@ -174,11 +176,11 @@ func (ts *tokenSourceImpl) Token() (*oauth2.Token, error) {
return false, nil
}
tr, inErr := ts.coreClient.ServiceAccounts(ts.namespace).CreateToken(ts.serviceAccountName, &v1authenticationapi.TokenRequest{
tr, inErr := ts.coreClient.ServiceAccounts(ts.namespace).CreateToken(context.TODO(), ts.serviceAccountName, &v1authenticationapi.TokenRequest{
Spec: v1authenticationapi.TokenRequestSpec{
ExpirationSeconds: utilpointer.Int64Ptr(ts.expirationSeconds),
},
})
}, metav1.CreateOptions{})
if inErr != nil {
klog.Warningf("get token failed: %v", inErr)
return false, nil

View File

@ -17,6 +17,7 @@ limitations under the License.
package controller
import (
"context"
"encoding/binary"
"encoding/json"
"fmt"
@ -419,7 +420,7 @@ type RealRSControl struct {
var _ RSControlInterface = &RealRSControl{}
func (r RealRSControl) PatchReplicaSet(namespace, name string, data []byte) error {
_, err := r.KubeClient.AppsV1().ReplicaSets(namespace).Patch(name, types.StrategicMergePatchType, data)
_, err := r.KubeClient.AppsV1().ReplicaSets(namespace).Patch(context.TODO(), name, types.StrategicMergePatchType, data, metav1.PatchOptions{})
return err
}
@ -439,7 +440,7 @@ type RealControllerRevisionControl struct {
var _ ControllerRevisionControlInterface = &RealControllerRevisionControl{}
func (r RealControllerRevisionControl) PatchControllerRevision(namespace, name string, data []byte) error {
_, err := r.KubeClient.AppsV1().ControllerRevisions(namespace).Patch(name, types.StrategicMergePatchType, data)
_, err := r.KubeClient.AppsV1().ControllerRevisions(namespace).Patch(context.TODO(), name, types.StrategicMergePatchType, data, metav1.PatchOptions{})
return err
}
@ -536,7 +537,7 @@ func (r RealPodControl) CreatePodsOnNode(nodeName, namespace string, template *v
}
func (r RealPodControl) PatchPod(namespace, name string, data []byte) error {
_, err := r.KubeClient.CoreV1().Pods(namespace).Patch(name, types.StrategicMergePatchType, data)
_, err := r.KubeClient.CoreV1().Pods(namespace).Patch(context.TODO(), name, types.StrategicMergePatchType, data, metav1.PatchOptions{})
return err
}
@ -576,7 +577,7 @@ func (r RealPodControl) createPods(nodeName, namespace string, template *v1.PodT
if len(labels.Set(pod.Labels)) == 0 {
return fmt.Errorf("unable to create pods, no labels")
}
newPod, err := r.KubeClient.CoreV1().Pods(namespace).Create(pod)
newPod, err := r.KubeClient.CoreV1().Pods(namespace).Create(context.TODO(), pod, metav1.CreateOptions{})
if err != nil {
// only send an event if the namespace isn't terminating
if !apierrors.HasStatusCause(err, v1.NamespaceTerminatingCause) {
@ -601,7 +602,7 @@ func (r RealPodControl) DeletePod(namespace string, podID string, object runtime
return fmt.Errorf("object does not have ObjectMeta, %v", err)
}
klog.V(2).Infof("Controller %v deleting pod %v/%v", accessor.GetName(), namespace, podID)
if err := r.KubeClient.CoreV1().Pods(namespace).Delete(podID, nil); err != nil && !apierrors.IsNotFound(err) {
if err := r.KubeClient.CoreV1().Pods(namespace).Delete(context.TODO(), podID, metav1.DeleteOptions{}); err != nil && !apierrors.IsNotFound(err) {
r.Recorder.Eventf(object, v1.EventTypeWarning, FailedDeletePodReason, "Error deleting: %v", err)
return fmt.Errorf("unable to delete pods: %v", err)
}
@ -1013,10 +1014,10 @@ func AddOrUpdateTaintOnNode(c clientset.Interface, nodeName string, taints ...*v
// First we try getting node from the API server cache, as it's cheaper. If it fails
// we get it from etcd to be sure to have fresh data.
if firstTry {
oldNode, err = c.CoreV1().Nodes().Get(nodeName, metav1.GetOptions{ResourceVersion: "0"})
oldNode, err = c.CoreV1().Nodes().Get(context.TODO(), nodeName, metav1.GetOptions{ResourceVersion: "0"})
firstTry = false
} else {
oldNode, err = c.CoreV1().Nodes().Get(nodeName, metav1.GetOptions{})
oldNode, err = c.CoreV1().Nodes().Get(context.TODO(), nodeName, metav1.GetOptions{})
}
if err != nil {
return err
@ -1070,10 +1071,10 @@ func RemoveTaintOffNode(c clientset.Interface, nodeName string, node *v1.Node, t
// First we try getting node from the API server cache, as it's cheaper. If it fails
// we get it from etcd to be sure to have fresh data.
if firstTry {
oldNode, err = c.CoreV1().Nodes().Get(nodeName, metav1.GetOptions{ResourceVersion: "0"})
oldNode, err = c.CoreV1().Nodes().Get(context.TODO(), nodeName, metav1.GetOptions{ResourceVersion: "0"})
firstTry = false
} else {
oldNode, err = c.CoreV1().Nodes().Get(nodeName, metav1.GetOptions{})
oldNode, err = c.CoreV1().Nodes().Get(context.TODO(), nodeName, metav1.GetOptions{})
}
if err != nil {
return err
@ -1118,7 +1119,7 @@ func PatchNodeTaints(c clientset.Interface, nodeName string, oldNode *v1.Node, n
return fmt.Errorf("failed to create patch for node %q: %v", nodeName, err)
}
_, err = c.CoreV1().Nodes().Patch(nodeName, types.StrategicMergePatchType, patchBytes)
_, err = c.CoreV1().Nodes().Patch(context.TODO(), nodeName, types.StrategicMergePatchType, patchBytes, metav1.PatchOptions{})
return err
}
@ -1147,10 +1148,10 @@ func AddOrUpdateLabelsOnNode(kubeClient clientset.Interface, nodeName string, la
// First we try getting node from the API server cache, as it's cheaper. If it fails
// we get it from etcd to be sure to have fresh data.
if firstTry {
node, err = kubeClient.CoreV1().Nodes().Get(nodeName, metav1.GetOptions{ResourceVersion: "0"})
node, err = kubeClient.CoreV1().Nodes().Get(context.TODO(), nodeName, metav1.GetOptions{ResourceVersion: "0"})
firstTry = false
} else {
node, err = kubeClient.CoreV1().Nodes().Get(nodeName, metav1.GetOptions{})
node, err = kubeClient.CoreV1().Nodes().Get(context.TODO(), nodeName, metav1.GetOptions{})
}
if err != nil {
return err
@ -1177,7 +1178,7 @@ func AddOrUpdateLabelsOnNode(kubeClient clientset.Interface, nodeName string, la
if err != nil {
return fmt.Errorf("failed to create a two-way merge patch: %v", err)
}
if _, err := kubeClient.CoreV1().Nodes().Patch(node.Name, types.StrategicMergePatchType, patchBytes); err != nil {
if _, err := kubeClient.CoreV1().Nodes().Patch(context.TODO(), node.Name, types.StrategicMergePatchType, patchBytes, metav1.PatchOptions{}); err != nil {
return fmt.Errorf("failed to patch the node: %v", err)
}
return nil
@ -1185,7 +1186,7 @@ func AddOrUpdateLabelsOnNode(kubeClient clientset.Interface, nodeName string, la
}
func getOrCreateServiceAccount(coreClient v1core.CoreV1Interface, namespace, name string) (*v1.ServiceAccount, error) {
sa, err := coreClient.ServiceAccounts(namespace).Get(name, metav1.GetOptions{})
sa, err := coreClient.ServiceAccounts(namespace).Get(context.TODO(), name, metav1.GetOptions{})
if err == nil {
return sa, nil
}
@ -1195,17 +1196,17 @@ func getOrCreateServiceAccount(coreClient v1core.CoreV1Interface, namespace, nam
// Create the namespace if we can't verify it exists.
// Tolerate errors, since we don't know whether this component has namespace creation permissions.
if _, err := coreClient.Namespaces().Get(namespace, metav1.GetOptions{}); apierrors.IsNotFound(err) {
if _, err = coreClient.Namespaces().Create(&v1.Namespace{ObjectMeta: metav1.ObjectMeta{Name: namespace}}); err != nil && !apierrors.IsAlreadyExists(err) {
if _, err := coreClient.Namespaces().Get(context.TODO(), namespace, metav1.GetOptions{}); apierrors.IsNotFound(err) {
if _, err = coreClient.Namespaces().Create(context.TODO(), &v1.Namespace{ObjectMeta: metav1.ObjectMeta{Name: namespace}}, metav1.CreateOptions{}); err != nil && !apierrors.IsAlreadyExists(err) {
klog.Warningf("create non-exist namespace %s failed:%v", namespace, err)
}
}
// Create the service account
sa, err = coreClient.ServiceAccounts(namespace).Create(&v1.ServiceAccount{ObjectMeta: metav1.ObjectMeta{Namespace: namespace, Name: name}})
sa, err = coreClient.ServiceAccounts(namespace).Create(context.TODO(), &v1.ServiceAccount{ObjectMeta: metav1.ObjectMeta{Namespace: namespace, Name: name}}, metav1.CreateOptions{})
if apierrors.IsAlreadyExists(err) {
// If we're racing to init and someone else already created it, re-fetch
return coreClient.ServiceAccounts(namespace).Get(name, metav1.GetOptions{})
return coreClient.ServiceAccounts(namespace).Get(context.TODO(), name, metav1.GetOptions{})
}
return sa, err
}

View File

@ -18,11 +18,13 @@ go_library(
"//staging/src/k8s.io/apimachinery/pkg/api/equality:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/api/meta:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/labels:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/runtime:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/types:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/intstr:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/wait:go_default_library",
"//staging/src/k8s.io/client-go/kubernetes/typed/apps/v1:go_default_library",
"//staging/src/k8s.io/client-go/listers/apps/v1:go_default_library",
"//vendor/k8s.io/klog:go_default_library",
"//vendor/k8s.io/utils/integer:go_default_library",
],
@ -46,6 +48,7 @@ go_test(
"//staging/src/k8s.io/apimachinery/pkg/types:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/intstr:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/storage/names:go_default_library",
"//staging/src/k8s.io/client-go/informers:go_default_library",
"//staging/src/k8s.io/client-go/kubernetes/fake:go_default_library",
"//staging/src/k8s.io/client-go/testing:go_default_library",
],

View File

@ -17,6 +17,7 @@ limitations under the License.
package util
import (
"context"
"fmt"
"math"
"sort"
@ -24,18 +25,19 @@ import (
"strings"
"time"
"k8s.io/klog"
apps "k8s.io/api/apps/v1"
v1 "k8s.io/api/core/v1"
apiequality "k8s.io/apimachinery/pkg/api/equality"
"k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
intstrutil "k8s.io/apimachinery/pkg/util/intstr"
"k8s.io/apimachinery/pkg/util/wait"
appsclient "k8s.io/client-go/kubernetes/typed/apps/v1"
appslisters "k8s.io/client-go/listers/apps/v1"
"k8s.io/klog"
"k8s.io/kubernetes/pkg/controller"
labelsutil "k8s.io/kubernetes/pkg/util/labels"
"k8s.io/utils/integer"
@ -544,7 +546,7 @@ func GetNewReplicaSet(deployment *apps.Deployment, c appsclient.AppsV1Interface)
// RsListFromClient returns an rsListFunc that wraps the given client.
func RsListFromClient(c appsclient.AppsV1Interface) RsListFunc {
return func(namespace string, options metav1.ListOptions) ([]*apps.ReplicaSet, error) {
rsList, err := c.ReplicaSets(namespace).List(options)
rsList, err := c.ReplicaSets(namespace).List(context.TODO(), options)
if err != nil {
return nil, err
}
@ -912,3 +914,38 @@ func HasProgressDeadline(d *apps.Deployment) bool {
func HasRevisionHistoryLimit(d *apps.Deployment) bool {
return d.Spec.RevisionHistoryLimit != nil && *d.Spec.RevisionHistoryLimit != math.MaxInt32
}
// GetDeploymentsForReplicaSet returns a list of Deployments that potentially
// match a ReplicaSet. Only the one specified in the ReplicaSet's ControllerRef
// will actually manage it.
// Returns an error only if no matching Deployments are found.
func GetDeploymentsForReplicaSet(deploymentLister appslisters.DeploymentLister, rs *apps.ReplicaSet) ([]*apps.Deployment, error) {
if len(rs.Labels) == 0 {
return nil, fmt.Errorf("no deployments found for ReplicaSet %v because it has no labels", rs.Name)
}
// TODO: MODIFY THIS METHOD so that it checks for the podTemplateSpecHash label
dList, err := deploymentLister.Deployments(rs.Namespace).List(labels.Everything())
if err != nil {
return nil, err
}
var deployments []*apps.Deployment
for _, d := range dList {
selector, err := metav1.LabelSelectorAsSelector(d.Spec.Selector)
if err != nil {
return nil, fmt.Errorf("invalid label selector: %v", err)
}
// If a deployment with a nil or empty selector creeps in, it should match nothing, not everything.
if selector.Empty() || !selector.Matches(labels.Set(rs.Labels)) {
continue
}
deployments = append(deployments, d)
}
if len(deployments) == 0 {
return nil, fmt.Errorf("could not find deployments set for ReplicaSet %s in namespace %s with labels: %v", rs.Name, rs.Namespace, rs.Labels)
}
return deployments, nil
}

View File

@ -22,6 +22,7 @@ limitations under the License.
package nodelifecycle
import (
"context"
"fmt"
"strings"
"sync"
@ -128,7 +129,7 @@ const (
retrySleepTime = 20 * time.Millisecond
nodeNameKeyIndex = "spec.nodeName"
// podUpdateWorkerSizes assumes that in most cases pod will be handled by monitorNodeHealth pass.
// Pod update workes will only handle lagging cache pods. 4 workes should be enough.
// Pod update workers will only handle lagging cache pods. 4 workers should be enough.
podUpdateWorkerSize = 4
)
@ -350,10 +351,6 @@ type Controller struct {
// tainted nodes, if they're not tolerated.
runTaintManager bool
// if set to true Controller will taint Nodes with 'TaintNodeNotReady' and 'TaintNodeUnreachable'
// taints instead of evicting Pods itself.
useTaintBasedEvictions bool
nodeUpdateQueue workqueue.Interface
podUpdateQueue workqueue.RateLimitingInterface
}
@ -374,7 +371,6 @@ func NewNodeLifecycleController(
largeClusterThreshold int32,
unhealthyZoneThreshold float32,
runTaintManager bool,
useTaintBasedEvictions bool,
) (*Controller, error) {
if kubeClient == nil {
@ -415,13 +411,9 @@ func NewNodeLifecycleController(
largeClusterThreshold: largeClusterThreshold,
unhealthyZoneThreshold: unhealthyZoneThreshold,
runTaintManager: runTaintManager,
useTaintBasedEvictions: useTaintBasedEvictions && runTaintManager,
nodeUpdateQueue: workqueue.NewNamed("node_lifecycle_controller"),
podUpdateQueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "node_lifecycle_controller_pods"),
}
if useTaintBasedEvictions {
klog.Infof("Controller is using taint based evictions.")
}
nc.enterPartialDisruptionFunc = nc.ReducedQPSFunc
nc.enterFullDisruptionFunc = nc.HealthyQPSFunc
@ -579,7 +571,7 @@ func (nc *Controller) Run(stopCh <-chan struct{}) {
go wait.Until(nc.doPodProcessingWorker, time.Second, stopCh)
}
if nc.useTaintBasedEvictions {
if nc.runTaintManager {
// Handling taint based evictions. Because we don't want a dedicated logic in TaintManager for NC-originated
// taints and we normally don't rate limit evictions caused by taints, we need to rate limit adding taints.
go wait.Until(nc.doNoExecuteTaintingPass, scheduler.NodeEvictionPeriod, stopCh)
@ -767,9 +759,7 @@ func (nc *Controller) doEvictionPass() {
// monitorNodeHealth verifies node health are constantly updated by kubelet, and
// if not, post "NodeReady==ConditionUnknown".
// For nodes who are not ready or not reachable for a long period of time.
// This function will taint them if TaintBasedEvictions feature was enabled.
// Otherwise, it would evict it directly.
// This function will taint nodes who are not ready or not reachable for a long period of time.
func (nc *Controller) monitorNodeHealth() error {
// We are listing nodes from local cache as we can tolerate some small delays
// comparing to state from etcd and there is eventual consistency anyway.
@ -788,7 +778,7 @@ func (nc *Controller) monitorNodeHealth() error {
nodeutil.RecordNodeEvent(nc.recorder, added[i].Name, string(added[i].UID), v1.EventTypeNormal, "RegisteredNode", fmt.Sprintf("Registered Node %v in Controller", added[i].Name))
nc.knownNodeSet[added[i].Name] = added[i]
nc.addPodEvictorForNewZone(added[i])
if nc.useTaintBasedEvictions {
if nc.runTaintManager {
nc.markNodeAsReachable(added[i])
} else {
nc.cancelPodEviction(added[i])
@ -813,7 +803,7 @@ func (nc *Controller) monitorNodeHealth() error {
return true, nil
}
name := node.Name
node, err = nc.kubeClient.CoreV1().Nodes().Get(name, metav1.GetOptions{})
node, err = nc.kubeClient.CoreV1().Nodes().Get(context.TODO(), name, metav1.GetOptions{})
if err != nil {
klog.Errorf("Failed while getting a Node to retry updating node health. Probably Node %s was deleted.", name)
return false, err
@ -842,7 +832,7 @@ func (nc *Controller) monitorNodeHealth() error {
}
continue
}
if nc.useTaintBasedEvictions {
if nc.runTaintManager {
nc.processTaintBaseEviction(node, &observedReadyCondition)
} else {
if err := nc.processNoTaintBaseEviction(node, &observedReadyCondition, gracePeriod, pods); err != nil {
@ -893,7 +883,7 @@ func (nc *Controller) processTaintBaseEviction(node *v1.Node, observedReadyCondi
if taintutils.TaintExists(node.Spec.Taints, NotReadyTaintTemplate) {
taintToAdd := *UnreachableTaintTemplate
if !nodeutil.SwapNodeControllerTaint(nc.kubeClient, []*v1.Taint{&taintToAdd}, []*v1.Taint{NotReadyTaintTemplate}, node) {
klog.Errorf("Failed to instantly swap UnreachableTaint to NotReadyTaint. Will try again in the next cycle.")
klog.Errorf("Failed to instantly swap NotReadyTaint to UnreachableTaint. Will try again in the next cycle.")
}
} else if nc.markNodeForTainting(node) {
klog.V(2).Infof("Node %v is unresponsive as of %v. Adding it to the Taint queue.",
@ -1148,7 +1138,7 @@ func (nc *Controller) tryUpdateNodeHealth(node *v1.Node) (time.Duration, v1.Node
_, currentReadyCondition = nodeutil.GetNodeCondition(&node.Status, v1.NodeReady)
if !apiequality.Semantic.DeepEqual(currentReadyCondition, &observedReadyCondition) {
if _, err := nc.kubeClient.CoreV1().Nodes().UpdateStatus(node); err != nil {
if _, err := nc.kubeClient.CoreV1().Nodes().UpdateStatus(context.TODO(), node, metav1.UpdateOptions{}); err != nil {
klog.Errorf("Error updating node %s: %v", node.Name, err)
return gracePeriod, observedReadyCondition, currentReadyCondition, err
}
@ -1208,7 +1198,7 @@ func (nc *Controller) handleDisruption(zoneToNodeConditions map[string][]*v1.Nod
if allAreFullyDisrupted {
klog.V(0).Info("Controller detected that all Nodes are not-Ready. Entering master disruption mode.")
for i := range nodes {
if nc.useTaintBasedEvictions {
if nc.runTaintManager {
_, err := nc.markNodeAsReachable(nodes[i])
if err != nil {
klog.Errorf("Failed to remove taints from Node %v", nodes[i].Name)
@ -1219,7 +1209,7 @@ func (nc *Controller) handleDisruption(zoneToNodeConditions map[string][]*v1.Nod
}
// We stop all evictions.
for k := range nc.zoneStates {
if nc.useTaintBasedEvictions {
if nc.runTaintManager {
nc.zoneNoExecuteTainter[k].SwapLimiter(0)
} else {
nc.zonePodEvictor[k].SwapLimiter(0)
@ -1331,7 +1321,7 @@ func (nc *Controller) processPod(podItem podUpdateItem) {
pods := []*v1.Pod{pod}
// In taint-based eviction mode, only node updates are processed by NodeLifecycleController.
// Pods are processed by TaintManager.
if !nc.useTaintBasedEvictions {
if !nc.runTaintManager {
if err := nc.processNoTaintBaseEviction(node, currentReadyCondition, nc.nodeMonitorGracePeriod, pods); err != nil {
klog.Warningf("Unable to process pod %+v eviction from node %v: %v.", podItem, nodeName, err)
nc.podUpdateQueue.AddRateLimited(podItem)
@ -1350,13 +1340,13 @@ func (nc *Controller) processPod(podItem podUpdateItem) {
func (nc *Controller) setLimiterInZone(zone string, zoneSize int, state ZoneState) {
switch state {
case stateNormal:
if nc.useTaintBasedEvictions {
if nc.runTaintManager {
nc.zoneNoExecuteTainter[zone].SwapLimiter(nc.evictionLimiterQPS)
} else {
nc.zonePodEvictor[zone].SwapLimiter(nc.evictionLimiterQPS)
}
case statePartialDisruption:
if nc.useTaintBasedEvictions {
if nc.runTaintManager {
nc.zoneNoExecuteTainter[zone].SwapLimiter(
nc.enterPartialDisruptionFunc(zoneSize))
} else {
@ -1364,7 +1354,7 @@ func (nc *Controller) setLimiterInZone(zone string, zoneSize int, state ZoneStat
nc.enterPartialDisruptionFunc(zoneSize))
}
case stateFullDisruption:
if nc.useTaintBasedEvictions {
if nc.runTaintManager {
nc.zoneNoExecuteTainter[zone].SwapLimiter(
nc.enterFullDisruptionFunc(zoneSize))
} else {
@ -1430,7 +1420,7 @@ func (nc *Controller) addPodEvictorForNewZone(node *v1.Node) {
zone := utilnode.GetZoneKey(node)
if _, found := nc.zoneStates[zone]; !found {
nc.zoneStates[zone] = stateInitial
if !nc.useTaintBasedEvictions {
if !nc.runTaintManager {
nc.zonePodEvictor[zone] =
scheduler.NewRateLimitedTimedQueue(
flowcontrol.NewTokenBucketRateLimiter(nc.evictionLimiterQPS, scheduler.EvictionRateLimiterBurst))

View File

@ -17,6 +17,7 @@ limitations under the License.
package scheduler
import (
"context"
"fmt"
"hash/fnv"
"io"
@ -108,7 +109,7 @@ func deletePodHandler(c clientset.Interface, emitEventFunc func(types.Namespaced
}
var err error
for i := 0; i < retries; i++ {
err = c.CoreV1().Pods(ns).Delete(name, &metav1.DeleteOptions{})
err = c.CoreV1().Pods(ns).Delete(context.TODO(), name, metav1.DeleteOptions{})
if err == nil {
break
}

View File

@ -137,7 +137,7 @@ func (q *TimedWorkerQueue) CancelWork(key string) bool {
}
// GetWorkerUnsafe returns a TimedWorker corresponding to the given key.
// Unsafe method - workers have attached goroutines which can fire afater this function is called.
// Unsafe method - workers have attached goroutines which can fire after this function is called.
func (q *TimedWorkerQueue) GetWorkerUnsafe(key string) *TimedWorker {
q.Lock()
defer q.Unlock()

View File

@ -17,6 +17,7 @@ limitations under the License.
package node
import (
"context"
"fmt"
"strings"
@ -79,7 +80,7 @@ func DeletePods(kubeClient clientset.Interface, pods []*v1.Pod, recorder record.
klog.V(2).Infof("Starting deletion of pod %v/%v", pod.Namespace, pod.Name)
recorder.Eventf(pod, v1.EventTypeNormal, "NodeControllerEviction", "Marking for deletion Pod %s from Node %s", pod.Name, nodeName)
if err := kubeClient.CoreV1().Pods(pod.Namespace).Delete(pod.Name, nil); err != nil {
if err := kubeClient.CoreV1().Pods(pod.Namespace).Delete(context.TODO(), pod.Name, metav1.DeleteOptions{}); err != nil {
if apierrors.IsNotFound(err) {
// NotFound error means that pod was already deleted.
// There is nothing left to do with this pod.
@ -109,7 +110,7 @@ func SetPodTerminationReason(kubeClient clientset.Interface, pod *v1.Pod, nodeNa
var updatedPod *v1.Pod
var err error
if updatedPod, err = kubeClient.CoreV1().Pods(pod.Namespace).UpdateStatus(pod); err != nil {
if updatedPod, err = kubeClient.CoreV1().Pods(pod.Namespace).UpdateStatus(context.TODO(), pod, metav1.UpdateOptions{}); err != nil {
return nil, err
}
return updatedPod, nil
@ -136,7 +137,7 @@ func MarkPodsNotReady(kubeClient clientset.Interface, pods []*v1.Pod, nodeName s
break
}
klog.V(2).Infof("Updating ready status of pod %v to false", pod.Name)
_, err := kubeClient.CoreV1().Pods(pod.Namespace).UpdateStatus(pod)
_, err := kubeClient.CoreV1().Pods(pod.Namespace).UpdateStatus(context.TODO(), pod, metav1.UpdateOptions{})
if err != nil {
if apierrors.IsNotFound(err) {
// NotFound error means that pod was already deleted.
@ -159,10 +160,11 @@ func MarkPodsNotReady(kubeClient clientset.Interface, pods []*v1.Pod, nodeName s
// RecordNodeEvent records a event related to a node.
func RecordNodeEvent(recorder record.EventRecorder, nodeName, nodeUID, eventtype, reason, event string) {
ref := &v1.ObjectReference{
Kind: "Node",
Name: nodeName,
UID: types.UID(nodeUID),
Namespace: "",
APIVersion: "v1",
Kind: "Node",
Name: nodeName,
UID: types.UID(nodeUID),
Namespace: "",
}
klog.V(2).Infof("Recording %s event message for node %s", event, nodeName)
recorder.Eventf(ref, eventtype, reason, "Node %s event: %s", nodeName, event)
@ -171,10 +173,11 @@ func RecordNodeEvent(recorder record.EventRecorder, nodeName, nodeUID, eventtype
// RecordNodeStatusChange records a event related to a node status change. (Common to lifecycle and ipam)
func RecordNodeStatusChange(recorder record.EventRecorder, node *v1.Node, newStatus string) {
ref := &v1.ObjectReference{
Kind: "Node",
Name: node.Name,
UID: node.UID,
Namespace: "",
APIVersion: "v1",
Kind: "Node",
Name: node.Name,
UID: node.UID,
Namespace: "",
}
klog.V(2).Infof("Recording status change %s event message for node %s", newStatus, node.Name)
// TODO: This requires a transaction, either both node status is updated

View File

@ -64,6 +64,13 @@ const (
// recognize dynamically provisioned PVs in its decisions).
AnnDynamicallyProvisioned = "pv.kubernetes.io/provisioned-by"
// AnnMigratedTo annotation is added to a PVC and PV that is supposed to be
// dynamically provisioned/deleted by by its corresponding CSI driver
// through the CSIMigration feature flags. When this annotation is set the
// Kubernetes components will "stand-down" and the external-provisioner will
// act on the objects
AnnMigratedTo = "pv.kubernetes.io/migrated-to"
// AnnStorageProvisioner annotation is added to a PVC that is supposed to be dynamically
// provisioned. Its value is name of volume plugin that is supposed to provision
// a volume for this PVC.
@ -204,9 +211,14 @@ func FindMatchingVolume(
// Skip volumes in the excluded list
continue
}
if volume.Spec.ClaimRef != nil && !IsVolumeBoundToClaim(volume, claim) {
continue
}
volumeQty := volume.Spec.Capacity[v1.ResourceStorage]
if volumeQty.Cmp(requestedQty) < 0 {
continue
}
// filter out mismatching volumeModes
if CheckVolumeModeMismatches(&claim.Spec, &volume.Spec) {
continue
@ -223,6 +235,8 @@ func FindMatchingVolume(
if node != nil {
// Scheduler path, check that the PV NodeAffinity
// is satisfied by the node
// volumeutil.CheckNodeAffinity is the most expensive call in this loop.
// We should check cheaper conditions first or consider optimizing this function.
err := volumeutil.CheckNodeAffinity(volume, node.Labels)
if err != nil {
nodeAffinityValid = false
@ -230,13 +244,6 @@ func FindMatchingVolume(
}
if IsVolumeBoundToClaim(volume, claim) {
// this claim and volume are pre-bound; return
// the volume if the size request is satisfied,
// otherwise continue searching for a match
if volumeQty.Cmp(requestedQty) < 0 {
continue
}
// If PV node affinity is invalid, return no match.
// This means the prebound PV (and therefore PVC)
// is not suitable for this node.
@ -256,7 +263,6 @@ func FindMatchingVolume(
// filter out:
// - volumes in non-available phase
// - volumes bound to another claim
// - volumes whose labels don't match the claim's selector, if specified
// - volumes in Class that is not requested
// - volumes whose NodeAffinity does not match the node
@ -266,8 +272,6 @@ func FindMatchingVolume(
// them now has high chance of encountering unnecessary failures
// due to API conflicts.
continue
} else if volume.Spec.ClaimRef != nil {
continue
} else if selector != nil && !selector.Matches(labels.Set(volume.Labels)) {
continue
}
@ -286,11 +290,9 @@ func FindMatchingVolume(
}
}
if volumeQty.Cmp(requestedQty) >= 0 {
if smallestVolume == nil || smallestVolumeQty.Cmp(volumeQty) > 0 {
smallestVolume = volume
smallestVolumeQty = volumeQty
}
if smallestVolume == nil || smallestVolumeQty.Cmp(volumeQty) > 0 {
smallestVolume = volume
smallestVolumeQty = volumeQty
}
}
@ -305,23 +307,6 @@ func FindMatchingVolume(
// CheckVolumeModeMismatches is a convenience method that checks volumeMode for PersistentVolume
// and PersistentVolumeClaims
func CheckVolumeModeMismatches(pvcSpec *v1.PersistentVolumeClaimSpec, pvSpec *v1.PersistentVolumeSpec) bool {
if !utilfeature.DefaultFeatureGate.Enabled(features.BlockVolume) {
if pvcSpec.VolumeMode != nil && *pvcSpec.VolumeMode == v1.PersistentVolumeBlock {
// Block PVC does not match anything when the feature is off. We explicitly want
// to prevent binding block PVC to filesystem PV.
// The PVC should be ignored by PV controller.
return true
}
if pvSpec.VolumeMode != nil && *pvSpec.VolumeMode == v1.PersistentVolumeBlock {
// Block PV does not match anything when the feature is off. We explicitly want
// to prevent binding block PV to filesystem PVC.
// The PV should be ignored by PV controller.
return true
}
// Both PV + PVC are not block.
return false
}
// In HA upgrades, we cannot guarantee that the apiserver is on a version >= controller-manager.
// So we default a nil volumeMode to filesystem
requestedVolumeMode := v1.PersistentVolumeFilesystem

View File

@ -45,7 +45,6 @@ go_test(
],
embed = [":go_default_library"],
deps = [
"//pkg/api/testapi:go_default_library",
"//pkg/controller:go_default_library",
"//pkg/controller/volume/persistentvolume/testing:go_default_library",
"//pkg/controller/volume/persistentvolume/util:go_default_library",

View File

@ -127,7 +127,7 @@ func (c *assumeCache) objInfoIndexFunc(obj interface{}) ([]string, error) {
return c.indexFunc(objInfo.latestObj)
}
// NewAssumeCache creates an assume cache for genernal objects.
// NewAssumeCache creates an assume cache for general objects.
func NewAssumeCache(informer cache.SharedIndexInformer, description, indexName string, indexFunc cache.IndexFunc) AssumeCache {
c := &assumeCache{
description: description,

View File

@ -17,6 +17,7 @@ limitations under the License.
package scheduling
import (
"context"
"fmt"
"sort"
"strings"
@ -44,6 +45,24 @@ import (
volumeutil "k8s.io/kubernetes/pkg/volume/util"
)
// ConflictReason is used for the special strings which explain why
// volume binding is impossible for a node.
type ConflictReason string
// ConflictReasons contains all reasons that explain why volume binding is impossible for a node.
type ConflictReasons []ConflictReason
func (reasons ConflictReasons) Len() int { return len(reasons) }
func (reasons ConflictReasons) Less(i, j int) bool { return reasons[i] < reasons[j] }
func (reasons ConflictReasons) Swap(i, j int) { reasons[i], reasons[j] = reasons[j], reasons[i] }
const (
// ErrReasonBindConflict is used for VolumeBindingNoMatch predicate error.
ErrReasonBindConflict ConflictReason = "node(s) didn't find available persistent volumes to bind"
// ErrReasonNodeConflict is used for VolumeNodeAffinityConflict predicate error.
ErrReasonNodeConflict ConflictReason = "node(s) had volume node affinity conflict"
)
// InTreeToCSITranslator contains methods required to check migratable status
// and perform translations from InTree PV's to CSI
type InTreeToCSITranslator interface {
@ -82,11 +101,11 @@ type SchedulerVolumeBinder interface {
// If a PVC is bound, it checks if the PV's NodeAffinity matches the Node.
// Otherwise, it tries to find an available PV to bind to the PVC.
//
// It returns true if all of the Pod's PVCs have matching PVs or can be dynamic provisioned,
// and returns true if bound volumes satisfy the PV NodeAffinity.
// It returns an error when something went wrong or a list of reasons why the node is
// (currently) not usable for the pod.
//
// This function is called by the volume binding scheduler predicate and can be called in parallel
FindPodVolumes(pod *v1.Pod, node *v1.Node) (unboundVolumesSatisified, boundVolumesSatisfied bool, err error)
FindPodVolumes(pod *v1.Pod, node *v1.Node) (reasons ConflictReasons, err error)
// AssumePodVolumes will:
// 1. Take the PV matches for unbound PVCs and update the PV cache assuming
@ -112,6 +131,9 @@ type SchedulerVolumeBinder interface {
// GetBindingsCache returns the cache used (if any) to store volume binding decisions.
GetBindingsCache() PodBindingCache
// DeletePodBindings will delete pod's bindingDecisions in podBindingCache.
DeletePodBindings(pod *v1.Pod)
}
type volumeBinder struct {
@ -162,18 +184,40 @@ func (b *volumeBinder) GetBindingsCache() PodBindingCache {
return b.podBindingCache
}
// DeletePodBindings will delete pod's bindingDecisions in podBindingCache.
func (b *volumeBinder) DeletePodBindings(pod *v1.Pod) {
cache := b.podBindingCache
if pod != nil {
cache.DeleteBindings(pod)
}
}
// FindPodVolumes caches the matching PVs and PVCs to provision per node in podBindingCache.
// This method intentionally takes in a *v1.Node object instead of using volumebinder.nodeInformer.
// That's necessary because some operations will need to pass in to the predicate fake node objects.
func (b *volumeBinder) FindPodVolumes(pod *v1.Pod, node *v1.Node) (unboundVolumesSatisfied, boundVolumesSatisfied bool, err error) {
func (b *volumeBinder) FindPodVolumes(pod *v1.Pod, node *v1.Node) (reasons ConflictReasons, err error) {
podName := getPodName(pod)
// Warning: Below log needs high verbosity as it can be printed several times (#60933).
klog.V(5).Infof("FindPodVolumes for pod %q, node %q", podName, node.Name)
// Initialize to true for pods that don't have volumes
unboundVolumesSatisfied = true
boundVolumesSatisfied = true
// Initialize to true for pods that don't have volumes. These
// booleans get translated into reason strings when the function
// returns without an error.
unboundVolumesSatisfied := true
boundVolumesSatisfied := true
defer func() {
if err != nil {
return
}
if !boundVolumesSatisfied {
reasons = append(reasons, ErrReasonNodeConflict)
}
if !unboundVolumesSatisfied {
reasons = append(reasons, ErrReasonBindConflict)
}
}()
start := time.Now()
defer func() {
metrics.VolumeSchedulingStageLatency.WithLabelValues("predicate").Observe(time.Since(start).Seconds())
@ -209,19 +253,19 @@ func (b *volumeBinder) FindPodVolumes(pod *v1.Pod, node *v1.Node) (unboundVolume
// volumes can get bound/provisioned in between calls.
boundClaims, claimsToBind, unboundClaimsImmediate, err := b.getPodVolumes(pod)
if err != nil {
return false, false, err
return nil, err
}
// Immediate claims should be bound
if len(unboundClaimsImmediate) > 0 {
return false, false, fmt.Errorf("pod has unbound immediate PersistentVolumeClaims")
return nil, fmt.Errorf("pod has unbound immediate PersistentVolumeClaims")
}
// Check PV node affinity on bound volumes
if len(boundClaims) > 0 {
boundVolumesSatisfied, err = b.checkBoundClaims(boundClaims, node, podName)
if err != nil {
return false, false, err
return nil, err
}
}
@ -236,8 +280,9 @@ func (b *volumeBinder) FindPodVolumes(pod *v1.Pod, node *v1.Node) (unboundVolume
for _, claim := range claimsToBind {
if selectedNode, ok := claim.Annotations[pvutil.AnnSelectedNode]; ok {
if selectedNode != node.Name {
// Fast path, skip unmatched node
return false, boundVolumesSatisfied, nil
// Fast path, skip unmatched node.
unboundVolumesSatisfied = false
return
}
claimsToProvision = append(claimsToProvision, claim)
} else {
@ -250,7 +295,7 @@ func (b *volumeBinder) FindPodVolumes(pod *v1.Pod, node *v1.Node) (unboundVolume
var unboundClaims []*v1.PersistentVolumeClaim
unboundVolumesSatisfied, matchedBindings, unboundClaims, err = b.findMatchingVolumes(pod, claimsToFindMatching, node)
if err != nil {
return false, false, err
return nil, err
}
claimsToProvision = append(claimsToProvision, unboundClaims...)
}
@ -259,12 +304,12 @@ func (b *volumeBinder) FindPodVolumes(pod *v1.Pod, node *v1.Node) (unboundVolume
if len(claimsToProvision) > 0 {
unboundVolumesSatisfied, provisionedClaims, err = b.checkVolumeProvisions(pod, claimsToProvision, node)
if err != nil {
return false, false, err
return nil, err
}
}
}
return unboundVolumesSatisfied, boundVolumesSatisfied, nil
return
}
// AssumePodVolumes will take the cached matching PVs and PVCs to provision
@ -423,7 +468,7 @@ func (b *volumeBinder) bindAPIUpdate(podName string, bindings []*bindingInfo, cl
// TODO: does it hurt if we make an api call and nothing needs to be updated?
claimKey := claimToClaimKey(binding.pvc)
klog.V(2).Infof("claim %q bound to volume %q", claimKey, binding.pv.Name)
newPV, err := b.kubeClient.CoreV1().PersistentVolumes().Update(binding.pv)
newPV, err := b.kubeClient.CoreV1().PersistentVolumes().Update(context.TODO(), binding.pv, metav1.UpdateOptions{})
if err != nil {
klog.V(4).Infof("updating PersistentVolume[%s]: binding to %q failed: %v", binding.pv.Name, claimKey, err)
return err
@ -438,7 +483,7 @@ func (b *volumeBinder) bindAPIUpdate(podName string, bindings []*bindingInfo, cl
// PV controller is expect to signal back by removing related annotations if actual provisioning fails
for i, claim = range claimsToProvision {
klog.V(5).Infof("bindAPIUpdate: Pod %q, PVC %q", podName, getPVCName(claim))
newClaim, err := b.kubeClient.CoreV1().PersistentVolumeClaims(claim.Namespace).Update(claim)
newClaim, err := b.kubeClient.CoreV1().PersistentVolumeClaims(claim.Namespace).Update(context.TODO(), claim, metav1.UpdateOptions{})
if err != nil {
return err
}

View File

@ -20,12 +20,11 @@ import "k8s.io/api/core/v1"
// FakeVolumeBinderConfig holds configurations for fake volume binder.
type FakeVolumeBinderConfig struct {
AllBound bool
FindUnboundSatsified bool
FindBoundSatsified bool
FindErr error
AssumeErr error
BindErr error
AllBound bool
FindReasons ConflictReasons
FindErr error
AssumeErr error
BindErr error
}
// NewFakeVolumeBinder sets up all the caches needed for the scheduler to make
@ -44,8 +43,8 @@ type FakeVolumeBinder struct {
}
// FindPodVolumes implements SchedulerVolumeBinder.FindPodVolumes.
func (b *FakeVolumeBinder) FindPodVolumes(pod *v1.Pod, node *v1.Node) (unboundVolumesSatisfied, boundVolumesSatsified bool, err error) {
return b.config.FindUnboundSatsified, b.config.FindBoundSatsified, b.config.FindErr
func (b *FakeVolumeBinder) FindPodVolumes(pod *v1.Pod, node *v1.Node) (reasons ConflictReasons, err error) {
return b.config.FindReasons, b.config.FindErr
}
// AssumePodVolumes implements SchedulerVolumeBinder.AssumePodVolumes.
@ -64,3 +63,6 @@ func (b *FakeVolumeBinder) BindPodVolumes(assumedPod *v1.Pod) error {
func (b *FakeVolumeBinder) GetBindingsCache() PodBindingCache {
return nil
}
// DeletePodBindings implements SchedulerVolumeBinder.DeletePodBindings.
func (b *FakeVolumeBinder) DeletePodBindings(pod *v1.Pod) {}