rebase: update kubernetes dep to 1.24.0

As kubernetes 1.24.0 is released, updating
kubernetes dependencies to 1.24.0

updates: #3086

Signed-off-by: Madhu Rajanna <madhupr007@gmail.com>
This commit is contained in:
Madhu Rajanna
2022-05-05 08:17:06 +05:30
committed by mergify[bot]
parent fc1529f268
commit c4f79d455f
959 changed files with 80055 additions and 27456 deletions

View File

@ -1,38 +1,28 @@
# See the OWNERS docs at https://go.k8s.io/owners
approvers:
- thockin
- lavalamp
- smarterclayton
- wojtek-t
- deads2k
- caesarxuchao
- liggitt
- ncdc
- thockin
- lavalamp
- smarterclayton
- wojtek-t
- deads2k
- caesarxuchao
- liggitt
- ncdc
reviewers:
- thockin
- lavalamp
- smarterclayton
- wojtek-t
- deads2k
- brendandburns
- derekwaynecarr
- caesarxuchao
- mikedanese
- liggitt
- davidopp
- pmorie
- janetkuo
- justinsb
- soltysh
- jsafrane
- dims
- hongchaodeng
- krousey
- xiang90
- ingvagabund
- resouer
- jessfraz
- mfojtik
- sdminonne
- ncdc
- thockin
- lavalamp
- smarterclayton
- wojtek-t
- deads2k
- derekwaynecarr
- caesarxuchao
- mikedanese
- liggitt
- janetkuo
- justinsb
- soltysh
- jsafrane
- dims
- ingvagabund
- ncdc

View File

