rebase: update kubernetes to v1.20.0

updated kubernetes packages to latest
release.

Signed-off-by: Madhu Rajanna <madhupr007@gmail.com>
This commit is contained in:
Madhu Rajanna
2020-12-17 17:58:29 +05:30
committed by mergify[bot]
parent 4abe128bd8
commit 83559144b1
1624 changed files with 247222 additions and 160270 deletions

View File

@ -26,7 +26,6 @@ reviewers:
- pmorie
- janetkuo
- justinsb
- eparis
- soltysh
- jsafrane
- dims
@ -38,8 +37,6 @@ reviewers:
- ingvagabund
- resouer
- jessfraz
- david-mcmahon
- mfojtik
- mqliang
- sdminonne
- ncdc

View File

@ -69,6 +69,12 @@ type Config struct {
// question to this interface as a parameter. This is probably moot
// now that this functionality appears at a higher level.
RetryOnError bool
// Called whenever the ListAndWatch drops the connection with an error.
WatchErrorHandler WatchErrorHandler
// WatchListPageSize is the requested chunk size of initial and relist watch lists.
WatchListPageSize int64
}
// ShouldResyncFunc is a type of function that indicates if a reflector should perform a
@ -131,18 +137,22 @@ func (c *controller) Run(stopCh <-chan struct{}) {
c.config.FullResyncPeriod,
)
r.ShouldResync = c.config.ShouldResync
r.WatchListPageSize = c.config.WatchListPageSize
r.clock = c.clock
if c.config.WatchErrorHandler != nil {
r.watchErrorHandler = c.config.WatchErrorHandler
}
c.reflectorMutex.Lock()
c.reflector = r
c.reflectorMutex.Unlock()
var wg wait.Group
defer wg.Wait()
wg.StartWithChannel(stopCh, r.Run)
wait.Until(c.processLoop, time.Second, stopCh)
wg.Wait()
}
// Returns true once this controller has completed an initial resource listing
@ -183,9 +193,11 @@ func (c *controller) processLoop() {
}
}
// ResourceEventHandler can handle notifications for events that happen to a
// resource. The events are informational only, so you can't return an
// error.
// ResourceEventHandler can handle notifications for events that
// happen to a resource. The events are informational only, so you
// can't return an error. The handlers MUST NOT modify the objects
// received; this concerns not only the top level of structure but all
// the data structures reachable from it.
// * OnAdd is called when an object is added.
// * OnUpdate is called when an object is modified. Note that oldObj is the
// last known state of the object-- it is possible that several changes
@ -205,7 +217,8 @@ type ResourceEventHandler interface {
// ResourceEventHandlerFuncs is an adaptor to let you easily specify as many or
// as few of the notification functions as you want while still implementing
// ResourceEventHandler.
// ResourceEventHandler. This adapter does not remove the prohibition against
// modifying the objects.
type ResourceEventHandlerFuncs struct {
AddFunc func(obj interface{})
UpdateFunc func(oldObj, newObj interface{})
@ -237,6 +250,7 @@ func (r ResourceEventHandlerFuncs) OnDelete(obj interface{}) {
// in, ensuring the appropriate nested handler method is invoked. An object
// that starts passing the filter after an update is considered an add, and an
// object that stops passing the filter after an update is considered a delete.
// Like the handlers, the filter MUST NOT modify the objects it is given.
type FilteringResourceEventHandler struct {
FilterFunc func(obj interface{}) bool
Handler ResourceEventHandler

View File

@ -23,7 +23,7 @@ import (
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/klog"
"k8s.io/klog/v2"
)
// NewDeltaFIFO returns a Queue which can be used to process changes to items.
@ -41,7 +41,7 @@ import (
// affects error retrying.
// NOTE: It is possible to misuse this and cause a race when using an
// external known object source.
// Whether there is a potential race depends on how the comsumer
// Whether there is a potential race depends on how the consumer
// modifies knownObjects. In Pop(), process function is called under
// lock, so it is safe to update data structures in it that need to be
// in sync with the queue (e.g. knownObjects).
@ -99,7 +99,7 @@ type DeltaFIFOOptions struct {
EmitDeltaTypeReplaced bool
}
// NewDeltaFIFOWithOptions returns a Store which can be used process changes to
// NewDeltaFIFOWithOptions returns a Queue which can be used to process changes to
// items. See also the comment on DeltaFIFO.
func NewDeltaFIFOWithOptions(opts DeltaFIFOOptions) *DeltaFIFO {
if opts.KeyFunction == nil {
@ -144,7 +144,8 @@ func NewDeltaFIFOWithOptions(opts DeltaFIFOOptions) *DeltaFIFO {
//
// DeltaFIFO's Pop(), Get(), and GetByKey() methods return
// interface{} to satisfy the Store/Queue interfaces, but they
// will always return an object of type Deltas.
// will always return an object of type Deltas. List() returns
// the newest object from each accumulator in the FIFO.
//
// A DeltaFIFO's knownObjects KeyListerGetter provides the abilities
// to list Store keys and to get objects by Store key. The objects in
@ -160,14 +161,17 @@ type DeltaFIFO struct {
lock sync.RWMutex
cond sync.Cond
// We depend on the property that items in the set are in
// the queue and vice versa, and that all Deltas in this
// map have at least one Delta.
// `items` maps a key to a Deltas.
// Each such Deltas has at least one Delta.
items map[string]Deltas
// `queue` maintains FIFO order of keys for consumption in Pop().
// There are no duplicates in `queue`.
// A key is in `queue` if and only if it is in `items`.
queue []string
// populated is true if the first batch of items inserted by Replace() has been populated
// or Delete/Add/Update was called first.
// or Delete/Add/Update/AddIfNotPresent was called first.
populated bool
// initialPopulationCount is the number of items inserted by the first call of Replace()
initialPopulationCount int
@ -180,11 +184,9 @@ type DeltaFIFO struct {
// Replace(), and Resync()
knownObjects KeyListerGetter
// Indication the queue is closed.
// Used to indicate a queue is closed so a control loop can exit when a queue is empty.
// Currently, not used to gate any of CRED operations.
closed bool
closedLock sync.Mutex
closed bool
// emitDeltaTypeReplaced is whether to emit the Replaced or Sync
// DeltaType when Replace() is called (to preserve backwards compat).
@ -204,8 +206,8 @@ var (
// Close the queue.
func (f *DeltaFIFO) Close() {
f.closedLock.Lock()
defer f.closedLock.Unlock()
f.lock.Lock()
defer f.lock.Unlock()
f.closed = true
f.cond.Broadcast()
}
@ -226,7 +228,7 @@ func (f *DeltaFIFO) KeyOf(obj interface{}) (string, error) {
}
// HasSynced returns true if an Add/Update/Delete/AddIfNotPresent are called first,
// or an Update called first but the first batch of items inserted by Replace() has been popped
// or the first batch of items inserted by Replace() has been popped.
func (f *DeltaFIFO) HasSynced() bool {
f.lock.Lock()
defer f.lock.Unlock()
@ -283,6 +285,7 @@ func (f *DeltaFIFO) Delete(obj interface{}) error {
}
}
// exist in items and/or KnownObjects
return f.queueActionLocked(Deleted, obj)
}
@ -333,6 +336,11 @@ func dedupDeltas(deltas Deltas) Deltas {
a := &deltas[n-1]
b := &deltas[n-2]
if out := isDup(a, b); out != nil {
// `a` and `b` are duplicates. Only keep the one returned from isDup().
// TODO: This extra array allocation and copy seems unnecessary if
// all we do to dedup is compare the new delta with the last element
// in `items`, which could be done by mutating `items` directly.
// Might be worth profiling and investigating if it is safe to optimize.
d := append(Deltas{}, deltas[:n-2]...)
return append(d, *out)
}
@ -369,8 +377,8 @@ func (f *DeltaFIFO) queueActionLocked(actionType DeltaType, obj interface{}) err
if err != nil {
return KeyError{obj, err}
}
newDeltas := append(f.items[id], Delta{actionType, obj})
oldDeltas := f.items[id]
newDeltas := append(oldDeltas, Delta{actionType, obj})
newDeltas = dedupDeltas(newDeltas)
if len(newDeltas) > 0 {
@ -382,10 +390,14 @@ func (f *DeltaFIFO) queueActionLocked(actionType DeltaType, obj interface{}) err
} else {
// This never happens, because dedupDeltas never returns an empty list
// when given a non-empty list (as it is here).
// But if somehow it ever does return an empty list, then
// We need to remove this from our map (extra items in the queue are
// ignored if they are not in the map).
delete(f.items, id)
// If somehow it happens anyway, deal with it but complain.
if oldDeltas == nil {
klog.Errorf("Impossible dedupDeltas for id=%q: oldDeltas=%#+v, obj=%#+v; ignoring", id, oldDeltas, obj)
return nil
}
klog.Errorf("Impossible dedupDeltas for id=%q: oldDeltas=%#+v, obj=%#+v; breaking invariant by storing empty Deltas", id, oldDeltas, obj)
f.items[id] = newDeltas
return fmt.Errorf("Impossible dedupDeltas for id=%q: oldDeltas=%#+v, obj=%#+v; broke DeltaFIFO invariant by storing empty Deltas", id, oldDeltas, obj)
}
return nil
}
@ -447,20 +459,22 @@ func (f *DeltaFIFO) GetByKey(key string) (item interface{}, exists bool, err err
// IsClosed checks if the queue is closed
func (f *DeltaFIFO) IsClosed() bool {
f.closedLock.Lock()
defer f.closedLock.Unlock()
f.lock.Lock()
defer f.lock.Unlock()
return f.closed
}
// Pop blocks until an item is added to the queue, and then returns it. If
// Pop blocks until the queue has some items, and then returns one. If
// multiple items are ready, they are returned in the order in which they were
// added/updated. The item is removed from the queue (and the store) before it
// is returned, so if you don't successfully process it, you need to add it back
// with AddIfNotPresent().
// process function is called under lock, so it is safe update data structures
// process function is called under lock, so it is safe to update data structures
// in it that need to be in sync with the queue (e.g. knownKeys). The PopProcessFunc
// may return an instance of ErrRequeue with a nested error to indicate the current
// item should be requeued (equivalent to calling AddIfNotPresent under the lock).
// process should avoid expensive I/O operation so that other queue operations, i.e.
// Add() and Get(), won't be blocked for too long.
//
// Pop returns a 'Deltas', which has a complete list of all the things
// that happened to the object (deltas) while it was sitting in the queue.
@ -472,7 +486,7 @@ func (f *DeltaFIFO) Pop(process PopProcessFunc) (interface{}, error) {
// When the queue is empty, invocation of Pop() is blocked until new item is enqueued.
// When Close() is called, the f.closed is set and the condition is broadcasted.
// Which causes this loop to continue and return from the Pop().
if f.IsClosed() {
if f.closed {
return nil, ErrFIFOClosed
}
@ -485,7 +499,8 @@ func (f *DeltaFIFO) Pop(process PopProcessFunc) (interface{}, error) {
}
item, ok := f.items[id]
if !ok {
// Item may have been deleted subsequently.
// This should never happen
klog.Errorf("Inconceivable! %q was in f.queue but not f.items; ignoring.", id)
continue
}
delete(f.items, id)
@ -521,6 +536,7 @@ func (f *DeltaFIFO) Replace(list []interface{}, resourceVersion string) error {
action = Replaced
}
// Add Sync/Replaced action for each new item.
for _, item := range list {
key, err := f.KeyOf(item)
if err != nil {
@ -539,6 +555,9 @@ func (f *DeltaFIFO) Replace(list []interface{}, resourceVersion string) error {
if keys.Has(k) {
continue
}
// Delete pre-existing items not in the new list.
// This could happen if watch deletion event was missed while
// disconnected from apiserver.
var deletedObj interface{}
if n := oldItem.Newest(); n != nil {
deletedObj = n.Object
@ -650,7 +669,8 @@ type KeyLister interface {
// A KeyGetter is anything that knows how to get the value stored under a given key.
type KeyGetter interface {
GetByKey(key string) (interface{}, bool, error)
// GetByKey returns the value associated with the key, or sets exists=false.
GetByKey(key string) (value interface{}, exists bool, err error)
}
// DeltaType is the type of a change (addition, deletion, etc)
@ -713,10 +733,10 @@ func copyDeltas(d Deltas) Deltas {
return d2
}
// DeletedFinalStateUnknown is placed into a DeltaFIFO in the case where
// an object was deleted but the watch deletion event was missed. In this
// case we don't know the final "resting" state of the object, so there's
// a chance the included `Obj` is stale.
// DeletedFinalStateUnknown is placed into a DeltaFIFO in the case where an object
// was deleted but the watch deletion event was missed while disconnected from
// apiserver. In this case we don't know the final "resting" state of the object, so
// there's a chance the included `Obj` is stale.
type DeletedFinalStateUnknown struct {
Key string
Obj interface{}

View File

@ -21,7 +21,7 @@ import (
"time"
"k8s.io/apimachinery/pkg/util/clock"
"k8s.io/klog"
"k8s.io/klog/v2"
)
// ExpirationCache implements the store interface

View File

@ -71,8 +71,8 @@ type Queue interface {
// HasSynced returns true if the first batch of keys have all been
// popped. The first batch of keys are those of the first Replace
// operation if that happened before any Add, Update, or Delete;
// otherwise the first batch is empty.
// operation if that happened before any Add, AddIfNotPresent,
// Update, or Delete; otherwise the first batch is empty.
HasSynced() bool
// Close the queue
@ -128,8 +128,7 @@ type FIFO struct {
// Indication the queue is closed.
// Used to indicate a queue is closed so a control loop can exit when a queue is empty.
// Currently, not used to gate any of CRED operations.
closed bool
closedLock sync.Mutex
closed bool
}
var (
@ -138,14 +137,14 @@ var (
// Close the queue.
func (f *FIFO) Close() {
f.closedLock.Lock()
defer f.closedLock.Unlock()
f.lock.Lock()
defer f.lock.Unlock()
f.closed = true
f.cond.Broadcast()
}
// HasSynced returns true if an Add/Update/Delete/AddIfNotPresent are called first,
// or an Update called first but the first batch of items inserted by Replace() has been popped
// or the first batch of items inserted by Replace() has been popped.
func (f *FIFO) HasSynced() bool {
f.lock.Lock()
defer f.lock.Unlock()
@ -262,8 +261,8 @@ func (f *FIFO) GetByKey(key string) (item interface{}, exists bool, err error) {
// IsClosed checks if the queue is closed
func (f *FIFO) IsClosed() bool {
f.closedLock.Lock()
defer f.closedLock.Unlock()
f.lock.Lock()
defer f.lock.Unlock()
if f.closed {
return true
}
@ -284,7 +283,7 @@ func (f *FIFO) Pop(process PopProcessFunc) (interface{}, error) {
// When the queue is empty, invocation of Pop() is blocked until new item is enqueued.
// When Close() is called, the f.closed is set and the condition is broadcasted.
// Which causes this loop to continue and return from the Pop().
if f.IsClosed() {
if f.closed {
return nil, ErrFIFOClosed
}

View File

@ -17,7 +17,7 @@ limitations under the License.
package cache
import (
"k8s.io/klog"
"k8s.io/klog/v2"
"k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/api/meta"

View File

@ -22,7 +22,7 @@ import (
"sync"
"time"
"k8s.io/klog"
"k8s.io/klog/v2"
"k8s.io/apimachinery/pkg/api/meta"
"k8s.io/apimachinery/pkg/runtime"

View File

@ -24,7 +24,7 @@ import (
"sync"
"time"
"k8s.io/klog"
"k8s.io/klog/v2"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/util/diff"

View File

@ -39,7 +39,7 @@ import (
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/apimachinery/pkg/watch"
"k8s.io/client-go/tools/pager"
"k8s.io/klog"
"k8s.io/klog/v2"
"k8s.io/utils/trace"
)
@ -69,6 +69,8 @@ type Reflector struct {
// backoff manages backoff of ListWatch
backoffManager wait.BackoffManager
// initConnBackoffManager manages backoff the initial connection with the Watch calll of ListAndWatch.
initConnBackoffManager wait.BackoffManager
resyncPeriod time.Duration
// ShouldResync is invoked periodically and whenever it returns `true` the Store's Resync operation is invoked
@ -95,6 +97,46 @@ type Reflector struct {
// etcd, which is significantly less efficient and may lead to serious performance and
// scalability problems.
WatchListPageSize int64
// Called whenever the ListAndWatch drops the connection with an error.
watchErrorHandler WatchErrorHandler
}
// ResourceVersionUpdater is an interface that allows store implementation to
// track the current resource version of the reflector. This is especially
// important if storage bookmarks are enabled.
type ResourceVersionUpdater interface {
// UpdateResourceVersion is called each time current resource version of the reflector
// is updated.
UpdateResourceVersion(resourceVersion string)
}
// The WatchErrorHandler is called whenever ListAndWatch drops the
// connection with an error. After calling this handler, the informer
// will backoff and retry.
//
// The default implementation looks at the error type and tries to log
// the error message at an appropriate level.
//
// Implementations of this handler may display the error message in other
// ways. Implementations should return quickly - any expensive processing
// should be offloaded.
type WatchErrorHandler func(r *Reflector, err error)
// DefaultWatchErrorHandler is the default implementation of WatchErrorHandler
func DefaultWatchErrorHandler(r *Reflector, err error) {
switch {
case isExpiredError(err):
// Don't set LastSyncResourceVersionUnavailable - LIST call with ResourceVersion=RV already
// has a semantic that it returns data at least as fresh as provided RV.
// So first try to LIST with setting RV to resource version of last observed object.
klog.V(4).Infof("%s: watch of %v closed with: %v", r.name, r.expectedTypeName, err)
case err == io.EOF:
// watch closed normally
case err == io.ErrUnexpectedEOF:
klog.V(1).Infof("%s: Watch for %v closed with unexpected EOF: %v", r.name, r.expectedTypeName, err)
default:
utilruntime.HandleError(fmt.Errorf("%s: Failed to watch %v: %v", r.name, r.expectedTypeName, err))
}
}
var (
@ -135,9 +177,11 @@ func NewNamedReflector(name string, lw ListerWatcher, expectedType interface{},
// We used to make the call every 1sec (1 QPS), the goal here is to achieve ~98% traffic reduction when
// API server is not healthy. With these parameters, backoff will stop at [30,60) sec interval which is
// 0.22 QPS. If we don't backoff for 2min, assume API server is healthy and we reset the backoff.
backoffManager: wait.NewExponentialBackoffManager(800*time.Millisecond, 30*time.Second, 2*time.Minute, 2.0, 1.0, realClock),
resyncPeriod: resyncPeriod,
clock: realClock,
backoffManager: wait.NewExponentialBackoffManager(800*time.Millisecond, 30*time.Second, 2*time.Minute, 2.0, 1.0, realClock),
initConnBackoffManager: wait.NewExponentialBackoffManager(800*time.Millisecond, 30*time.Second, 2*time.Minute, 2.0, 1.0, realClock),
resyncPeriod: resyncPeriod,
clock: realClock,
watchErrorHandler: WatchErrorHandler(DefaultWatchErrorHandler),
}
r.setExpectedType(expectedType)
return r
@ -175,7 +219,7 @@ func (r *Reflector) Run(stopCh <-chan struct{}) {
klog.V(2).Infof("Starting reflector %s (%s) from %s", r.expectedTypeName, r.resyncPeriod, r.name)
wait.BackoffUntil(func() {
if err := r.ListAndWatch(stopCh); err != nil {
utilruntime.HandleError(err)
r.watchErrorHandler(r, err)
}
}, r.backoffManager, true, stopCh)
klog.V(2).Infof("Stopping reflector %s (%s) from %s", r.expectedTypeName, r.resyncPeriod, r.name)
@ -276,7 +320,7 @@ func (r *Reflector) ListAndWatch(stopCh <-chan struct{}) error {
case <-listCh:
}
if err != nil {
return fmt.Errorf("%s: Failed to list %v: %v", r.name, r.expectedTypeName, err)
return fmt.Errorf("failed to list %v: %v", r.expectedTypeName, err)
}
// We check if the list was paginated and if so set the paginatedResult based on that.
@ -297,17 +341,17 @@ func (r *Reflector) ListAndWatch(stopCh <-chan struct{}) error {
initTrace.Step("Objects listed")
listMetaInterface, err := meta.ListAccessor(list)
if err != nil {
return fmt.Errorf("%s: Unable to understand list result %#v: %v", r.name, list, err)
return fmt.Errorf("unable to understand list result %#v: %v", list, err)
}
resourceVersion = listMetaInterface.GetResourceVersion()
initTrace.Step("Resource version extracted")
items, err := meta.ExtractList(list)
if err != nil {
return fmt.Errorf("%s: Unable to understand list result %#v (%v)", r.name, list, err)
return fmt.Errorf("unable to understand list result %#v (%v)", list, err)
}
initTrace.Step("Objects extracted")
if err := r.syncWith(items, resourceVersion); err != nil {
return fmt.Errorf("%s: Unable to sync list result: %v", r.name, err)
return fmt.Errorf("unable to sync list result: %v", err)
}
initTrace.Step("SyncWith done")
r.setLastSyncResourceVersion(resourceVersion)
@ -369,28 +413,15 @@ func (r *Reflector) ListAndWatch(stopCh <-chan struct{}) error {
start := r.clock.Now()
w, err := r.listerWatcher.Watch(options)
if err != nil {
switch {
case isExpiredError(err):
// Don't set LastSyncResourceVersionExpired - LIST call with ResourceVersion=RV already
// has a semantic that it returns data at least as fresh as provided RV.
// So first try to LIST with setting RV to resource version of last observed object.
klog.V(4).Infof("%s: watch of %v closed with: %v", r.name, r.expectedTypeName, err)
case err == io.EOF:
// watch closed normally
case err == io.ErrUnexpectedEOF:
klog.V(1).Infof("%s: Watch for %v closed with unexpected EOF: %v", r.name, r.expectedTypeName, err)
default:
utilruntime.HandleError(fmt.Errorf("%s: Failed to watch %v: %v", r.name, r.expectedTypeName, err))
}
// If this is "connection refused" error, it means that most likely apiserver is not responsive.
// It doesn't make sense to re-list all objects because most likely we will be able to restart
// watch where we ended.
// If that's the case wait and resend watch request.
// If that's the case begin exponentially backing off and resend watch request.
if utilnet.IsConnectionRefused(err) {
time.Sleep(time.Second)
<-r.initConnBackoffManager.Backoff().C()
continue
}
return nil
return err
}
if err := r.watchHandler(start, w, &resourceVersion, resyncerrc, stopCh); err != nil {
@ -485,6 +516,9 @@ loop:
}
*resourceVersion = newResourceVersion
r.setLastSyncResourceVersion(newResourceVersion)
if rvu, ok := r.store.(ResourceVersionUpdater); ok {
rvu.UpdateResourceVersion(newResourceVersion)
}
eventCount++
}
}
@ -551,5 +585,26 @@ func isExpiredError(err error) bool {
}
func isTooLargeResourceVersionError(err error) bool {
return apierrors.HasStatusCause(err, metav1.CauseTypeResourceVersionTooLarge)
if apierrors.HasStatusCause(err, metav1.CauseTypeResourceVersionTooLarge) {
return true
}
// In Kubernetes 1.17.0-1.18.5, the api server doesn't set the error status cause to
// metav1.CauseTypeResourceVersionTooLarge to indicate that the requested minimum resource
// version is larger than the largest currently available resource version. To ensure backward
// compatibility with these server versions we also need to detect the error based on the content
// of the error message field.
if !apierrors.IsTimeout(err) {
return false
}
apierr, ok := err.(apierrors.APIStatus)
if !ok || apierr == nil || apierr.Status().Details == nil {
return false
}
for _, cause := range apierr.Status().Details.Causes {
// Matches the message returned by api server 1.17.0-1.18.5 for this error condition
if cause.Message == "Too large resource version" {
return true
}
}
return false
}

View File

@ -28,35 +28,37 @@ import (
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/utils/buffer"
"k8s.io/klog"
"k8s.io/klog/v2"
)
// SharedInformer provides eventually consistent linkage of its
// clients to the authoritative state of a given collection of
// objects. An object is identified by its API group, kind/resource,
// namespace, and name; the `ObjectMeta.UID` is not part of an
// object's ID as far as this contract is concerned. One
// namespace (if any), and name; the `ObjectMeta.UID` is not part of
// an object's ID as far as this contract is concerned. One
// SharedInformer provides linkage to objects of a particular API
// group and kind/resource. The linked object collection of a
// SharedInformer may be further restricted to one namespace and/or by
// label selector and/or field selector.
// SharedInformer may be further restricted to one namespace (if
// applicable) and/or by label selector and/or field selector.
//
// The authoritative state of an object is what apiservers provide
// access to, and an object goes through a strict sequence of states.
// An object state is either "absent" or present with a
// ResourceVersion and other appropriate content.
// An object state is either (1) present with a ResourceVersion and
// other appropriate content or (2) "absent".
//
// A SharedInformer maintains a local cache, exposed by GetStore() and
// by GetIndexer() in the case of an indexed informer, of the state of
// each relevant object. This cache is eventually consistent with the
// authoritative state. This means that, unless prevented by
// persistent communication problems, if ever a particular object ID X
// is authoritatively associated with a state S then for every
// SharedInformer I whose collection includes (X, S) eventually either
// (1) I's cache associates X with S or a later state of X, (2) I is
// stopped, or (3) the authoritative state service for X terminates.
// To be formally complete, we say that the absent state meets any
// restriction by label selector or field selector.
// A SharedInformer maintains a local cache --- exposed by GetStore(),
// by GetIndexer() in the case of an indexed informer, and possibly by
// machinery involved in creating and/or accessing the informer --- of
// the state of each relevant object. This cache is eventually
// consistent with the authoritative state. This means that, unless
// prevented by persistent communication problems, if ever a
// particular object ID X is authoritatively associated with a state S
// then for every SharedInformer I whose collection includes (X, S)
// eventually either (1) I's cache associates X with S or a later
// state of X, (2) I is stopped, or (3) the authoritative state
// service for X terminates. To be formally complete, we say that the
// absent state meets any restriction by label selector or field
// selector.
//
// For a given informer and relevant object ID X, the sequence of
// states that appears in the informer's cache is a subsequence of the
@ -98,7 +100,7 @@ import (
// added before `Run()`, eventually either the SharedInformer is
// stopped or the client is notified of the update. A client added
// after `Run()` starts gets a startup batch of notifications of
// additions of the object existing in the cache at the time that
// additions of the objects existing in the cache at the time that
// client was added; also, for every update to the SharedInformer's
// local cache after that client was added, eventually either the
// SharedInformer is stopped or that client is notified of that
@ -163,6 +165,21 @@ type SharedInformer interface {
// store. The value returned is not synchronized with access to the underlying store and is not
// thread-safe.
LastSyncResourceVersion() string
// The WatchErrorHandler is called whenever ListAndWatch drops the
// connection with an error. After calling this handler, the informer
// will backoff and retry.
//
// The default implementation looks at the error type and tries to log
// the error message at an appropriate level.
//
// There's only one handler, so if you call this multiple times, last one
// wins; calling after the informer has been started returns an error.
//
// The handler is intended for visibility, not to e.g. pause the consumers.
// The handler should return quickly - any expensive processing should be
// offloaded.
SetWatchErrorHandler(handler WatchErrorHandler) error
}
// SharedIndexInformer provides add and get Indexers ability based on SharedInformer.
@ -298,6 +315,9 @@ type sharedIndexInformer struct {
// blockDeltas gives a way to stop all event distribution so that a late event handler
// can safely join the shared informer.
blockDeltas sync.Mutex
// Called whenever the ListAndWatch drops the connection with an error.
watchErrorHandler WatchErrorHandler
}
// dummyController hides the fact that a SharedInformer is different from a dedicated one
@ -333,6 +353,18 @@ type deleteNotification struct {
oldObj interface{}
}
func (s *sharedIndexInformer) SetWatchErrorHandler(handler WatchErrorHandler) error {
s.startedLock.Lock()
defer s.startedLock.Unlock()
if s.started {
return fmt.Errorf("informer has already started")
}
s.watchErrorHandler = handler
return nil
}
func (s *sharedIndexInformer) Run(stopCh <-chan struct{}) {
defer utilruntime.HandleCrash()
@ -349,7 +381,8 @@ func (s *sharedIndexInformer) Run(stopCh <-chan struct{}) {
RetryOnError: false,
ShouldResync: s.processor.shouldResync,
Process: s.HandleDeltas,
Process: s.HandleDeltas,
WatchErrorHandler: s.watchErrorHandler,
}
func() {
@ -452,13 +485,13 @@ func (s *sharedIndexInformer) AddEventHandlerWithResyncPeriod(handler ResourceEv
if resyncPeriod > 0 {
if resyncPeriod < minimumResyncPeriod {
klog.Warningf("resyncPeriod %d is too small. Changing it to the minimum allowed value of %d", resyncPeriod, minimumResyncPeriod)
klog.Warningf("resyncPeriod %v is too small. Changing it to the minimum allowed value of %v", resyncPeriod, minimumResyncPeriod)
resyncPeriod = minimumResyncPeriod
}
if resyncPeriod < s.resyncCheckPeriod {
if s.started {
klog.Warningf("resyncPeriod %d is smaller than resyncCheckPeriod %d and the informer has already started. Changing it to %d", resyncPeriod, s.resyncCheckPeriod, s.resyncCheckPeriod)
klog.Warningf("resyncPeriod %v is smaller than resyncCheckPeriod %v and the informer has already started. Changing it to %v", resyncPeriod, s.resyncCheckPeriod, s.resyncCheckPeriod)
resyncPeriod = s.resyncCheckPeriod
} else {
// if the event handler's resyncPeriod is smaller than the current resyncCheckPeriod, update