rebase: bump k8s.io/kubernetes in the k8s-dependencies group

Bumps the k8s-dependencies group with 1 update: [k8s.io/kubernetes](https://github.com/kubernetes/kubernetes).

Updates `k8s.io/kubernetes` from 1.32.3 to 1.33.0
- [Release notes](https://github.com/kubernetes/kubernetes/releases)
- [Commits](https://github.com/kubernetes/kubernetes/compare/v1.32.3...v1.33.0)

---
updated-dependencies:
- dependency-name: k8s.io/kubernetes
  dependency-version: 1.33.0
  dependency-type: direct:production
  update-type: version-update:semver-minor
  dependency-group: k8s-dependencies
...

Signed-off-by: dependabot[bot] <support@github.com>
Signed-off-by: Niels de Vos <ndevos@ibm.com>
This commit is contained in:
dependabot[bot]
2025-04-28 22:16:28 +00:00
committed by mergify[bot]
parent 4147d5d15a
commit 51895f8619
699 changed files with 51590 additions and 17096 deletions

View File

@ -17,7 +17,9 @@ limitations under the License.
package cache
import (
"context"
"errors"
clientgofeaturegate "k8s.io/client-go/features"
"sync"
"time"
@ -71,16 +73,15 @@ type Config struct {
// resync.
ShouldResync ShouldResyncFunc
// If true, when Process() returns an error, re-enqueue the object.
// TODO: add interface to let you inject a delay/backoff or drop
// the object completely if desired. Pass the object in
// question to this interface as a parameter. This is probably moot
// now that this functionality appears at a higher level.
RetryOnError bool
// Called whenever the ListAndWatch drops the connection with an error.
//
// Contextual logging: WatchErrorHandlerWithContext should be used instead of WatchErrorHandler in code which supports contextual logging.
WatchErrorHandler WatchErrorHandler
// Called whenever the ListAndWatch drops the connection with an error
// and WatchErrorHandler is not set.
WatchErrorHandlerWithContext WatchErrorHandlerWithContext
// WatchListPageSize is the requested chunk size of initial and relist watch lists.
WatchListPageSize int64
}
@ -104,12 +105,21 @@ type controller struct {
// Controller is a low-level controller that is parameterized by a
// Config and used in sharedIndexInformer.
type Controller interface {
// Run does two things. One is to construct and run a Reflector
// RunWithContext does two things. One is to construct and run a Reflector
// to pump objects/notifications from the Config's ListerWatcher
// to the Config's Queue and possibly invoke the occasional Resync
// on that Queue. The other is to repeatedly Pop from the Queue
// and process with the Config's ProcessFunc. Both of these
// continue until `stopCh` is closed.
// continue until the context is canceled.
//
// It's an error to call RunWithContext more than once.
// RunWithContext blocks; call via go.
RunWithContext(ctx context.Context)
// Run does the same as RunWithContext with a stop channel instead of
// a context.
//
// Contextual logging: RunWithcontext should be used instead of Run in code which supports contextual logging.
Run(stopCh <-chan struct{})
// HasSynced delegates to the Config's Queue
@ -129,13 +139,16 @@ func New(c *Config) Controller {
return ctlr
}
// Run begins processing items, and will continue until a value is sent down stopCh or it is closed.
// It's an error to call Run more than once.
// Run blocks; call via go.
// Run implements [Controller.Run].
func (c *controller) Run(stopCh <-chan struct{}) {
defer utilruntime.HandleCrash()
c.RunWithContext(wait.ContextForChannel(stopCh))
}
// RunWithContext implements [Controller.RunWithContext].
func (c *controller) RunWithContext(ctx context.Context) {
defer utilruntime.HandleCrashWithContext(ctx)
go func() {
<-stopCh
<-ctx.Done()
c.config.Queue.Close()
}()
r := NewReflectorWithOptions(
@ -152,7 +165,11 @@ func (c *controller) Run(stopCh <-chan struct{}) {
r.ShouldResync = c.config.ShouldResync
r.WatchListPageSize = c.config.WatchListPageSize
if c.config.WatchErrorHandler != nil {
r.watchErrorHandler = c.config.WatchErrorHandler
r.watchErrorHandler = func(_ context.Context, r *Reflector, err error) {
c.config.WatchErrorHandler(r, err)
}
} else if c.config.WatchErrorHandlerWithContext != nil {
r.watchErrorHandler = c.config.WatchErrorHandlerWithContext
}
c.reflectorMutex.Lock()
@ -161,9 +178,9 @@ func (c *controller) Run(stopCh <-chan struct{}) {
var wg wait.Group
wg.StartWithChannel(stopCh, r.Run)
wg.StartWithContext(ctx, r.RunWithContext)
wait.Until(c.processLoop, time.Second, stopCh)
wait.UntilWithContext(ctx, c.processLoop, time.Second)
wg.Wait()
}
@ -185,22 +202,16 @@ func (c *controller) LastSyncResourceVersion() string {
// TODO: Consider doing the processing in parallel. This will require a little thought
// to make sure that we don't end up processing the same object multiple times
// concurrently.
//
// TODO: Plumb through the stopCh here (and down to the queue) so that this can
// actually exit when the controller is stopped. Or just give up on this stuff
// ever being stoppable. Converting this whole package to use Context would
// also be helpful.
func (c *controller) processLoop() {
func (c *controller) processLoop(ctx context.Context) {
for {
obj, err := c.config.Queue.Pop(PopProcessFunc(c.config.Process))
// TODO: Plumb through the ctx so that this can
// actually exit when the controller is stopped. Or just give up on this stuff
// ever being stoppable.
_, err := c.config.Queue.Pop(PopProcessFunc(c.config.Process))
if err != nil {
if err == ErrFIFOClosed {
return
}
if c.config.RetryOnError {
// This is the safe way to re-enqueue.
c.config.Queue.AddIfNotPresent(obj)
}
}
}
}
@ -582,11 +593,17 @@ func newInformer(clientState Store, options InformerOptions) Controller {
// This will hold incoming changes. Note how we pass clientState in as a
// KeyLister, that way resync operations will result in the correct set
// of update/delete deltas.
fifo := NewDeltaFIFOWithOptions(DeltaFIFOOptions{
KnownObjects: clientState,
EmitDeltaTypeReplaced: true,
Transformer: options.Transform,
})
var fifo Queue
if clientgofeaturegate.FeatureGates().Enabled(clientgofeaturegate.InOrderInformers) {
fifo = NewRealFIFO(MetaNamespaceKeyFunc, clientState, options.Transform)
} else {
fifo = NewDeltaFIFOWithOptions(DeltaFIFOOptions{
KnownObjects: clientState,
EmitDeltaTypeReplaced: true,
Transformer: options.Transform,
})
}
cfg := &Config{
Queue: fifo,
@ -594,7 +611,6 @@ func newInformer(clientState Store, options InformerOptions) Controller {
ObjectType: options.ObjectType,
FullResyncPeriod: options.ResyncPeriod,
MinWatchTimeout: options.MinWatchTimeout,
RetryOnError: false,
Process: func(obj interface{}, isInInitialList bool) error {
if deltas, ok := obj.(Deltas); ok {

View File

@ -55,6 +55,9 @@ type DeltaFIFOOptions struct {
// If set, will be called for objects before enqueueing them. Please
// see the comment on TransformFunc for details.
Transformer TransformFunc
// If set, log output will go to this logger instead of klog.Background().
Logger *klog.Logger
}
// DeltaFIFO is like FIFO, but differs in two ways. One is that the
@ -136,6 +139,10 @@ type DeltaFIFO struct {
// Called with every object if non-nil.
transformer TransformFunc
// logger is a per-instance logger. This gets chosen when constructing
// the instance, with klog.Background() as default.
logger klog.Logger
}
// TransformFunc allows for transforming an object before it will be processed.
@ -253,6 +260,10 @@ func NewDeltaFIFOWithOptions(opts DeltaFIFOOptions) *DeltaFIFO {
emitDeltaTypeReplaced: opts.EmitDeltaTypeReplaced,
transformer: opts.Transformer,
logger: klog.Background(),
}
if opts.Logger != nil {
f.logger = *opts.Logger
}
f.cond.L = &f.lock
return f
@ -358,43 +369,6 @@ func (f *DeltaFIFO) Delete(obj interface{}) error {
return f.queueActionLocked(Deleted, obj)
}
// AddIfNotPresent inserts an item, and puts it in the queue. If the item is already
// present in the set, it is neither enqueued nor added to the set.
//
// This is useful in a single producer/consumer scenario so that the consumer can
// safely retry items without contending with the producer and potentially enqueueing
// stale items.
//
// Important: obj must be a Deltas (the output of the Pop() function). Yes, this is
// different from the Add/Update/Delete functions.
func (f *DeltaFIFO) AddIfNotPresent(obj interface{}) error {
deltas, ok := obj.(Deltas)
if !ok {
return fmt.Errorf("object must be of type deltas, but got: %#v", obj)
}
id, err := f.KeyOf(deltas)
if err != nil {
return KeyError{obj, err}
}
f.lock.Lock()
defer f.lock.Unlock()
f.addIfNotPresent(id, deltas)
return nil
}
// addIfNotPresent inserts deltas under id if it does not exist, and assumes the caller
// already holds the fifo lock.
func (f *DeltaFIFO) addIfNotPresent(id string, deltas Deltas) {
f.populated = true
if _, exists := f.items[id]; exists {
return
}
f.queue = append(f.queue, id)
f.items[id] = deltas
f.cond.Broadcast()
}
// re-listing and watching can deliver the same update multiple times in any
// order. This will combine the most recent two deltas if they are the same.
func dedupDeltas(deltas Deltas) Deltas {
@ -487,71 +461,16 @@ func (f *DeltaFIFO) queueActionInternalLocked(actionType, internalActionType Del
// when given a non-empty list (as it is here).
// If somehow it happens anyway, deal with it but complain.
if oldDeltas == nil {
klog.Errorf("Impossible dedupDeltas for id=%q: oldDeltas=%#+v, obj=%#+v; ignoring", id, oldDeltas, obj)
f.logger.Error(nil, "Impossible dedupDeltas, ignoring", "id", id, "oldDeltas", oldDeltas, "obj", obj)
return nil
}
klog.Errorf("Impossible dedupDeltas for id=%q: oldDeltas=%#+v, obj=%#+v; breaking invariant by storing empty Deltas", id, oldDeltas, obj)
f.logger.Error(nil, "Impossible dedupDeltas, breaking invariant by storing empty Deltas", "id", id, "oldDeltas", oldDeltas, "obj", obj)
f.items[id] = newDeltas
return fmt.Errorf("Impossible dedupDeltas for id=%q: oldDeltas=%#+v, obj=%#+v; broke DeltaFIFO invariant by storing empty Deltas", id, oldDeltas, obj)
}
return nil
}
// List returns a list of all the items; it returns the object
// from the most recent Delta.
// You should treat the items returned inside the deltas as immutable.
func (f *DeltaFIFO) List() []interface{} {
f.lock.RLock()
defer f.lock.RUnlock()
return f.listLocked()
}
func (f *DeltaFIFO) listLocked() []interface{} {
list := make([]interface{}, 0, len(f.items))
for _, item := range f.items {
list = append(list, item.Newest().Object)
}
return list
}
// ListKeys returns a list of all the keys of the objects currently
// in the FIFO.
func (f *DeltaFIFO) ListKeys() []string {
f.lock.RLock()
defer f.lock.RUnlock()
list := make([]string, 0, len(f.queue))
for _, key := range f.queue {
list = append(list, key)
}
return list
}
// Get returns the complete list of deltas for the requested item,
// or sets exists=false.
// You should treat the items returned inside the deltas as immutable.
func (f *DeltaFIFO) Get(obj interface{}) (item interface{}, exists bool, err error) {
key, err := f.KeyOf(obj)
if err != nil {
return nil, false, KeyError{obj, err}
}
return f.GetByKey(key)
}
// GetByKey returns the complete list of deltas for the requested item,
// setting exists=false if that list is empty.
// You should treat the items returned inside the deltas as immutable.
func (f *DeltaFIFO) GetByKey(key string) (item interface{}, exists bool, err error) {
f.lock.RLock()
defer f.lock.RUnlock()
d, exists := f.items[key]
if exists {
// Copy item's slice so operations on this slice
// won't interfere with the object we return.
d = copyDeltas(d)
}
return d, exists, nil
}
// IsClosed checks if the queue is closed
func (f *DeltaFIFO) IsClosed() bool {
f.lock.Lock()
@ -565,9 +484,7 @@ func (f *DeltaFIFO) IsClosed() bool {
// is returned, so if you don't successfully process it, you need to add it back
// with AddIfNotPresent().
// process function is called under lock, so it is safe to update data structures
// in it that need to be in sync with the queue (e.g. knownKeys). The PopProcessFunc
// may return an instance of ErrRequeue with a nested error to indicate the current
// item should be requeued (equivalent to calling AddIfNotPresent under the lock).
// in it that need to be in sync with the queue (e.g. knownKeys).
// process should avoid expensive I/O operation so that other queue operations, i.e.
// Add() and Get(), won't be blocked for too long.
//
@ -597,7 +514,7 @@ func (f *DeltaFIFO) Pop(process PopProcessFunc) (interface{}, error) {
item, ok := f.items[id]
if !ok {
// This should never happen
klog.Errorf("Inconceivable! %q was in f.queue but not f.items; ignoring.", id)
f.logger.Error(nil, "Inconceivable! Item was in f.queue but not f.items; ignoring", "id", id)
continue
}
delete(f.items, id)
@ -614,10 +531,6 @@ func (f *DeltaFIFO) Pop(process PopProcessFunc) (interface{}, error) {
defer trace.LogIfLong(100 * time.Millisecond)
}
err := process(item, isInInitialList)
if e, ok := err.(ErrRequeue); ok {
f.addIfNotPresent(id, item)
err = e.Err
}
// Don't need to copyDeltas here, because we're transferring
// ownership to the caller.
return item, err
@ -694,10 +607,10 @@ func (f *DeltaFIFO) Replace(list []interface{}, _ string) error {
deletedObj, exists, err := f.knownObjects.GetByKey(k)
if err != nil {
deletedObj = nil
klog.Errorf("Unexpected error %v during lookup of key %v, placing DeleteFinalStateUnknown marker without object", err, k)
f.logger.Error(err, "Unexpected error during lookup, placing DeleteFinalStateUnknown marker without object", "key", k)
} else if !exists {
deletedObj = nil
klog.Infof("Key %v does not exist in known objects store, placing DeleteFinalStateUnknown marker without object", k)
f.logger.Info("Key does not exist in known objects store, placing DeleteFinalStateUnknown marker without object", "key", k)
}
queuedDeletions++
if err := f.queueActionLocked(Deleted, DeletedFinalStateUnknown{k, deletedObj}); err != nil {
@ -737,10 +650,10 @@ func (f *DeltaFIFO) Resync() error {
func (f *DeltaFIFO) syncKeyLocked(key string) error {
obj, exists, err := f.knownObjects.GetByKey(key)
if err != nil {
klog.Errorf("Unexpected error %v during lookup of key %v, unable to queue object for sync", err, key)
f.logger.Error(err, "Unexpected error during lookup, unable to queue object for sync", "key", key)
return nil
} else if !exists {
klog.Infof("Key %v does not exist in known objects store, unable to queue object for sync", key)
f.logger.Info("Key does not exist in known objects store, unable to queue object for sync", "key", key)
return nil
}

View File

@ -21,4 +21,4 @@ limitations under the License.
// list currently available nodes), and one that additionally acts as
// a FIFO queue (for example, to allow a scheduler to process incoming
// pods).
package cache // import "k8s.io/client-go/tools/cache"
package cache

View File

@ -27,30 +27,16 @@ import (
// It is supposed to process the accumulator popped from the queue.
type PopProcessFunc func(obj interface{}, isInInitialList bool) error
// ErrRequeue may be returned by a PopProcessFunc to safely requeue
// the current item. The value of Err will be returned from Pop.
type ErrRequeue struct {
// Err is returned by the Pop function
Err error
}
// ErrFIFOClosed used when FIFO is closed
var ErrFIFOClosed = errors.New("DeltaFIFO: manipulating with closed queue")
func (e ErrRequeue) Error() string {
if e.Err == nil {
return "the popped item should be requeued without returning an error"
}
return e.Err.Error()
}
// Queue extends Store with a collection of Store keys to "process".
// Queue extends ReflectorStore with a collection of Store keys to "process".
// Every Add, Update, or Delete may put the object's key in that collection.
// A Queue has a way to derive the corresponding key given an accumulator.
// A Queue can be accessed concurrently from multiple goroutines.
// A Queue can be "closed", after which Pop operations return an error.
type Queue interface {
Store
ReflectorStore
// Pop blocks until there is at least one key to process or the
// Queue is closed. In the latter case Pop returns with an error.
@ -64,11 +50,6 @@ type Queue interface {
// Pop.
Pop(PopProcessFunc) (interface{}, error)
// AddIfNotPresent puts the given accumulator into the Queue (in
// association with the accumulator's key) if and only if that key
// is not already associated with a non-empty accumulator.
AddIfNotPresent(interface{}) error
// HasSynced returns true if the first batch of keys have all been
// popped. The first batch of keys are those of the first Replace
// operation if that happened before any Add, AddIfNotPresent,
@ -177,36 +158,6 @@ func (f *FIFO) Add(obj interface{}) error {
return nil
}
// AddIfNotPresent inserts an item, and puts it in the queue. If the item is already
// present in the set, it is neither enqueued nor added to the set.
//
// This is useful in a single producer/consumer scenario so that the consumer can
// safely retry items without contending with the producer and potentially enqueueing
// stale items.
func (f *FIFO) AddIfNotPresent(obj interface{}) error {
id, err := f.keyFunc(obj)
if err != nil {
return KeyError{obj, err}
}
f.lock.Lock()
defer f.lock.Unlock()
f.addIfNotPresent(id, obj)
return nil
}
// addIfNotPresent assumes the fifo lock is already held and adds the provided
// item to the queue under id if it does not already exist.
func (f *FIFO) addIfNotPresent(id string, obj interface{}) {
f.populated = true
if _, exists := f.items[id]; exists {
return
}
f.queue = append(f.queue, id)
f.items[id] = obj
f.cond.Broadcast()
}
// Update is the same as Add in this implementation.
func (f *FIFO) Update(obj interface{}) error {
return f.Add(obj)
@ -227,46 +178,6 @@ func (f *FIFO) Delete(obj interface{}) error {
return err
}
// List returns a list of all the items.
func (f *FIFO) List() []interface{} {
f.lock.RLock()
defer f.lock.RUnlock()
list := make([]interface{}, 0, len(f.items))
for _, item := range f.items {
list = append(list, item)
}
return list
}
// ListKeys returns a list of all the keys of the objects currently
// in the FIFO.
func (f *FIFO) ListKeys() []string {
f.lock.RLock()
defer f.lock.RUnlock()
list := make([]string, 0, len(f.items))
for key := range f.items {
list = append(list, key)
}
return list
}
// Get returns the requested item, or sets exists=false.
func (f *FIFO) Get(obj interface{}) (item interface{}, exists bool, err error) {
key, err := f.keyFunc(obj)
if err != nil {
return nil, false, KeyError{obj, err}
}
return f.GetByKey(key)
}
// GetByKey returns the requested item, or sets exists=false.
func (f *FIFO) GetByKey(key string) (item interface{}, exists bool, err error) {
f.lock.RLock()
defer f.lock.RUnlock()
item, exists = f.items[key]
return item, exists, nil
}
// IsClosed checks if the queue is closed
func (f *FIFO) IsClosed() bool {
f.lock.Lock()
@ -307,10 +218,6 @@ func (f *FIFO) Pop(process PopProcessFunc) (interface{}, error) {
}
delete(f.items, id)
err := process(item, isInInitialList)
if e, ok := err.(ErrRequeue); ok {
f.addIfNotPresent(id, item)
err = e.Err
}
return item, err
}
}

View File

@ -62,7 +62,12 @@ func ListAllByNamespace(indexer Indexer, namespace string, selector labels.Selec
items, err := indexer.Index(NamespaceIndex, &metav1.ObjectMeta{Namespace: namespace})
if err != nil {
// Ignore error; do slow search without index.
klog.Warningf("can not retrieve list of objects using index : %v", err)
//
// ListAllByNamespace is called by generated code
// (k8s.io/client-go/listers) and probably not worth converting
// to contextual logging, which would require changing all of
// those APIs.
klog.TODO().Info("Warning: can not retrieve list of objects using index", "err", err)
for _, m := range indexer.List() {
metadata, err := meta.Accessor(m)
if err != nil {

View File

@ -27,50 +27,160 @@ import (
)
// Lister is any object that knows how to perform an initial list.
//
// Ideally, all implementations of Lister should also implement ListerWithContext.
type Lister interface {
// List should return a list type object; the Items field will be extracted, and the
// ResourceVersion field will be used to start the watch in the right place.
//
// Deprecated: use ListerWithContext.ListWithContext instead.
List(options metav1.ListOptions) (runtime.Object, error)
}
// ListerWithContext is any object that knows how to perform an initial list.
type ListerWithContext interface {
// ListWithContext should return a list type object; the Items field will be extracted, and the
// ResourceVersion field will be used to start the watch in the right place.
ListWithContext(ctx context.Context, options metav1.ListOptions) (runtime.Object, error)
}
func ToListerWithContext(l Lister) ListerWithContext {
if l, ok := l.(ListerWithContext); ok {
return l
}
return listerWrapper{
parent: l,
}
}
type listerWrapper struct {
parent Lister
}
func (l listerWrapper) ListWithContext(ctx context.Context, options metav1.ListOptions) (runtime.Object, error) {
return l.parent.List(options)
}
// Watcher is any object that knows how to start a watch on a resource.
//
// Ideally, all implementations of Watcher should also implement WatcherWithContext.
type Watcher interface {
// Watch should begin a watch at the specified version.
//
// If Watch returns an error, it should handle its own cleanup, including
// but not limited to calling Stop() on the watch, if one was constructed.
// This allows the caller to ignore the watch, if the error is non-nil.
//
// Deprecated: use WatcherWithContext.WatchWithContext instead.
Watch(options metav1.ListOptions) (watch.Interface, error)
}
// WatcherWithContext is any object that knows how to start a watch on a resource.
type WatcherWithContext interface {
// WatchWithContext should begin a watch at the specified version.
//
// If Watch returns an error, it should handle its own cleanup, including
// but not limited to calling Stop() on the watch, if one was constructed.
// This allows the caller to ignore the watch, if the error is non-nil.
WatchWithContext(ctx context.Context, options metav1.ListOptions) (watch.Interface, error)
}
func ToWatcherWithContext(w Watcher) WatcherWithContext {
if w, ok := w.(WatcherWithContext); ok {
return w
}
return watcherWrapper{
parent: w,
}
}
type watcherWrapper struct {
parent Watcher
}
func (l watcherWrapper) WatchWithContext(ctx context.Context, options metav1.ListOptions) (watch.Interface, error) {
return l.parent.Watch(options)
}
// ListerWatcher is any object that knows how to perform an initial list and start a watch on a resource.
//
// Ideally, all implementations of ListerWatcher should also implement ListerWatcherWithContext.
type ListerWatcher interface {
Lister
Watcher
}
// ListerWatcherWithContext is any object that knows how to perform an initial list and start a watch on a resource.
type ListerWatcherWithContext interface {
ListerWithContext
WatcherWithContext
}
func ToListerWatcherWithContext(lw ListerWatcher) ListerWatcherWithContext {
if lw, ok := lw.(ListerWatcherWithContext); ok {
return lw
}
return listerWatcherWrapper{
ListerWithContext: ToListerWithContext(lw),
WatcherWithContext: ToWatcherWithContext(lw),
}
}
type listerWatcherWrapper struct {
ListerWithContext
WatcherWithContext
}
// ListFunc knows how to list resources
//
// Deprecated: use ListWithContextFunc instead.
type ListFunc func(options metav1.ListOptions) (runtime.Object, error)
// ListWithContextFunc knows how to list resources
type ListWithContextFunc func(ctx context.Context, options metav1.ListOptions) (runtime.Object, error)
// WatchFunc knows how to watch resources
//
// Deprecated: use WatchFuncWithContext instead.
type WatchFunc func(options metav1.ListOptions) (watch.Interface, error)
// ListWatch knows how to list and watch a set of apiserver resources. It satisfies the ListerWatcher interface.
// WatchFuncWithContext knows how to watch resources
type WatchFuncWithContext func(ctx context.Context, options metav1.ListOptions) (watch.Interface, error)
// ListWatch knows how to list and watch a set of apiserver resources.
// It satisfies the ListerWatcher and ListerWatcherWithContext interfaces.
// It is a convenience function for users of NewReflector, etc.
// ListFunc and WatchFunc must not be nil
// ListFunc or ListWithContextFunc must be set. Same for WatchFunc and WatchFuncWithContext.
// ListWithContextFunc and WatchFuncWithContext are preferred if
// a context is available, otherwise ListFunc and WatchFunc.
//
// NewFilteredListWatchFromClient sets all of the functions to ensure that callers
// which only know about ListFunc and WatchFunc continue to work.
type ListWatch struct {
ListFunc ListFunc
// Deprecated: use ListWithContext instead.
ListFunc ListFunc
// Deprecated: use WatchWithContext instead.
WatchFunc WatchFunc
ListWithContextFunc ListWithContextFunc
WatchFuncWithContext WatchFuncWithContext
// DisableChunking requests no chunking for this list watcher.
DisableChunking bool
}
var (
_ ListerWatcher = &ListWatch{}
_ ListerWatcherWithContext = &ListWatch{}
)
// Getter interface knows how to access Get method from RESTClient.
type Getter interface {
Get() *restclient.Request
}
// NewListWatchFromClient creates a new ListWatch from the specified client, resource, namespace and field selector.
// For backward compatibility, all function fields are populated.
func NewListWatchFromClient(c Getter, resource string, namespace string, fieldSelector fields.Selector) *ListWatch {
optionsModifier := func(options *metav1.ListOptions) {
options.FieldSelector = fieldSelector.String()
@ -81,6 +191,7 @@ func NewListWatchFromClient(c Getter, resource string, namespace string, fieldSe
// NewFilteredListWatchFromClient creates a new ListWatch from the specified client, resource, namespace, and option modifier.
// Option modifier is a function takes a ListOptions and modifies the consumed ListOptions. Provide customized modifier function
// to apply modification to ListOptions with a field selector, a label selector, or any other desired options.
// For backward compatibility, all function fields are populated.
func NewFilteredListWatchFromClient(c Getter, resource string, namespace string, optionsModifier func(options *metav1.ListOptions)) *ListWatch {
listFunc := func(options metav1.ListOptions) (runtime.Object, error) {
optionsModifier(&options)
@ -88,7 +199,7 @@ func NewFilteredListWatchFromClient(c Getter, resource string, namespace string,
Namespace(namespace).
Resource(resource).
VersionedParams(&options, metav1.ParameterCodec).
Do(context.TODO()).
Do(context.Background()).
Get()
}
watchFunc := func(options metav1.ListOptions) (watch.Interface, error) {
@ -98,19 +209,70 @@ func NewFilteredListWatchFromClient(c Getter, resource string, namespace string,
Namespace(namespace).
Resource(resource).
VersionedParams(&options, metav1.ParameterCodec).
Watch(context.TODO())
Watch(context.Background())
}
listFuncWithContext := func(ctx context.Context, options metav1.ListOptions) (runtime.Object, error) {
optionsModifier(&options)
return c.Get().
Namespace(namespace).
Resource(resource).
VersionedParams(&options, metav1.ParameterCodec).
Do(ctx).
Get()
}
watchFuncWithContext := func(ctx context.Context, options metav1.ListOptions) (watch.Interface, error) {
options.Watch = true
optionsModifier(&options)
return c.Get().
Namespace(namespace).
Resource(resource).
VersionedParams(&options, metav1.ParameterCodec).
Watch(ctx)
}
return &ListWatch{
ListFunc: listFunc,
WatchFunc: watchFunc,
ListWithContextFunc: listFuncWithContext,
WatchFuncWithContext: watchFuncWithContext,
}
return &ListWatch{ListFunc: listFunc, WatchFunc: watchFunc}
}
// List a set of apiserver resources
//
// Deprecated: use ListWatchWithContext.ListWithContext instead.
func (lw *ListWatch) List(options metav1.ListOptions) (runtime.Object, error) {
// ListWatch is used in Reflector, which already supports pagination.
// Don't paginate here to avoid duplication.
if lw.ListFunc != nil {
return lw.ListFunc(options)
}
return lw.ListWithContextFunc(context.Background(), options)
}
// List a set of apiserver resources
func (lw *ListWatch) ListWithContext(ctx context.Context, options metav1.ListOptions) (runtime.Object, error) {
// ListWatch is used in Reflector, which already supports pagination.
// Don't paginate here to avoid duplication.
if lw.ListWithContextFunc != nil {
return lw.ListWithContextFunc(ctx, options)
}
return lw.ListFunc(options)
}
// Watch a set of apiserver resources
//
// Deprecated: use ListWatchWithContext.WatchWithContext instead.
func (lw *ListWatch) Watch(options metav1.ListOptions) (watch.Interface, error) {
if lw.WatchFunc != nil {
return lw.WatchFunc(options)
}
return lw.WatchFuncWithContext(context.Background(), options)
}
// Watch a set of apiserver resources
func (lw *ListWatch) WatchWithContext(ctx context.Context, options metav1.ListOptions) (watch.Interface, error) {
if lw.WatchFuncWithContext != nil {
return lw.WatchFuncWithContext(ctx, options)
}
return lw.WatchFunc(options)
}

View File

@ -60,7 +60,7 @@ type ResourceVersionComparator interface {
// If includeAdds is true, objects in the mutation cache will be returned even if they don't exist
// in the underlying store. This is only safe if your use of the cache can handle mutation entries
// remaining in the cache for up to ttl when mutations and deletes occur very closely in time.
func NewIntegerResourceVersionMutationCache(backingCache Store, indexer Indexer, ttl time.Duration, includeAdds bool) MutationCache {
func NewIntegerResourceVersionMutationCache(logger klog.Logger, backingCache Store, indexer Indexer, ttl time.Duration, includeAdds bool) MutationCache {
return &mutationCache{
backingCache: backingCache,
indexer: indexer,
@ -68,6 +68,7 @@ func NewIntegerResourceVersionMutationCache(backingCache Store, indexer Indexer,
comparator: etcdObjectVersioner{},
ttl: ttl,
includeAdds: includeAdds,
logger: logger,
}
}
@ -75,6 +76,7 @@ func NewIntegerResourceVersionMutationCache(backingCache Store, indexer Indexer,
// since you can't distinguish between, "didn't observe create" and "was deleted after create",
// if the key is missing from the backing cache, we always return it as missing
type mutationCache struct {
logger klog.Logger
lock sync.Mutex
backingCache Store
indexer Indexer
@ -157,7 +159,7 @@ func (c *mutationCache) ByIndex(name string, indexKey string) ([]interface{}, er
}
elements, err := fn(updated)
if err != nil {
klog.V(4).Infof("Unable to calculate an index entry for mutation cache entry %s: %v", key, err)
c.logger.V(4).Info("Unable to calculate an index entry for mutation cache entry", "key", key, "err", err)
continue
}
for _, inIndex := range elements {
@ -204,7 +206,7 @@ func (c *mutationCache) Mutation(obj interface{}) {
key, err := DeletionHandlingMetaNamespaceKeyFunc(obj)
if err != nil {
// this is a "nice to have", so failures shouldn't do anything weird
utilruntime.HandleError(err)
utilruntime.HandleErrorWithLogger(c.logger, err, "DeletionHandlingMetaNamespaceKeyFunc")
return
}

View File

@ -50,6 +50,7 @@ func NewCacheMutationDetector(name string) MutationDetector {
if !mutationDetectionEnabled {
return dummyMutationDetector{}
}
//nolint:logcheck // This code shouldn't be used in production.
klog.Warningln("Mutation detector is enabled, this will result in memory leakage.")
return &defaultCacheMutationDetector{name: name, period: 1 * time.Second, retainDuration: 2 * time.Minute}
}

View File

@ -55,6 +55,28 @@ var (
defaultMinWatchTimeout = 5 * time.Minute
)
// ReflectorStore is the subset of cache.Store that the reflector uses
type ReflectorStore interface {
// Add adds the given object to the accumulator associated with the given object's key
Add(obj interface{}) error
// Update updates the given object in the accumulator associated with the given object's key
Update(obj interface{}) error
// Delete deletes the given object from the accumulator associated with the given object's key
Delete(obj interface{}) error
// Replace will delete the contents of the store, using instead the
// given list. Store takes ownership of the list, you should not reference
// it after calling this function.
Replace([]interface{}, string) error
// Resync is meaningless in the terms appearing here but has
// meaning in some implementations that have non-trivial
// additional behavior (e.g., DeltaFIFO).
Resync() error
}
// Reflector watches a specified resource and causes all changes to be reflected in the given store.
type Reflector struct {
// name identifies this reflector. By default, it will be a file:line if possible.
@ -72,9 +94,9 @@ type Reflector struct {
// The GVK of the object we expect to place in the store if unstructured.
expectedGVK *schema.GroupVersionKind
// The destination to sync up with the watch source
store Store
store ReflectorStore
// listerWatcher is used to perform lists and watches.
listerWatcher ListerWatcher
listerWatcher ListerWatcherWithContext
// backoff manages backoff of ListWatch
backoffManager wait.BackoffManager
resyncPeriod time.Duration
@ -95,7 +117,7 @@ type Reflector struct {
// lastSyncResourceVersionMutex guards read/write access to lastSyncResourceVersion
lastSyncResourceVersionMutex sync.RWMutex
// Called whenever the ListAndWatch drops the connection with an error.
watchErrorHandler WatchErrorHandler
watchErrorHandler WatchErrorHandlerWithContext
// WatchListPageSize is the requested chunk size of initial and resync watch lists.
// If unset, for consistent reads (RV="") or reads that opt-into arbitrarily old data
// (RV="0") it will default to pager.PageSize, for the rest (RV != "" && RV != "0")
@ -150,20 +172,32 @@ type ResourceVersionUpdater interface {
// should be offloaded.
type WatchErrorHandler func(r *Reflector, err error)
// DefaultWatchErrorHandler is the default implementation of WatchErrorHandler
func DefaultWatchErrorHandler(r *Reflector, err error) {
// The WatchErrorHandler is called whenever ListAndWatch drops the
// connection with an error. After calling this handler, the informer
// will backoff and retry.
//
// The default implementation looks at the error type and tries to log
// the error message at an appropriate level.
//
// Implementations of this handler may display the error message in other
// ways. Implementations should return quickly - any expensive processing
// should be offloaded.
type WatchErrorHandlerWithContext func(ctx context.Context, r *Reflector, err error)
// DefaultWatchErrorHandler is the default implementation of WatchErrorHandlerWithContext.
func DefaultWatchErrorHandler(ctx context.Context, r *Reflector, err error) {
switch {
case isExpiredError(err):
// Don't set LastSyncResourceVersionUnavailable - LIST call with ResourceVersion=RV already
// has a semantic that it returns data at least as fresh as provided RV.
// So first try to LIST with setting RV to resource version of last observed object.
klog.V(4).Infof("%s: watch of %v closed with: %v", r.name, r.typeDescription, err)
klog.FromContext(ctx).V(4).Info("Watch closed", "reflector", r.name, "type", r.typeDescription, "err", err)
case err == io.EOF:
// watch closed normally
case err == io.ErrUnexpectedEOF:
klog.V(1).Infof("%s: Watch for %v closed with unexpected EOF: %v", r.name, r.typeDescription, err)
klog.FromContext(ctx).V(1).Info("Watch closed with unexpected EOF", "reflector", r.name, "type", r.typeDescription, "err", err)
default:
utilruntime.HandleError(fmt.Errorf("%s: Failed to watch %v: %v", r.name, r.typeDescription, err))
utilruntime.HandleErrorWithContext(ctx, err, "Failed to watch", "reflector", r.name, "type", r.typeDescription)
}
}
@ -177,13 +211,13 @@ func NewNamespaceKeyedIndexerAndReflector(lw ListerWatcher, expectedType interfa
// NewReflector creates a new Reflector with its name defaulted to the closest source_file.go:line in the call stack
// that is outside this package. See NewReflectorWithOptions for further information.
func NewReflector(lw ListerWatcher, expectedType interface{}, store Store, resyncPeriod time.Duration) *Reflector {
func NewReflector(lw ListerWatcher, expectedType interface{}, store ReflectorStore, resyncPeriod time.Duration) *Reflector {
return NewReflectorWithOptions(lw, expectedType, store, ReflectorOptions{ResyncPeriod: resyncPeriod})
}
// NewNamedReflector creates a new Reflector with the specified name. See NewReflectorWithOptions for further
// information.
func NewNamedReflector(name string, lw ListerWatcher, expectedType interface{}, store Store, resyncPeriod time.Duration) *Reflector {
func NewNamedReflector(name string, lw ListerWatcher, expectedType interface{}, store ReflectorStore, resyncPeriod time.Duration) *Reflector {
return NewReflectorWithOptions(lw, expectedType, store, ReflectorOptions{Name: name, ResyncPeriod: resyncPeriod})
}
@ -222,7 +256,7 @@ type ReflectorOptions struct {
// "yes". This enables you to use reflectors to periodically process
// everything as well as incrementally processing the things that
// change.
func NewReflectorWithOptions(lw ListerWatcher, expectedType interface{}, store Store, options ReflectorOptions) *Reflector {
func NewReflectorWithOptions(lw ListerWatcher, expectedType interface{}, store ReflectorStore, options ReflectorOptions) *Reflector {
reflectorClock := options.Clock
if reflectorClock == nil {
reflectorClock = clock.RealClock{}
@ -236,14 +270,14 @@ func NewReflectorWithOptions(lw ListerWatcher, expectedType interface{}, store S
resyncPeriod: options.ResyncPeriod,
minWatchTimeout: minWatchTimeout,
typeDescription: options.TypeDescription,
listerWatcher: lw,
listerWatcher: ToListerWatcherWithContext(lw),
store: store,
// We used to make the call every 1sec (1 QPS), the goal here is to achieve ~98% traffic reduction when
// API server is not healthy. With these parameters, backoff will stop at [30,60) sec interval which is
// 0.22 QPS. If we don't backoff for 2min, assume API server is healthy and we reset the backoff.
backoffManager: wait.NewExponentialBackoffManager(800*time.Millisecond, 30*time.Second, 2*time.Minute, 2.0, 1.0, reflectorClock),
clock: reflectorClock,
watchErrorHandler: WatchErrorHandler(DefaultWatchErrorHandler),
watchErrorHandler: WatchErrorHandlerWithContext(DefaultWatchErrorHandler),
expectedType: reflect.TypeOf(expectedType),
}
@ -309,14 +343,24 @@ var internalPackages = []string{"client-go/tools/cache/"}
// Run repeatedly uses the reflector's ListAndWatch to fetch all the
// objects and subsequent deltas.
// Run will exit when stopCh is closed.
//
// Contextual logging: RunWithContext should be used instead of Run in code which supports contextual logging.
func (r *Reflector) Run(stopCh <-chan struct{}) {
klog.V(3).Infof("Starting reflector %s (%s) from %s", r.typeDescription, r.resyncPeriod, r.name)
r.RunWithContext(wait.ContextForChannel(stopCh))
}
// RunWithContext repeatedly uses the reflector's ListAndWatch to fetch all the
// objects and subsequent deltas.
// Run will exit when the context is canceled.
func (r *Reflector) RunWithContext(ctx context.Context) {
logger := klog.FromContext(ctx)
logger.V(3).Info("Starting reflector", "type", r.typeDescription, "resyncPeriod", r.resyncPeriod, "reflector", r.name)
wait.BackoffUntil(func() {
if err := r.ListAndWatch(stopCh); err != nil {
r.watchErrorHandler(r, err)
if err := r.ListAndWatchWithContext(ctx); err != nil {
r.watchErrorHandler(ctx, r, err)
}
}, r.backoffManager, true, stopCh)
klog.V(3).Infof("Stopping reflector %s (%s) from %s", r.typeDescription, r.resyncPeriod, r.name)
}, r.backoffManager, true, ctx.Done())
logger.V(3).Info("Stopping reflector", "type", r.typeDescription, "resyncPeriod", r.resyncPeriod, "reflector", r.name)
}
var (
@ -345,21 +389,31 @@ func (r *Reflector) resyncChan() (<-chan time.Time, func() bool) {
// ListAndWatch first lists all items and get the resource version at the moment of call,
// and then use the resource version to watch.
// It returns error if ListAndWatch didn't even try to initialize watch.
//
// Contextual logging: ListAndWatchWithContext should be used instead of ListAndWatch in code which supports contextual logging.
func (r *Reflector) ListAndWatch(stopCh <-chan struct{}) error {
klog.V(3).Infof("Listing and watching %v from %s", r.typeDescription, r.name)
return r.ListAndWatchWithContext(wait.ContextForChannel(stopCh))
}
// ListAndWatchWithContext first lists all items and get the resource version at the moment of call,
// and then use the resource version to watch.
// It returns error if ListAndWatchWithContext didn't even try to initialize watch.
func (r *Reflector) ListAndWatchWithContext(ctx context.Context) error {
logger := klog.FromContext(ctx)
logger.V(3).Info("Listing and watching", "type", r.typeDescription, "reflector", r.name)
var err error
var w watch.Interface
useWatchList := ptr.Deref(r.UseWatchList, false)
fallbackToList := !useWatchList
if useWatchList {
w, err = r.watchList(stopCh)
w, err = r.watchList(ctx)
if w == nil && err == nil {
// stopCh was closed
return nil
}
if err != nil {
klog.Warningf("The watchlist request ended with an error, falling back to the standard LIST/WATCH semantics because making progress is better than deadlocking, err = %v", err)
logger.Error(err, "The watchlist request ended with an error, falling back to the standard LIST/WATCH semantics because making progress is better than deadlocking")
fallbackToList = true
// ensure that we won't accidentally pass some garbage down the watch.
w = nil
@ -367,20 +421,21 @@ func (r *Reflector) ListAndWatch(stopCh <-chan struct{}) error {
}
if fallbackToList {
err = r.list(stopCh)
err = r.list(ctx)
if err != nil {
return err
}
}
klog.V(2).Infof("Caches populated for %v from %s", r.typeDescription, r.name)
return r.watchWithResync(w, stopCh)
logger.V(2).Info("Caches populated", "type", r.typeDescription, "reflector", r.name)
return r.watchWithResync(ctx, w)
}
// startResync periodically calls r.store.Resync() method.
// Note that this method is blocking and should be
// called in a separate goroutine.
func (r *Reflector) startResync(stopCh <-chan struct{}, cancelCh <-chan struct{}, resyncerrc chan error) {
func (r *Reflector) startResync(ctx context.Context, resyncerrc chan error) {
logger := klog.FromContext(ctx)
resyncCh, cleanup := r.resyncChan()
defer func() {
cleanup() // Call the last one written into cleanup
@ -388,13 +443,11 @@ func (r *Reflector) startResync(stopCh <-chan struct{}, cancelCh <-chan struct{}
for {
select {
case <-resyncCh:
case <-stopCh:
return
case <-cancelCh:
case <-ctx.Done():
return
}
if r.ShouldResync == nil || r.ShouldResync() {
klog.V(4).Infof("%s: forcing resync", r.name)
logger.V(4).Info("Forcing resync", "reflector", r.name)
if err := r.store.Resync(); err != nil {
resyncerrc <- err
return
@ -406,16 +459,27 @@ func (r *Reflector) startResync(stopCh <-chan struct{}, cancelCh <-chan struct{}
}
// watchWithResync runs watch with startResync in the background.
func (r *Reflector) watchWithResync(w watch.Interface, stopCh <-chan struct{}) error {
func (r *Reflector) watchWithResync(ctx context.Context, w watch.Interface) error {
resyncerrc := make(chan error, 1)
cancelCh := make(chan struct{})
defer close(cancelCh)
go r.startResync(stopCh, cancelCh, resyncerrc)
return r.watch(w, stopCh, resyncerrc)
cancelCtx, cancel := context.WithCancel(ctx)
// Waiting for completion of the goroutine is relevant for race detector.
// Without this, there is a race between "this function returns + code
// waiting for it" and "goroutine does something".
var wg wait.Group
defer func() {
cancel()
wg.Wait()
}()
wg.Start(func() {
r.startResync(cancelCtx, resyncerrc)
})
return r.watch(ctx, w, resyncerrc)
}
// watch simply starts a watch request with the server.
func (r *Reflector) watch(w watch.Interface, stopCh <-chan struct{}, resyncerrc chan error) error {
func (r *Reflector) watch(ctx context.Context, w watch.Interface, resyncerrc chan error) error {
stopCh := ctx.Done()
logger := klog.FromContext(ctx)
var err error
retry := NewRetryWithDeadline(r.MaxInternalErrorRetryDuration, time.Minute, apierrors.IsInternalError, r.clock)
@ -448,10 +512,10 @@ func (r *Reflector) watch(w watch.Interface, stopCh <-chan struct{}, resyncerrc
AllowWatchBookmarks: true,
}
w, err = r.listerWatcher.Watch(options)
w, err = r.listerWatcher.WatchWithContext(ctx, options)
if err != nil {
if canRetry := isWatchErrorRetriable(err); canRetry {
klog.V(4).Infof("%s: watch of %v returned %v - backing off", r.name, r.typeDescription, err)
logger.V(4).Info("Watch failed - backing off", "reflector", r.name, "type", r.typeDescription, "err", err)
select {
case <-stopCh:
return nil
@ -463,8 +527,8 @@ func (r *Reflector) watch(w watch.Interface, stopCh <-chan struct{}, resyncerrc
}
}
err = handleWatch(start, w, r.store, r.expectedType, r.expectedGVK, r.name, r.typeDescription, r.setLastSyncResourceVersion,
r.clock, resyncerrc, stopCh)
err = handleWatch(ctx, start, w, r.store, r.expectedType, r.expectedGVK, r.name, r.typeDescription, r.setLastSyncResourceVersion,
r.clock, resyncerrc)
// Ensure that watch will not be reused across iterations.
w.Stop()
w = nil
@ -476,9 +540,9 @@ func (r *Reflector) watch(w watch.Interface, stopCh <-chan struct{}, resyncerrc
// Don't set LastSyncResourceVersionUnavailable - LIST call with ResourceVersion=RV already
// has a semantic that it returns data at least as fresh as provided RV.
// So first try to LIST with setting RV to resource version of last observed object.
klog.V(4).Infof("%s: watch of %v closed with: %v", r.name, r.typeDescription, err)
logger.V(4).Info("Watch closed", "reflector", r.name, "type", r.typeDescription, "err", err)
case apierrors.IsTooManyRequests(err):
klog.V(2).Infof("%s: watch of %v returned 429 - backing off", r.name, r.typeDescription)
logger.V(2).Info("Watch returned 429 - backing off", "reflector", r.name, "type", r.typeDescription)
select {
case <-stopCh:
return nil
@ -486,10 +550,10 @@ func (r *Reflector) watch(w watch.Interface, stopCh <-chan struct{}, resyncerrc
continue
}
case apierrors.IsInternalError(err) && retry.ShouldRetry():
klog.V(2).Infof("%s: retrying watch of %v internal error: %v", r.name, r.typeDescription, err)
logger.V(2).Info("Retrying watch after internal error", "reflector", r.name, "type", r.typeDescription, "err", err)
continue
default:
klog.Warningf("%s: watch of %v ended with: %v", r.name, r.typeDescription, err)
logger.Info("Warning: watch ended with error", "reflector", r.name, "type", r.typeDescription, "err", err)
}
}
return nil
@ -499,7 +563,7 @@ func (r *Reflector) watch(w watch.Interface, stopCh <-chan struct{}, resyncerrc
// list simply lists all items and records a resource version obtained from the server at the moment of the call.
// the resource version can be used for further progress notification (aka. watch).
func (r *Reflector) list(stopCh <-chan struct{}) error {
func (r *Reflector) list(ctx context.Context) error {
var resourceVersion string
options := metav1.ListOptions{ResourceVersion: r.relistResourceVersion()}
@ -519,7 +583,7 @@ func (r *Reflector) list(stopCh <-chan struct{}) error {
// Attempt to gather list in chunks, if supported by listerWatcher, if not, the first
// list request will return the full response.
pager := pager.New(pager.SimplePageFunc(func(opts metav1.ListOptions) (runtime.Object, error) {
return r.listerWatcher.List(opts)
return r.listerWatcher.ListWithContext(ctx, opts)
}))
switch {
case r.WatchListPageSize != 0:
@ -558,7 +622,7 @@ func (r *Reflector) list(stopCh <-chan struct{}) error {
close(listCh)
}()
select {
case <-stopCh:
case <-ctx.Done():
return nil
case r := <-panicCh:
panic(r)
@ -566,7 +630,6 @@ func (r *Reflector) list(stopCh <-chan struct{}) error {
}
initTrace.Step("Objects listed", trace.Field{Key: "error", Value: err})
if err != nil {
klog.Warningf("%s: failed to list %v: %v", r.name, r.typeDescription, err)
return fmt.Errorf("failed to list %v: %w", r.typeDescription, err)
}
@ -624,7 +687,9 @@ func (r *Reflector) list(stopCh <-chan struct{}) error {
// After receiving a "Bookmark" event the reflector is considered to be synchronized.
// It replaces its internal store with the collected items and
// reuses the current watch requests for getting further events.
func (r *Reflector) watchList(stopCh <-chan struct{}) (watch.Interface, error) {
func (r *Reflector) watchList(ctx context.Context) (watch.Interface, error) {
stopCh := ctx.Done()
logger := klog.FromContext(ctx)
var w watch.Interface
var err error
var temporaryStore Store
@ -634,7 +699,7 @@ func (r *Reflector) watchList(stopCh <-chan struct{}) (watch.Interface, error) {
// could be unified with the r.watch method
isErrorRetriableWithSideEffectsFn := func(err error) bool {
if canRetry := isWatchErrorRetriable(err); canRetry {
klog.V(2).Infof("%s: watch-list of %v returned %v - backing off", r.name, r.typeDescription, err)
logger.V(2).Info("watch-list failed - backing off", "reflector", r.name, "type", r.typeDescription, "err", err)
<-r.backoffManager.Backoff().C()
return true
}
@ -674,16 +739,16 @@ func (r *Reflector) watchList(stopCh <-chan struct{}) (watch.Interface, error) {
}
start := r.clock.Now()
w, err = r.listerWatcher.Watch(options)
w, err = r.listerWatcher.WatchWithContext(ctx, options)
if err != nil {
if isErrorRetriableWithSideEffectsFn(err) {
continue
}
return nil, err
}
watchListBookmarkReceived, err := handleListWatch(start, w, temporaryStore, r.expectedType, r.expectedGVK, r.name, r.typeDescription,
watchListBookmarkReceived, err := handleListWatch(ctx, start, w, temporaryStore, r.expectedType, r.expectedGVK, r.name, r.typeDescription,
func(rv string) { resourceVersion = rv },
r.clock, make(chan error), stopCh)
r.clock, make(chan error))
if err != nil {
w.Stop() // stop and retry with clean state
if errors.Is(err, errorStopRequested) {
@ -706,7 +771,7 @@ func (r *Reflector) watchList(stopCh <-chan struct{}) (watch.Interface, error) {
// we utilize the temporaryStore to ensure independence from the current store implementation.
// as of today, the store is implemented as a queue and will be drained by the higher-level
// component as soon as it finishes replacing the content.
checkWatchListDataConsistencyIfRequested(wait.ContextForChannel(stopCh), r.name, resourceVersion, wrapListFuncWithContext(r.listerWatcher.List), temporaryStore.List)
checkWatchListDataConsistencyIfRequested(ctx, r.name, resourceVersion, r.listerWatcher.ListWithContext, temporaryStore.List)
if err := r.store.Replace(temporaryStore.List(), resourceVersion); err != nil {
return nil, fmt.Errorf("unable to sync watch-list result: %w", err)
@ -731,6 +796,7 @@ func (r *Reflector) syncWith(items []runtime.Object, resourceVersion string) err
// retry. If successful, the watcher will be left open after receiving the
// initial set of objects, to allow watching for future events.
func handleListWatch(
ctx context.Context,
start time.Time,
w watch.Interface,
store Store,
@ -741,20 +807,20 @@ func handleListWatch(
setLastSyncResourceVersion func(string),
clock clock.Clock,
errCh chan error,
stopCh <-chan struct{},
) (bool, error) {
exitOnWatchListBookmarkReceived := true
return handleAnyWatch(start, w, store, expectedType, expectedGVK, name, expectedTypeName,
setLastSyncResourceVersion, exitOnWatchListBookmarkReceived, clock, errCh, stopCh)
return handleAnyWatch(ctx, start, w, store, expectedType, expectedGVK, name, expectedTypeName,
setLastSyncResourceVersion, exitOnWatchListBookmarkReceived, clock, errCh)
}
// handleListWatch consumes events from w, updates the Store, and records the
// last seen ResourceVersion, to allow continuing from that ResourceVersion on
// retry. The watcher will always be stopped on exit.
func handleWatch(
ctx context.Context,
start time.Time,
w watch.Interface,
store Store,
store ReflectorStore,
expectedType reflect.Type,
expectedGVK *schema.GroupVersionKind,
name string,
@ -762,11 +828,10 @@ func handleWatch(
setLastSyncResourceVersion func(string),
clock clock.Clock,
errCh chan error,
stopCh <-chan struct{},
) error {
exitOnWatchListBookmarkReceived := false
_, err := handleAnyWatch(start, w, store, expectedType, expectedGVK, name, expectedTypeName,
setLastSyncResourceVersion, exitOnWatchListBookmarkReceived, clock, errCh, stopCh)
_, err := handleAnyWatch(ctx, start, w, store, expectedType, expectedGVK, name, expectedTypeName,
setLastSyncResourceVersion, exitOnWatchListBookmarkReceived, clock, errCh)
return err
}
@ -779,9 +844,11 @@ func handleWatch(
// The watcher will always be stopped, unless exitOnWatchListBookmarkReceived is
// true and watchListBookmarkReceived is true. This allows the same watch stream
// to be re-used by the caller to continue watching for new events.
func handleAnyWatch(start time.Time,
func handleAnyWatch(
ctx context.Context,
start time.Time,
w watch.Interface,
store Store,
store ReflectorStore,
expectedType reflect.Type,
expectedGVK *schema.GroupVersionKind,
name string,
@ -790,17 +857,17 @@ func handleAnyWatch(start time.Time,
exitOnWatchListBookmarkReceived bool,
clock clock.Clock,
errCh chan error,
stopCh <-chan struct{},
) (bool, error) {
watchListBookmarkReceived := false
eventCount := 0
initialEventsEndBookmarkWarningTicker := newInitialEventsEndBookmarkTicker(name, clock, start, exitOnWatchListBookmarkReceived)
logger := klog.FromContext(ctx)
initialEventsEndBookmarkWarningTicker := newInitialEventsEndBookmarkTicker(logger, name, clock, start, exitOnWatchListBookmarkReceived)
defer initialEventsEndBookmarkWarningTicker.Stop()
loop:
for {
select {
case <-stopCh:
case <-ctx.Done():
return watchListBookmarkReceived, errorStopRequested
case err := <-errCh:
return watchListBookmarkReceived, err
@ -813,19 +880,19 @@ loop:
}
if expectedType != nil {
if e, a := expectedType, reflect.TypeOf(event.Object); e != a {
utilruntime.HandleError(fmt.Errorf("%s: expected type %v, but watch event object had type %v", name, e, a))
utilruntime.HandleErrorWithContext(ctx, nil, "Unexpected watch event object type", "reflector", name, "expectedType", e, "actualType", a)
continue
}
}
if expectedGVK != nil {
if e, a := *expectedGVK, event.Object.GetObjectKind().GroupVersionKind(); e != a {
utilruntime.HandleError(fmt.Errorf("%s: expected gvk %v, but watch event object had gvk %v", name, e, a))
utilruntime.HandleErrorWithContext(ctx, nil, "Unexpected watch event object gvk", "reflector", name, "expectedGVK", e, "actualGVK", a)
continue
}
}
meta, err := meta.Accessor(event.Object)
if err != nil {
utilruntime.HandleError(fmt.Errorf("%s: unable to understand watch event %#v", name, event))
utilruntime.HandleErrorWithContext(ctx, err, "Unable to understand watch event", "reflector", name, "event", event)
continue
}
resourceVersion := meta.GetResourceVersion()
@ -833,12 +900,12 @@ loop:
case watch.Added:
err := store.Add(event.Object)
if err != nil {
utilruntime.HandleError(fmt.Errorf("%s: unable to add watch event object (%#v) to store: %v", name, event.Object, err))
utilruntime.HandleErrorWithContext(ctx, err, "Unable to add watch event object to store", "reflector", name, "object", event.Object)
}
case watch.Modified:
err := store.Update(event.Object)
if err != nil {
utilruntime.HandleError(fmt.Errorf("%s: unable to update watch event object (%#v) to store: %v", name, event.Object, err))
utilruntime.HandleErrorWithContext(ctx, err, "Unable to update watch event object to store", "reflector", name, "object", event.Object)
}
case watch.Deleted:
// TODO: Will any consumers need access to the "last known
@ -846,7 +913,7 @@ loop:
// to change this.
err := store.Delete(event.Object)
if err != nil {
utilruntime.HandleError(fmt.Errorf("%s: unable to delete watch event object (%#v) from store: %v", name, event.Object, err))
utilruntime.HandleErrorWithContext(ctx, err, "Unable to delete watch event object from store", "reflector", name, "object", event.Object)
}
case watch.Bookmark:
// A `Bookmark` means watch has synced here, just update the resourceVersion
@ -854,7 +921,7 @@ loop:
watchListBookmarkReceived = true
}
default:
utilruntime.HandleError(fmt.Errorf("%s: unable to understand watch event %#v", name, event))
utilruntime.HandleErrorWithContext(ctx, err, "Unknown watch event", "reflector", name, "event", event)
}
setLastSyncResourceVersion(resourceVersion)
if rvu, ok := store.(ResourceVersionUpdater); ok {
@ -863,7 +930,7 @@ loop:
eventCount++
if exitOnWatchListBookmarkReceived && watchListBookmarkReceived {
watchDuration := clock.Since(start)
klog.V(4).Infof("exiting %v Watch because received the bookmark that marks the end of initial events stream, total %v items received in %v", name, eventCount, watchDuration)
klog.FromContext(ctx).V(4).Info("Exiting watch because received the bookmark that marks the end of initial events stream", "reflector", name, "totalItems", eventCount, "duration", watchDuration)
return watchListBookmarkReceived, nil
}
initialEventsEndBookmarkWarningTicker.observeLastEventTimeStamp(clock.Now())
@ -876,7 +943,7 @@ loop:
if watchDuration < 1*time.Second && eventCount == 0 {
return watchListBookmarkReceived, fmt.Errorf("very short watch: %s: Unexpected watch close - watch lasted less than a second and no items received", name)
}
klog.V(4).Infof("%s: Watch close - %v total %v items received", name, expectedTypeName, eventCount)
klog.FromContext(ctx).V(4).Info("Watch close", "reflector", name, "type", expectedTypeName, "totalItems", eventCount)
return watchListBookmarkReceived, nil
}
@ -990,13 +1057,6 @@ func isWatchErrorRetriable(err error) bool {
return false
}
// wrapListFuncWithContext simply wraps ListFunction into another function that accepts a context and ignores it.
func wrapListFuncWithContext(listFn ListFunc) func(ctx context.Context, options metav1.ListOptions) (runtime.Object, error) {
return func(_ context.Context, options metav1.ListOptions) (runtime.Object, error) {
return listFn(options)
}
}
// initialEventsEndBookmarkTicker a ticker that produces a warning if the bookmark event
// which marks the end of the watch stream, has not been received within the defined tick interval.
//
@ -1004,8 +1064,9 @@ func wrapListFuncWithContext(listFn ListFunc) func(ctx context.Context, options
// The methods exposed by this type are not thread-safe.
type initialEventsEndBookmarkTicker struct {
clock.Ticker
clock clock.Clock
name string
clock clock.Clock
name string
logger klog.Logger
watchStart time.Time
tickInterval time.Duration
@ -1019,15 +1080,15 @@ type initialEventsEndBookmarkTicker struct {
// Note that the caller controls whether to call t.C() and t.Stop().
//
// In practice, the reflector exits the watchHandler as soon as the bookmark event is received and calls the t.C() method.
func newInitialEventsEndBookmarkTicker(name string, c clock.Clock, watchStart time.Time, exitOnWatchListBookmarkReceived bool) *initialEventsEndBookmarkTicker {
return newInitialEventsEndBookmarkTickerInternal(name, c, watchStart, 10*time.Second, exitOnWatchListBookmarkReceived)
func newInitialEventsEndBookmarkTicker(logger klog.Logger, name string, c clock.Clock, watchStart time.Time, exitOnWatchListBookmarkReceived bool) *initialEventsEndBookmarkTicker {
return newInitialEventsEndBookmarkTickerInternal(logger, name, c, watchStart, 10*time.Second, exitOnWatchListBookmarkReceived)
}
func newInitialEventsEndBookmarkTickerInternal(name string, c clock.Clock, watchStart time.Time, tickInterval time.Duration, exitOnWatchListBookmarkReceived bool) *initialEventsEndBookmarkTicker {
func newInitialEventsEndBookmarkTickerInternal(logger klog.Logger, name string, c clock.Clock, watchStart time.Time, tickInterval time.Duration, exitOnWatchListBookmarkReceived bool) *initialEventsEndBookmarkTicker {
clockWithTicker, ok := c.(clock.WithTicker)
if !ok || !exitOnWatchListBookmarkReceived {
if exitOnWatchListBookmarkReceived {
klog.Warningf("clock does not support WithTicker interface but exitOnInitialEventsEndBookmark was requested")
logger.Info("Warning: clock does not support WithTicker interface but exitOnInitialEventsEndBookmark was requested")
}
return &initialEventsEndBookmarkTicker{
Ticker: &noopTicker{},
@ -1038,6 +1099,7 @@ func newInitialEventsEndBookmarkTickerInternal(name string, c clock.Clock, watch
Ticker: clockWithTicker.NewTicker(tickInterval),
clock: c,
name: name,
logger: logger,
watchStart: watchStart,
tickInterval: tickInterval,
}
@ -1049,7 +1111,7 @@ func (t *initialEventsEndBookmarkTicker) observeLastEventTimeStamp(lastEventObse
func (t *initialEventsEndBookmarkTicker) warnIfExpired() {
if err := t.produceWarningIfExpired(); err != nil {
klog.Warning(err)
t.logger.Info("Warning: event bookmark expired", "err", err)
}
}

View File

@ -17,6 +17,7 @@ limitations under the License.
package cache
import (
"context"
"errors"
"fmt"
"sync"
@ -29,6 +30,7 @@ import (
"k8s.io/client-go/tools/cache/synctrack"
"k8s.io/utils/buffer"
"k8s.io/utils/clock"
"k8s.io/utils/ptr"
"k8s.io/klog/v2"
@ -142,6 +144,8 @@ type SharedInformer interface {
// It returns a registration handle for the handler that can be used to
// remove the handler again, or to tell if the handler is synced (has
// seen every item in the initial list).
//
// Contextual logging: AddEventHandlerWithOptions together with a logger in the options should be used instead of AddEventHandler in code which supports contextual logging.
AddEventHandler(handler ResourceEventHandler) (ResourceEventHandlerRegistration, error)
// AddEventHandlerWithResyncPeriod adds an event handler to the
// shared informer with the requested resync period; zero means
@ -159,7 +163,12 @@ type SharedInformer interface {
// be competing load and scheduling noise.
// It returns a registration handle for the handler that can be used to remove
// the handler again and an error if the handler cannot be added.
//
// Contextual logging: AddEventHandlerWithOptions together with a logger in the options should be used instead of AddEventHandlerWithResyncPeriod in code which supports contextual logging.
AddEventHandlerWithResyncPeriod(handler ResourceEventHandler, resyncPeriod time.Duration) (ResourceEventHandlerRegistration, error)
// AddEventHandlerWithOptions is a variant of AddEventHandlerWithResyncPeriod where
// all optional parameters are passed in a struct.
AddEventHandlerWithOptions(handler ResourceEventHandler, options HandlerOptions) (ResourceEventHandlerRegistration, error)
// RemoveEventHandler removes a formerly added event handler given by
// its registration handle.
// This function is guaranteed to be idempotent, and thread-safe.
@ -170,7 +179,12 @@ type SharedInformer interface {
GetController() Controller
// Run starts and runs the shared informer, returning after it stops.
// The informer will be stopped when stopCh is closed.
//
// Contextual logging: RunWithContext should be used instead of Run in code which uses contextual logging.
Run(stopCh <-chan struct{})
// RunWithContext starts and runs the shared informer, returning after it stops.
// The informer will be stopped when the context is canceled.
RunWithContext(ctx context.Context)
// HasSynced returns true if the shared informer's store has been
// informed by at least one full LIST of the authoritative state
// of the informer's object collection. This is unrelated to "resync".
@ -197,8 +211,14 @@ type SharedInformer interface {
// The handler is intended for visibility, not to e.g. pause the consumers.
// The handler should return quickly - any expensive processing should be
// offloaded.
//
// Contextual logging: SetWatchErrorHandlerWithContext should be used instead of SetWatchErrorHandler in code which supports contextual logging.
SetWatchErrorHandler(handler WatchErrorHandler) error
// SetWatchErrorHandlerWithContext is a variant of SetWatchErrorHandler where
// the handler is passed an additional context parameter.
SetWatchErrorHandlerWithContext(handler WatchErrorHandlerWithContext) error
// The TransformFunc is called for each object which is about to be stored.
//
// This function is intended for you to take the opportunity to
@ -228,6 +248,21 @@ type ResourceEventHandlerRegistration interface {
HasSynced() bool
}
// Optional configuration options for [SharedInformer.AddEventHandlerWithOptions].
// May be left empty.
type HandlerOptions struct {
// Logger overrides the default klog.Background() logger.
Logger *klog.Logger
// ResyncPeriod requests a certain resync period from an informer. Zero
// means the handler does not care about resyncs. Not all informers do
// resyncs, even if requested. See
// [SharedInformer.AddEventHandlerWithResyncPeriod] for details.
//
// If nil, the default resync period of the shared informer is used.
ResyncPeriod *time.Duration
}
// SharedIndexInformer provides add and get Indexers ability based on SharedInformer.
type SharedIndexInformer interface {
SharedInformer
@ -309,15 +344,38 @@ const (
// WaitForNamedCacheSync is a wrapper around WaitForCacheSync that generates log messages
// indicating that the caller identified by name is waiting for syncs, followed by
// either a successful or failed sync.
//
// Contextual logging: WaitForNamedCacheSyncWithContext should be used instead of WaitForNamedCacheSync in code which supports contextual logging.
func WaitForNamedCacheSync(controllerName string, stopCh <-chan struct{}, cacheSyncs ...InformerSynced) bool {
klog.Infof("Waiting for caches to sync for %s", controllerName)
klog.Background().Info("Waiting for caches to sync", "controller", controllerName)
if !WaitForCacheSync(stopCh, cacheSyncs...) {
utilruntime.HandleError(fmt.Errorf("unable to sync caches for %s", controllerName))
utilruntime.HandleErrorWithContext(context.Background(), nil, "Unable to sync caches", "controller", controllerName)
return false
}
klog.Infof("Caches are synced for %s", controllerName)
klog.Background().Info("Caches are synced", "controller", controllerName)
return true
}
// WaitForNamedCacheSyncWithContext is a wrapper around WaitForCacheSyncWithContext that generates log messages
// indicating that the caller is waiting for syncs, followed by either a successful or failed sync.
//
// Contextual logging can be used to identify the caller in those log messages. The log level is zero,
// the same as in [WaitForNamedCacheSync]. If this is too verbose, then store a logger with an increased
// threshold in the context:
//
// WaitForNamedCacheSyncWithContext(klog.NewContext(ctx, logger.V(5)), ...)
func WaitForNamedCacheSyncWithContext(ctx context.Context, cacheSyncs ...InformerSynced) bool {
logger := klog.FromContext(ctx)
logger.Info("Waiting for caches to sync")
if !WaitForCacheSync(ctx.Done(), cacheSyncs...) {
utilruntime.HandleErrorWithContext(ctx, nil, "Unable to sync caches")
return false
}
logger.Info("Caches are synced")
return true
}
@ -389,7 +447,7 @@ type sharedIndexInformer struct {
blockDeltas sync.Mutex
// Called whenever the ListAndWatch drops the connection with an error.
watchErrorHandler WatchErrorHandler
watchErrorHandler WatchErrorHandlerWithContext
transform TransformFunc
}
@ -403,6 +461,9 @@ type dummyController struct {
informer *sharedIndexInformer
}
func (v *dummyController) RunWithContext(context.Context) {
}
func (v *dummyController) Run(stopCh <-chan struct{}) {
}
@ -433,6 +494,12 @@ type deleteNotification struct {
}
func (s *sharedIndexInformer) SetWatchErrorHandler(handler WatchErrorHandler) error {
return s.SetWatchErrorHandlerWithContext(func(_ context.Context, r *Reflector, err error) {
handler(r, err)
})
}
func (s *sharedIndexInformer) SetWatchErrorHandlerWithContext(handler WatchErrorHandlerWithContext) error {
s.startedLock.Lock()
defer s.startedLock.Unlock()
@ -457,10 +524,15 @@ func (s *sharedIndexInformer) SetTransform(handler TransformFunc) error {
}
func (s *sharedIndexInformer) Run(stopCh <-chan struct{}) {
defer utilruntime.HandleCrash()
s.RunWithContext(wait.ContextForChannel(stopCh))
}
func (s *sharedIndexInformer) RunWithContext(ctx context.Context) {
defer utilruntime.HandleCrashWithContext(ctx)
logger := klog.FromContext(ctx)
if s.HasStarted() {
klog.Warningf("The sharedIndexInformer has started, run more than once is not allowed")
logger.Info("Warning: the sharedIndexInformer has started, run more than once is not allowed")
return
}
@ -468,11 +540,16 @@ func (s *sharedIndexInformer) Run(stopCh <-chan struct{}) {
s.startedLock.Lock()
defer s.startedLock.Unlock()
fifo := NewDeltaFIFOWithOptions(DeltaFIFOOptions{
KnownObjects: s.indexer,
EmitDeltaTypeReplaced: true,
Transformer: s.transform,
})
var fifo Queue
if clientgofeaturegate.FeatureGates().Enabled(clientgofeaturegate.InOrderInformers) {
fifo = NewRealFIFO(MetaNamespaceKeyFunc, s.indexer, s.transform)
} else {
fifo = NewDeltaFIFOWithOptions(DeltaFIFOOptions{
KnownObjects: s.indexer,
EmitDeltaTypeReplaced: true,
Transformer: s.transform,
})
}
cfg := &Config{
Queue: fifo,
@ -480,11 +557,10 @@ func (s *sharedIndexInformer) Run(stopCh <-chan struct{}) {
ObjectType: s.objectType,
ObjectDescription: s.objectDescription,
FullResyncPeriod: s.resyncCheckPeriod,
RetryOnError: false,
ShouldResync: s.processor.shouldResync,
Process: s.HandleDeltas,
WatchErrorHandler: s.watchErrorHandler,
Process: s.HandleDeltas,
WatchErrorHandlerWithContext: s.watchErrorHandler,
}
s.controller = New(cfg)
@ -492,20 +568,24 @@ func (s *sharedIndexInformer) Run(stopCh <-chan struct{}) {
s.started = true
}()
// Separate stop channel because Processor should be stopped strictly after controller
processorStopCh := make(chan struct{})
// Separate stop context because Processor should be stopped strictly after controller.
// Cancelation in the parent context is ignored and all values are passed on,
// including - but not limited to - a logger.
processorStopCtx, stopProcessor := context.WithCancelCause(context.WithoutCancel(ctx))
var wg wait.Group
defer wg.Wait() // Wait for Processor to stop
defer close(processorStopCh) // Tell Processor to stop
wg.StartWithChannel(processorStopCh, s.cacheMutationDetector.Run)
wg.StartWithChannel(processorStopCh, s.processor.run)
defer wg.Wait() // Wait for Processor to stop
defer stopProcessor(errors.New("informer is stopping")) // Tell Processor to stop
// TODO: extend the MutationDetector interface so that it optionally
// has a RunWithContext method that we can use here.
wg.StartWithChannel(processorStopCtx.Done(), s.cacheMutationDetector.Run)
wg.StartWithContext(processorStopCtx, s.processor.run)
defer func() {
s.startedLock.Lock()
defer s.startedLock.Unlock()
s.stopped = true // Don't want any new listeners
}()
s.controller.Run(stopCh)
s.controller.RunWithContext(ctx)
}
func (s *sharedIndexInformer) HasStarted() bool {
@ -558,19 +638,19 @@ func (s *sharedIndexInformer) GetController() Controller {
}
func (s *sharedIndexInformer) AddEventHandler(handler ResourceEventHandler) (ResourceEventHandlerRegistration, error) {
return s.AddEventHandlerWithResyncPeriod(handler, s.defaultEventHandlerResyncPeriod)
return s.AddEventHandlerWithOptions(handler, HandlerOptions{})
}
func determineResyncPeriod(desired, check time.Duration) time.Duration {
func determineResyncPeriod(logger klog.Logger, desired, check time.Duration) time.Duration {
if desired == 0 {
return desired
}
if check == 0 {
klog.Warningf("The specified resyncPeriod %v is invalid because this shared informer doesn't support resyncing", desired)
logger.Info("Warning: the specified resyncPeriod is invalid because this shared informer doesn't support resyncing", "desired", desired)
return 0
}
if desired < check {
klog.Warningf("The specified resyncPeriod %v is being increased to the minimum resyncCheckPeriod %v", desired, check)
logger.Info("Warning: the specified resyncPeriod is being increased to the minimum resyncCheckPeriod", "desired", desired, "resyncCheckPeriod", check)
return check
}
return desired
@ -579,6 +659,10 @@ func determineResyncPeriod(desired, check time.Duration) time.Duration {
const minimumResyncPeriod = 1 * time.Second
func (s *sharedIndexInformer) AddEventHandlerWithResyncPeriod(handler ResourceEventHandler, resyncPeriod time.Duration) (ResourceEventHandlerRegistration, error) {
return s.AddEventHandlerWithOptions(handler, HandlerOptions{ResyncPeriod: &resyncPeriod})
}
func (s *sharedIndexInformer) AddEventHandlerWithOptions(handler ResourceEventHandler, options HandlerOptions) (ResourceEventHandlerRegistration, error) {
s.startedLock.Lock()
defer s.startedLock.Unlock()
@ -586,27 +670,30 @@ func (s *sharedIndexInformer) AddEventHandlerWithResyncPeriod(handler ResourceEv
return nil, fmt.Errorf("handler %v was not added to shared informer because it has stopped already", handler)
}
logger := ptr.Deref(options.Logger, klog.Background())
resyncPeriod := ptr.Deref(options.ResyncPeriod, s.defaultEventHandlerResyncPeriod)
if resyncPeriod > 0 {
if resyncPeriod < minimumResyncPeriod {
klog.Warningf("resyncPeriod %v is too small. Changing it to the minimum allowed value of %v", resyncPeriod, minimumResyncPeriod)
logger.Info("Warning: resync period is too small. Changing it to the minimum allowed value", "resyncPeriod", resyncPeriod, "minimumResyncPeriod", minimumResyncPeriod)
resyncPeriod = minimumResyncPeriod
}
if resyncPeriod < s.resyncCheckPeriod {
if s.started {
klog.Warningf("resyncPeriod %v is smaller than resyncCheckPeriod %v and the informer has already started. Changing it to %v", resyncPeriod, s.resyncCheckPeriod, s.resyncCheckPeriod)
logger.Info("Warning: resync period is smaller than resync check period and the informer has already started. Changing it to the resync check period", "resyncPeriod", resyncPeriod, "resyncCheckPeriod", s.resyncCheckPeriod)
resyncPeriod = s.resyncCheckPeriod
} else {
// if the event handler's resyncPeriod is smaller than the current resyncCheckPeriod, update
// resyncCheckPeriod to match resyncPeriod and adjust the resync periods of all the listeners
// accordingly
s.resyncCheckPeriod = resyncPeriod
s.processor.resyncCheckPeriodChanged(resyncPeriod)
s.processor.resyncCheckPeriodChanged(logger, resyncPeriod)
}
}
}
listener := newProcessListener(handler, resyncPeriod, determineResyncPeriod(resyncPeriod, s.resyncCheckPeriod), s.clock.Now(), initialBufferSize, s.HasSynced)
listener := newProcessListener(logger, handler, resyncPeriod, determineResyncPeriod(logger, resyncPeriod, s.resyncCheckPeriod), s.clock.Now(), initialBufferSize, s.HasSynced)
if !s.started {
return s.processor.addListener(listener), nil
@ -648,7 +735,7 @@ func (s *sharedIndexInformer) HandleDeltas(obj interface{}, isInInitialList bool
// Conforms to ResourceEventHandler
func (s *sharedIndexInformer) OnAdd(obj interface{}, isInInitialList bool) {
// Invocation of this function is locked under s.blockDeltas, so it is
// save to distribute the notification
// safe to distribute the notification
s.cacheMutationDetector.AddObject(obj)
s.processor.distribute(addNotification{newObj: obj, isInInitialList: isInInitialList}, false)
}
@ -670,7 +757,7 @@ func (s *sharedIndexInformer) OnUpdate(old, new interface{}) {
}
// Invocation of this function is locked under s.blockDeltas, so it is
// save to distribute the notification
// safe to distribute the notification
s.cacheMutationDetector.AddObject(new)
s.processor.distribute(updateNotification{oldObj: old, newObj: new}, isSync)
}
@ -678,7 +765,7 @@ func (s *sharedIndexInformer) OnUpdate(old, new interface{}) {
// Conforms to ResourceEventHandler
func (s *sharedIndexInformer) OnDelete(old interface{}) {
// Invocation of this function is locked under s.blockDeltas, so it is
// save to distribute the notification
// safe to distribute the notification
s.processor.distribute(deleteNotification{oldObj: old}, false)
}
@ -794,7 +881,7 @@ func (p *sharedProcessor) distribute(obj interface{}, sync bool) {
}
}
func (p *sharedProcessor) run(stopCh <-chan struct{}) {
func (p *sharedProcessor) run(ctx context.Context) {
func() {
p.listenersLock.RLock()
defer p.listenersLock.RUnlock()
@ -804,7 +891,7 @@ func (p *sharedProcessor) run(stopCh <-chan struct{}) {
}
p.listenersStarted = true
}()
<-stopCh
<-ctx.Done()
p.listenersLock.Lock()
defer p.listenersLock.Unlock()
@ -844,13 +931,13 @@ func (p *sharedProcessor) shouldResync() bool {
return resyncNeeded
}
func (p *sharedProcessor) resyncCheckPeriodChanged(resyncCheckPeriod time.Duration) {
func (p *sharedProcessor) resyncCheckPeriodChanged(logger klog.Logger, resyncCheckPeriod time.Duration) {
p.listenersLock.RLock()
defer p.listenersLock.RUnlock()
for listener := range p.listeners {
resyncPeriod := determineResyncPeriod(
listener.requestedResyncPeriod, resyncCheckPeriod)
logger, listener.requestedResyncPeriod, resyncCheckPeriod)
listener.setResyncPeriod(resyncPeriod)
}
}
@ -867,6 +954,7 @@ func (p *sharedProcessor) resyncCheckPeriodChanged(resyncCheckPeriod time.Durati
// processorListener also keeps track of the adjusted requested resync
// period of the listener.
type processorListener struct {
logger klog.Logger
nextCh chan interface{}
addCh chan interface{}
@ -910,8 +998,9 @@ func (p *processorListener) HasSynced() bool {
return p.syncTracker.HasSynced()
}
func newProcessListener(handler ResourceEventHandler, requestedResyncPeriod, resyncPeriod time.Duration, now time.Time, bufferSize int, hasSynced func() bool) *processorListener {
func newProcessListener(logger klog.Logger, handler ResourceEventHandler, requestedResyncPeriod, resyncPeriod time.Duration, now time.Time, bufferSize int, hasSynced func() bool) *processorListener {
ret := &processorListener{
logger: logger,
nextCh: make(chan interface{}),
addCh: make(chan interface{}),
handler: handler,
@ -934,7 +1023,7 @@ func (p *processorListener) add(notification interface{}) {
}
func (p *processorListener) pop() {
defer utilruntime.HandleCrash()
defer utilruntime.HandleCrashWithLogger(p.logger)
defer close(p.nextCh) // Tell .run() to stop
var nextCh chan<- interface{}
@ -966,11 +1055,21 @@ func (p *processorListener) pop() {
func (p *processorListener) run() {
// this call blocks until the channel is closed. When a panic happens during the notification
// we will catch it, **the offending item will be skipped!**, and after a short delay (one second)
// the next notification will be attempted. This is usually better than the alternative of never
// the next notification will be attempted. This is usually better than the alternative of never
// delivering again.
stopCh := make(chan struct{})
wait.Until(func() {
for next := range p.nextCh {
//
// This only applies if utilruntime is configured to not panic, which is not the default.
sleepAfterCrash := false
for next := range p.nextCh {
if sleepAfterCrash {
// Sleep before processing the next item.
time.Sleep(time.Second)
}
func() {
// Gets reset below, but only if we get that far.
sleepAfterCrash = true
defer utilruntime.HandleCrashWithLogger(p.logger)
switch notification := next.(type) {
case updateNotification:
p.handler.OnUpdate(notification.oldObj, notification.newObj)
@ -982,15 +1081,14 @@ func (p *processorListener) run() {
case deleteNotification:
p.handler.OnDelete(notification.oldObj)
default:
utilruntime.HandleError(fmt.Errorf("unrecognized notification: %T", next))
utilruntime.HandleErrorWithLogger(p.logger, nil, "unrecognized notification", "notificationType", fmt.Sprintf("%T", next))
}
}
// the only way to get here is if the p.nextCh is empty and closed
close(stopCh)
}, 1*time.Second, stopCh)
sleepAfterCrash = false
}()
}
}
// shouldResync deterimines if the listener needs a resync. If the listener's resyncPeriod is 0,
// shouldResync determines if the listener needs a resync. If the listener's resyncPeriod is 0,
// this always returns false.
func (p *processorListener) shouldResync(now time.Time) bool {
p.resyncLock.Lock()

407
vendor/k8s.io/client-go/tools/cache/the_real_fifo.go generated vendored Normal file
View File

@ -0,0 +1,407 @@
/*
Copyright 2025 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package cache
import (
"fmt"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/sets"
utiltrace "k8s.io/utils/trace"
"sync"
"time"
)
// RealFIFO is a Queue in which every notification from the Reflector is passed
// in order to the Queue via Pop.
// This means that it
// 1. delivers notifications for items that have been deleted
// 2. delivers multiple notifications per item instead of simply the most recent value
type RealFIFO struct {
lock sync.RWMutex
cond sync.Cond
items []Delta
// populated is true if the first batch of items inserted by Replace() has been populated
// or Delete/Add/Update was called first.
populated bool
// initialPopulationCount is the number of items inserted by the first call of Replace()
initialPopulationCount int
// keyFunc is used to make the key used for queued item insertion and retrieval, and
// should be deterministic.
keyFunc KeyFunc
// knownObjects list keys that are "known" --- affecting Delete(),
// Replace(), and Resync()
knownObjects KeyListerGetter
// Indication the queue is closed.
// Used to indicate a queue is closed so a control loop can exit when a queue is empty.
// Currently, not used to gate any of CRUD operations.
closed bool
// Called with every object if non-nil.
transformer TransformFunc
}
var (
_ = Queue(&RealFIFO{}) // RealFIFO is a Queue
)
// Close the queue.
func (f *RealFIFO) Close() {
f.lock.Lock()
defer f.lock.Unlock()
f.closed = true
f.cond.Broadcast()
}
// KeyOf exposes f's keyFunc, but also detects the key of a Deltas object or
// DeletedFinalStateUnknown objects.
func (f *RealFIFO) keyOf(obj interface{}) (string, error) {
if d, ok := obj.(Deltas); ok {
if len(d) == 0 {
return "", KeyError{obj, ErrZeroLengthDeltasObject}
}
obj = d.Newest().Object
}
if d, ok := obj.(Delta); ok {
obj = d.Object
}
if d, ok := obj.(DeletedFinalStateUnknown); ok {
return d.Key, nil
}
return f.keyFunc(obj)
}
// HasSynced returns true if an Add/Update/Delete are called first,
// or the first batch of items inserted by Replace() has been popped.
func (f *RealFIFO) HasSynced() bool {
f.lock.Lock()
defer f.lock.Unlock()
return f.hasSynced_locked()
}
// ignoring lint to reduce delta to the original for review. It's ok adjust later.
//
//lint:file-ignore ST1003: should not use underscores in Go names
func (f *RealFIFO) hasSynced_locked() bool {
return f.populated && f.initialPopulationCount == 0
}
// addToItems_locked appends to the delta list.
func (f *RealFIFO) addToItems_locked(deltaActionType DeltaType, skipTransform bool, obj interface{}) error {
// we must be able to read the keys in order to determine whether the knownObjcts and the items
// in this FIFO overlap
_, err := f.keyOf(obj)
if err != nil {
return KeyError{obj, err}
}
// Every object comes through this code path once, so this is a good
// place to call the transform func.
//
// If obj is a DeletedFinalStateUnknown tombstone or the action is a Sync,
// then the object have already gone through the transformer.
//
// If the objects already present in the cache are passed to Replace(),
// the transformer must be idempotent to avoid re-mutating them,
// or coordinate with all readers from the cache to avoid data races.
// Default informers do not pass existing objects to Replace.
if f.transformer != nil {
_, isTombstone := obj.(DeletedFinalStateUnknown)
if !isTombstone && !skipTransform {
var err error
obj, err = f.transformer(obj)
if err != nil {
return err
}
}
}
f.items = append(f.items, Delta{
Type: deltaActionType,
Object: obj,
})
f.cond.Broadcast()
return nil
}
// Add inserts an item, and puts it in the queue. The item is only enqueued
// if it doesn't already exist in the set.
func (f *RealFIFO) Add(obj interface{}) error {
f.lock.Lock()
defer f.lock.Unlock()
f.populated = true
retErr := f.addToItems_locked(Added, false, obj)
return retErr
}
// Update is the same as Add in this implementation.
func (f *RealFIFO) Update(obj interface{}) error {
f.lock.Lock()
defer f.lock.Unlock()
f.populated = true
retErr := f.addToItems_locked(Updated, false, obj)
return retErr
}
// Delete removes an item. It doesn't add it to the queue, because
// this implementation assumes the consumer only cares about the objects,
// not the order in which they were created/added.
func (f *RealFIFO) Delete(obj interface{}) error {
f.lock.Lock()
defer f.lock.Unlock()
f.populated = true
retErr := f.addToItems_locked(Deleted, false, obj)
return retErr
}
// IsClosed checks if the queue is closed
func (f *RealFIFO) IsClosed() bool {
f.lock.Lock()
defer f.lock.Unlock()
return f.closed
}
// Pop waits until an item is ready and processes it. If multiple items are
// ready, they are returned in the order in which they were added/updated.
// The item is removed from the queue (and the store) before it is processed.
// process function is called under lock, so it is safe
// update data structures in it that need to be in sync with the queue.
func (f *RealFIFO) Pop(process PopProcessFunc) (interface{}, error) {
f.lock.Lock()
defer f.lock.Unlock()
for len(f.items) == 0 {
// When the queue is empty, invocation of Pop() is blocked until new item is enqueued.
// When Close() is called, the f.closed is set and the condition is broadcasted.
// Which causes this loop to continue and return from the Pop().
if f.closed {
return nil, ErrFIFOClosed
}
f.cond.Wait()
}
isInInitialList := !f.hasSynced_locked()
item := f.items[0]
// The underlying array still exists and references this object, so the object will not be garbage collected unless we zero the reference.
f.items[0] = Delta{}
f.items = f.items[1:]
if f.initialPopulationCount > 0 {
f.initialPopulationCount--
}
// Only log traces if the queue depth is greater than 10 and it takes more than
// 100 milliseconds to process one item from the queue.
// Queue depth never goes high because processing an item is locking the queue,
// and new items can't be added until processing finish.
// https://github.com/kubernetes/kubernetes/issues/103789
if len(f.items) > 10 {
id, _ := f.keyOf(item)
trace := utiltrace.New("RealFIFO Pop Process",
utiltrace.Field{Key: "ID", Value: id},
utiltrace.Field{Key: "Depth", Value: len(f.items)},
utiltrace.Field{Key: "Reason", Value: "slow event handlers blocking the queue"})
defer trace.LogIfLong(100 * time.Millisecond)
}
// we wrap in Deltas here to be compatible with preview Pop functions and those interpreting the return value.
err := process(Deltas{item}, isInInitialList)
return Deltas{item}, err
}
// Replace
// 1. finds those items in f.items that are not in newItems and creates synthetic deletes for them
// 2. finds items in knownObjects that are not in newItems and creates synthetic deletes for them
// 3. adds the newItems to the queue
func (f *RealFIFO) Replace(newItems []interface{}, resourceVersion string) error {
f.lock.Lock()
defer f.lock.Unlock()
// determine the keys of everything we're adding. We cannot add the items until after the synthetic deletes have been
// created for items that don't existing in newItems
newKeys := sets.Set[string]{}
for _, obj := range newItems {
key, err := f.keyOf(obj)
if err != nil {
return KeyError{obj, err}
}
newKeys.Insert(key)
}
queuedItems := f.items
queuedKeys := []string{}
lastQueuedItemForKey := map[string]Delta{}
for _, queuedItem := range queuedItems {
queuedKey, err := f.keyOf(queuedItem.Object)
if err != nil {
return KeyError{queuedItem.Object, err}
}
if _, seen := lastQueuedItemForKey[queuedKey]; !seen {
queuedKeys = append(queuedKeys, queuedKey)
}
lastQueuedItemForKey[queuedKey] = queuedItem
}
// all the deletes already in the queue are important. There are two cases
// 1. queuedItems has delete for key/X and newItems has replace for key/X. This means the queued UID was deleted and a new one was created.
// 2. queuedItems has a delete for key/X and newItems does NOT have key/X. This means the queued item was deleted.
// Do deletion detection against objects in the queue.
for _, queuedKey := range queuedKeys {
if newKeys.Has(queuedKey) {
continue
}
// Delete pre-existing items not in the new list.
// This could happen if watch deletion event was missed while
// disconnected from apiserver.
lastQueuedItem := lastQueuedItemForKey[queuedKey]
// if we've already got the item marked as deleted, no need to add another delete
if lastQueuedItem.Type == Deleted {
continue
}
// if we got here, then the last entry we have for the queued item is *not* a deletion and we need to add a delete
deletedObj := lastQueuedItem.Object
retErr := f.addToItems_locked(Deleted, true, DeletedFinalStateUnknown{
Key: queuedKey,
Obj: deletedObj,
})
if retErr != nil {
return fmt.Errorf("couldn't enqueue object: %w", retErr)
}
}
// Detect deletions for objects not present in the queue, but present in KnownObjects
knownKeys := f.knownObjects.ListKeys()
for _, knownKey := range knownKeys {
if newKeys.Has(knownKey) { // still present
continue
}
if _, inQueuedItems := lastQueuedItemForKey[knownKey]; inQueuedItems { // already added delete for these
continue
}
deletedObj, exists, err := f.knownObjects.GetByKey(knownKey)
if err != nil {
deletedObj = nil
utilruntime.HandleError(fmt.Errorf("error during lookup, placing DeleteFinalStateUnknown marker without object: key=%q, err=%w", knownKey, err))
} else if !exists {
deletedObj = nil
utilruntime.HandleError(fmt.Errorf("key does not exist in known objects store, placing DeleteFinalStateUnknown marker without object: key=%q", knownKey))
}
retErr := f.addToItems_locked(Deleted, false, DeletedFinalStateUnknown{
Key: knownKey,
Obj: deletedObj,
})
if retErr != nil {
return fmt.Errorf("couldn't enqueue object: %w", retErr)
}
}
// now that we have the deletes we need for items, we can add the newItems to the items queue
for _, obj := range newItems {
retErr := f.addToItems_locked(Replaced, false, obj)
if retErr != nil {
return fmt.Errorf("couldn't enqueue object: %w", retErr)
}
}
if !f.populated {
f.populated = true
f.initialPopulationCount = len(f.items)
}
return nil
}
// Resync will ensure that every object in the Store has its key in the queue.
// This should be a no-op, because that property is maintained by all operations.
func (f *RealFIFO) Resync() error {
// TODO this cannot logically be done by the FIFO, it can only be done by the indexer
f.lock.Lock()
defer f.lock.Unlock()
if f.knownObjects == nil {
return nil
}
keysInQueue := sets.Set[string]{}
for _, item := range f.items {
key, err := f.keyOf(item.Object)
if err != nil {
return KeyError{item, err}
}
keysInQueue.Insert(key)
}
knownKeys := f.knownObjects.ListKeys()
for _, knownKey := range knownKeys {
// If we are doing Resync() and there is already an event queued for that object,
// we ignore the Resync for it. This is to avoid the race, in which the resync
// comes with the previous value of object (since queueing an event for the object
// doesn't trigger changing the underlying store <knownObjects>.
if keysInQueue.Has(knownKey) {
continue
}
knownObj, exists, err := f.knownObjects.GetByKey(knownKey)
if err != nil {
utilruntime.HandleError(fmt.Errorf("unable to queue object for sync: key=%q, err=%w", knownKey, err))
continue
} else if !exists {
utilruntime.HandleError(fmt.Errorf("key does not exist in known objects store, unable to queue object for sync: key=%q", knownKey))
continue
}
retErr := f.addToItems_locked(Sync, true, knownObj)
if retErr != nil {
return fmt.Errorf("couldn't queue object: %w", err)
}
}
return nil
}
// NewRealFIFO returns a Store which can be used to queue up items to
// process.
func NewRealFIFO(keyFunc KeyFunc, knownObjects KeyListerGetter, transformer TransformFunc) *RealFIFO {
if knownObjects == nil {
panic("coding error: knownObjects must be provided")
}
f := &RealFIFO{
items: make([]Delta, 0, 10),
keyFunc: keyFunc,
knownObjects: knownObjects,
transformer: transformer,
}
f.cond.L = &f.lock
return f
}

View File

@ -16,4 +16,4 @@ limitations under the License.
// +k8s:deepcopy-gen=package
package api // import "k8s.io/client-go/tools/clientcmd/api"
package api

View File

@ -18,4 +18,4 @@ limitations under the License.
// +k8s:deepcopy-gen=package
// +k8s:defaulter-gen=Kind
package v1 // import "k8s.io/client-go/tools/clientcmd/api/v1"
package v1

View File

@ -34,4 +34,4 @@ Sample usage from merged .kubeconfig files (local directory, home directory)
client, err := metav1.New(config)
// ...
*/
package clientcmd // import "k8s.io/client-go/tools/clientcmd"
package clientcmd

View File

@ -22,14 +22,14 @@ import (
"time"
v1 "k8s.io/api/coordination/v1"
v1alpha2 "k8s.io/api/coordination/v1alpha2"
v1beta1 "k8s.io/api/coordination/v1beta1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/fields"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/client-go/informers"
"k8s.io/client-go/kubernetes"
coordinationv1alpha2client "k8s.io/client-go/kubernetes/typed/coordination/v1alpha2"
coordinationv1beta1client "k8s.io/client-go/kubernetes/typed/coordination/v1beta1"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/util/workqueue"
"k8s.io/klog/v2"
@ -43,7 +43,7 @@ type CacheSyncWaiter interface {
}
type LeaseCandidate struct {
leaseClient coordinationv1alpha2client.LeaseCandidateInterface
leaseClient coordinationv1beta1client.LeaseCandidateInterface
leaseCandidateInformer cache.SharedIndexInformer
informerFactory informers.SharedInformerFactory
hasSynced cache.InformerSynced
@ -84,10 +84,10 @@ func NewCandidate(clientset kubernetes.Interface,
options.FieldSelector = fieldSelector
}),
)
leaseCandidateInformer := informerFactory.Coordination().V1alpha2().LeaseCandidates().Informer()
leaseCandidateInformer := informerFactory.Coordination().V1beta1().LeaseCandidates().Informer()
lc := &LeaseCandidate{
leaseClient: clientset.CoordinationV1alpha2().LeaseCandidates(candidateNamespace),
leaseClient: clientset.CoordinationV1beta1().LeaseCandidates(candidateNamespace),
leaseCandidateInformer: leaseCandidateInformer,
informerFactory: informerFactory,
name: candidateName,
@ -102,7 +102,7 @@ func NewCandidate(clientset kubernetes.Interface,
h, err := leaseCandidateInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{
UpdateFunc: func(oldObj, newObj interface{}) {
if leasecandidate, ok := newObj.(*v1alpha2.LeaseCandidate); ok {
if leasecandidate, ok := newObj.(*v1beta1.LeaseCandidate); ok {
if leasecandidate.Spec.PingTime != nil && leasecandidate.Spec.PingTime.After(leasecandidate.Spec.RenewTime.Time) {
lc.enqueueLease()
}
@ -184,13 +184,13 @@ func (c *LeaseCandidate) ensureLease(ctx context.Context) error {
return nil
}
func (c *LeaseCandidate) newLeaseCandidate() *v1alpha2.LeaseCandidate {
lc := &v1alpha2.LeaseCandidate{
func (c *LeaseCandidate) newLeaseCandidate() *v1beta1.LeaseCandidate {
lc := &v1beta1.LeaseCandidate{
ObjectMeta: metav1.ObjectMeta{
Name: c.name,
Namespace: c.namespace,
},
Spec: v1alpha2.LeaseCandidateSpec{
Spec: v1beta1.LeaseCandidateSpec{
LeaseName: c.leaseName,
BinaryVersion: c.binaryVersion,
EmulationVersion: c.emulationVersion,

View File

@ -16,4 +16,4 @@ limitations under the License.
// Package record has all client logic for recording and reporting
// "k8s.io/api/core/v1".Event events.
package record // import "k8s.io/client-go/tools/record"
package record

View File

@ -489,7 +489,7 @@ func (recorder *recorderImpl) makeEvent(ref *v1.ObjectReference, annotations map
}
return &v1.Event{
ObjectMeta: metav1.ObjectMeta{
Name: fmt.Sprintf("%v.%x", ref.Name, t.UnixNano()),
Name: util.GenerateEventName(ref.Name, t.UnixNano()),
Namespace: namespace,
Annotations: annotations,
},

View File

@ -17,10 +17,14 @@ limitations under the License.
package util
import (
"fmt"
"net/http"
"github.com/google/uuid"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
apimachineryvalidation "k8s.io/apimachinery/pkg/api/validation"
)
// ValidateEventType checks that eventtype is an expected type of event
@ -38,3 +42,16 @@ func IsKeyNotFoundError(err error) bool {
return statusErr != nil && statusErr.Status().Code == http.StatusNotFound
}
// GenerateEventName generates a valid Event name from the referenced name and the passed UNIX timestamp.
// The referenced Object name may not be a valid name for Events and cause the Event to fail
// to be created, so we need to generate a new one in that case.
// Ref: https://issues.k8s.io/127594
func GenerateEventName(refName string, unixNano int64) string {
name := fmt.Sprintf("%s.%x", refName, unixNano)
if errs := apimachineryvalidation.NameIsDNSSubdomain(name, false); len(errs) > 0 {
// Using an uuid guarantees uniqueness and correctness
name = uuid.New().String()
}
return name
}