@ -17,6 +17,7 @@ limitations under the License.
package cache
import (
"errors"
"sync"
"time"
@ -370,8 +371,8 @@ type TransformFunc func(interface{}) (interface{}, error)
// the returned Store for Get/List operations; Add/Modify/Deletes will cause
// the event notifications to be faulty.
// The given transform function will be called on all objects before they will
// put put into the Store and corresponding Add/Modify/Delete handlers will
// be invokved for them.
// put into the Store and corresponding Add/Modify/Delete handlers will
// be invoked for them.
func NewTransformingInformer(
lw ListerWatcher,
objType runtime.Object,
@ -406,6 +407,49 @@ func NewTransformingIndexerInformer(
return clientState, newInformer(lw, objType, resyncPeriod, h, clientState, transformer)
}
// Multiplexes updates in the form of a list of Deltas into a Store, and informs
// a given handler of events OnUpdate, OnAdd, OnDelete
func processDeltas(
// Object which receives event notifications from the given deltas
handler ResourceEventHandler,
clientState Store,
transformer TransformFunc,
deltas Deltas,
) error {
// from oldest to newest
for _, d := range deltas {
obj := d.Object
if transformer != nil {
var err error
obj, err = transformer(obj)
if err != nil {
return err
}
}
switch d.Type {
case Sync, Replaced, Added, Updated:
if old, exists, err := clientState.Get(obj); err == nil && exists {
if err := clientState.Update(obj); err != nil {
return err
}
handler.OnUpdate(old, obj)
} else {
if err := clientState.Add(obj); err != nil {
return err
}
handler.OnAdd(obj)
}
case Deleted:
if err := clientState.Delete(obj); err != nil {
return err
}
handler.OnDelete(obj)
}
}
return nil
}
// newInformer returns a controller for populating the store while also
// providing event notifications.
//
@ -444,38 +488,10 @@ func newInformer(
RetryOnError: false,
Process: func(obj interface{}) error {
// from oldest to newest
for _, d := range obj.(Deltas) {
obj := d.Object
if transformer != nil {
var err error
obj, err = transformer(obj)
if err != nil {
return err
}
}
switch d.Type {
case Sync, Replaced, Added, Updated:
if old, exists, err := clientState.Get(obj); err == nil && exists {
if err := clientState.Update(obj); err != nil {
return err
}
h.OnUpdate(old, obj)
} else {
if err := clientState.Add(obj); err != nil {
return err
}
h.OnAdd(obj)
}
case Deleted:
if err := clientState.Delete(obj); err != nil {
return err
}
h.OnDelete(obj)
}
if deltas, ok := obj.(Deltas); ok {
return processDeltas(h, clientState, transformer, deltas)
}
return nil
return errors.New("object given as Process argument is not Deltas")
},
}
return New(cfg)

View File

@ -231,7 +231,7 @@ var (
// Used to indicate that watching stopped because of a signal from the stop
// channel passed in from a client of the reflector.
errorStopRequested = errors.New("Stop requested")
errorStopRequested = errors.New("stop requested")
)
// resyncChan returns a channel which will receive something when a resync is
@ -258,7 +258,7 @@ func (r *Reflector) ListAndWatch(stopCh <-chan struct{}) error {
options := metav1.ListOptions{ResourceVersion: r.relistResourceVersion()}
if err := func() error {
initTrace := trace.New("Reflector ListAndWatch", trace.Field{"name", r.name})
initTrace := trace.New("Reflector ListAndWatch", trace.Field{Key: "name", Value: r.name})
defer initTrace.LogIfLong(10 * time.Second)
var list runtime.Object
var paginatedResult bool
@ -319,7 +319,7 @@ func (r *Reflector) ListAndWatch(stopCh <-chan struct{}) error {
panic(r)
case <-listCh:
}
initTrace.Step("Objects listed", trace.Field{"error", err})
initTrace.Step("Objects listed", trace.Field{Key: "error", Value: err})
if err != nil {
klog.Warningf("%s: failed to list %v: %v", r.name, r.expectedTypeName, err)
return fmt.Errorf("failed to list %v: %v", r.expectedTypeName, err)
@ -401,7 +401,7 @@ func (r *Reflector) ListAndWatch(stopCh <-chan struct{}) error {
timeoutSeconds := int64(minWatchTimeout.Seconds() * (rand.Float64() + 1.0))
options = metav1.ListOptions{
ResourceVersion: resourceVersion,
// We want to avoid situations of hanging watchers. Stop any wachers that do not
// We want to avoid situations of hanging watchers. Stop any watchers that do not
// receive any events within the timeout window.
TimeoutSeconds: &timeoutSeconds,
// To reduce load on kube-apiserver on watch restarts, you may enable watch bookmarks.

View File

@ -17,6 +17,7 @@ limitations under the License.
package cache
import (
"errors"
"fmt"
"sync"
"time"
@ -180,6 +181,20 @@ type SharedInformer interface {
// The handler should return quickly - any expensive processing should be
// offloaded.
SetWatchErrorHandler(handler WatchErrorHandler) error
// The TransformFunc is called for each object which is about to be stored.
//
// This function is intended for you to take the opportunity to
// remove, transform, or normalize fields. One use case is to strip unused
// metadata fields out of objects to save on RAM cost.
//
// Must be set before starting the informer.
//
// Note: Since the object given to the handler may be already shared with
// other goroutines, it is advisable to copy the object being
// transform before mutating it at all and returning the copy to prevent
// data races.
SetTransform(handler TransformFunc) error
}
// SharedIndexInformer provides add and get Indexers ability based on SharedInformer.
@ -244,7 +259,7 @@ func WaitForNamedCacheSync(controllerName string, stopCh <-chan struct{}, cacheS
return false
}
klog.Infof("Caches are synced for %s ", controllerName)
klog.Infof("Caches are synced for %s", controllerName)
return true
}
@ -318,6 +333,8 @@ type sharedIndexInformer struct {
// Called whenever the ListAndWatch drops the connection with an error.
watchErrorHandler WatchErrorHandler
transform TransformFunc
}
// dummyController hides the fact that a SharedInformer is different from a dedicated one
@ -365,6 +382,18 @@ func (s *sharedIndexInformer) SetWatchErrorHandler(handler WatchErrorHandler) er
return nil
}
func (s *sharedIndexInformer) SetTransform(handler TransformFunc) error {
s.startedLock.Lock()
defer s.startedLock.Unlock()
if s.started {
return fmt.Errorf("informer has already started")
}
s.transform = handler
return nil
}
func (s *sharedIndexInformer) Run(stopCh <-chan struct{}) {
defer utilruntime.HandleCrash()
@ -538,45 +567,47 @@ func (s *sharedIndexInformer) HandleDeltas(obj interface{}) error {
s.blockDeltas.Lock()
defer s.blockDeltas.Unlock()
// from oldest to newest
for _, d := range obj.(Deltas) {
switch d.Type {
case Sync, Replaced, Added, Updated:
s.cacheMutationDetector.AddObject(d.Object)
if old, exists, err := s.indexer.Get(d.Object); err == nil && exists {
if err := s.indexer.Update(d.Object); err != nil {
return err
}
if deltas, ok := obj.(Deltas); ok {
return processDeltas(s, s.indexer, s.transform, deltas)
}
return errors.New("object given as Process argument is not Deltas")
}
isSync := false
switch {
case d.Type == Sync:
// Sync events are only propagated to listeners that requested resync
isSync = true
case d.Type == Replaced:
if accessor, err := meta.Accessor(d.Object); err == nil {
if oldAccessor, err := meta.Accessor(old); err == nil {
// Replaced events that didn't change resourceVersion are treated as resync events
// and only propagated to listeners that requested resync
isSync = accessor.GetResourceVersion() == oldAccessor.GetResourceVersion()
}
}
}
s.processor.distribute(updateNotification{oldObj: old, newObj: d.Object}, isSync)
} else {
if err := s.indexer.Add(d.Object); err != nil {
return err
}
s.processor.distribute(addNotification{newObj: d.Object}, false)
}
case Deleted:
if err := s.indexer.Delete(d.Object); err != nil {
return err
}
s.processor.distribute(deleteNotification{oldObj: d.Object}, false)
// Conforms to ResourceEventHandler
func (s *sharedIndexInformer) OnAdd(obj interface{}) {
// Invocation of this function is locked under s.blockDeltas, so it is
// save to distribute the notification
s.cacheMutationDetector.AddObject(obj)
s.processor.distribute(addNotification{newObj: obj}, false)
}
// Conforms to ResourceEventHandler
func (s *sharedIndexInformer) OnUpdate(old, new interface{}) {
isSync := false
// If is a Sync event, isSync should be true
// If is a Replaced event, isSync is true if resource version is unchanged.
// If RV is unchanged: this is a Sync/Replaced event, so isSync is true
if accessor, err := meta.Accessor(new); err == nil {
if oldAccessor, err := meta.Accessor(old); err == nil {
// Events that didn't change resourceVersion are treated as resync events
// and only propagated to listeners that requested resync
isSync = accessor.GetResourceVersion() == oldAccessor.GetResourceVersion()
}
}
return nil
// Invocation of this function is locked under s.blockDeltas, so it is
// save to distribute the notification
s.cacheMutationDetector.AddObject(new)
s.processor.distribute(updateNotification{oldObj: old, newObj: new}, isSync)
}
// Conforms to ResourceEventHandler
func (s *sharedIndexInformer) OnDelete(old interface{}) {
// Invocation of this function is locked under s.blockDeltas, so it is
// save to distribute the notification
s.processor.distribute(deleteNotification{oldObj: old}, false)
}
// sharedProcessor has a collection of processorListener and can

View File

@ -71,11 +71,7 @@ type threadSafeMap struct {
}
func (c *threadSafeMap) Add(key string, obj interface{}) {
c.lock.Lock()
defer c.lock.Unlock()
oldObject := c.items[key]
c.items[key] = obj
c.updateIndices(oldObject, obj, key)
c.Update(key, obj)
}
func (c *threadSafeMap) Update(key string, obj interface{}) {