2019-05-31 09:45:11 +00:00
/ *
Copyright 2014 The Kubernetes Authors .
Licensed under the Apache License , Version 2.0 ( the "License" ) ;
you may not use this file except in compliance with the License .
You may obtain a copy of the License at
http : //www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing , software
distributed under the License is distributed on an "AS IS" BASIS ,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND , either express or implied .
See the License for the specific language governing permissions and
limitations under the License .
* /
package cache
import (
2019-06-24 09:08:09 +00:00
"context"
2019-05-31 09:45:11 +00:00
"errors"
"fmt"
"io"
"math/rand"
2023-08-17 05:15:28 +00:00
"os"
2019-05-31 09:45:11 +00:00
"reflect"
2023-06-01 16:58:10 +00:00
"strings"
2019-05-31 09:45:11 +00:00
"sync"
"time"
2020-04-14 07:04:33 +00:00
apierrors "k8s.io/apimachinery/pkg/api/errors"
2019-05-31 09:45:11 +00:00
"k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
2020-01-14 10:38:55 +00:00
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
2019-05-31 09:45:11 +00:00
"k8s.io/apimachinery/pkg/runtime"
2020-01-14 10:38:55 +00:00
"k8s.io/apimachinery/pkg/runtime/schema"
2019-05-31 09:45:11 +00:00
"k8s.io/apimachinery/pkg/util/naming"
2020-01-14 10:38:55 +00:00
utilnet "k8s.io/apimachinery/pkg/util/net"
2019-05-31 09:45:11 +00:00
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/apimachinery/pkg/watch"
2019-06-24 09:08:09 +00:00
"k8s.io/client-go/tools/pager"
2020-12-17 12:28:29 +00:00
"k8s.io/klog/v2"
2021-12-08 13:50:47 +00:00
"k8s.io/utils/clock"
2023-06-01 16:58:10 +00:00
"k8s.io/utils/pointer"
2024-05-15 06:54:18 +00:00
"k8s.io/utils/ptr"
2019-05-31 09:45:11 +00:00
"k8s.io/utils/trace"
)
2020-01-14 10:38:55 +00:00
const defaultExpectedTypeName = "<unspecified>"
2019-05-31 09:45:11 +00:00
// Reflector watches a specified resource and causes all changes to be reflected in the given store.
type Reflector struct {
// name identifies this reflector. By default it will be a file:line if possible.
name string
2020-01-14 10:38:55 +00:00
// The name of the type we expect to place in the store. The name
// will be the stringification of expectedGVK if provided, and the
// stringification of expectedType otherwise. It is for display
// only, and should not be used for parsing or comparison.
2023-06-01 16:58:10 +00:00
typeDescription string
2020-04-14 07:04:33 +00:00
// An example object of the type we expect to place in the store.
// Only the type needs to be right, except that when that is
// `unstructured.Unstructured` the object's `"apiVersion"` and
// `"kind"` must also be right.
2019-05-31 09:45:11 +00:00
expectedType reflect . Type
2020-01-14 10:38:55 +00:00
// The GVK of the object we expect to place in the store if unstructured.
expectedGVK * schema . GroupVersionKind
2019-05-31 09:45:11 +00:00
// The destination to sync up with the watch source
store Store
// listerWatcher is used to perform lists and watches.
listerWatcher ListerWatcher
2020-04-14 07:04:33 +00:00
// backoff manages backoff of ListWatch
backoffManager wait . BackoffManager
2023-08-17 05:15:28 +00:00
resyncPeriod time . Duration
2019-05-31 09:45:11 +00:00
// clock allows tests to manipulate time
clock clock . Clock
2020-04-14 07:04:33 +00:00
// paginatedResult defines whether pagination should be forced for list calls.
// It is set based on the result of the initial list call.
paginatedResult bool
2019-05-31 09:45:11 +00:00
// lastSyncResourceVersion is the resource version token last
// observed when doing a sync with the underlying store
// it is thread safe, but not synchronized with the underlying store
lastSyncResourceVersion string
2020-07-24 14:20:51 +00:00
// isLastSyncResourceVersionUnavailable is true if the previous list or watch request with
// lastSyncResourceVersion failed with an "expired" or "too large resource version" error.
isLastSyncResourceVersionUnavailable bool
2019-05-31 09:45:11 +00:00
// lastSyncResourceVersionMutex guards read/write access to lastSyncResourceVersion
lastSyncResourceVersionMutex sync . RWMutex
2023-06-01 16:58:10 +00:00
// Called whenever the ListAndWatch drops the connection with an error.
watchErrorHandler WatchErrorHandler
2019-06-24 09:08:09 +00:00
// WatchListPageSize is the requested chunk size of initial and resync watch lists.
2020-04-14 07:04:33 +00:00
// If unset, for consistent reads (RV="") or reads that opt-into arbitrarily old data
// (RV="0") it will default to pager.PageSize, for the rest (RV != "" && RV != "0")
// it will turn off pagination to allow serving them from watch cache.
// NOTE: It should be used carefully as paginated lists are always served directly from
// etcd, which is significantly less efficient and may lead to serious performance and
// scalability problems.
2019-06-24 09:08:09 +00:00
WatchListPageSize int64
2023-06-01 16:58:10 +00:00
// ShouldResync is invoked periodically and whenever it returns `true` the Store's Resync operation is invoked
ShouldResync func ( ) bool
// MaxInternalErrorRetryDuration defines how long we should retry internal errors returned by watch.
MaxInternalErrorRetryDuration time . Duration
// UseWatchList if turned on instructs the reflector to open a stream to bring data from the API server.
// Streaming has the primary advantage of using fewer server's resources to fetch data.
//
// The old behaviour establishes a LIST request which gets data in chunks.
// Paginated list is less efficient and depending on the actual size of objects
// might result in an increased memory consumption of the APIServer.
//
// See https://github.com/kubernetes/enhancements/tree/master/keps/sig-api-machinery/3157-watch-list#design-details
2024-05-15 06:54:18 +00:00
//
// TODO(#115478): Consider making reflector.UseWatchList a private field. Since we implemented "api streaming" on the etcd storage layer it should work.
UseWatchList * bool
2020-12-17 12:28:29 +00:00
}
// ResourceVersionUpdater is an interface that allows store implementation to
// track the current resource version of the reflector. This is especially
// important if storage bookmarks are enabled.
type ResourceVersionUpdater interface {
// UpdateResourceVersion is called each time current resource version of the reflector
// is updated.
UpdateResourceVersion ( resourceVersion string )
}
// The WatchErrorHandler is called whenever ListAndWatch drops the
// connection with an error. After calling this handler, the informer
// will backoff and retry.
//
// The default implementation looks at the error type and tries to log
// the error message at an appropriate level.
//
// Implementations of this handler may display the error message in other
// ways. Implementations should return quickly - any expensive processing
// should be offloaded.
type WatchErrorHandler func ( r * Reflector , err error )
// DefaultWatchErrorHandler is the default implementation of WatchErrorHandler
func DefaultWatchErrorHandler ( r * Reflector , err error ) {
switch {
case isExpiredError ( err ) :
// Don't set LastSyncResourceVersionUnavailable - LIST call with ResourceVersion=RV already
// has a semantic that it returns data at least as fresh as provided RV.
// So first try to LIST with setting RV to resource version of last observed object.
2023-06-01 16:58:10 +00:00
klog . V ( 4 ) . Infof ( "%s: watch of %v closed with: %v" , r . name , r . typeDescription , err )
2020-12-17 12:28:29 +00:00
case err == io . EOF :
// watch closed normally
case err == io . ErrUnexpectedEOF :
2023-06-01 16:58:10 +00:00
klog . V ( 1 ) . Infof ( "%s: Watch for %v closed with unexpected EOF: %v" , r . name , r . typeDescription , err )
2020-12-17 12:28:29 +00:00
default :
2023-06-01 16:58:10 +00:00
utilruntime . HandleError ( fmt . Errorf ( "%s: Failed to watch %v: %v" , r . name , r . typeDescription , err ) )
2020-12-17 12:28:29 +00:00
}
2019-05-31 09:45:11 +00:00
}
var (
// We try to spread the load on apiserver by setting timeouts for
// watch requests - it is random in [minWatchTimeout, 2*minWatchTimeout].
minWatchTimeout = 5 * time . Minute
)
// NewNamespaceKeyedIndexerAndReflector creates an Indexer and a Reflector
// The indexer is configured to key on namespace
func NewNamespaceKeyedIndexerAndReflector ( lw ListerWatcher , expectedType interface { } , resyncPeriod time . Duration ) ( indexer Indexer , reflector * Reflector ) {
2019-06-24 09:08:09 +00:00
indexer = NewIndexer ( MetaNamespaceKeyFunc , Indexers { NamespaceIndex : MetaNamespaceIndexFunc } )
2019-05-31 09:45:11 +00:00
reflector = NewReflector ( lw , expectedType , indexer , resyncPeriod )
return indexer , reflector
}
2023-06-01 16:58:10 +00:00
// NewReflector creates a new Reflector with its name defaulted to the closest source_file.go:line in the call stack
// that is outside this package. See NewReflectorWithOptions for further information.
func NewReflector ( lw ListerWatcher , expectedType interface { } , store Store , resyncPeriod time . Duration ) * Reflector {
return NewReflectorWithOptions ( lw , expectedType , store , ReflectorOptions { ResyncPeriod : resyncPeriod } )
}
// NewNamedReflector creates a new Reflector with the specified name. See NewReflectorWithOptions for further
// information.
func NewNamedReflector ( name string , lw ListerWatcher , expectedType interface { } , store Store , resyncPeriod time . Duration ) * Reflector {
return NewReflectorWithOptions ( lw , expectedType , store , ReflectorOptions { Name : name , ResyncPeriod : resyncPeriod } )
}
// ReflectorOptions configures a Reflector.
type ReflectorOptions struct {
// Name is the Reflector's name. If unset/unspecified, the name defaults to the closest source_file.go:line
// in the call stack that is outside this package.
Name string
// TypeDescription is the Reflector's type description. If unset/unspecified, the type description is defaulted
// using the following rules: if the expectedType passed to NewReflectorWithOptions was nil, the type description is
// "<unspecified>". If the expectedType is an instance of *unstructured.Unstructured and its apiVersion and kind fields
// are set, the type description is the string encoding of those. Otherwise, the type description is set to the
// go type of expectedType..
TypeDescription string
// ResyncPeriod is the Reflector's resync period. If unset/unspecified, the resync period defaults to 0
// (do not resync).
ResyncPeriod time . Duration
// Clock allows tests to control time. If unset defaults to clock.RealClock{}
Clock clock . Clock
}
// NewReflectorWithOptions creates a new Reflector object which will keep the
2020-04-14 07:04:33 +00:00
// given store up to date with the server's contents for the given
// resource. Reflector promises to only put things in the store that
// have the type of expectedType, unless expectedType is nil. If
// resyncPeriod is non-zero, then the reflector will periodically
// consult its ShouldResync function to determine whether to invoke
// the Store's Resync operation; `ShouldResync==nil` means always
// "yes". This enables you to use reflectors to periodically process
// everything as well as incrementally processing the things that
// change.
2023-06-01 16:58:10 +00:00
func NewReflectorWithOptions ( lw ListerWatcher , expectedType interface { } , store Store , options ReflectorOptions ) * Reflector {
reflectorClock := options . Clock
if reflectorClock == nil {
reflectorClock = clock . RealClock { }
}
2019-05-31 09:45:11 +00:00
r := & Reflector {
2023-06-01 16:58:10 +00:00
name : options . Name ,
resyncPeriod : options . ResyncPeriod ,
typeDescription : options . TypeDescription ,
listerWatcher : lw ,
store : store ,
2020-04-14 07:04:33 +00:00
// We used to make the call every 1sec (1 QPS), the goal here is to achieve ~98% traffic reduction when
// API server is not healthy. With these parameters, backoff will stop at [30,60) sec interval which is
// 0.22 QPS. If we don't backoff for 2min, assume API server is healthy and we reset the backoff.
2023-08-17 05:15:28 +00:00
backoffManager : wait . NewExponentialBackoffManager ( 800 * time . Millisecond , 30 * time . Second , 2 * time . Minute , 2.0 , 1.0 , reflectorClock ) ,
clock : reflectorClock ,
watchErrorHandler : WatchErrorHandler ( DefaultWatchErrorHandler ) ,
expectedType : reflect . TypeOf ( expectedType ) ,
2019-05-31 09:45:11 +00:00
}
2023-06-01 16:58:10 +00:00
if r . name == "" {
r . name = naming . GetNameFromCallsite ( internalPackages ... )
}
if r . typeDescription == "" {
r . typeDescription = getTypeDescriptionFromObject ( expectedType )
}
if r . expectedGVK == nil {
r . expectedGVK = getExpectedGVKFromObject ( expectedType )
}
2024-05-15 06:54:18 +00:00
// don't overwrite UseWatchList if already set
// because the higher layers (e.g. storage/cacher) disabled it on purpose
if r . UseWatchList == nil {
if s := os . Getenv ( "ENABLE_CLIENT_GO_WATCH_LIST_ALPHA" ) ; len ( s ) > 0 {
r . UseWatchList = ptr . To ( true )
}
2023-08-17 05:15:28 +00:00
}
2019-05-31 09:45:11 +00:00
return r
}
2023-06-01 16:58:10 +00:00
func getTypeDescriptionFromObject ( expectedType interface { } ) string {
if expectedType == nil {
return defaultExpectedTypeName
2020-01-14 10:38:55 +00:00
}
2023-06-01 16:58:10 +00:00
reflectDescription := reflect . TypeOf ( expectedType ) . String ( )
2020-01-14 10:38:55 +00:00
2023-06-01 16:58:10 +00:00
obj , ok := expectedType . ( * unstructured . Unstructured )
if ! ok {
return reflectDescription
2020-01-14 10:38:55 +00:00
}
2023-06-01 16:58:10 +00:00
gvk := obj . GroupVersionKind ( )
if gvk . Empty ( ) {
return reflectDescription
}
return gvk . String ( )
}
func getExpectedGVKFromObject ( expectedType interface { } ) * schema . GroupVersionKind {
obj , ok := expectedType . ( * unstructured . Unstructured )
if ! ok {
return nil
}
gvk := obj . GroupVersionKind ( )
if gvk . Empty ( ) {
return nil
}
return & gvk
2020-01-14 10:38:55 +00:00
}
2019-05-31 09:45:11 +00:00
// internalPackages are packages that ignored when creating a default reflector name. These packages are in the common
// call chains to NewReflector, so they'd be low entropy names for reflectors
var internalPackages = [ ] string { "client-go/tools/cache/" }
2020-04-14 07:04:33 +00:00
// Run repeatedly uses the reflector's ListAndWatch to fetch all the
// objects and subsequent deltas.
2019-05-31 09:45:11 +00:00
// Run will exit when stopCh is closed.
func ( r * Reflector ) Run ( stopCh <- chan struct { } ) {
2023-06-01 16:58:10 +00:00
klog . V ( 3 ) . Infof ( "Starting reflector %s (%s) from %s" , r . typeDescription , r . resyncPeriod , r . name )
2020-04-14 07:04:33 +00:00
wait . BackoffUntil ( func ( ) {
2019-05-31 09:45:11 +00:00
if err := r . ListAndWatch ( stopCh ) ; err != nil {
2020-12-17 12:28:29 +00:00
r . watchErrorHandler ( r , err )
2019-05-31 09:45:11 +00:00
}
2020-04-14 07:04:33 +00:00
} , r . backoffManager , true , stopCh )
2023-06-01 16:58:10 +00:00
klog . V ( 3 ) . Infof ( "Stopping reflector %s (%s) from %s" , r . typeDescription , r . resyncPeriod , r . name )
2019-05-31 09:45:11 +00:00
}
var (
// nothing will ever be sent down this channel
neverExitWatch <- chan time . Time = make ( chan time . Time )
// Used to indicate that watching stopped because of a signal from the stop
// channel passed in from a client of the reflector.
2022-05-05 02:47:06 +00:00
errorStopRequested = errors . New ( "stop requested" )
2019-05-31 09:45:11 +00:00
)
// resyncChan returns a channel which will receive something when a resync is
// required, and a cleanup function.
func ( r * Reflector ) resyncChan ( ) ( <- chan time . Time , func ( ) bool ) {
if r . resyncPeriod == 0 {
return neverExitWatch , func ( ) bool { return false }
}
// The cleanup function is required: imagine the scenario where watches
// always fail so we end up listing frequently. Then, if we don't
// manually stop the timer, we could end up with many timers active
// concurrently.
t := r . clock . NewTimer ( r . resyncPeriod )
return t . C ( ) , t . Stop
}
// ListAndWatch first lists all items and get the resource version at the moment of call,
// and then use the resource version to watch.
// It returns error if ListAndWatch didn't even try to initialize watch.
func ( r * Reflector ) ListAndWatch ( stopCh <- chan struct { } ) error {
2023-06-01 16:58:10 +00:00
klog . V ( 3 ) . Infof ( "Listing and watching %v from %s" , r . typeDescription , r . name )
var err error
var w watch . Interface
2024-05-15 06:54:18 +00:00
useWatchList := ptr . Deref ( r . UseWatchList , false )
fallbackToList := ! useWatchList
2020-04-14 07:04:33 +00:00
2024-05-15 06:54:18 +00:00
if useWatchList {
2023-06-01 16:58:10 +00:00
w , err = r . watchList ( stopCh )
if w == nil && err == nil {
// stopCh was closed
return nil
}
if err != nil {
2023-12-20 12:23:59 +00:00
klog . Warningf ( "The watchlist request ended with an error, falling back to the standard LIST/WATCH semantics because making progress is better than deadlocking, err = %v" , err )
2023-06-01 16:58:10 +00:00
fallbackToList = true
2023-12-20 12:23:59 +00:00
// ensure that we won't accidentally pass some garbage down the watch.
2023-06-01 16:58:10 +00:00
w = nil
}
}
if fallbackToList {
err = r . list ( stopCh )
if err != nil {
return err
}
2019-05-31 09:45:11 +00:00
}
2023-12-20 12:23:59 +00:00
klog . V ( 2 ) . Infof ( "Caches populated for %v from %s" , r . typeDescription , r . name )
2019-05-31 09:45:11 +00:00
resyncerrc := make ( chan error , 1 )
cancelCh := make ( chan struct { } )
defer close ( cancelCh )
2023-06-01 16:58:10 +00:00
go r . startResync ( stopCh , cancelCh , resyncerrc )
return r . watch ( w , stopCh , resyncerrc )
}
// startResync periodically calls r.store.Resync() method.
// Note that this method is blocking and should be
// called in a separate goroutine.
func ( r * Reflector ) startResync ( stopCh <- chan struct { } , cancelCh <- chan struct { } , resyncerrc chan error ) {
resyncCh , cleanup := r . resyncChan ( )
defer func ( ) {
cleanup ( ) // Call the last one written into cleanup
} ( )
for {
select {
case <- resyncCh :
case <- stopCh :
return
case <- cancelCh :
return
}
if r . ShouldResync == nil || r . ShouldResync ( ) {
klog . V ( 4 ) . Infof ( "%s: forcing resync" , r . name )
if err := r . store . Resync ( ) ; err != nil {
resyncerrc <- err
2019-05-31 09:45:11 +00:00
return
}
}
2023-06-01 16:58:10 +00:00
cleanup ( )
resyncCh , cleanup = r . resyncChan ( )
}
}
2019-05-31 09:45:11 +00:00
2023-06-01 16:58:10 +00:00
// watch simply starts a watch request with the server.
func ( r * Reflector ) watch ( w watch . Interface , stopCh <- chan struct { } , resyncerrc chan error ) error {
var err error
2022-08-24 02:24:25 +00:00
retry := NewRetryWithDeadline ( r . MaxInternalErrorRetryDuration , time . Minute , apierrors . IsInternalError , r . clock )
2023-06-01 16:58:10 +00:00
2019-05-31 09:45:11 +00:00
for {
// give the stopCh a chance to stop the loop, even in case of continue statements further down on errors
select {
case <- stopCh :
2023-12-20 12:23:59 +00:00
// we can only end up here when the stopCh
// was closed after a successful watchlist or list request
if w != nil {
w . Stop ( )
}
2019-05-31 09:45:11 +00:00
return nil
default :
}
2020-07-24 14:20:51 +00:00
// start the clock before sending the request, since some proxies won't flush headers until after the first watch event is sent
start := r . clock . Now ( )
2023-06-01 16:58:10 +00:00
if w == nil {
timeoutSeconds := int64 ( minWatchTimeout . Seconds ( ) * ( rand . Float64 ( ) + 1.0 ) )
options := metav1 . ListOptions {
ResourceVersion : r . LastSyncResourceVersion ( ) ,
// We want to avoid situations of hanging watchers. Stop any watchers that do not
// receive any events within the timeout window.
TimeoutSeconds : & timeoutSeconds ,
// To reduce load on kube-apiserver on watch restarts, you may enable watch bookmarks.
// Reflector doesn't assume bookmarks are returned at all (if the server do not support
// watch bookmarks, it will ignore this field).
AllowWatchBookmarks : true ,
}
w , err = r . listerWatcher . Watch ( options )
if err != nil {
if canRetry := isWatchErrorRetriable ( err ) ; canRetry {
klog . V ( 4 ) . Infof ( "%s: watch of %v returned %v - backing off" , r . name , r . typeDescription , err )
select {
case <- stopCh :
return nil
2023-08-17 05:15:28 +00:00
case <- r . backoffManager . Backoff ( ) . C ( ) :
2023-06-01 16:58:10 +00:00
continue
}
}
return err
2019-05-31 09:45:11 +00:00
}
}
2023-06-01 16:58:10 +00:00
err = watchHandler ( start , w , r . store , r . expectedType , r . expectedGVK , r . name , r . typeDescription , r . setLastSyncResourceVersion , nil , r . clock , resyncerrc , stopCh )
// Ensure that watch will not be reused across iterations.
w . Stop ( )
w = nil
2022-08-24 02:24:25 +00:00
retry . After ( err )
if err != nil {
2019-05-31 09:45:11 +00:00
if err != errorStopRequested {
2020-01-14 10:38:55 +00:00
switch {
2020-04-14 07:04:33 +00:00
case isExpiredError ( err ) :
2020-07-24 14:20:51 +00:00
// Don't set LastSyncResourceVersionUnavailable - LIST call with ResourceVersion=RV already
2020-04-14 07:04:33 +00:00
// has a semantic that it returns data at least as fresh as provided RV.
// So first try to LIST with setting RV to resource version of last observed object.
2023-06-01 16:58:10 +00:00
klog . V ( 4 ) . Infof ( "%s: watch of %v closed with: %v" , r . name , r . typeDescription , err )
2021-08-09 07:19:24 +00:00
case apierrors . IsTooManyRequests ( err ) :
2023-06-01 16:58:10 +00:00
klog . V ( 2 ) . Infof ( "%s: watch of %v returned 429 - backing off" , r . name , r . typeDescription )
select {
case <- stopCh :
return nil
2023-08-17 05:15:28 +00:00
case <- r . backoffManager . Backoff ( ) . C ( ) :
2023-06-01 16:58:10 +00:00
continue
}
2022-08-24 02:24:25 +00:00
case apierrors . IsInternalError ( err ) && retry . ShouldRetry ( ) :
2023-06-01 16:58:10 +00:00
klog . V ( 2 ) . Infof ( "%s: retrying watch of %v internal error: %v" , r . name , r . typeDescription , err )
2022-08-24 02:24:25 +00:00
continue
2020-01-14 10:38:55 +00:00
default :
2023-06-01 16:58:10 +00:00
klog . Warningf ( "%s: watch of %v ended with: %v" , r . name , r . typeDescription , err )
2020-01-14 10:38:55 +00:00
}
2019-05-31 09:45:11 +00:00
}
return nil
}
}
}
2022-08-24 02:24:25 +00:00
// list simply lists all items and records a resource version obtained from the server at the moment of the call.
// the resource version can be used for further progress notification (aka. watch).
func ( r * Reflector ) list ( stopCh <- chan struct { } ) error {
var resourceVersion string
options := metav1 . ListOptions { ResourceVersion : r . relistResourceVersion ( ) }
initTrace := trace . New ( "Reflector ListAndWatch" , trace . Field { Key : "name" , Value : r . name } )
defer initTrace . LogIfLong ( 10 * time . Second )
var list runtime . Object
var paginatedResult bool
var err error
listCh := make ( chan struct { } , 1 )
panicCh := make ( chan interface { } , 1 )
go func ( ) {
defer func ( ) {
if r := recover ( ) ; r != nil {
panicCh <- r
}
} ( )
// Attempt to gather list in chunks, if supported by listerWatcher, if not, the first
// list request will return the full response.
pager := pager . New ( pager . SimplePageFunc ( func ( opts metav1 . ListOptions ) ( runtime . Object , error ) {
return r . listerWatcher . List ( opts )
} ) )
switch {
case r . WatchListPageSize != 0 :
pager . PageSize = r . WatchListPageSize
case r . paginatedResult :
// We got a paginated result initially. Assume this resource and server honor
// paging requests (i.e. watch cache is probably disabled) and leave the default
// pager size set.
case options . ResourceVersion != "" && options . ResourceVersion != "0" :
// User didn't explicitly request pagination.
//
// With ResourceVersion != "", we have a possibility to list from watch cache,
// but we do that (for ResourceVersion != "0") only if Limit is unset.
// To avoid thundering herd on etcd (e.g. on master upgrades), we explicitly
// switch off pagination to force listing from watch cache (if enabled).
// With the existing semantic of RV (result is at least as fresh as provided RV),
// this is correct and doesn't lead to going back in time.
//
// We also don't turn off pagination for ResourceVersion="0", since watch cache
// is ignoring Limit in that case anyway, and if watch cache is not enabled
// we don't introduce regression.
pager . PageSize = 0
}
2023-08-17 05:15:28 +00:00
list , paginatedResult , err = pager . ListWithAlloc ( context . Background ( ) , options )
2022-08-24 02:24:25 +00:00
if isExpiredError ( err ) || isTooLargeResourceVersionError ( err ) {
r . setIsLastSyncResourceVersionUnavailable ( true )
// Retry immediately if the resource version used to list is unavailable.
// The pager already falls back to full list if paginated list calls fail due to an "Expired" error on
// continuation pages, but the pager might not be enabled, the full list might fail because the
// resource version it is listing at is expired or the cache may not yet be synced to the provided
// resource version. So we need to fallback to resourceVersion="" in all to recover and ensure
// the reflector makes forward progress.
2023-08-17 05:15:28 +00:00
list , paginatedResult , err = pager . ListWithAlloc ( context . Background ( ) , metav1 . ListOptions { ResourceVersion : r . relistResourceVersion ( ) } )
2022-08-24 02:24:25 +00:00
}
close ( listCh )
} ( )
select {
case <- stopCh :
return nil
case r := <- panicCh :
panic ( r )
case <- listCh :
}
initTrace . Step ( "Objects listed" , trace . Field { Key : "error" , Value : err } )
if err != nil {
2023-06-01 16:58:10 +00:00
klog . Warningf ( "%s: failed to list %v: %v" , r . name , r . typeDescription , err )
return fmt . Errorf ( "failed to list %v: %w" , r . typeDescription , err )
2022-08-24 02:24:25 +00:00
}
// We check if the list was paginated and if so set the paginatedResult based on that.
// However, we want to do that only for the initial list (which is the only case
// when we set ResourceVersion="0"). The reasoning behind it is that later, in some
// situations we may force listing directly from etcd (by setting ResourceVersion="")
// which will return paginated result, even if watch cache is enabled. However, in
// that case, we still want to prefer sending requests to watch cache if possible.
//
// Paginated result returned for request with ResourceVersion="0" mean that watch
// cache is disabled and there are a lot of objects of a given type. In such case,
// there is no need to prefer listing from watch cache.
if options . ResourceVersion == "0" && paginatedResult {
r . paginatedResult = true
}
r . setIsLastSyncResourceVersionUnavailable ( false ) // list was successful
listMetaInterface , err := meta . ListAccessor ( list )
if err != nil {
return fmt . Errorf ( "unable to understand list result %#v: %v" , list , err )
}
resourceVersion = listMetaInterface . GetResourceVersion ( )
initTrace . Step ( "Resource version extracted" )
2023-08-17 05:15:28 +00:00
items , err := meta . ExtractListWithAlloc ( list )
2022-08-24 02:24:25 +00:00
if err != nil {
return fmt . Errorf ( "unable to understand list result %#v (%v)" , list , err )
}
initTrace . Step ( "Objects extracted" )
if err := r . syncWith ( items , resourceVersion ) ; err != nil {
return fmt . Errorf ( "unable to sync list result: %v" , err )
}
initTrace . Step ( "SyncWith done" )
r . setLastSyncResourceVersion ( resourceVersion )
initTrace . Step ( "Resource version updated" )
return nil
}
2023-06-01 16:58:10 +00:00
// watchList establishes a stream to get a consistent snapshot of data
// from the server as described in https://github.com/kubernetes/enhancements/tree/master/keps/sig-api-machinery/3157-watch-list#proposal
//
// case 1: start at Most Recent (RV="", ResourceVersionMatch=ResourceVersionMatchNotOlderThan)
// Establishes a consistent stream with the server.
// That means the returned data is consistent, as if, served directly from etcd via a quorum read.
// It begins with synthetic "Added" events of all resources up to the most recent ResourceVersion.
// It ends with a synthetic "Bookmark" event containing the most recent ResourceVersion.
// After receiving a "Bookmark" event the reflector is considered to be synchronized.
// It replaces its internal store with the collected items and
// reuses the current watch requests for getting further events.
//
// case 2: start at Exact (RV>"0", ResourceVersionMatch=ResourceVersionMatchNotOlderThan)
// Establishes a stream with the server at the provided resource version.
// To establish the initial state the server begins with synthetic "Added" events.
// It ends with a synthetic "Bookmark" event containing the provided or newer resource version.
// After receiving a "Bookmark" event the reflector is considered to be synchronized.
// It replaces its internal store with the collected items and
// reuses the current watch requests for getting further events.
func ( r * Reflector ) watchList ( stopCh <- chan struct { } ) ( watch . Interface , error ) {
var w watch . Interface
var err error
var temporaryStore Store
var resourceVersion string
// TODO(#115478): see if this function could be turned
// into a method and see if error handling
// could be unified with the r.watch method
isErrorRetriableWithSideEffectsFn := func ( err error ) bool {
if canRetry := isWatchErrorRetriable ( err ) ; canRetry {
klog . V ( 2 ) . Infof ( "%s: watch-list of %v returned %v - backing off" , r . name , r . typeDescription , err )
2023-08-17 05:15:28 +00:00
<- r . backoffManager . Backoff ( ) . C ( )
2023-06-01 16:58:10 +00:00
return true
}
if isExpiredError ( err ) || isTooLargeResourceVersionError ( err ) {
// we tried to re-establish a watch request but the provided RV
// has either expired or it is greater than the server knows about.
// In that case we reset the RV and
// try to get a consistent snapshot from the watch cache (case 1)
r . setIsLastSyncResourceVersionUnavailable ( true )
return true
}
return false
}
initTrace := trace . New ( "Reflector WatchList" , trace . Field { Key : "name" , Value : r . name } )
defer initTrace . LogIfLong ( 10 * time . Second )
for {
select {
case <- stopCh :
return nil , nil
default :
}
resourceVersion = ""
lastKnownRV := r . rewatchResourceVersion ( )
temporaryStore = NewStore ( DeletionHandlingMetaNamespaceKeyFunc )
// TODO(#115478): large "list", slow clients, slow network, p&f
// might slow down streaming and eventually fail.
// maybe in such a case we should retry with an increased timeout?
timeoutSeconds := int64 ( minWatchTimeout . Seconds ( ) * ( rand . Float64 ( ) + 1.0 ) )
options := metav1 . ListOptions {
ResourceVersion : lastKnownRV ,
AllowWatchBookmarks : true ,
SendInitialEvents : pointer . Bool ( true ) ,
ResourceVersionMatch : metav1 . ResourceVersionMatchNotOlderThan ,
TimeoutSeconds : & timeoutSeconds ,
}
start := r . clock . Now ( )
w , err = r . listerWatcher . Watch ( options )
if err != nil {
if isErrorRetriableWithSideEffectsFn ( err ) {
continue
}
return nil , err
}
bookmarkReceived := pointer . Bool ( false )
err = watchHandler ( start , w , temporaryStore , r . expectedType , r . expectedGVK , r . name , r . typeDescription ,
func ( rv string ) { resourceVersion = rv } ,
bookmarkReceived ,
r . clock , make ( chan error ) , stopCh )
if err != nil {
w . Stop ( ) // stop and retry with clean state
if err == errorStopRequested {
return nil , nil
}
if isErrorRetriableWithSideEffectsFn ( err ) {
continue
}
return nil , err
}
if * bookmarkReceived {
break
}
}
// We successfully got initial state from watch-list confirmed by the
// "k8s.io/initial-events-end" bookmark.
initTrace . Step ( "Objects streamed" , trace . Field { Key : "count" , Value : len ( temporaryStore . List ( ) ) } )
r . setIsLastSyncResourceVersionUnavailable ( false )
2023-12-20 12:23:59 +00:00
// we utilize the temporaryStore to ensure independence from the current store implementation.
// as of today, the store is implemented as a queue and will be drained by the higher-level
// component as soon as it finishes replacing the content.
checkWatchListConsistencyIfRequested ( stopCh , r . name , resourceVersion , r . listerWatcher , temporaryStore )
2023-06-01 16:58:10 +00:00
if err = r . store . Replace ( temporaryStore . List ( ) , resourceVersion ) ; err != nil {
return nil , fmt . Errorf ( "unable to sync watch-list result: %v" , err )
}
initTrace . Step ( "SyncWith done" )
r . setLastSyncResourceVersion ( resourceVersion )
return w , nil
}
2019-05-31 09:45:11 +00:00
// syncWith replaces the store's items with the given list.
func ( r * Reflector ) syncWith ( items [ ] runtime . Object , resourceVersion string ) error {
found := make ( [ ] interface { } , 0 , len ( items ) )
for _ , item := range items {
found = append ( found , item )
}
return r . store . Replace ( found , resourceVersion )
}
2022-08-24 02:24:25 +00:00
// watchHandler watches w and sets setLastSyncResourceVersion
func watchHandler ( start time . Time ,
w watch . Interface ,
store Store ,
expectedType reflect . Type ,
expectedGVK * schema . GroupVersionKind ,
name string ,
expectedTypeName string ,
setLastSyncResourceVersion func ( string ) ,
2023-06-01 16:58:10 +00:00
exitOnInitialEventsEndBookmark * bool ,
2022-08-24 02:24:25 +00:00
clock clock . Clock ,
errc chan error ,
stopCh <- chan struct { } ,
) error {
2019-05-31 09:45:11 +00:00
eventCount := 0
2023-06-01 16:58:10 +00:00
if exitOnInitialEventsEndBookmark != nil {
// set it to false just in case somebody
// made it positive
* exitOnInitialEventsEndBookmark = false
}
2019-05-31 09:45:11 +00:00
loop :
for {
select {
case <- stopCh :
return errorStopRequested
case err := <- errc :
return err
case event , ok := <- w . ResultChan ( ) :
if ! ok {
break loop
}
if event . Type == watch . Error {
2020-04-14 07:04:33 +00:00
return apierrors . FromObject ( event . Object )
2019-05-31 09:45:11 +00:00
}
2022-08-24 02:24:25 +00:00
if expectedType != nil {
if e , a := expectedType , reflect . TypeOf ( event . Object ) ; e != a {
utilruntime . HandleError ( fmt . Errorf ( "%s: expected type %v, but watch event object had type %v" , name , e , a ) )
2020-01-14 10:38:55 +00:00
continue
}
}
2022-08-24 02:24:25 +00:00
if expectedGVK != nil {
if e , a := * expectedGVK , event . Object . GetObjectKind ( ) . GroupVersionKind ( ) ; e != a {
utilruntime . HandleError ( fmt . Errorf ( "%s: expected gvk %v, but watch event object had gvk %v" , name , e , a ) )
2020-01-14 10:38:55 +00:00
continue
}
2019-05-31 09:45:11 +00:00
}
meta , err := meta . Accessor ( event . Object )
if err != nil {
2022-08-24 02:24:25 +00:00
utilruntime . HandleError ( fmt . Errorf ( "%s: unable to understand watch event %#v" , name , event ) )
2019-05-31 09:45:11 +00:00
continue
}
2022-08-24 02:24:25 +00:00
resourceVersion := meta . GetResourceVersion ( )
2019-05-31 09:45:11 +00:00
switch event . Type {
case watch . Added :
2022-08-24 02:24:25 +00:00
err := store . Add ( event . Object )
2019-05-31 09:45:11 +00:00
if err != nil {
2022-08-24 02:24:25 +00:00
utilruntime . HandleError ( fmt . Errorf ( "%s: unable to add watch event object (%#v) to store: %v" , name , event . Object , err ) )
2019-05-31 09:45:11 +00:00
}
case watch . Modified :
2022-08-24 02:24:25 +00:00
err := store . Update ( event . Object )
2019-05-31 09:45:11 +00:00
if err != nil {
2022-08-24 02:24:25 +00:00
utilruntime . HandleError ( fmt . Errorf ( "%s: unable to update watch event object (%#v) to store: %v" , name , event . Object , err ) )
2019-05-31 09:45:11 +00:00
}
case watch . Deleted :
// TODO: Will any consumers need access to the "last known
// state", which is passed in event.Object? If so, may need
// to change this.
2022-08-24 02:24:25 +00:00
err := store . Delete ( event . Object )
2019-05-31 09:45:11 +00:00
if err != nil {
2022-08-24 02:24:25 +00:00
utilruntime . HandleError ( fmt . Errorf ( "%s: unable to delete watch event object (%#v) from store: %v" , name , event . Object , err ) )
2019-05-31 09:45:11 +00:00
}
2019-06-24 09:08:09 +00:00
case watch . Bookmark :
// A `Bookmark` means watch has synced here, just update the resourceVersion
2023-12-20 12:23:59 +00:00
if meta . GetAnnotations ( ) [ "k8s.io/initial-events-end" ] == "true" {
2023-06-01 16:58:10 +00:00
if exitOnInitialEventsEndBookmark != nil {
* exitOnInitialEventsEndBookmark = true
}
}
2019-05-31 09:45:11 +00:00
default :
2022-08-24 02:24:25 +00:00
utilruntime . HandleError ( fmt . Errorf ( "%s: unable to understand watch event %#v" , name , event ) )
2019-05-31 09:45:11 +00:00
}
2022-08-24 02:24:25 +00:00
setLastSyncResourceVersion ( resourceVersion )
if rvu , ok := store . ( ResourceVersionUpdater ) ; ok {
rvu . UpdateResourceVersion ( resourceVersion )
2020-12-17 12:28:29 +00:00
}
2019-05-31 09:45:11 +00:00
eventCount ++
2023-06-01 16:58:10 +00:00
if exitOnInitialEventsEndBookmark != nil && * exitOnInitialEventsEndBookmark {
watchDuration := clock . Since ( start )
klog . V ( 4 ) . Infof ( "exiting %v Watch because received the bookmark that marks the end of initial events stream, total %v items received in %v" , name , eventCount , watchDuration )
return nil
}
2019-05-31 09:45:11 +00:00
}
}
2022-08-24 02:24:25 +00:00
watchDuration := clock . Since ( start )
2019-05-31 09:45:11 +00:00
if watchDuration < 1 * time . Second && eventCount == 0 {
2022-08-24 02:24:25 +00:00
return fmt . Errorf ( "very short watch: %s: Unexpected watch close - watch lasted less than a second and no items received" , name )
2019-05-31 09:45:11 +00:00
}
2022-08-24 02:24:25 +00:00
klog . V ( 4 ) . Infof ( "%s: Watch close - %v total %v items received" , name , expectedTypeName , eventCount )
2019-05-31 09:45:11 +00:00
return nil
}
// LastSyncResourceVersion is the resource version observed when last sync with the underlying store
// The value returned is not synchronized with access to the underlying store and is not thread-safe
func ( r * Reflector ) LastSyncResourceVersion ( ) string {
r . lastSyncResourceVersionMutex . RLock ( )
defer r . lastSyncResourceVersionMutex . RUnlock ( )
return r . lastSyncResourceVersion
}
func ( r * Reflector ) setLastSyncResourceVersion ( v string ) {
r . lastSyncResourceVersionMutex . Lock ( )
defer r . lastSyncResourceVersionMutex . Unlock ( )
r . lastSyncResourceVersion = v
}
2020-04-14 07:04:33 +00:00
// relistResourceVersion determines the resource version the reflector should list or relist from.
// Returns either the lastSyncResourceVersion so that this reflector will relist with a resource
// versions no older than has already been observed in relist results or watch events, or, if the last relist resulted
// in an HTTP 410 (Gone) status code, returns "" so that the relist will use the latest resource version available in
// etcd via a quorum read.
func ( r * Reflector ) relistResourceVersion ( ) string {
r . lastSyncResourceVersionMutex . RLock ( )
defer r . lastSyncResourceVersionMutex . RUnlock ( )
2020-07-24 14:20:51 +00:00
if r . isLastSyncResourceVersionUnavailable {
2020-04-14 07:04:33 +00:00
// Since this reflector makes paginated list requests, and all paginated list requests skip the watch cache
2020-07-24 14:20:51 +00:00
// if the lastSyncResourceVersion is unavailable, we set ResourceVersion="" and list again to re-establish reflector
2020-04-14 07:04:33 +00:00
// to the latest available ResourceVersion, using a consistent read from etcd.
return ""
}
if r . lastSyncResourceVersion == "" {
// For performance reasons, initial list performed by reflector uses "0" as resource version to allow it to
// be served from the watch cache if it is enabled.
return "0"
}
return r . lastSyncResourceVersion
}
2023-06-01 16:58:10 +00:00
// rewatchResourceVersion determines the resource version the reflector should start streaming from.
func ( r * Reflector ) rewatchResourceVersion ( ) string {
r . lastSyncResourceVersionMutex . RLock ( )
defer r . lastSyncResourceVersionMutex . RUnlock ( )
if r . isLastSyncResourceVersionUnavailable {
// initial stream should return data at the most recent resource version.
// the returned data must be consistent i.e. as if served from etcd via a quorum read
return ""
}
return r . lastSyncResourceVersion
}
2020-07-24 14:20:51 +00:00
// setIsLastSyncResourceVersionUnavailable sets if the last list or watch request with lastSyncResourceVersion returned
// "expired" or "too large resource version" error.
func ( r * Reflector ) setIsLastSyncResourceVersionUnavailable ( isUnavailable bool ) {
2020-04-14 07:04:33 +00:00
r . lastSyncResourceVersionMutex . Lock ( )
defer r . lastSyncResourceVersionMutex . Unlock ( )
2020-07-24 14:20:51 +00:00
r . isLastSyncResourceVersionUnavailable = isUnavailable
2020-04-14 07:04:33 +00:00
}
func isExpiredError ( err error ) bool {
// In Kubernetes 1.17 and earlier, the api server returns both apierrors.StatusReasonExpired and
// apierrors.StatusReasonGone for HTTP 410 (Gone) status code responses. In 1.18 the kube server is more consistent
// and always returns apierrors.StatusReasonExpired. For backward compatibility we can only remove the apierrors.IsGone
// check when we fully drop support for Kubernetes 1.17 servers from reflectors.
return apierrors . IsResourceExpired ( err ) || apierrors . IsGone ( err )
}
2020-07-24 14:20:51 +00:00
func isTooLargeResourceVersionError ( err error ) bool {
2020-12-17 12:28:29 +00:00
if apierrors . HasStatusCause ( err , metav1 . CauseTypeResourceVersionTooLarge ) {
return true
}
// In Kubernetes 1.17.0-1.18.5, the api server doesn't set the error status cause to
// metav1.CauseTypeResourceVersionTooLarge to indicate that the requested minimum resource
// version is larger than the largest currently available resource version. To ensure backward
// compatibility with these server versions we also need to detect the error based on the content
// of the error message field.
if ! apierrors . IsTimeout ( err ) {
return false
}
apierr , ok := err . ( apierrors . APIStatus )
if ! ok || apierr == nil || apierr . Status ( ) . Details == nil {
return false
}
for _ , cause := range apierr . Status ( ) . Details . Causes {
// Matches the message returned by api server 1.17.0-1.18.5 for this error condition
if cause . Message == "Too large resource version" {
return true
}
}
2023-06-01 16:58:10 +00:00
// Matches the message returned by api server before 1.17.0
if strings . Contains ( apierr . Status ( ) . Message , "Too large resource version" ) {
return true
}
return false
}
// isWatchErrorRetriable determines if it is safe to retry
// a watch error retrieved from the server.
func isWatchErrorRetriable ( err error ) bool {
// If this is "connection refused" error, it means that most likely apiserver is not responsive.
// It doesn't make sense to re-list all objects because most likely we will be able to restart
// watch where we ended.
// If that's the case begin exponentially backing off and resend watch request.
// Do the same for "429" errors.
if utilnet . IsConnectionRefused ( err ) || apierrors . IsTooManyRequests ( err ) {
return true
}
2020-12-17 12:28:29 +00:00
return false
2020-07-24 14:20:51 +00:00
}