2019-05-31 09:45:11 +00:00
/ *
Copyright 2014 The Kubernetes Authors .
Licensed under the Apache License , Version 2.0 ( the "License" ) ;
you may not use this file except in compliance with the License .
You may obtain a copy of the License at
http : //www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing , software
distributed under the License is distributed on an "AS IS" BASIS ,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND , either express or implied .
See the License for the specific language governing permissions and
limitations under the License .
* /
package controller
import (
2020-04-14 07:04:33 +00:00
"context"
2019-05-31 09:45:11 +00:00
"encoding/binary"
"encoding/json"
"fmt"
"hash/fnv"
2021-06-25 04:59:51 +00:00
"math"
2019-05-31 09:45:11 +00:00
"sync"
"sync/atomic"
"time"
apps "k8s.io/api/apps/v1"
2019-06-24 09:08:09 +00:00
v1 "k8s.io/api/core/v1"
2019-05-31 09:45:11 +00:00
apierrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/rand"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/apimachinery/pkg/util/strategicpatch"
"k8s.io/apimachinery/pkg/util/wait"
2021-06-25 04:59:51 +00:00
utilfeature "k8s.io/apiserver/pkg/util/feature"
2019-05-31 09:45:11 +00:00
clientset "k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/tools/record"
clientretry "k8s.io/client-go/util/retry"
podutil "k8s.io/kubernetes/pkg/api/v1/pod"
2021-06-25 04:59:51 +00:00
"k8s.io/kubernetes/pkg/apis/core/helper"
2019-05-31 09:45:11 +00:00
_ "k8s.io/kubernetes/pkg/apis/core/install"
"k8s.io/kubernetes/pkg/apis/core/validation"
2021-06-25 04:59:51 +00:00
"k8s.io/kubernetes/pkg/features"
2019-05-31 09:45:11 +00:00
hashutil "k8s.io/kubernetes/pkg/util/hash"
taintutils "k8s.io/kubernetes/pkg/util/taints"
2021-12-08 13:50:47 +00:00
"k8s.io/utils/clock"
2019-05-31 09:45:11 +00:00
"k8s.io/utils/integer"
2020-12-17 12:28:29 +00:00
"k8s.io/klog/v2"
2019-05-31 09:45:11 +00:00
)
const (
// If a watch drops a delete event for a pod, it'll take this long
// before a dormant controller waiting for those packets is woken up anyway. It is
// specifically targeted at the case where some problem prevents an update
// of expectations, without it the controller could stay asleep forever. This should
// be set based on the expected latency of watch events.
//
// Currently a controller can service (create *and* observe the watch events for said
// creation) about 10 pods a second, so it takes about 1 min to service
// 500 pods. Just creation is limited to 20qps, and watching happens with ~10-30s
// latency/pod at the scale of 3000 pods over 100 nodes.
ExpectationsTimeout = 5 * time . Minute
// When batching pod creates, SlowStartInitialBatchSize is the size of the
// initial batch. The size of each successive batch is twice the size of
// the previous batch. For example, for a value of 1, batch sizes would be
// 1, 2, 4, 8, ... and for a value of 10, batch sizes would be
// 10, 20, 40, 80, ... Setting the value higher means that quota denials
// will result in more doomed API calls and associated event spam. Setting
// the value lower will result in more API call round trip periods for
// large batches.
//
// Given a number of pods to start "N":
// The number of doomed calls per sync once quota is exceeded is given by:
// min(N,SlowStartInitialBatchSize)
// The number of batches is given by:
// 1+floor(log_2(ceil(N/SlowStartInitialBatchSize)))
SlowStartInitialBatchSize = 1
)
var UpdateTaintBackoff = wait . Backoff {
Steps : 5 ,
Duration : 100 * time . Millisecond ,
Jitter : 1.0 ,
}
var UpdateLabelBackoff = wait . Backoff {
Steps : 5 ,
Duration : 100 * time . Millisecond ,
Jitter : 1.0 ,
}
var (
2020-01-14 10:38:55 +00:00
KeyFunc = cache . DeletionHandlingMetaNamespaceKeyFunc
podPhaseToOrdinal = map [ v1 . PodPhase ] int { v1 . PodPending : 0 , v1 . PodUnknown : 1 , v1 . PodRunning : 2 }
2019-05-31 09:45:11 +00:00
)
type ResyncPeriodFunc func ( ) time . Duration
// Returns 0 for resyncPeriod in case resyncing is not needed.
func NoResyncPeriodFunc ( ) time . Duration {
return 0
}
// StaticResyncPeriodFunc returns the resync period specified
func StaticResyncPeriodFunc ( resyncPeriod time . Duration ) ResyncPeriodFunc {
return func ( ) time . Duration {
return resyncPeriod
}
}
// Expectations are a way for controllers to tell the controller manager what they expect. eg:
// ControllerExpectations: {
// controller1: expects 2 adds in 2 minutes
// controller2: expects 2 dels in 2 minutes
// controller3: expects -1 adds in 2 minutes => controller3's expectations have already been met
// }
//
// Implementation:
// ControlleeExpectation = pair of atomic counters to track controllee's creation/deletion
// ControllerExpectationsStore = TTLStore + a ControlleeExpectation per controller
//
// * Once set expectations can only be lowered
// * A controller isn't synced till its expectations are either fulfilled, or expire
// * Controllers that don't set expectations will get woken up for every matching controllee
// ExpKeyFunc to parse out the key from a ControlleeExpectation
var ExpKeyFunc = func ( obj interface { } ) ( string , error ) {
if e , ok := obj . ( * ControlleeExpectations ) ; ok {
return e . key , nil
}
return "" , fmt . Errorf ( "could not find key for obj %#v" , obj )
}
// ControllerExpectationsInterface is an interface that allows users to set and wait on expectations.
// Only abstracted out for testing.
// Warning: if using KeyFunc it is not safe to use a single ControllerExpectationsInterface with different
// types of controllers, because the keys might conflict across types.
type ControllerExpectationsInterface interface {
GetExpectations ( controllerKey string ) ( * ControlleeExpectations , bool , error )
SatisfiedExpectations ( controllerKey string ) bool
DeleteExpectations ( controllerKey string )
SetExpectations ( controllerKey string , add , del int ) error
ExpectCreations ( controllerKey string , adds int ) error
ExpectDeletions ( controllerKey string , dels int ) error
CreationObserved ( controllerKey string )
DeletionObserved ( controllerKey string )
RaiseExpectations ( controllerKey string , add , del int )
LowerExpectations ( controllerKey string , add , del int )
}
// ControllerExpectations is a cache mapping controllers to what they expect to see before being woken up for a sync.
type ControllerExpectations struct {
cache . Store
}
// GetExpectations returns the ControlleeExpectations of the given controller.
func ( r * ControllerExpectations ) GetExpectations ( controllerKey string ) ( * ControlleeExpectations , bool , error ) {
2019-06-24 09:08:09 +00:00
exp , exists , err := r . GetByKey ( controllerKey )
if err == nil && exists {
2019-05-31 09:45:11 +00:00
return exp . ( * ControlleeExpectations ) , true , nil
}
2019-06-24 09:08:09 +00:00
return nil , false , err
2019-05-31 09:45:11 +00:00
}
// DeleteExpectations deletes the expectations of the given controller from the TTLStore.
func ( r * ControllerExpectations ) DeleteExpectations ( controllerKey string ) {
if exp , exists , err := r . GetByKey ( controllerKey ) ; err == nil && exists {
if err := r . Delete ( exp ) ; err != nil {
klog . V ( 2 ) . Infof ( "Error deleting expectations for controller %v: %v" , controllerKey , err )
}
}
}
// SatisfiedExpectations returns true if the required adds/dels for the given controller have been observed.
// Add/del counts are established by the controller at sync time, and updated as controllees are observed by the controller
// manager.
func ( r * ControllerExpectations ) SatisfiedExpectations ( controllerKey string ) bool {
if exp , exists , err := r . GetExpectations ( controllerKey ) ; exists {
if exp . Fulfilled ( ) {
klog . V ( 4 ) . Infof ( "Controller expectations fulfilled %#v" , exp )
return true
} else if exp . isExpired ( ) {
klog . V ( 4 ) . Infof ( "Controller expectations expired %#v" , exp )
return true
} else {
klog . V ( 4 ) . Infof ( "Controller still waiting on expectations %#v" , exp )
return false
}
} else if err != nil {
klog . V ( 2 ) . Infof ( "Error encountered while checking expectations %#v, forcing sync" , err )
} else {
// When a new controller is created, it doesn't have expectations.
// When it doesn't see expected watch events for > TTL, the expectations expire.
// - In this case it wakes up, creates/deletes controllees, and sets expectations again.
// When it has satisfied expectations and no controllees need to be created/destroyed > TTL, the expectations expire.
// - In this case it continues without setting expectations till it needs to create/delete controllees.
klog . V ( 4 ) . Infof ( "Controller %v either never recorded expectations, or the ttl expired." , controllerKey )
}
// Trigger a sync if we either encountered and error (which shouldn't happen since we're
// getting from local store) or this controller hasn't established expectations.
return true
}
// TODO: Extend ExpirationCache to support explicit expiration.
// TODO: Make this possible to disable in tests.
// TODO: Support injection of clock.
func ( exp * ControlleeExpectations ) isExpired ( ) bool {
return clock . RealClock { } . Since ( exp . timestamp ) > ExpectationsTimeout
}
// SetExpectations registers new expectations for the given controller. Forgets existing expectations.
func ( r * ControllerExpectations ) SetExpectations ( controllerKey string , add , del int ) error {
exp := & ControlleeExpectations { add : int64 ( add ) , del : int64 ( del ) , key : controllerKey , timestamp : clock . RealClock { } . Now ( ) }
klog . V ( 4 ) . Infof ( "Setting expectations %#v" , exp )
return r . Add ( exp )
}
func ( r * ControllerExpectations ) ExpectCreations ( controllerKey string , adds int ) error {
return r . SetExpectations ( controllerKey , adds , 0 )
}
func ( r * ControllerExpectations ) ExpectDeletions ( controllerKey string , dels int ) error {
return r . SetExpectations ( controllerKey , 0 , dels )
}
// Decrements the expectation counts of the given controller.
func ( r * ControllerExpectations ) LowerExpectations ( controllerKey string , add , del int ) {
if exp , exists , err := r . GetExpectations ( controllerKey ) ; err == nil && exists {
exp . Add ( int64 ( - add ) , int64 ( - del ) )
// The expectations might've been modified since the update on the previous line.
klog . V ( 4 ) . Infof ( "Lowered expectations %#v" , exp )
}
}
// Increments the expectation counts of the given controller.
func ( r * ControllerExpectations ) RaiseExpectations ( controllerKey string , add , del int ) {
if exp , exists , err := r . GetExpectations ( controllerKey ) ; err == nil && exists {
exp . Add ( int64 ( add ) , int64 ( del ) )
// The expectations might've been modified since the update on the previous line.
klog . V ( 4 ) . Infof ( "Raised expectations %#v" , exp )
}
}
// CreationObserved atomically decrements the `add` expectation count of the given controller.
func ( r * ControllerExpectations ) CreationObserved ( controllerKey string ) {
r . LowerExpectations ( controllerKey , 1 , 0 )
}
// DeletionObserved atomically decrements the `del` expectation count of the given controller.
func ( r * ControllerExpectations ) DeletionObserved ( controllerKey string ) {
r . LowerExpectations ( controllerKey , 0 , 1 )
}
// ControlleeExpectations track controllee creates/deletes.
type ControlleeExpectations struct {
// Important: Since these two int64 fields are using sync/atomic, they have to be at the top of the struct due to a bug on 32-bit platforms
// See: https://golang.org/pkg/sync/atomic/ for more information
add int64
del int64
key string
timestamp time . Time
}
// Add increments the add and del counters.
func ( e * ControlleeExpectations ) Add ( add , del int64 ) {
atomic . AddInt64 ( & e . add , add )
atomic . AddInt64 ( & e . del , del )
}
// Fulfilled returns true if this expectation has been fulfilled.
func ( e * ControlleeExpectations ) Fulfilled ( ) bool {
// TODO: think about why this line being atomic doesn't matter
return atomic . LoadInt64 ( & e . add ) <= 0 && atomic . LoadInt64 ( & e . del ) <= 0
}
// GetExpectations returns the add and del expectations of the controllee.
func ( e * ControlleeExpectations ) GetExpectations ( ) ( int64 , int64 ) {
return atomic . LoadInt64 ( & e . add ) , atomic . LoadInt64 ( & e . del )
}
// NewControllerExpectations returns a store for ControllerExpectations.
func NewControllerExpectations ( ) * ControllerExpectations {
return & ControllerExpectations { cache . NewStore ( ExpKeyFunc ) }
}
// UIDSetKeyFunc to parse out the key from a UIDSet.
var UIDSetKeyFunc = func ( obj interface { } ) ( string , error ) {
if u , ok := obj . ( * UIDSet ) ; ok {
return u . key , nil
}
return "" , fmt . Errorf ( "could not find key for obj %#v" , obj )
}
// UIDSet holds a key and a set of UIDs. Used by the
// UIDTrackingControllerExpectations to remember which UID it has seen/still
// waiting for.
type UIDSet struct {
sets . String
key string
}
// UIDTrackingControllerExpectations tracks the UID of the pods it deletes.
// This cache is needed over plain old expectations to safely handle graceful
// deletion. The desired behavior is to treat an update that sets the
// DeletionTimestamp on an object as a delete. To do so consistently, one needs
// to remember the expected deletes so they aren't double counted.
// TODO: Track creates as well (#22599)
type UIDTrackingControllerExpectations struct {
ControllerExpectationsInterface
// TODO: There is a much nicer way to do this that involves a single store,
// a lock per entry, and a ControlleeExpectationsInterface type.
uidStoreLock sync . Mutex
// Store used for the UIDs associated with any expectation tracked via the
// ControllerExpectationsInterface.
uidStore cache . Store
}
// GetUIDs is a convenience method to avoid exposing the set of expected uids.
// The returned set is not thread safe, all modifications must be made holding
// the uidStoreLock.
func ( u * UIDTrackingControllerExpectations ) GetUIDs ( controllerKey string ) sets . String {
if uid , exists , err := u . uidStore . GetByKey ( controllerKey ) ; err == nil && exists {
return uid . ( * UIDSet ) . String
}
return nil
}
// ExpectDeletions records expectations for the given deleteKeys, against the given controller.
func ( u * UIDTrackingControllerExpectations ) ExpectDeletions ( rcKey string , deletedKeys [ ] string ) error {
2020-01-14 10:38:55 +00:00
expectedUIDs := sets . NewString ( )
for _ , k := range deletedKeys {
expectedUIDs . Insert ( k )
}
klog . V ( 4 ) . Infof ( "Controller %v waiting on deletions for: %+v" , rcKey , deletedKeys )
2019-05-31 09:45:11 +00:00
u . uidStoreLock . Lock ( )
defer u . uidStoreLock . Unlock ( )
if existing := u . GetUIDs ( rcKey ) ; existing != nil && existing . Len ( ) != 0 {
klog . Errorf ( "Clobbering existing delete keys: %+v" , existing )
}
if err := u . uidStore . Add ( & UIDSet { expectedUIDs , rcKey } ) ; err != nil {
return err
}
return u . ControllerExpectationsInterface . ExpectDeletions ( rcKey , expectedUIDs . Len ( ) )
}
// DeletionObserved records the given deleteKey as a deletion, for the given rc.
func ( u * UIDTrackingControllerExpectations ) DeletionObserved ( rcKey , deleteKey string ) {
u . uidStoreLock . Lock ( )
defer u . uidStoreLock . Unlock ( )
uids := u . GetUIDs ( rcKey )
if uids != nil && uids . Has ( deleteKey ) {
klog . V ( 4 ) . Infof ( "Controller %v received delete for pod %v" , rcKey , deleteKey )
u . ControllerExpectationsInterface . DeletionObserved ( rcKey )
uids . Delete ( deleteKey )
}
}
// DeleteExpectations deletes the UID set and invokes DeleteExpectations on the
// underlying ControllerExpectationsInterface.
func ( u * UIDTrackingControllerExpectations ) DeleteExpectations ( rcKey string ) {
u . uidStoreLock . Lock ( )
defer u . uidStoreLock . Unlock ( )
u . ControllerExpectationsInterface . DeleteExpectations ( rcKey )
if uidExp , exists , err := u . uidStore . GetByKey ( rcKey ) ; err == nil && exists {
if err := u . uidStore . Delete ( uidExp ) ; err != nil {
klog . V ( 2 ) . Infof ( "Error deleting uid expectations for controller %v: %v" , rcKey , err )
}
}
}
// NewUIDTrackingControllerExpectations returns a wrapper around
// ControllerExpectations that is aware of deleteKeys.
func NewUIDTrackingControllerExpectations ( ce ControllerExpectationsInterface ) * UIDTrackingControllerExpectations {
return & UIDTrackingControllerExpectations { ControllerExpectationsInterface : ce , uidStore : cache . NewStore ( UIDSetKeyFunc ) }
}
// Reasons for pod events
const (
// FailedCreatePodReason is added in an event and in a replica set condition
// when a pod for a replica set is failed to be created.
FailedCreatePodReason = "FailedCreate"
// SuccessfulCreatePodReason is added in an event when a pod for a replica set
// is successfully created.
SuccessfulCreatePodReason = "SuccessfulCreate"
// FailedDeletePodReason is added in an event and in a replica set condition
// when a pod for a replica set is failed to be deleted.
FailedDeletePodReason = "FailedDelete"
// SuccessfulDeletePodReason is added in an event when a pod for a replica set
// is successfully deleted.
SuccessfulDeletePodReason = "SuccessfulDelete"
)
// RSControlInterface is an interface that knows how to add or delete
// ReplicaSets, as well as increment or decrement them. It is used
// by the deployment controller to ease testing of actions that it takes.
type RSControlInterface interface {
2021-12-08 13:50:47 +00:00
PatchReplicaSet ( ctx context . Context , namespace , name string , data [ ] byte ) error
2019-05-31 09:45:11 +00:00
}
// RealRSControl is the default implementation of RSControllerInterface.
type RealRSControl struct {
KubeClient clientset . Interface
Recorder record . EventRecorder
}
var _ RSControlInterface = & RealRSControl { }
2021-12-08 13:50:47 +00:00
func ( r RealRSControl ) PatchReplicaSet ( ctx context . Context , namespace , name string , data [ ] byte ) error {
_ , err := r . KubeClient . AppsV1 ( ) . ReplicaSets ( namespace ) . Patch ( ctx , name , types . StrategicMergePatchType , data , metav1 . PatchOptions { } )
2019-05-31 09:45:11 +00:00
return err
}
// TODO: merge the controller revision interface in controller_history.go with this one
// ControllerRevisionControlInterface is an interface that knows how to patch
// ControllerRevisions, as well as increment or decrement them. It is used
// by the daemonset controller to ease testing of actions that it takes.
type ControllerRevisionControlInterface interface {
2021-12-08 13:50:47 +00:00
PatchControllerRevision ( ctx context . Context , namespace , name string , data [ ] byte ) error
2019-05-31 09:45:11 +00:00
}
// RealControllerRevisionControl is the default implementation of ControllerRevisionControlInterface.
type RealControllerRevisionControl struct {
KubeClient clientset . Interface
}
var _ ControllerRevisionControlInterface = & RealControllerRevisionControl { }
2021-12-08 13:50:47 +00:00
func ( r RealControllerRevisionControl ) PatchControllerRevision ( ctx context . Context , namespace , name string , data [ ] byte ) error {
_ , err := r . KubeClient . AppsV1 ( ) . ControllerRevisions ( namespace ) . Patch ( ctx , name , types . StrategicMergePatchType , data , metav1 . PatchOptions { } )
2019-05-31 09:45:11 +00:00
return err
}
// PodControlInterface is an interface that knows how to add or delete pods
// created as an interface to allow testing.
type PodControlInterface interface {
2021-08-09 07:19:24 +00:00
// CreatePods creates new pods according to the spec, and sets object as the pod's controller.
2021-12-08 13:50:47 +00:00
CreatePods ( ctx context . Context , namespace string , template * v1 . PodTemplateSpec , object runtime . Object , controllerRef * metav1 . OwnerReference ) error
2021-08-09 07:19:24 +00:00
// CreatePodsWithGenerateName creates new pods according to the spec, sets object as the pod's controller and sets pod's generateName.
2021-12-08 13:50:47 +00:00
CreatePodsWithGenerateName ( ctx context . Context , namespace string , template * v1 . PodTemplateSpec , object runtime . Object , controllerRef * metav1 . OwnerReference , generateName string ) error
2019-05-31 09:45:11 +00:00
// DeletePod deletes the pod identified by podID.
2021-12-08 13:50:47 +00:00
DeletePod ( ctx context . Context , namespace string , podID string , object runtime . Object ) error
2019-05-31 09:45:11 +00:00
// PatchPod patches the pod.
2021-12-08 13:50:47 +00:00
PatchPod ( ctx context . Context , namespace , name string , data [ ] byte ) error
2019-05-31 09:45:11 +00:00
}
// RealPodControl is the default implementation of PodControlInterface.
type RealPodControl struct {
KubeClient clientset . Interface
Recorder record . EventRecorder
}
var _ PodControlInterface = & RealPodControl { }
func getPodsLabelSet ( template * v1 . PodTemplateSpec ) labels . Set {
desiredLabels := make ( labels . Set )
for k , v := range template . Labels {
desiredLabels [ k ] = v
}
return desiredLabels
}
func getPodsFinalizers ( template * v1 . PodTemplateSpec ) [ ] string {
desiredFinalizers := make ( [ ] string , len ( template . Finalizers ) )
copy ( desiredFinalizers , template . Finalizers )
return desiredFinalizers
}
func getPodsAnnotationSet ( template * v1 . PodTemplateSpec ) labels . Set {
desiredAnnotations := make ( labels . Set )
for k , v := range template . Annotations {
desiredAnnotations [ k ] = v
}
return desiredAnnotations
}
func getPodsPrefix ( controllerName string ) string {
// use the dash (if the name isn't too long) to make the pod name a bit prettier
prefix := fmt . Sprintf ( "%s-" , controllerName )
if len ( validation . ValidatePodName ( prefix , true ) ) != 0 {
prefix = controllerName
}
return prefix
}
func validateControllerRef ( controllerRef * metav1 . OwnerReference ) error {
if controllerRef == nil {
return fmt . Errorf ( "controllerRef is nil" )
}
if len ( controllerRef . APIVersion ) == 0 {
return fmt . Errorf ( "controllerRef has empty APIVersion" )
}
if len ( controllerRef . Kind ) == 0 {
return fmt . Errorf ( "controllerRef has empty Kind" )
}
2022-05-05 02:47:06 +00:00
if controllerRef . Controller == nil || ! * controllerRef . Controller {
2019-05-31 09:45:11 +00:00
return fmt . Errorf ( "controllerRef.Controller is not set to true" )
}
2022-05-05 02:47:06 +00:00
if controllerRef . BlockOwnerDeletion == nil || ! * controllerRef . BlockOwnerDeletion {
2019-05-31 09:45:11 +00:00
return fmt . Errorf ( "controllerRef.BlockOwnerDeletion is not set" )
}
return nil
}
2021-12-08 13:50:47 +00:00
func ( r RealPodControl ) CreatePods ( ctx context . Context , namespace string , template * v1 . PodTemplateSpec , controllerObject runtime . Object , controllerRef * metav1 . OwnerReference ) error {
return r . CreatePodsWithGenerateName ( ctx , namespace , template , controllerObject , controllerRef , "" )
2019-05-31 09:45:11 +00:00
}
2021-12-08 13:50:47 +00:00
func ( r RealPodControl ) CreatePodsWithGenerateName ( ctx context . Context , namespace string , template * v1 . PodTemplateSpec , controllerObject runtime . Object , controllerRef * metav1 . OwnerReference , generateName string ) error {
2019-05-31 09:45:11 +00:00
if err := validateControllerRef ( controllerRef ) ; err != nil {
return err
}
2021-08-09 07:19:24 +00:00
pod , err := GetPodFromTemplate ( template , controllerObject , controllerRef )
if err != nil {
2019-05-31 09:45:11 +00:00
return err
}
2021-08-09 07:19:24 +00:00
if len ( generateName ) > 0 {
pod . ObjectMeta . GenerateName = generateName
}
2021-12-08 13:50:47 +00:00
return r . createPods ( ctx , namespace , pod , controllerObject )
2019-05-31 09:45:11 +00:00
}
2021-12-08 13:50:47 +00:00
func ( r RealPodControl ) PatchPod ( ctx context . Context , namespace , name string , data [ ] byte ) error {
_ , err := r . KubeClient . CoreV1 ( ) . Pods ( namespace ) . Patch ( ctx , name , types . StrategicMergePatchType , data , metav1 . PatchOptions { } )
2019-05-31 09:45:11 +00:00
return err
}
func GetPodFromTemplate ( template * v1 . PodTemplateSpec , parentObject runtime . Object , controllerRef * metav1 . OwnerReference ) ( * v1 . Pod , error ) {
desiredLabels := getPodsLabelSet ( template )
desiredFinalizers := getPodsFinalizers ( template )
desiredAnnotations := getPodsAnnotationSet ( template )
accessor , err := meta . Accessor ( parentObject )
if err != nil {
return nil , fmt . Errorf ( "parentObject does not have ObjectMeta, %v" , err )
}
prefix := getPodsPrefix ( accessor . GetName ( ) )
pod := & v1 . Pod {
ObjectMeta : metav1 . ObjectMeta {
Labels : desiredLabels ,
Annotations : desiredAnnotations ,
GenerateName : prefix ,
Finalizers : desiredFinalizers ,
} ,
}
if controllerRef != nil {
pod . OwnerReferences = append ( pod . OwnerReferences , * controllerRef )
}
pod . Spec = * template . Spec . DeepCopy ( )
return pod , nil
}
2021-12-08 13:50:47 +00:00
func ( r RealPodControl ) createPods ( ctx context . Context , namespace string , pod * v1 . Pod , object runtime . Object ) error {
2020-01-14 10:38:55 +00:00
if len ( labels . Set ( pod . Labels ) ) == 0 {
2019-05-31 09:45:11 +00:00
return fmt . Errorf ( "unable to create pods, no labels" )
}
2021-12-08 13:50:47 +00:00
newPod , err := r . KubeClient . CoreV1 ( ) . Pods ( namespace ) . Create ( ctx , pod , metav1 . CreateOptions { } )
2019-06-24 09:08:09 +00:00
if err != nil {
2020-01-14 10:38:55 +00:00
// only send an event if the namespace isn't terminating
if ! apierrors . HasStatusCause ( err , v1 . NamespaceTerminatingCause ) {
r . Recorder . Eventf ( object , v1 . EventTypeWarning , FailedCreatePodReason , "Error creating: %v" , err )
}
2019-05-31 09:45:11 +00:00
return err
}
2019-06-24 09:08:09 +00:00
accessor , err := meta . Accessor ( object )
if err != nil {
klog . Errorf ( "parentObject does not have ObjectMeta, %v" , err )
return nil
}
klog . V ( 4 ) . Infof ( "Controller %v created pod %v" , accessor . GetName ( ) , newPod . Name )
r . Recorder . Eventf ( object , v1 . EventTypeNormal , SuccessfulCreatePodReason , "Created pod: %v" , newPod . Name )
2019-05-31 09:45:11 +00:00
return nil
}
2021-12-08 13:50:47 +00:00
func ( r RealPodControl ) DeletePod ( ctx context . Context , namespace string , podID string , object runtime . Object ) error {
2019-05-31 09:45:11 +00:00
accessor , err := meta . Accessor ( object )
if err != nil {
return fmt . Errorf ( "object does not have ObjectMeta, %v" , err )
}
2020-12-17 12:28:29 +00:00
klog . V ( 2 ) . InfoS ( "Deleting pod" , "controller" , accessor . GetName ( ) , "pod" , klog . KRef ( namespace , podID ) )
2021-12-08 13:50:47 +00:00
if err := r . KubeClient . CoreV1 ( ) . Pods ( namespace ) . Delete ( ctx , podID , metav1 . DeleteOptions { } ) ; err != nil {
2020-12-17 12:28:29 +00:00
if apierrors . IsNotFound ( err ) {
klog . V ( 4 ) . Infof ( "pod %v/%v has already been deleted." , namespace , podID )
return err
}
2019-05-31 09:45:11 +00:00
r . Recorder . Eventf ( object , v1 . EventTypeWarning , FailedDeletePodReason , "Error deleting: %v" , err )
return fmt . Errorf ( "unable to delete pods: %v" , err )
}
r . Recorder . Eventf ( object , v1 . EventTypeNormal , SuccessfulDeletePodReason , "Deleted pod: %v" , podID )
return nil
}
type FakePodControl struct {
sync . Mutex
Templates [ ] v1 . PodTemplateSpec
ControllerRefs [ ] metav1 . OwnerReference
DeletePodName [ ] string
Patches [ ] [ ] byte
Err error
CreateLimit int
CreateCallCount int
}
var _ PodControlInterface = & FakePodControl { }
2021-12-08 13:50:47 +00:00
func ( f * FakePodControl ) PatchPod ( ctx context . Context , namespace , name string , data [ ] byte ) error {
2019-05-31 09:45:11 +00:00
f . Lock ( )
defer f . Unlock ( )
f . Patches = append ( f . Patches , data )
if f . Err != nil {
return f . Err
}
return nil
}
2021-12-08 13:50:47 +00:00
func ( f * FakePodControl ) CreatePods ( ctx context . Context , namespace string , spec * v1 . PodTemplateSpec , object runtime . Object , controllerRef * metav1 . OwnerReference ) error {
return f . CreatePodsWithGenerateName ( ctx , namespace , spec , object , controllerRef , "" )
2019-05-31 09:45:11 +00:00
}
2021-12-08 13:50:47 +00:00
func ( f * FakePodControl ) CreatePodsWithGenerateName ( ctx context . Context , namespace string , spec * v1 . PodTemplateSpec , object runtime . Object , controllerRef * metav1 . OwnerReference , generateNamePrefix string ) error {
2019-05-31 09:45:11 +00:00
f . Lock ( )
defer f . Unlock ( )
f . CreateCallCount ++
if f . CreateLimit != 0 && f . CreateCallCount > f . CreateLimit {
return fmt . Errorf ( "not creating pod, limit %d already reached (create call %d)" , f . CreateLimit , f . CreateCallCount )
}
2021-12-08 13:50:47 +00:00
spec . GenerateName = generateNamePrefix
2021-08-09 07:19:24 +00:00
f . Templates = append ( f . Templates , * spec )
2019-05-31 09:45:11 +00:00
f . ControllerRefs = append ( f . ControllerRefs , * controllerRef )
if f . Err != nil {
return f . Err
}
return nil
}
2021-12-08 13:50:47 +00:00
func ( f * FakePodControl ) DeletePod ( ctx context . Context , namespace string , podID string , object runtime . Object ) error {
2019-05-31 09:45:11 +00:00
f . Lock ( )
defer f . Unlock ( )
f . DeletePodName = append ( f . DeletePodName , podID )
if f . Err != nil {
return f . Err
}
return nil
}
func ( f * FakePodControl ) Clear ( ) {
f . Lock ( )
defer f . Unlock ( )
f . DeletePodName = [ ] string { }
f . Templates = [ ] v1 . PodTemplateSpec { }
f . ControllerRefs = [ ] metav1 . OwnerReference { }
f . Patches = [ ] [ ] byte { }
f . CreateLimit = 0
f . CreateCallCount = 0
}
// ByLogging allows custom sorting of pods so the best one can be picked for getting its logs.
type ByLogging [ ] * v1 . Pod
func ( s ByLogging ) Len ( ) int { return len ( s ) }
func ( s ByLogging ) Swap ( i , j int ) { s [ i ] , s [ j ] = s [ j ] , s [ i ] }
func ( s ByLogging ) Less ( i , j int ) bool {
// 1. assigned < unassigned
if s [ i ] . Spec . NodeName != s [ j ] . Spec . NodeName && ( len ( s [ i ] . Spec . NodeName ) == 0 || len ( s [ j ] . Spec . NodeName ) == 0 ) {
return len ( s [ i ] . Spec . NodeName ) > 0
}
// 2. PodRunning < PodUnknown < PodPending
2020-01-14 10:38:55 +00:00
if s [ i ] . Status . Phase != s [ j ] . Status . Phase {
return podPhaseToOrdinal [ s [ i ] . Status . Phase ] > podPhaseToOrdinal [ s [ j ] . Status . Phase ]
2019-05-31 09:45:11 +00:00
}
// 3. ready < not ready
if podutil . IsPodReady ( s [ i ] ) != podutil . IsPodReady ( s [ j ] ) {
return podutil . IsPodReady ( s [ i ] )
}
// TODO: take availability into account when we push minReadySeconds information from deployment into pods,
// see https://github.com/kubernetes/kubernetes/issues/22065
// 4. Been ready for more time < less time < empty time
2020-01-14 10:38:55 +00:00
if podutil . IsPodReady ( s [ i ] ) && podutil . IsPodReady ( s [ j ] ) {
readyTime1 := podReadyTime ( s [ i ] )
readyTime2 := podReadyTime ( s [ j ] )
if ! readyTime1 . Equal ( readyTime2 ) {
return afterOrZero ( readyTime2 , readyTime1 )
}
2019-05-31 09:45:11 +00:00
}
// 5. Pods with containers with higher restart counts < lower restart counts
if maxContainerRestarts ( s [ i ] ) != maxContainerRestarts ( s [ j ] ) {
return maxContainerRestarts ( s [ i ] ) > maxContainerRestarts ( s [ j ] )
}
// 6. older pods < newer pods < empty timestamp pods
if ! s [ i ] . CreationTimestamp . Equal ( & s [ j ] . CreationTimestamp ) {
return afterOrZero ( & s [ j ] . CreationTimestamp , & s [ i ] . CreationTimestamp )
}
return false
}
// ActivePods type allows custom sorting of pods so a controller can pick the best ones to delete.
type ActivePods [ ] * v1 . Pod
func ( s ActivePods ) Len ( ) int { return len ( s ) }
func ( s ActivePods ) Swap ( i , j int ) { s [ i ] , s [ j ] = s [ j ] , s [ i ] }
func ( s ActivePods ) Less ( i , j int ) bool {
// 1. Unassigned < assigned
// If only one of the pods is unassigned, the unassigned one is smaller
if s [ i ] . Spec . NodeName != s [ j ] . Spec . NodeName && ( len ( s [ i ] . Spec . NodeName ) == 0 || len ( s [ j ] . Spec . NodeName ) == 0 ) {
return len ( s [ i ] . Spec . NodeName ) == 0
}
// 2. PodPending < PodUnknown < PodRunning
2020-01-14 10:38:55 +00:00
if podPhaseToOrdinal [ s [ i ] . Status . Phase ] != podPhaseToOrdinal [ s [ j ] . Status . Phase ] {
return podPhaseToOrdinal [ s [ i ] . Status . Phase ] < podPhaseToOrdinal [ s [ j ] . Status . Phase ]
2019-05-31 09:45:11 +00:00
}
// 3. Not ready < ready
// If only one of the pods is not ready, the not ready one is smaller
if podutil . IsPodReady ( s [ i ] ) != podutil . IsPodReady ( s [ j ] ) {
return ! podutil . IsPodReady ( s [ i ] )
}
// TODO: take availability into account when we push minReadySeconds information from deployment into pods,
// see https://github.com/kubernetes/kubernetes/issues/22065
// 4. Been ready for empty time < less time < more time
// If both pods are ready, the latest ready one is smaller
2020-01-14 10:38:55 +00:00
if podutil . IsPodReady ( s [ i ] ) && podutil . IsPodReady ( s [ j ] ) {
readyTime1 := podReadyTime ( s [ i ] )
readyTime2 := podReadyTime ( s [ j ] )
if ! readyTime1 . Equal ( readyTime2 ) {
return afterOrZero ( readyTime1 , readyTime2 )
}
2019-05-31 09:45:11 +00:00
}
// 5. Pods with containers with higher restart counts < lower restart counts
if maxContainerRestarts ( s [ i ] ) != maxContainerRestarts ( s [ j ] ) {
return maxContainerRestarts ( s [ i ] ) > maxContainerRestarts ( s [ j ] )
}
// 6. Empty creation time pods < newer pods < older pods
if ! s [ i ] . CreationTimestamp . Equal ( & s [ j ] . CreationTimestamp ) {
return afterOrZero ( & s [ i ] . CreationTimestamp , & s [ j ] . CreationTimestamp )
}
return false
}
2020-01-14 10:38:55 +00:00
// ActivePodsWithRanks is a sortable list of pods and a list of corresponding
// ranks which will be considered during sorting. The two lists must have equal
// length. After sorting, the pods will be ordered as follows, applying each
// rule in turn until one matches:
//
2022-08-24 02:24:25 +00:00
// 1. If only one of the pods is assigned to a node, the pod that is not
// assigned comes before the pod that is.
// 2. If the pods' phases differ, a pending pod comes before a pod whose phase
// is unknown, and a pod whose phase is unknown comes before a running pod.
// 3. If exactly one of the pods is ready, the pod that is not ready comes
// before the ready pod.
// 4. If controller.kubernetes.io/pod-deletion-cost annotation is set, then
// the pod with the lower value will come first.
// 5. If the pods' ranks differ, the pod with greater rank comes before the pod
// with lower rank.
// 6. If both pods are ready but have not been ready for the same amount of
// time, the pod that has been ready for a shorter amount of time comes
// before the pod that has been ready for longer.
// 7. If one pod has a container that has restarted more than any container in
// the other pod, the pod with the container with more restarts comes
// before the other pod.
// 8. If the pods' creation times differ, the pod that was created more recently
// comes before the older pod.
2020-01-14 10:38:55 +00:00
//
2021-06-25 04:59:51 +00:00
// In 6 and 8, times are compared in a logarithmic scale. This allows a level
// of randomness among equivalent Pods when sorting. If two pods have the same
// logarithmic rank, they are sorted by UUID to provide a pseudorandom order.
//
2020-01-14 10:38:55 +00:00
// If none of these rules matches, the second pod comes before the first pod.
//
// The intention of this ordering is to put pods that should be preferred for
// deletion first in the list.
type ActivePodsWithRanks struct {
// Pods is a list of pods.
Pods [ ] * v1 . Pod
// Rank is a ranking of pods. This ranking is used during sorting when
// comparing two pods that are both scheduled, in the same phase, and
// having the same ready status.
Rank [ ] int
2021-06-25 04:59:51 +00:00
// Now is a reference timestamp for doing logarithmic timestamp comparisons.
// If zero, comparison happens without scaling.
Now metav1 . Time
2020-01-14 10:38:55 +00:00
}
func ( s ActivePodsWithRanks ) Len ( ) int {
return len ( s . Pods )
}
func ( s ActivePodsWithRanks ) Swap ( i , j int ) {
s . Pods [ i ] , s . Pods [ j ] = s . Pods [ j ] , s . Pods [ i ]
s . Rank [ i ] , s . Rank [ j ] = s . Rank [ j ] , s . Rank [ i ]
}
// Less compares two pods with corresponding ranks and returns true if the first
// one should be preferred for deletion.
func ( s ActivePodsWithRanks ) Less ( i , j int ) bool {
// 1. Unassigned < assigned
// If only one of the pods is unassigned, the unassigned one is smaller
if s . Pods [ i ] . Spec . NodeName != s . Pods [ j ] . Spec . NodeName && ( len ( s . Pods [ i ] . Spec . NodeName ) == 0 || len ( s . Pods [ j ] . Spec . NodeName ) == 0 ) {
return len ( s . Pods [ i ] . Spec . NodeName ) == 0
}
// 2. PodPending < PodUnknown < PodRunning
if podPhaseToOrdinal [ s . Pods [ i ] . Status . Phase ] != podPhaseToOrdinal [ s . Pods [ j ] . Status . Phase ] {
return podPhaseToOrdinal [ s . Pods [ i ] . Status . Phase ] < podPhaseToOrdinal [ s . Pods [ j ] . Status . Phase ]
}
// 3. Not ready < ready
// If only one of the pods is not ready, the not ready one is smaller
if podutil . IsPodReady ( s . Pods [ i ] ) != podutil . IsPodReady ( s . Pods [ j ] ) {
return ! podutil . IsPodReady ( s . Pods [ i ] )
}
2021-06-25 04:59:51 +00:00
2021-12-08 13:50:47 +00:00
// 4. lower pod-deletion-cost < higher pod-deletion cost
2021-06-25 04:59:51 +00:00
if utilfeature . DefaultFeatureGate . Enabled ( features . PodDeletionCost ) {
pi , _ := helper . GetDeletionCostFromPodAnnotations ( s . Pods [ i ] . Annotations )
pj , _ := helper . GetDeletionCostFromPodAnnotations ( s . Pods [ j ] . Annotations )
if pi != pj {
return pi < pj
}
}
// 5. Doubled up < not doubled up
2020-01-14 10:38:55 +00:00
// If one of the two pods is on the same node as one or more additional
// ready pods that belong to the same replicaset, whichever pod has more
// colocated ready pods is less
if s . Rank [ i ] != s . Rank [ j ] {
return s . Rank [ i ] > s . Rank [ j ]
}
// TODO: take availability into account when we push minReadySeconds information from deployment into pods,
// see https://github.com/kubernetes/kubernetes/issues/22065
2021-06-25 04:59:51 +00:00
// 6. Been ready for empty time < less time < more time
2020-01-14 10:38:55 +00:00
// If both pods are ready, the latest ready one is smaller
if podutil . IsPodReady ( s . Pods [ i ] ) && podutil . IsPodReady ( s . Pods [ j ] ) {
readyTime1 := podReadyTime ( s . Pods [ i ] )
readyTime2 := podReadyTime ( s . Pods [ j ] )
if ! readyTime1 . Equal ( readyTime2 ) {
2021-06-25 04:59:51 +00:00
if ! utilfeature . DefaultFeatureGate . Enabled ( features . LogarithmicScaleDown ) {
return afterOrZero ( readyTime1 , readyTime2 )
} else {
if s . Now . IsZero ( ) || readyTime1 . IsZero ( ) || readyTime2 . IsZero ( ) {
return afterOrZero ( readyTime1 , readyTime2 )
}
rankDiff := logarithmicRankDiff ( * readyTime1 , * readyTime2 , s . Now )
if rankDiff == 0 {
return s . Pods [ i ] . UID < s . Pods [ j ] . UID
}
return rankDiff < 0
}
2020-01-14 10:38:55 +00:00
}
}
2021-06-25 04:59:51 +00:00
// 7. Pods with containers with higher restart counts < lower restart counts
2020-01-14 10:38:55 +00:00
if maxContainerRestarts ( s . Pods [ i ] ) != maxContainerRestarts ( s . Pods [ j ] ) {
return maxContainerRestarts ( s . Pods [ i ] ) > maxContainerRestarts ( s . Pods [ j ] )
}
2021-06-25 04:59:51 +00:00
// 8. Empty creation time pods < newer pods < older pods
2020-01-14 10:38:55 +00:00
if ! s . Pods [ i ] . CreationTimestamp . Equal ( & s . Pods [ j ] . CreationTimestamp ) {
2021-06-25 04:59:51 +00:00
if ! utilfeature . DefaultFeatureGate . Enabled ( features . LogarithmicScaleDown ) {
return afterOrZero ( & s . Pods [ i ] . CreationTimestamp , & s . Pods [ j ] . CreationTimestamp )
} else {
if s . Now . IsZero ( ) || s . Pods [ i ] . CreationTimestamp . IsZero ( ) || s . Pods [ j ] . CreationTimestamp . IsZero ( ) {
return afterOrZero ( & s . Pods [ i ] . CreationTimestamp , & s . Pods [ j ] . CreationTimestamp )
}
rankDiff := logarithmicRankDiff ( s . Pods [ i ] . CreationTimestamp , s . Pods [ j ] . CreationTimestamp , s . Now )
if rankDiff == 0 {
return s . Pods [ i ] . UID < s . Pods [ j ] . UID
}
return rankDiff < 0
}
2020-01-14 10:38:55 +00:00
}
return false
}
2019-05-31 09:45:11 +00:00
// afterOrZero checks if time t1 is after time t2; if one of them
// is zero, the zero time is seen as after non-zero time.
func afterOrZero ( t1 , t2 * metav1 . Time ) bool {
if t1 . Time . IsZero ( ) || t2 . Time . IsZero ( ) {
return t1 . Time . IsZero ( )
}
return t1 . After ( t2 . Time )
}
2021-06-25 04:59:51 +00:00
// logarithmicRankDiff calculates the base-2 logarithmic ranks of 2 timestamps,
// compared to the current timestamp
func logarithmicRankDiff ( t1 , t2 , now metav1 . Time ) int64 {
d1 := now . Sub ( t1 . Time )
d2 := now . Sub ( t2 . Time )
r1 := int64 ( - 1 )
r2 := int64 ( - 1 )
if d1 > 0 {
r1 = int64 ( math . Log2 ( float64 ( d1 ) ) )
}
if d2 > 0 {
r2 = int64 ( math . Log2 ( float64 ( d2 ) ) )
}
return r1 - r2
}
2019-05-31 09:45:11 +00:00
func podReadyTime ( pod * v1 . Pod ) * metav1 . Time {
if podutil . IsPodReady ( pod ) {
for _ , c := range pod . Status . Conditions {
// we only care about pod ready conditions
if c . Type == v1 . PodReady && c . Status == v1 . ConditionTrue {
return & c . LastTransitionTime
}
}
}
return & metav1 . Time { }
}
func maxContainerRestarts ( pod * v1 . Pod ) int {
maxRestarts := 0
for _ , c := range pod . Status . ContainerStatuses {
maxRestarts = integer . IntMax ( maxRestarts , int ( c . RestartCount ) )
}
return maxRestarts
}
// FilterActivePods returns pods that have not terminated.
func FilterActivePods ( pods [ ] * v1 . Pod ) [ ] * v1 . Pod {
var result [ ] * v1 . Pod
for _ , p := range pods {
if IsPodActive ( p ) {
result = append ( result , p )
} else {
klog . V ( 4 ) . Infof ( "Ignoring inactive pod %v/%v in state %v, deletion time %v" ,
p . Namespace , p . Name , p . Status . Phase , p . DeletionTimestamp )
}
}
return result
}
func IsPodActive ( p * v1 . Pod ) bool {
return v1 . PodSucceeded != p . Status . Phase &&
v1 . PodFailed != p . Status . Phase &&
p . DeletionTimestamp == nil
}
// FilterActiveReplicaSets returns replica sets that have (or at least ought to have) pods.
func FilterActiveReplicaSets ( replicaSets [ ] * apps . ReplicaSet ) [ ] * apps . ReplicaSet {
activeFilter := func ( rs * apps . ReplicaSet ) bool {
return rs != nil && * ( rs . Spec . Replicas ) > 0
}
return FilterReplicaSets ( replicaSets , activeFilter )
}
type filterRS func ( rs * apps . ReplicaSet ) bool
// FilterReplicaSets returns replica sets that are filtered by filterFn (all returned ones should match filterFn).
func FilterReplicaSets ( RSes [ ] * apps . ReplicaSet , filterFn filterRS ) [ ] * apps . ReplicaSet {
var filtered [ ] * apps . ReplicaSet
for i := range RSes {
if filterFn ( RSes [ i ] ) {
filtered = append ( filtered , RSes [ i ] )
}
}
return filtered
}
// PodKey returns a key unique to the given pod within a cluster.
// It's used so we consistently use the same key scheme in this module.
// It does exactly what cache.MetaNamespaceKeyFunc would have done
// except there's not possibility for error since we know the exact type.
func PodKey ( pod * v1 . Pod ) string {
return fmt . Sprintf ( "%v/%v" , pod . Namespace , pod . Name )
}
// ControllersByCreationTimestamp sorts a list of ReplicationControllers by creation timestamp, using their names as a tie breaker.
type ControllersByCreationTimestamp [ ] * v1 . ReplicationController
func ( o ControllersByCreationTimestamp ) Len ( ) int { return len ( o ) }
func ( o ControllersByCreationTimestamp ) Swap ( i , j int ) { o [ i ] , o [ j ] = o [ j ] , o [ i ] }
func ( o ControllersByCreationTimestamp ) Less ( i , j int ) bool {
if o [ i ] . CreationTimestamp . Equal ( & o [ j ] . CreationTimestamp ) {
return o [ i ] . Name < o [ j ] . Name
}
return o [ i ] . CreationTimestamp . Before ( & o [ j ] . CreationTimestamp )
}
// ReplicaSetsByCreationTimestamp sorts a list of ReplicaSet by creation timestamp, using their names as a tie breaker.
type ReplicaSetsByCreationTimestamp [ ] * apps . ReplicaSet
func ( o ReplicaSetsByCreationTimestamp ) Len ( ) int { return len ( o ) }
func ( o ReplicaSetsByCreationTimestamp ) Swap ( i , j int ) { o [ i ] , o [ j ] = o [ j ] , o [ i ] }
func ( o ReplicaSetsByCreationTimestamp ) Less ( i , j int ) bool {
if o [ i ] . CreationTimestamp . Equal ( & o [ j ] . CreationTimestamp ) {
return o [ i ] . Name < o [ j ] . Name
}
return o [ i ] . CreationTimestamp . Before ( & o [ j ] . CreationTimestamp )
}
// ReplicaSetsBySizeOlder sorts a list of ReplicaSet by size in descending order, using their creation timestamp or name as a tie breaker.
// By using the creation timestamp, this sorts from old to new replica sets.
type ReplicaSetsBySizeOlder [ ] * apps . ReplicaSet
func ( o ReplicaSetsBySizeOlder ) Len ( ) int { return len ( o ) }
func ( o ReplicaSetsBySizeOlder ) Swap ( i , j int ) { o [ i ] , o [ j ] = o [ j ] , o [ i ] }
func ( o ReplicaSetsBySizeOlder ) Less ( i , j int ) bool {
if * ( o [ i ] . Spec . Replicas ) == * ( o [ j ] . Spec . Replicas ) {
return ReplicaSetsByCreationTimestamp ( o ) . Less ( i , j )
}
return * ( o [ i ] . Spec . Replicas ) > * ( o [ j ] . Spec . Replicas )
}
// ReplicaSetsBySizeNewer sorts a list of ReplicaSet by size in descending order, using their creation timestamp or name as a tie breaker.
// By using the creation timestamp, this sorts from new to old replica sets.
type ReplicaSetsBySizeNewer [ ] * apps . ReplicaSet
func ( o ReplicaSetsBySizeNewer ) Len ( ) int { return len ( o ) }
func ( o ReplicaSetsBySizeNewer ) Swap ( i , j int ) { o [ i ] , o [ j ] = o [ j ] , o [ i ] }
func ( o ReplicaSetsBySizeNewer ) Less ( i , j int ) bool {
if * ( o [ i ] . Spec . Replicas ) == * ( o [ j ] . Spec . Replicas ) {
return ReplicaSetsByCreationTimestamp ( o ) . Less ( j , i )
}
return * ( o [ i ] . Spec . Replicas ) > * ( o [ j ] . Spec . Replicas )
}
// AddOrUpdateTaintOnNode add taints to the node. If taint was added into node, it'll issue API calls
// to update nodes; otherwise, no API calls. Return error if any.
2021-12-08 13:50:47 +00:00
func AddOrUpdateTaintOnNode ( ctx context . Context , c clientset . Interface , nodeName string , taints ... * v1 . Taint ) error {
2019-05-31 09:45:11 +00:00
if len ( taints ) == 0 {
return nil
}
firstTry := true
return clientretry . RetryOnConflict ( UpdateTaintBackoff , func ( ) error {
var err error
var oldNode * v1 . Node
// First we try getting node from the API server cache, as it's cheaper. If it fails
// we get it from etcd to be sure to have fresh data.
if firstTry {
2021-12-08 13:50:47 +00:00
oldNode , err = c . CoreV1 ( ) . Nodes ( ) . Get ( ctx , nodeName , metav1 . GetOptions { ResourceVersion : "0" } )
2019-05-31 09:45:11 +00:00
firstTry = false
} else {
2021-12-08 13:50:47 +00:00
oldNode , err = c . CoreV1 ( ) . Nodes ( ) . Get ( ctx , nodeName , metav1 . GetOptions { } )
2019-05-31 09:45:11 +00:00
}
if err != nil {
return err
}
var newNode * v1 . Node
oldNodeCopy := oldNode
updated := false
for _ , taint := range taints {
curNewNode , ok , err := taintutils . AddOrUpdateTaint ( oldNodeCopy , taint )
if err != nil {
return fmt . Errorf ( "failed to update taint of node" )
}
updated = updated || ok
newNode = curNewNode
oldNodeCopy = curNewNode
}
if ! updated {
return nil
}
2021-12-08 13:50:47 +00:00
return PatchNodeTaints ( ctx , c , nodeName , oldNode , newNode )
2019-05-31 09:45:11 +00:00
} )
}
// RemoveTaintOffNode is for cleaning up taints temporarily added to node,
// won't fail if target taint doesn't exist or has been removed.
// If passed a node it'll check if there's anything to be done, if taint is not present it won't issue
// any API calls.
2021-12-08 13:50:47 +00:00
func RemoveTaintOffNode ( ctx context . Context , c clientset . Interface , nodeName string , node * v1 . Node , taints ... * v1 . Taint ) error {
2019-05-31 09:45:11 +00:00
if len ( taints ) == 0 {
return nil
}
// Short circuit for limiting amount of API calls.
if node != nil {
match := false
for _ , taint := range taints {
if taintutils . TaintExists ( node . Spec . Taints , taint ) {
match = true
break
}
}
if ! match {
return nil
}
}
firstTry := true
return clientretry . RetryOnConflict ( UpdateTaintBackoff , func ( ) error {
var err error
var oldNode * v1 . Node
// First we try getting node from the API server cache, as it's cheaper. If it fails
// we get it from etcd to be sure to have fresh data.
if firstTry {
2021-12-08 13:50:47 +00:00
oldNode , err = c . CoreV1 ( ) . Nodes ( ) . Get ( ctx , nodeName , metav1 . GetOptions { ResourceVersion : "0" } )
2019-05-31 09:45:11 +00:00
firstTry = false
} else {
2021-12-08 13:50:47 +00:00
oldNode , err = c . CoreV1 ( ) . Nodes ( ) . Get ( ctx , nodeName , metav1 . GetOptions { } )
2019-05-31 09:45:11 +00:00
}
if err != nil {
return err
}
var newNode * v1 . Node
oldNodeCopy := oldNode
updated := false
for _ , taint := range taints {
curNewNode , ok , err := taintutils . RemoveTaint ( oldNodeCopy , taint )
if err != nil {
return fmt . Errorf ( "failed to remove taint of node" )
}
updated = updated || ok
newNode = curNewNode
oldNodeCopy = curNewNode
}
if ! updated {
return nil
}
2021-12-08 13:50:47 +00:00
return PatchNodeTaints ( ctx , c , nodeName , oldNode , newNode )
2019-05-31 09:45:11 +00:00
} )
}
// PatchNodeTaints patches node's taints.
2021-12-08 13:50:47 +00:00
func PatchNodeTaints ( ctx context . Context , c clientset . Interface , nodeName string , oldNode * v1 . Node , newNode * v1 . Node ) error {
2022-11-23 08:59:52 +00:00
// Strip base diff node from RV to ensure that our Patch request will set RV to check for conflicts over .spec.taints.
// This is needed because .spec.taints does not specify patchMergeKey and patchStrategy and adding them is no longer an option for compatibility reasons.
// Using other Patch strategy works for adding new taints, however will not resolve problem with taint removal.
oldNodeNoRV := oldNode . DeepCopy ( )
oldNodeNoRV . ResourceVersion = ""
oldDataNoRV , err := json . Marshal ( & oldNodeNoRV )
2019-05-31 09:45:11 +00:00
if err != nil {
2022-11-23 08:59:52 +00:00
return fmt . Errorf ( "failed to marshal old node %#v for node %q: %v" , oldNodeNoRV , nodeName , err )
2019-05-31 09:45:11 +00:00
}
newTaints := newNode . Spec . Taints
newNodeClone := oldNode . DeepCopy ( )
newNodeClone . Spec . Taints = newTaints
newData , err := json . Marshal ( newNodeClone )
if err != nil {
return fmt . Errorf ( "failed to marshal new node %#v for node %q: %v" , newNodeClone , nodeName , err )
}
2022-11-23 08:59:52 +00:00
patchBytes , err := strategicpatch . CreateTwoWayMergePatch ( oldDataNoRV , newData , v1 . Node { } )
2019-05-31 09:45:11 +00:00
if err != nil {
return fmt . Errorf ( "failed to create patch for node %q: %v" , nodeName , err )
}
2021-12-08 13:50:47 +00:00
_ , err = c . CoreV1 ( ) . Nodes ( ) . Patch ( ctx , nodeName , types . StrategicMergePatchType , patchBytes , metav1 . PatchOptions { } )
2019-05-31 09:45:11 +00:00
return err
}
// ComputeHash returns a hash value calculated from pod template and
// a collisionCount to avoid hash collision. The hash will be safe encoded to
// avoid bad words.
func ComputeHash ( template * v1 . PodTemplateSpec , collisionCount * int32 ) string {
podTemplateSpecHasher := fnv . New32a ( )
hashutil . DeepHashObject ( podTemplateSpecHasher , * template )
// Add collisionCount in the hash if it exists.
if collisionCount != nil {
collisionCountBytes := make ( [ ] byte , 8 )
binary . LittleEndian . PutUint32 ( collisionCountBytes , uint32 ( * collisionCount ) )
podTemplateSpecHasher . Write ( collisionCountBytes )
}
return rand . SafeEncodeString ( fmt . Sprint ( podTemplateSpecHasher . Sum32 ( ) ) )
}
func AddOrUpdateLabelsOnNode ( kubeClient clientset . Interface , nodeName string , labelsToUpdate map [ string ] string ) error {
firstTry := true
return clientretry . RetryOnConflict ( UpdateLabelBackoff , func ( ) error {
var err error
var node * v1 . Node
// First we try getting node from the API server cache, as it's cheaper. If it fails
// we get it from etcd to be sure to have fresh data.
if firstTry {
2020-04-14 07:04:33 +00:00
node , err = kubeClient . CoreV1 ( ) . Nodes ( ) . Get ( context . TODO ( ) , nodeName , metav1 . GetOptions { ResourceVersion : "0" } )
2019-05-31 09:45:11 +00:00
firstTry = false
} else {
2020-04-14 07:04:33 +00:00
node , err = kubeClient . CoreV1 ( ) . Nodes ( ) . Get ( context . TODO ( ) , nodeName , metav1 . GetOptions { } )
2019-05-31 09:45:11 +00:00
}
if err != nil {
return err
}
// Make a copy of the node and update the labels.
newNode := node . DeepCopy ( )
if newNode . Labels == nil {
newNode . Labels = make ( map [ string ] string )
}
for key , value := range labelsToUpdate {
newNode . Labels [ key ] = value
}
oldData , err := json . Marshal ( node )
if err != nil {
return fmt . Errorf ( "failed to marshal the existing node %#v: %v" , node , err )
}
newData , err := json . Marshal ( newNode )
if err != nil {
return fmt . Errorf ( "failed to marshal the new node %#v: %v" , newNode , err )
}
patchBytes , err := strategicpatch . CreateTwoWayMergePatch ( oldData , newData , & v1 . Node { } )
if err != nil {
return fmt . Errorf ( "failed to create a two-way merge patch: %v" , err )
}
2020-04-14 07:04:33 +00:00
if _ , err := kubeClient . CoreV1 ( ) . Nodes ( ) . Patch ( context . TODO ( ) , node . Name , types . StrategicMergePatchType , patchBytes , metav1 . PatchOptions { } ) ; err != nil {
2019-05-31 09:45:11 +00:00
return fmt . Errorf ( "failed to patch the node: %v" , err )
}
return nil
} )
}