diff --git a/go.mod b/go.mod index 087d1f5e2..aad3efb4b 100644 --- a/go.mod +++ b/go.mod @@ -47,7 +47,7 @@ require ( require ( // sigs.k8s.io/controller-runtime wants this version, it gets replaced below k8s.io/client-go v12.0.0+incompatible - sigs.k8s.io/controller-runtime v0.20.4 + sigs.k8s.io/controller-runtime v0.21.0 ) replace k8s.io/client-go => k8s.io/client-go v0.33.0 diff --git a/go.sum b/go.sum index 5e8abe74e..7e87754d4 100644 --- a/go.sum +++ b/go.sum @@ -1395,8 +1395,8 @@ k8s.io/utils v0.0.0-20241210054802-24370beab758/go.mod h1:OLgZIPagt7ERELqWJFomSt rsc.io/binaryregexp v0.2.0/go.mod h1:qTv7/COck+e2FymRvadv62gMdZztPaShugOCi3I+8D8= rsc.io/quote/v3 v3.1.0/go.mod h1:yEA65RcK8LyAZtP9Kv3t0HmxON59tX3rD+tICJqUlj0= rsc.io/sampler v1.3.0/go.mod h1:T1hPZKmBbMNahiBKFy5HrXp6adAjACjK9JXDnKaTXpA= -sigs.k8s.io/controller-runtime v0.20.4 h1:X3c+Odnxz+iPTRobG4tp092+CvBU9UK0t/bRf+n0DGU= -sigs.k8s.io/controller-runtime v0.20.4/go.mod h1:xg2XB0K5ShQzAgsoujxuKN4LNXR2LfwwHsPj7Iaw+XY= +sigs.k8s.io/controller-runtime v0.21.0 h1:CYfjpEuicjUecRk+KAeyYh+ouUBn4llGyDYytIGcJS8= +sigs.k8s.io/controller-runtime v0.21.0/go.mod h1:OSg14+F65eWqIu4DceX7k/+QRAbTTvxeQSNSOQpukWM= sigs.k8s.io/json v0.0.0-20220713155537-f223a00ba0e2/go.mod h1:B8JuhiUyNFVKdsE8h686QcCxMaH6HrOAZj4vswFpcB0= sigs.k8s.io/json v0.0.0-20221116044647-bc3834ca7abd/go.mod h1:B8JuhiUyNFVKdsE8h686QcCxMaH6HrOAZj4vswFpcB0= sigs.k8s.io/json v0.0.0-20241010143419-9aa6b5e7a4b3/go.mod h1:18nIHnGi6636UCz6m8i4DhaJ65T6EruyzmoQqI2BVDo= diff --git a/vendor/modules.txt b/vendor/modules.txt index c2f88b4c2..a1378aad6 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -1178,8 +1178,8 @@ k8s.io/utils/path k8s.io/utils/pointer k8s.io/utils/ptr k8s.io/utils/trace -# sigs.k8s.io/controller-runtime v0.20.4 -## explicit; go 1.23.0 +# sigs.k8s.io/controller-runtime v0.21.0 +## explicit; go 1.24.0 sigs.k8s.io/controller-runtime/pkg/cache sigs.k8s.io/controller-runtime/pkg/cache/internal sigs.k8s.io/controller-runtime/pkg/certwatcher diff --git a/vendor/sigs.k8s.io/controller-runtime/pkg/cache/cache.go b/vendor/sigs.k8s.io/controller-runtime/pkg/cache/cache.go index 8f14bfdbf..648d0d75b 100644 --- a/vendor/sigs.k8s.io/controller-runtime/pkg/cache/cache.go +++ b/vendor/sigs.k8s.io/controller-runtime/pkg/cache/cache.go @@ -113,6 +113,10 @@ type Informer interface { // the handler again and an error if the handler cannot be added. AddEventHandlerWithResyncPeriod(handler toolscache.ResourceEventHandler, resyncPeriod time.Duration) (toolscache.ResourceEventHandlerRegistration, error) + // AddEventHandlerWithOptions is a variant of AddEventHandlerWithResyncPeriod where + // all optional parameters are passed in as a struct. + AddEventHandlerWithOptions(handler toolscache.ResourceEventHandler, options toolscache.HandlerOptions) (toolscache.ResourceEventHandlerRegistration, error) + // RemoveEventHandler removes a previously added event handler given by // its registration handle. // This function is guaranteed to be idempotent and thread-safe. @@ -207,11 +211,11 @@ type Options struct { // to reduce the caches memory usage. DefaultTransform toolscache.TransformFunc - // DefaultWatchErrorHandler will be used to the WatchErrorHandler which is called + // DefaultWatchErrorHandler will be used to set the WatchErrorHandler which is called // whenever ListAndWatch drops the connection with an error. // // After calling this handler, the informer will backoff and retry. - DefaultWatchErrorHandler toolscache.WatchErrorHandler + DefaultWatchErrorHandler toolscache.WatchErrorHandlerWithContext // DefaultUnsafeDisableDeepCopy is the default for UnsafeDisableDeepCopy // for everything that doesn't specify this. diff --git a/vendor/sigs.k8s.io/controller-runtime/pkg/cache/internal/cache_reader.go b/vendor/sigs.k8s.io/controller-runtime/pkg/cache/internal/cache_reader.go index 81ee960b7..33ce8a830 100644 --- a/vendor/sigs.k8s.io/controller-runtime/pkg/cache/internal/cache_reader.go +++ b/vendor/sigs.k8s.io/controller-runtime/pkg/cache/internal/cache_reader.go @@ -174,7 +174,13 @@ func (c *CacheReader) List(_ context.Context, out client.ObjectList, opts ...cli } runtimeObjs = append(runtimeObjs, outObj) } - return apimeta.SetList(out, runtimeObjs) + + if err := apimeta.SetList(out, runtimeObjs); err != nil { + return err + } + + out.SetContinue("continue-not-supported") + return nil } func byIndexes(indexer cache.Indexer, requires fields.Requirements, namespace string) ([]interface{}, error) { diff --git a/vendor/sigs.k8s.io/controller-runtime/pkg/cache/internal/informers.go b/vendor/sigs.k8s.io/controller-runtime/pkg/cache/internal/informers.go index 097ee7a45..4bf832b2d 100644 --- a/vendor/sigs.k8s.io/controller-runtime/pkg/cache/internal/informers.go +++ b/vendor/sigs.k8s.io/controller-runtime/pkg/cache/internal/informers.go @@ -25,21 +25,26 @@ import ( "sync" "time" + "github.com/go-logr/logr" apierrors "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/api/meta" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apimachinery/pkg/runtime/serializer" + "k8s.io/apimachinery/pkg/util/wait" "k8s.io/apimachinery/pkg/watch" "k8s.io/client-go/dynamic" "k8s.io/client-go/metadata" "k8s.io/client-go/rest" "k8s.io/client-go/tools/cache" "sigs.k8s.io/controller-runtime/pkg/client/apiutil" + logf "sigs.k8s.io/controller-runtime/pkg/internal/log" "sigs.k8s.io/controller-runtime/pkg/internal/syncs" ) +var log = logf.RuntimeLog.WithName("cache") + // InformersOpts configures an InformerMap. type InformersOpts struct { HTTPClient *http.Client @@ -52,7 +57,7 @@ type InformersOpts struct { Transform cache.TransformFunc UnsafeDisableDeepCopy bool EnableWatchBookmarks bool - WatchErrorHandler cache.WatchErrorHandler + WatchErrorHandler cache.WatchErrorHandlerWithContext } // NewInformers creates a new InformersMap that can create informers under the hood. @@ -105,7 +110,8 @@ func (c *Cache) Start(stop <-chan struct{}) { // Stop on either the whole map stopping or just this informer being removed. internalStop, cancel := syncs.MergeChans(stop, c.stop) defer cancel() - c.Informer.Run(internalStop) + // Convert the stop channel to a context and then add the logger. + c.Informer.RunWithContext(logr.NewContext(wait.ContextForChannel(internalStop), log)) } type tracker struct { @@ -181,10 +187,10 @@ type Informers struct { // NewInformer allows overriding of the shared index informer constructor for testing. newInformer func(cache.ListerWatcher, runtime.Object, time.Duration, cache.Indexers) cache.SharedIndexInformer - // WatchErrorHandler allows the shared index informer's + // watchErrorHandler allows the shared index informer's // watchErrorHandler to be set by overriding the options // or to use the default watchErrorHandler - watchErrorHandler cache.WatchErrorHandler + watchErrorHandler cache.WatchErrorHandlerWithContext } // Start calls Run on each of the informers and sets started to true. Blocks on the context. @@ -195,7 +201,7 @@ func (ip *Informers) Start(ctx context.Context) error { defer ip.mu.Unlock() if ip.started { - return errors.New("Informer already started") //nolint:stylecheck + return errors.New("informer already started") //nolint:stylecheck } // Set the context so it can be passed to informers that are added later @@ -359,16 +365,16 @@ func (ip *Informers) addInformerToMap(gvk schema.GroupVersionKind, obj runtime.O return nil, false, err } sharedIndexInformer := ip.newInformer(&cache.ListWatch{ - ListFunc: func(opts metav1.ListOptions) (runtime.Object, error) { + ListWithContextFunc: func(ctx context.Context, opts metav1.ListOptions) (runtime.Object, error) { ip.selector.ApplyToList(&opts) - return listWatcher.ListFunc(opts) + return listWatcher.ListWithContextFunc(ctx, opts) }, - WatchFunc: func(opts metav1.ListOptions) (watch.Interface, error) { + WatchFuncWithContext: func(ctx context.Context, opts metav1.ListOptions) (watch.Interface, error) { opts.Watch = true // Watch needs to be set to true separately opts.AllowWatchBookmarks = ip.enableWatchBookmarks ip.selector.ApplyToList(&opts) - return listWatcher.WatchFunc(opts) + return listWatcher.WatchFuncWithContext(ctx, opts) }, }, obj, calculateResyncPeriod(ip.resync), cache.Indexers{ cache.NamespaceIndex: cache.MetaNamespaceIndexFunc, @@ -376,7 +382,7 @@ func (ip *Informers) addInformerToMap(gvk schema.GroupVersionKind, obj runtime.O // Set WatchErrorHandler on SharedIndexInformer if set if ip.watchErrorHandler != nil { - if err := sharedIndexInformer.SetWatchErrorHandler(ip.watchErrorHandler); err != nil { + if err := sharedIndexInformer.SetWatchErrorHandlerWithContext(ip.watchErrorHandler); err != nil { return nil, false, err } } @@ -441,21 +447,21 @@ func (ip *Informers) makeListWatcher(gvk schema.GroupVersionKind, obj runtime.Ob } resources := dynamicClient.Resource(mapping.Resource) return &cache.ListWatch{ - ListFunc: func(opts metav1.ListOptions) (runtime.Object, error) { + ListWithContextFunc: func(ctx context.Context, opts metav1.ListOptions) (runtime.Object, error) { if namespace != "" { - return resources.Namespace(namespace).List(ip.ctx, opts) + return resources.Namespace(namespace).List(ctx, opts) } - return resources.List(ip.ctx, opts) + return resources.List(ctx, opts) }, // Setup the watch function - WatchFunc: func(opts metav1.ListOptions) (watch.Interface, error) { + WatchFuncWithContext: func(ctx context.Context, opts metav1.ListOptions) (watch.Interface, error) { opts.Watch = true // Watch needs to be set to true separately opts.AllowWatchBookmarks = ip.enableWatchBookmarks if namespace != "" { - return resources.Namespace(namespace).Watch(ip.ctx, opts) + return resources.Namespace(namespace).Watch(ctx, opts) } - return resources.Watch(ip.ctx, opts) + return resources.Watch(ctx, opts) }, }, nil // @@ -475,15 +481,15 @@ func (ip *Informers) makeListWatcher(gvk schema.GroupVersionKind, obj runtime.Ob resources := metadataClient.Resource(mapping.Resource) return &cache.ListWatch{ - ListFunc: func(opts metav1.ListOptions) (runtime.Object, error) { + ListWithContextFunc: func(ctx context.Context, opts metav1.ListOptions) (runtime.Object, error) { var ( list *metav1.PartialObjectMetadataList err error ) if namespace != "" { - list, err = resources.Namespace(namespace).List(ip.ctx, opts) + list, err = resources.Namespace(namespace).List(ctx, opts) } else { - list, err = resources.List(ip.ctx, opts) + list, err = resources.List(ctx, opts) } if list != nil { for i := range list.Items { @@ -493,14 +499,14 @@ func (ip *Informers) makeListWatcher(gvk schema.GroupVersionKind, obj runtime.Ob return list, err }, // Setup the watch function - WatchFunc: func(opts metav1.ListOptions) (watcher watch.Interface, err error) { + WatchFuncWithContext: func(ctx context.Context, opts metav1.ListOptions) (watcher watch.Interface, err error) { opts.Watch = true // Watch needs to be set to true separately opts.AllowWatchBookmarks = ip.enableWatchBookmarks if namespace != "" { - watcher, err = resources.Namespace(namespace).Watch(ip.ctx, opts) + watcher, err = resources.Namespace(namespace).Watch(ctx, opts) } else { - watcher, err = resources.Watch(ip.ctx, opts) + watcher, err = resources.Watch(ctx, opts) } if err != nil { return nil, err @@ -522,7 +528,7 @@ func (ip *Informers) makeListWatcher(gvk schema.GroupVersionKind, obj runtime.Ob return nil, err } return &cache.ListWatch{ - ListFunc: func(opts metav1.ListOptions) (runtime.Object, error) { + ListWithContextFunc: func(ctx context.Context, opts metav1.ListOptions) (runtime.Object, error) { // Build the request. req := client.Get().Resource(mapping.Resource.Resource).VersionedParams(&opts, ip.paramCodec) if namespace != "" { @@ -531,13 +537,13 @@ func (ip *Informers) makeListWatcher(gvk schema.GroupVersionKind, obj runtime.Ob // Create the resulting object, and execute the request. res := listObj.DeepCopyObject() - if err := req.Do(ip.ctx).Into(res); err != nil { + if err := req.Do(ctx).Into(res); err != nil { return nil, err } return res, nil }, // Setup the watch function - WatchFunc: func(opts metav1.ListOptions) (watch.Interface, error) { + WatchFuncWithContext: func(ctx context.Context, opts metav1.ListOptions) (watch.Interface, error) { opts.Watch = true // Watch needs to be set to true separately opts.AllowWatchBookmarks = ip.enableWatchBookmarks @@ -547,7 +553,7 @@ func (ip *Informers) makeListWatcher(gvk schema.GroupVersionKind, obj runtime.Ob req.Namespace(namespace) } // Call the watch. - return req.Watch(ip.ctx) + return req.Watch(ctx) }, }, nil } diff --git a/vendor/sigs.k8s.io/controller-runtime/pkg/cache/multi_namespace_cache.go b/vendor/sigs.k8s.io/controller-runtime/pkg/cache/multi_namespace_cache.go index aeeeb6693..d7d7b0e7c 100644 --- a/vendor/sigs.k8s.io/controller-runtime/pkg/cache/multi_namespace_cache.go +++ b/vendor/sigs.k8s.io/controller-runtime/pkg/cache/multi_namespace_cache.go @@ -249,6 +249,10 @@ func (c *multiNamespaceCache) List(ctx context.Context, list client.ObjectList, listOpts := client.ListOptions{} listOpts.ApplyOptions(opts) + if listOpts.Continue != "" { + return fmt.Errorf("continue list option is not supported by the cache") + } + isNamespaced, err := apiutil.IsObjectNamespaced(list, c.Scheme, c.RESTMapper) if err != nil { return err @@ -275,10 +279,7 @@ func (c *multiNamespaceCache) List(ctx context.Context, list client.ObjectList, return err } - allItems, err := apimeta.ExtractList(list) - if err != nil { - return err - } + allItems := []runtime.Object{} limitSet := listOpts.Limit > 0 @@ -316,7 +317,12 @@ func (c *multiNamespaceCache) List(ctx context.Context, list client.ObjectList, } listAccessor.SetResourceVersion(resourceVersion) - return apimeta.SetList(list, allItems) + if err := apimeta.SetList(list, allItems); err != nil { + return err + } + + list.SetContinue("continue-not-supported") + return nil } // multiNamespaceInformer knows how to handle interacting with the underlying informer across multiple namespaces. @@ -328,18 +334,11 @@ type handlerRegistration struct { handles map[string]toolscache.ResourceEventHandlerRegistration } -type syncer interface { - HasSynced() bool -} - // HasSynced asserts that the handler has been called for the full initial state of the informer. -// This uses syncer to be compatible between client-go 1.27+ and older versions when the interface changed. func (h handlerRegistration) HasSynced() bool { - for _, reg := range h.handles { - if s, ok := reg.(syncer); ok { - if !s.HasSynced() { - return false - } + for _, h := range h.handles { + if !h.HasSynced() { + return false } } return true @@ -381,6 +380,23 @@ func (i *multiNamespaceInformer) AddEventHandlerWithResyncPeriod(handler toolsca return handles, nil } +// AddEventHandlerWithOptions adds the handler with options to each namespaced informer. +func (i *multiNamespaceInformer) AddEventHandlerWithOptions(handler toolscache.ResourceEventHandler, options toolscache.HandlerOptions) (toolscache.ResourceEventHandlerRegistration, error) { + handles := handlerRegistration{ + handles: make(map[string]toolscache.ResourceEventHandlerRegistration, len(i.namespaceToInformer)), + } + + for ns, informer := range i.namespaceToInformer { + registration, err := informer.AddEventHandlerWithOptions(handler, options) + if err != nil { + return nil, err + } + handles.handles[ns] = registration + } + + return handles, nil +} + // RemoveEventHandler removes a previously added event handler given by its registration handle. func (i *multiNamespaceInformer) RemoveEventHandler(h toolscache.ResourceEventHandlerRegistration) error { handles, ok := h.(handlerRegistration) diff --git a/vendor/sigs.k8s.io/controller-runtime/pkg/client/client.go b/vendor/sigs.k8s.io/controller-runtime/pkg/client/client.go index 6d8744017..50b0ebf33 100644 --- a/vendor/sigs.k8s.io/controller-runtime/pkg/client/client.go +++ b/vendor/sigs.k8s.io/controller-runtime/pkg/client/client.go @@ -74,8 +74,8 @@ type NewClientFunc func(config *rest.Config, options Options) (Client, error) // New returns a new Client using the provided config and Options. // // By default, the client surfaces warnings returned by the server. To -// suppress warnings, set config.WarningHandler = rest.NoWarnings{}. To -// define custom behavior, implement the rest.WarningHandler interface. +// suppress warnings, set config.WarningHandlerWithContext = rest.NoWarnings{}. To +// define custom behavior, implement the rest.WarningHandlerWithContext interface. // See [sigs.k8s.io/controller-runtime/pkg/log.KubeAPIWarningLogger] for // an example. // @@ -112,10 +112,9 @@ func newClient(config *rest.Config, options Options) (*client, error) { config.UserAgent = rest.DefaultKubernetesUserAgent() } - if config.WarningHandler == nil { + if config.WarningHandler == nil && config.WarningHandlerWithContext == nil { // By default, we surface warnings. - config.WarningHandler = log.NewKubeAPIWarningLogger( - log.Log.WithName("KubeAPIWarningLogger"), + config.WarningHandlerWithContext = log.NewKubeAPIWarningLogger( log.KubeAPIWarningLoggerOptions{ Deduplicate: false, }, diff --git a/vendor/sigs.k8s.io/controller-runtime/pkg/client/config/config.go b/vendor/sigs.k8s.io/controller-runtime/pkg/client/config/config.go index 5f0a6d4b1..70389dfa9 100644 --- a/vendor/sigs.k8s.io/controller-runtime/pkg/client/config/config.go +++ b/vendor/sigs.k8s.io/controller-runtime/pkg/client/config/config.go @@ -61,6 +61,9 @@ func RegisterFlags(fs *flag.FlagSet) { // If --kubeconfig is set, will use the kubeconfig file at that location. Otherwise will assume running // in cluster and use the cluster provided kubeconfig. // +// The returned `*rest.Config` has client-side ratelimting disabled as we can rely on API priority and +// fairness. Set its QPS to a value equal or bigger than 0 to re-enable it. +// // It also applies saner defaults for QPS and burst based on the Kubernetes // controller manager defaults (20 QPS, 30 burst) // @@ -81,6 +84,9 @@ func GetConfig() (*rest.Config, error) { // If --kubeconfig is set, will use the kubeconfig file at that location. Otherwise will assume running // in cluster and use the cluster provided kubeconfig. // +// The returned `*rest.Config` has client-side ratelimting disabled as we can rely on API priority and +// fairness. Set its QPS to a value equal or bigger than 0 to re-enable it. +// // It also applies saner defaults for QPS and burst based on the Kubernetes // controller manager defaults (20 QPS, 30 burst) // @@ -99,10 +105,9 @@ func GetConfigWithContext(context string) (*rest.Config, error) { return nil, err } if cfg.QPS == 0.0 { - cfg.QPS = 20.0 - } - if cfg.Burst == 0 { - cfg.Burst = 30 + // Disable client-side ratelimer by default, we can rely on + // API priority and fairness + cfg.QPS = -1 } return cfg, nil } @@ -170,6 +175,9 @@ func loadConfigWithContext(apiServerURL string, loader clientcmd.ClientConfigLoa // If --kubeconfig is set, will use the kubeconfig file at that location. Otherwise will assume running // in cluster and use the cluster provided kubeconfig. // +// The returned `*rest.Config` has client-side ratelimting disabled as we can rely on API priority and +// fairness. Set its QPS to a value equal or bigger than 0 to re-enable it. +// // Will log an error and exit if there is an error creating the rest.Config. func GetConfigOrDie() *rest.Config { config, err := GetConfig() diff --git a/vendor/sigs.k8s.io/controller-runtime/pkg/config/controller.go b/vendor/sigs.k8s.io/controller-runtime/pkg/config/controller.go index 0b2aa0cb7..a5655593e 100644 --- a/vendor/sigs.k8s.io/controller-runtime/pkg/config/controller.go +++ b/vendor/sigs.k8s.io/controller-runtime/pkg/config/controller.go @@ -16,9 +16,15 @@ limitations under the License. package config -import "time" +import ( + "time" -// Controller contains configuration options for a controller. + "github.com/go-logr/logr" +) + +// Controller contains configuration options for controllers. It only includes options +// that makes sense for a set of controllers and is used for defaulting the options +// of multiple controllers. type Controller struct { // SkipNameValidation allows skipping the name validation that ensures that every controller name is unique. // Unique controller names are important to get unique metrics and logs for a controller. @@ -59,4 +65,7 @@ type Controller struct { // // Note: This flag is disabled by default until a future version. It's currently in beta. UsePriorityQueue *bool + + // Logger is the logger controllers should use. + Logger logr.Logger } diff --git a/vendor/sigs.k8s.io/controller-runtime/pkg/controller/controller.go b/vendor/sigs.k8s.io/controller-runtime/pkg/controller/controller.go index 5c5b249ef..9de959b48 100644 --- a/vendor/sigs.k8s.io/controller-runtime/pkg/controller/controller.go +++ b/vendor/sigs.k8s.io/controller-runtime/pkg/controller/controller.go @@ -26,6 +26,7 @@ import ( "k8s.io/klog/v2" "k8s.io/utils/ptr" + "sigs.k8s.io/controller-runtime/pkg/config" "sigs.k8s.io/controller-runtime/pkg/controller/priorityqueue" "sigs.k8s.io/controller-runtime/pkg/internal/controller" "sigs.k8s.io/controller-runtime/pkg/manager" @@ -80,13 +81,53 @@ type TypedOptions[request comparable] struct { // Only use a custom NewQueue if you know what you are doing. NewQueue func(controllerName string, rateLimiter workqueue.TypedRateLimiter[request]) workqueue.TypedRateLimitingInterface[request] + // Logger will be used to build a default LogConstructor if unset. + Logger logr.Logger + // LogConstructor is used to construct a logger used for this controller and passed // to each reconciliation via the context field. LogConstructor func(request *request) logr.Logger + + // UsePriorityQueue configures the controllers queue to use the controller-runtime provided + // priority queue. + // + // Note: This flag is disabled by default until a future version. It's currently in beta. + UsePriorityQueue *bool } -// Controller implements a Kubernetes API. A Controller manages a work queue fed reconcile.Requests -// from source.Sources. Work is performed through the reconcile.Reconciler for each enqueued item. +// DefaultFromConfig defaults the config from a config.Controller +func (options *TypedOptions[request]) DefaultFromConfig(config config.Controller) { + if options.Logger.GetSink() == nil { + options.Logger = config.Logger + } + + if options.SkipNameValidation == nil { + options.SkipNameValidation = config.SkipNameValidation + } + + if options.MaxConcurrentReconciles <= 0 && config.MaxConcurrentReconciles > 0 { + options.MaxConcurrentReconciles = config.MaxConcurrentReconciles + } + + if options.CacheSyncTimeout == 0 && config.CacheSyncTimeout > 0 { + options.CacheSyncTimeout = config.CacheSyncTimeout + } + + if options.UsePriorityQueue == nil { + options.UsePriorityQueue = config.UsePriorityQueue + } + + if options.RecoverPanic == nil { + options.RecoverPanic = config.RecoverPanic + } + + if options.NeedLeaderElection == nil { + options.NeedLeaderElection = config.NeedLeaderElection + } +} + +// Controller implements an API. A Controller manages a work queue fed reconcile.Requests +// from source.Sources. Work is performed through the reconcile.Reconciler for each enqueued item. // Work typically is reads and writes Kubernetes objects to make the system state match the state specified // in the object Spec. type Controller = TypedController[reconcile.Request] @@ -119,7 +160,8 @@ func New(name string, mgr manager.Manager, options Options) (Controller, error) // // The name must be unique as it is used to identify the controller in metrics and logs. func NewTyped[request comparable](name string, mgr manager.Manager, options TypedOptions[request]) (TypedController[request], error) { - c, err := NewTypedUnmanaged(name, mgr, options) + options.DefaultFromConfig(mgr.GetControllerOptions()) + c, err := NewTypedUnmanaged(name, options) if err != nil { return nil, err } @@ -132,14 +174,14 @@ func NewTyped[request comparable](name string, mgr manager.Manager, options Type // caller is responsible for starting the returned controller. // // The name must be unique as it is used to identify the controller in metrics and logs. -func NewUnmanaged(name string, mgr manager.Manager, options Options) (Controller, error) { - return NewTypedUnmanaged(name, mgr, options) +func NewUnmanaged(name string, options Options) (Controller, error) { + return NewTypedUnmanaged(name, options) } // NewTypedUnmanaged returns a new typed controller without adding it to the manager. // // The name must be unique as it is used to identify the controller in metrics and logs. -func NewTypedUnmanaged[request comparable](name string, mgr manager.Manager, options TypedOptions[request]) (TypedController[request], error) { +func NewTypedUnmanaged[request comparable](name string, options TypedOptions[request]) (TypedController[request], error) { if options.Reconciler == nil { return nil, fmt.Errorf("must specify Reconciler") } @@ -148,10 +190,6 @@ func NewTypedUnmanaged[request comparable](name string, mgr manager.Manager, opt return nil, fmt.Errorf("must specify Name for Controller") } - if options.SkipNameValidation == nil { - options.SkipNameValidation = mgr.GetControllerOptions().SkipNameValidation - } - if options.SkipNameValidation == nil || !*options.SkipNameValidation { if err := checkName(name); err != nil { return nil, err @@ -159,7 +197,7 @@ func NewTypedUnmanaged[request comparable](name string, mgr manager.Manager, opt } if options.LogConstructor == nil { - log := mgr.GetLogger().WithValues( + log := options.Logger.WithValues( "controller", name, ) options.LogConstructor = func(in *request) logr.Logger { @@ -175,23 +213,15 @@ func NewTypedUnmanaged[request comparable](name string, mgr manager.Manager, opt } if options.MaxConcurrentReconciles <= 0 { - if mgr.GetControllerOptions().MaxConcurrentReconciles > 0 { - options.MaxConcurrentReconciles = mgr.GetControllerOptions().MaxConcurrentReconciles - } else { - options.MaxConcurrentReconciles = 1 - } + options.MaxConcurrentReconciles = 1 } if options.CacheSyncTimeout == 0 { - if mgr.GetControllerOptions().CacheSyncTimeout != 0 { - options.CacheSyncTimeout = mgr.GetControllerOptions().CacheSyncTimeout - } else { - options.CacheSyncTimeout = 2 * time.Minute - } + options.CacheSyncTimeout = 2 * time.Minute } if options.RateLimiter == nil { - if ptr.Deref(mgr.GetControllerOptions().UsePriorityQueue, false) { + if ptr.Deref(options.UsePriorityQueue, false) { options.RateLimiter = workqueue.NewTypedItemExponentialFailureRateLimiter[request](5*time.Millisecond, 1000*time.Second) } else { options.RateLimiter = workqueue.DefaultTypedControllerRateLimiter[request]() @@ -200,9 +230,9 @@ func NewTypedUnmanaged[request comparable](name string, mgr manager.Manager, opt if options.NewQueue == nil { options.NewQueue = func(controllerName string, rateLimiter workqueue.TypedRateLimiter[request]) workqueue.TypedRateLimitingInterface[request] { - if ptr.Deref(mgr.GetControllerOptions().UsePriorityQueue, false) { + if ptr.Deref(options.UsePriorityQueue, false) { return priorityqueue.New(controllerName, func(o *priorityqueue.Opts[request]) { - o.Log = mgr.GetLogger().WithValues("controller", controllerName) + o.Log = options.Logger.WithValues("controller", controllerName) o.RateLimiter = rateLimiter }) } @@ -212,14 +242,6 @@ func NewTypedUnmanaged[request comparable](name string, mgr manager.Manager, opt } } - if options.RecoverPanic == nil { - options.RecoverPanic = mgr.GetControllerOptions().RecoverPanic - } - - if options.NeedLeaderElection == nil { - options.NeedLeaderElection = mgr.GetControllerOptions().NeedLeaderElection - } - // Create controller with dependencies set return &controller.Controller[request]{ Do: options.Reconciler, diff --git a/vendor/sigs.k8s.io/controller-runtime/pkg/controller/priorityqueue/metrics.go b/vendor/sigs.k8s.io/controller-runtime/pkg/controller/priorityqueue/metrics.go index 36626646f..967a252df 100644 --- a/vendor/sigs.k8s.io/controller-runtime/pkg/controller/priorityqueue/metrics.go +++ b/vendor/sigs.k8s.io/controller-runtime/pkg/controller/priorityqueue/metrics.go @@ -6,6 +6,7 @@ import ( "k8s.io/client-go/util/workqueue" "k8s.io/utils/clock" + "sigs.k8s.io/controller-runtime/pkg/internal/metrics" ) // This file is mostly a copy of unexported code from @@ -14,8 +15,9 @@ import ( // The only two differences are the addition of mapLock in defaultQueueMetrics and converging retryMetrics into queueMetrics. type queueMetrics[T comparable] interface { - add(item T) - get(item T) + add(item T, priority int) + get(item T, priority int) + updateDepthWithPriorityMetric(oldPriority, newPriority int) done(item T) updateUnfinishedWork() retry() @@ -25,9 +27,9 @@ func newQueueMetrics[T comparable](mp workqueue.MetricsProvider, name string, cl if len(name) == 0 { return noMetrics[T]{} } - return &defaultQueueMetrics[T]{ + + dqm := &defaultQueueMetrics[T]{ clock: clock, - depth: mp.NewDepthMetric(name), adds: mp.NewAddsMetric(name), latency: mp.NewLatencyMetric(name), workDuration: mp.NewWorkDurationMetric(name), @@ -37,6 +39,13 @@ func newQueueMetrics[T comparable](mp workqueue.MetricsProvider, name string, cl processingStartTimes: map[T]time.Time{}, retries: mp.NewRetriesMetric(name), } + + if mpp, ok := mp.(metrics.MetricsProviderWithPriority); ok { + dqm.depthWithPriority = mpp.NewDepthMetricWithPriority(name) + } else { + dqm.depth = mp.NewDepthMetric(name) + } + return dqm } // defaultQueueMetrics expects the caller to lock before setting any metrics. @@ -44,7 +53,8 @@ type defaultQueueMetrics[T comparable] struct { clock clock.Clock // current depth of a workqueue - depth workqueue.GaugeMetric + depth workqueue.GaugeMetric + depthWithPriority metrics.DepthMetricWithPriority // total number of adds handled by a workqueue adds workqueue.CounterMetric // how long an item stays in a workqueue @@ -64,13 +74,17 @@ type defaultQueueMetrics[T comparable] struct { } // add is called for ready items only -func (m *defaultQueueMetrics[T]) add(item T) { +func (m *defaultQueueMetrics[T]) add(item T, priority int) { if m == nil { return } m.adds.Inc() - m.depth.Inc() + if m.depthWithPriority != nil { + m.depthWithPriority.Inc(priority) + } else { + m.depth.Inc() + } m.mapLock.Lock() defer m.mapLock.Unlock() @@ -80,12 +94,16 @@ func (m *defaultQueueMetrics[T]) add(item T) { } } -func (m *defaultQueueMetrics[T]) get(item T) { +func (m *defaultQueueMetrics[T]) get(item T, priority int) { if m == nil { return } - m.depth.Dec() + if m.depthWithPriority != nil { + m.depthWithPriority.Dec(priority) + } else { + m.depth.Dec() + } m.mapLock.Lock() defer m.mapLock.Unlock() @@ -97,6 +115,13 @@ func (m *defaultQueueMetrics[T]) get(item T) { } } +func (m *defaultQueueMetrics[T]) updateDepthWithPriorityMetric(oldPriority, newPriority int) { + if m.depthWithPriority != nil { + m.depthWithPriority.Dec(oldPriority) + m.depthWithPriority.Inc(newPriority) + } +} + func (m *defaultQueueMetrics[T]) done(item T) { if m == nil { return @@ -139,8 +164,9 @@ func (m *defaultQueueMetrics[T]) retry() { type noMetrics[T any] struct{} -func (noMetrics[T]) add(item T) {} -func (noMetrics[T]) get(item T) {} -func (noMetrics[T]) done(item T) {} -func (noMetrics[T]) updateUnfinishedWork() {} -func (noMetrics[T]) retry() {} +func (noMetrics[T]) add(item T, priority int) {} +func (noMetrics[T]) get(item T, priority int) {} +func (noMetrics[T]) updateDepthWithPriorityMetric(oldPriority, newPriority int) {} +func (noMetrics[T]) done(item T) {} +func (noMetrics[T]) updateUnfinishedWork() {} +func (noMetrics[T]) retry() {} diff --git a/vendor/sigs.k8s.io/controller-runtime/pkg/controller/priorityqueue/priorityqueue.go b/vendor/sigs.k8s.io/controller-runtime/pkg/controller/priorityqueue/priorityqueue.go index ff5dea902..c3f77a6f3 100644 --- a/vendor/sigs.k8s.io/controller-runtime/pkg/controller/priorityqueue/priorityqueue.go +++ b/vendor/sigs.k8s.io/controller-runtime/pkg/controller/priorityqueue/priorityqueue.go @@ -156,7 +156,7 @@ func (w *priorityqueue[T]) AddWithOpts(o AddOpts, items ...T) { w.items[key] = item w.queue.ReplaceOrInsert(item) if item.ReadyAt == nil { - w.metrics.add(key) + w.metrics.add(key, item.Priority) } w.addedCounter++ continue @@ -166,12 +166,16 @@ func (w *priorityqueue[T]) AddWithOpts(o AddOpts, items ...T) { // will affect the order - Just delete and re-add. item, _ := w.queue.Delete(w.items[key]) if o.Priority > item.Priority { + // Update depth metric only if the item in the queue was already added to the depth metric. + if item.ReadyAt == nil || w.becameReady.Has(key) { + w.metrics.updateDepthWithPriorityMetric(item.Priority, o.Priority) + } item.Priority = o.Priority } if item.ReadyAt != nil && (readyAt == nil || readyAt.Before(*item.ReadyAt)) { if readyAt == nil && !w.becameReady.Has(key) { - w.metrics.add(key) + w.metrics.add(key, item.Priority) } item.ReadyAt = readyAt } @@ -223,7 +227,7 @@ func (w *priorityqueue[T]) spin() { return false } if !w.becameReady.Has(item.Key) { - w.metrics.add(item.Key) + w.metrics.add(item.Key, item.Priority) w.becameReady.Insert(item.Key) } } @@ -239,7 +243,7 @@ func (w *priorityqueue[T]) spin() { return true } - w.metrics.get(item.Key) + w.metrics.get(item.Key, item.Priority) w.locked.Insert(item.Key) w.waiters.Add(-1) delete(w.items, item.Key) diff --git a/vendor/sigs.k8s.io/controller-runtime/pkg/event/event.go b/vendor/sigs.k8s.io/controller-runtime/pkg/event/event.go index 81229fc2d..82b1793f5 100644 --- a/vendor/sigs.k8s.io/controller-runtime/pkg/event/event.go +++ b/vendor/sigs.k8s.io/controller-runtime/pkg/event/event.go @@ -40,6 +40,9 @@ type GenericEvent = TypedGenericEvent[client.Object] type TypedCreateEvent[object any] struct { // Object is the object from the event Object object + + // IsInInitialList is true if the Create event was triggered by the initial list. + IsInInitialList bool } // TypedUpdateEvent is an event where a Kubernetes object was updated. TypedUpdateEvent should be generated diff --git a/vendor/sigs.k8s.io/controller-runtime/pkg/handler/enqueue_mapped.go b/vendor/sigs.k8s.io/controller-runtime/pkg/handler/enqueue_mapped.go index be97fa378..fe78f21a2 100644 --- a/vendor/sigs.k8s.io/controller-runtime/pkg/handler/enqueue_mapped.go +++ b/vendor/sigs.k8s.io/controller-runtime/pkg/handler/enqueue_mapped.go @@ -86,9 +86,8 @@ func (e *enqueueRequestsFromMapFunc[object, request]) Create( reqs := map[request]empty{} var lowPriority bool - if e.objectImplementsClientObject && isPriorityQueue(q) && !isNil(evt.Object) { - clientObjectEvent := event.CreateEvent{Object: any(evt.Object).(client.Object)} - if isObjectUnchanged(clientObjectEvent) { + if isPriorityQueue(q) && !isNil(evt.Object) { + if evt.IsInInitialList { lowPriority = true } } diff --git a/vendor/sigs.k8s.io/controller-runtime/pkg/handler/eventhandler.go b/vendor/sigs.k8s.io/controller-runtime/pkg/handler/eventhandler.go index 84a10ac07..29e755cbf 100644 --- a/vendor/sigs.k8s.io/controller-runtime/pkg/handler/eventhandler.go +++ b/vendor/sigs.k8s.io/controller-runtime/pkg/handler/eventhandler.go @@ -19,7 +19,6 @@ package handler import ( "context" "reflect" - "time" "k8s.io/client-go/util/workqueue" "sigs.k8s.io/controller-runtime/pkg/client" @@ -132,14 +131,8 @@ func (h TypedFuncs[object, request]) Create(ctx context.Context, e event.TypedCr // We already know that we have a priority queue, that event.Object implements // client.Object and that its not nil addFunc: func(item request, q workqueue.TypedRateLimitingInterface[request]) { - // We construct a new event typed to client.Object because isObjectUnchanged - // is a generic and hence has to know at compile time the type of the event - // it gets. We only figure that out at runtime though, but we know for sure - // that it implements client.Object at this point so we can hardcode the event - // type to that. - evt := event.CreateEvent{Object: any(e.Object).(client.Object)} var priority int - if isObjectUnchanged(evt) { + if e.IsInInitialList { priority = LowPriority } q.(priorityqueue.PriorityQueue[request]).AddWithOpts( @@ -217,13 +210,6 @@ func (w workqueueWithCustomAddFunc[request]) Add(item request) { w.addFunc(item, w.TypedRateLimitingInterface) } -// isObjectUnchanged checks if the object in a create event is unchanged, for example because -// we got it in our initial listwatch. The heuristic it uses is to check if the object is older -// than one minute. -func isObjectUnchanged[object client.Object](e event.TypedCreateEvent[object]) bool { - return e.Object.GetCreationTimestamp().Time.Before(time.Now().Add(-time.Minute)) -} - // addToQueueCreate adds the reconcile.Request to the priorityqueue in the handler // for Create requests if and only if the workqueue being used is of type priorityqueue.PriorityQueue[reconcile.Request] func addToQueueCreate[T client.Object, request comparable](q workqueue.TypedRateLimitingInterface[request], evt event.TypedCreateEvent[T], item request) { @@ -234,7 +220,7 @@ func addToQueueCreate[T client.Object, request comparable](q workqueue.TypedRate } var priority int - if isObjectUnchanged(evt) { + if evt.IsInInitialList { priority = LowPriority } priorityQueue.AddWithOpts(priorityqueue.AddOpts{Priority: priority}, item) diff --git a/vendor/sigs.k8s.io/controller-runtime/pkg/internal/controller/controller.go b/vendor/sigs.k8s.io/controller-runtime/pkg/internal/controller/controller.go index 3f8cfdaa0..9fa7ec71e 100644 --- a/vendor/sigs.k8s.io/controller-runtime/pkg/internal/controller/controller.go +++ b/vendor/sigs.k8s.io/controller-runtime/pkg/internal/controller/controller.go @@ -174,65 +174,12 @@ func (c *Controller[request]) Start(ctx context.Context) error { defer c.mu.Unlock() // TODO(pwittrock): Reconsider HandleCrash - defer utilruntime.HandleCrash() + defer utilruntime.HandleCrashWithLogger(c.LogConstructor(nil)) // NB(directxman12): launch the sources *before* trying to wait for the // caches to sync so that they have a chance to register their intended // caches. - errGroup := &errgroup.Group{} - for _, watch := range c.startWatches { - log := c.LogConstructor(nil) - _, ok := watch.(interface { - String() string - }) - - if !ok { - log = log.WithValues("source", fmt.Sprintf("%T", watch)) - } else { - log = log.WithValues("source", fmt.Sprintf("%s", watch)) - } - didStartSyncingSource := &atomic.Bool{} - errGroup.Go(func() error { - // Use a timeout for starting and syncing the source to avoid silently - // blocking startup indefinitely if it doesn't come up. - sourceStartCtx, cancel := context.WithTimeout(ctx, c.CacheSyncTimeout) - defer cancel() - - sourceStartErrChan := make(chan error, 1) // Buffer chan to not leak goroutine if we time out - go func() { - defer close(sourceStartErrChan) - log.Info("Starting EventSource") - if err := watch.Start(ctx, c.Queue); err != nil { - sourceStartErrChan <- err - return - } - syncingSource, ok := watch.(source.TypedSyncingSource[request]) - if !ok { - return - } - didStartSyncingSource.Store(true) - if err := syncingSource.WaitForSync(sourceStartCtx); err != nil { - err := fmt.Errorf("failed to wait for %s caches to sync %v: %w", c.Name, syncingSource, err) - log.Error(err, "Could not wait for Cache to sync") - sourceStartErrChan <- err - } - }() - - select { - case err := <-sourceStartErrChan: - return err - case <-sourceStartCtx.Done(): - if didStartSyncingSource.Load() { // We are racing with WaitForSync, wait for it to let it tell us what happened - return <-sourceStartErrChan - } - if ctx.Err() != nil { // Don't return an error if the root context got cancelled - return nil - } - return fmt.Errorf("timed out waiting for source %s to Start. Please ensure that its Start() method is non-blocking", watch) - } - }) - } - if err := errGroup.Wait(); err != nil { + if err := c.startEventSources(ctx); err != nil { return err } @@ -271,6 +218,65 @@ func (c *Controller[request]) Start(ctx context.Context) error { return nil } +// startEventSources launches all the sources registered with this controller and waits +// for them to sync. It returns an error if any of the sources fail to start or sync. +func (c *Controller[request]) startEventSources(ctx context.Context) error { + errGroup := &errgroup.Group{} + for _, watch := range c.startWatches { + log := c.LogConstructor(nil) + _, ok := watch.(interface { + String() string + }) + + if !ok { + log = log.WithValues("source", fmt.Sprintf("%T", watch)) + } else { + log = log.WithValues("source", fmt.Sprintf("%s", watch)) + } + didStartSyncingSource := &atomic.Bool{} + errGroup.Go(func() error { + // Use a timeout for starting and syncing the source to avoid silently + // blocking startup indefinitely if it doesn't come up. + sourceStartCtx, cancel := context.WithTimeout(ctx, c.CacheSyncTimeout) + defer cancel() + + sourceStartErrChan := make(chan error, 1) // Buffer chan to not leak goroutine if we time out + go func() { + defer close(sourceStartErrChan) + log.Info("Starting EventSource") + if err := watch.Start(ctx, c.Queue); err != nil { + sourceStartErrChan <- err + return + } + syncingSource, ok := watch.(source.TypedSyncingSource[request]) + if !ok { + return + } + didStartSyncingSource.Store(true) + if err := syncingSource.WaitForSync(sourceStartCtx); err != nil { + err := fmt.Errorf("failed to wait for %s caches to sync %v: %w", c.Name, syncingSource, err) + log.Error(err, "Could not wait for Cache to sync") + sourceStartErrChan <- err + } + }() + + select { + case err := <-sourceStartErrChan: + return err + case <-sourceStartCtx.Done(): + if didStartSyncingSource.Load() { // We are racing with WaitForSync, wait for it to let it tell us what happened + return <-sourceStartErrChan + } + if ctx.Err() != nil { // Don't return an error if the root context got cancelled + return nil + } + return fmt.Errorf("timed out waiting for source %s to Start. Please ensure that its Start() method is non-blocking", watch) + } + }) + } + return errGroup.Wait() +} + // processNextWorkItem will read a single work item off the workqueue and // attempt to process it, by calling the reconcileHandler. func (c *Controller[request]) processNextWorkItem(ctx context.Context) bool { @@ -354,7 +360,7 @@ func (c *Controller[request]) reconcileHandler(ctx context.Context, req request, c.Queue.Forget(req) c.Queue.AddWithOpts(priorityqueue.AddOpts{After: result.RequeueAfter, Priority: priority}, req) ctrlmetrics.ReconcileTotal.WithLabelValues(c.Name, labelRequeueAfter).Inc() - case result.Requeue: + case result.Requeue: //nolint: staticcheck // We have to handle it until it is removed log.V(5).Info("Reconcile done, requeueing") c.Queue.AddWithOpts(priorityqueue.AddOpts{RateLimited: true, Priority: priority}, req) ctrlmetrics.ReconcileTotal.WithLabelValues(c.Name, labelRequeue).Inc() diff --git a/vendor/sigs.k8s.io/controller-runtime/pkg/internal/controller/metrics/metrics.go b/vendor/sigs.k8s.io/controller-runtime/pkg/internal/controller/metrics/metrics.go index fbf15669d..450e9ae25 100644 --- a/vendor/sigs.k8s.io/controller-runtime/pkg/internal/controller/metrics/metrics.go +++ b/vendor/sigs.k8s.io/controller-runtime/pkg/internal/controller/metrics/metrics.go @@ -17,6 +17,8 @@ limitations under the License. package metrics import ( + "time" + "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/collectors" "sigs.k8s.io/controller-runtime/pkg/metrics" @@ -60,6 +62,9 @@ var ( Help: "Length of time per reconciliation per controller", Buckets: []float64{0.005, 0.01, 0.025, 0.05, 0.1, 0.15, 0.2, 0.25, 0.3, 0.35, 0.4, 0.45, 0.5, 0.6, 0.7, 0.8, 0.9, 1.0, 1.25, 1.5, 1.75, 2.0, 2.5, 3.0, 3.5, 4.0, 4.5, 5, 6, 7, 8, 9, 10, 15, 20, 25, 30, 40, 50, 60}, + NativeHistogramBucketFactor: 1.1, + NativeHistogramMaxBucketNumber: 100, + NativeHistogramMinResetDuration: 1 * time.Hour, }, []string{"controller"}) // WorkerCount is a prometheus metric which holds the number of @@ -88,7 +93,7 @@ func init() { ActiveWorkers, // expose process metrics like CPU, Memory, file descriptor usage etc. collectors.NewProcessCollector(collectors.ProcessCollectorOpts{}), - // expose Go runtime metrics like GC stats, memory stats etc. - collectors.NewGoCollector(), + // expose all Go runtime metrics like GC stats, memory stats etc. + collectors.NewGoCollector(collectors.WithGoCollectorRuntimeMetrics(collectors.MetricsAll)), ) } diff --git a/vendor/sigs.k8s.io/controller-runtime/pkg/internal/metrics/workqueue.go b/vendor/sigs.k8s.io/controller-runtime/pkg/internal/metrics/workqueue.go index 86da340af..402319817 100644 --- a/vendor/sigs.k8s.io/controller-runtime/pkg/internal/metrics/workqueue.go +++ b/vendor/sigs.k8s.io/controller-runtime/pkg/internal/metrics/workqueue.go @@ -17,6 +17,9 @@ limitations under the License. package metrics import ( + "strconv" + "time" + "github.com/prometheus/client_golang/prometheus" "k8s.io/client-go/util/workqueue" "sigs.k8s.io/controller-runtime/pkg/metrics" @@ -42,8 +45,8 @@ var ( depth = prometheus.NewGaugeVec(prometheus.GaugeOpts{ Subsystem: WorkQueueSubsystem, Name: DepthKey, - Help: "Current depth of workqueue", - }, []string{"name", "controller"}) + Help: "Current depth of workqueue by workqueue and priority", + }, []string{"name", "controller", "priority"}) adds = prometheus.NewCounterVec(prometheus.CounterOpts{ Subsystem: WorkQueueSubsystem, @@ -52,17 +55,23 @@ var ( }, []string{"name", "controller"}) latency = prometheus.NewHistogramVec(prometheus.HistogramOpts{ - Subsystem: WorkQueueSubsystem, - Name: QueueLatencyKey, - Help: "How long in seconds an item stays in workqueue before being requested", - Buckets: prometheus.ExponentialBuckets(10e-9, 10, 12), + Subsystem: WorkQueueSubsystem, + Name: QueueLatencyKey, + Help: "How long in seconds an item stays in workqueue before being requested", + Buckets: prometheus.ExponentialBuckets(10e-9, 10, 12), + NativeHistogramBucketFactor: 1.1, + NativeHistogramMaxBucketNumber: 100, + NativeHistogramMinResetDuration: 1 * time.Hour, }, []string{"name", "controller"}) workDuration = prometheus.NewHistogramVec(prometheus.HistogramOpts{ - Subsystem: WorkQueueSubsystem, - Name: WorkDurationKey, - Help: "How long in seconds processing an item from workqueue takes.", - Buckets: prometheus.ExponentialBuckets(10e-9, 10, 12), + Subsystem: WorkQueueSubsystem, + Name: WorkDurationKey, + Help: "How long in seconds processing an item from workqueue takes.", + Buckets: prometheus.ExponentialBuckets(10e-9, 10, 12), + NativeHistogramBucketFactor: 1.1, + NativeHistogramMaxBucketNumber: 100, + NativeHistogramMinResetDuration: 1 * time.Hour, }, []string{"name", "controller"}) unfinished = prometheus.NewGaugeVec(prometheus.GaugeOpts{ @@ -103,7 +112,7 @@ func init() { type WorkqueueMetricsProvider struct{} func (WorkqueueMetricsProvider) NewDepthMetric(name string) workqueue.GaugeMetric { - return depth.WithLabelValues(name, name) + return depth.WithLabelValues(name, name, "") // no priority } func (WorkqueueMetricsProvider) NewAddsMetric(name string) workqueue.CounterMetric { @@ -129,3 +138,33 @@ func (WorkqueueMetricsProvider) NewLongestRunningProcessorSecondsMetric(name str func (WorkqueueMetricsProvider) NewRetriesMetric(name string) workqueue.CounterMetric { return retries.WithLabelValues(name, name) } + +type MetricsProviderWithPriority interface { + workqueue.MetricsProvider + + NewDepthMetricWithPriority(name string) DepthMetricWithPriority +} + +// DepthMetricWithPriority represents a depth metric with priority. +type DepthMetricWithPriority interface { + Inc(priority int) + Dec(priority int) +} + +var _ MetricsProviderWithPriority = WorkqueueMetricsProvider{} + +func (WorkqueueMetricsProvider) NewDepthMetricWithPriority(name string) DepthMetricWithPriority { + return &depthWithPriorityMetric{lvs: []string{name, name}} +} + +type depthWithPriorityMetric struct { + lvs []string +} + +func (g *depthWithPriorityMetric) Inc(priority int) { + depth.WithLabelValues(append(g.lvs, strconv.Itoa(priority))...).Inc() +} + +func (g *depthWithPriorityMetric) Dec(priority int) { + depth.WithLabelValues(append(g.lvs, strconv.Itoa(priority))...).Dec() +} diff --git a/vendor/sigs.k8s.io/controller-runtime/pkg/internal/source/event_handler.go b/vendor/sigs.k8s.io/controller-runtime/pkg/internal/source/event_handler.go index 38432a1a7..7cc8c5155 100644 --- a/vendor/sigs.k8s.io/controller-runtime/pkg/internal/source/event_handler.go +++ b/vendor/sigs.k8s.io/controller-runtime/pkg/internal/source/event_handler.go @@ -32,6 +32,8 @@ import ( var log = logf.RuntimeLog.WithName("source").WithName("EventHandler") +var _ cache.ResourceEventHandler = &EventHandler[client.Object, any]{} + // NewEventHandler creates a new EventHandler. func NewEventHandler[object client.Object, request comparable]( ctx context.Context, @@ -57,19 +59,11 @@ type EventHandler[object client.Object, request comparable] struct { predicates []predicate.TypedPredicate[object] } -// HandlerFuncs converts EventHandler to a ResourceEventHandlerFuncs -// TODO: switch to ResourceEventHandlerDetailedFuncs with client-go 1.27 -func (e *EventHandler[object, request]) HandlerFuncs() cache.ResourceEventHandlerFuncs { - return cache.ResourceEventHandlerFuncs{ - AddFunc: e.OnAdd, - UpdateFunc: e.OnUpdate, - DeleteFunc: e.OnDelete, - } -} - // OnAdd creates CreateEvent and calls Create on EventHandler. -func (e *EventHandler[object, request]) OnAdd(obj interface{}) { - c := event.TypedCreateEvent[object]{} +func (e *EventHandler[object, request]) OnAdd(obj interface{}, isInInitialList bool) { + c := event.TypedCreateEvent[object]{ + IsInInitialList: isInInitialList, + } // Pull Object out of the object if o, ok := obj.(object); ok { diff --git a/vendor/sigs.k8s.io/controller-runtime/pkg/internal/source/kind.go b/vendor/sigs.k8s.io/controller-runtime/pkg/internal/source/kind.go index 2fdfbde8e..285424452 100644 --- a/vendor/sigs.k8s.io/controller-runtime/pkg/internal/source/kind.go +++ b/vendor/sigs.k8s.io/controller-runtime/pkg/internal/source/kind.go @@ -10,7 +10,9 @@ import ( "k8s.io/apimachinery/pkg/api/meta" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/util/wait" + toolscache "k8s.io/client-go/tools/cache" "k8s.io/client-go/util/workqueue" + logf "sigs.k8s.io/controller-runtime/pkg/internal/log" "sigs.k8s.io/controller-runtime/pkg/cache" "sigs.k8s.io/controller-runtime/pkg/client" @@ -18,6 +20,8 @@ import ( "sigs.k8s.io/controller-runtime/pkg/predicate" ) +var logKind = logf.RuntimeLog.WithName("source").WithName("Kind") + // Kind is used to provide a source of events originating inside the cluster from Watches (e.g. Pod Create). type Kind[object client.Object, request comparable] struct { // Type is the type of object to watch. e.g. &v1.Pod{} @@ -68,12 +72,12 @@ func (ks *Kind[object, request]) Start(ctx context.Context, queue workqueue.Type kindMatchErr := &meta.NoKindMatchError{} switch { case errors.As(lastErr, &kindMatchErr): - log.Error(lastErr, "if kind is a CRD, it should be installed before calling Start", + logKind.Error(lastErr, "if kind is a CRD, it should be installed before calling Start", "kind", kindMatchErr.GroupKind) case runtime.IsNotRegisteredError(lastErr): - log.Error(lastErr, "kind must be registered to the Scheme") + logKind.Error(lastErr, "kind must be registered to the Scheme") default: - log.Error(lastErr, "failed to get informer from cache") + logKind.Error(lastErr, "failed to get informer from cache") } return false, nil // Retry. } @@ -87,7 +91,9 @@ func (ks *Kind[object, request]) Start(ctx context.Context, queue workqueue.Type return } - _, err := i.AddEventHandler(NewEventHandler(ctx, queue, ks.Handler, ks.Predicates).HandlerFuncs()) + _, err := i.AddEventHandlerWithOptions(NewEventHandler(ctx, queue, ks.Handler, ks.Predicates), toolscache.HandlerOptions{ + Logger: &logKind, + }) if err != nil { ks.startedErr <- err return diff --git a/vendor/sigs.k8s.io/controller-runtime/pkg/log/warning_handler.go b/vendor/sigs.k8s.io/controller-runtime/pkg/log/warning_handler.go index e9522632d..413b56d2e 100644 --- a/vendor/sigs.k8s.io/controller-runtime/pkg/log/warning_handler.go +++ b/vendor/sigs.k8s.io/controller-runtime/pkg/log/warning_handler.go @@ -17,13 +17,12 @@ limitations under the License. package log import ( + "context" "sync" - - "github.com/go-logr/logr" ) // KubeAPIWarningLoggerOptions controls the behavior -// of a rest.WarningHandler constructed using NewKubeAPIWarningLogger(). +// of a rest.WarningHandlerWithContext constructed using NewKubeAPIWarningLogger(). type KubeAPIWarningLoggerOptions struct { // Deduplicate indicates a given warning message should only be written once. // Setting this to true in a long-running process handling many warnings can @@ -33,10 +32,8 @@ type KubeAPIWarningLoggerOptions struct { // KubeAPIWarningLogger is a wrapper around // a provided logr.Logger that implements the -// rest.WarningHandler interface. +// rest.WarningHandlerWithContext interface. type KubeAPIWarningLogger struct { - // logger is used to log responses with the warning header - logger logr.Logger // opts contain options controlling warning output opts KubeAPIWarningLoggerOptions // writtenLock gurads written @@ -46,9 +43,11 @@ type KubeAPIWarningLogger struct { written map[string]struct{} } -// HandleWarningHeader handles logging for responses from API server that are -// warnings with code being 299 and uses a logr.Logger for its logging purposes. -func (l *KubeAPIWarningLogger) HandleWarningHeader(code int, agent string, message string) { +// HandleWarningHeaderWithContext handles logging for responses from API server that are +// warnings with code being 299 and uses a logr.Logger from context for its logging purposes. +func (l *KubeAPIWarningLogger) HandleWarningHeaderWithContext(ctx context.Context, code int, _ string, message string) { + log := FromContext(ctx) + if code != 299 || len(message) == 0 { return } @@ -62,13 +61,13 @@ func (l *KubeAPIWarningLogger) HandleWarningHeader(code int, agent string, messa } l.written[message] = struct{}{} } - l.logger.Info(message) + log.Info(message) } -// NewKubeAPIWarningLogger returns an implementation of rest.WarningHandler that logs warnings -// with code = 299 to the provided logr.Logger. -func NewKubeAPIWarningLogger(l logr.Logger, opts KubeAPIWarningLoggerOptions) *KubeAPIWarningLogger { - h := &KubeAPIWarningLogger{logger: l, opts: opts} +// NewKubeAPIWarningLogger returns an implementation of rest.WarningHandlerWithContext that logs warnings +// with code = 299 to the logger passed into HandleWarningHeaderWithContext via the context. +func NewKubeAPIWarningLogger(opts KubeAPIWarningLoggerOptions) *KubeAPIWarningLogger { + h := &KubeAPIWarningLogger{opts: opts} if opts.Deduplicate { h.written = map[string]struct{}{} } diff --git a/vendor/sigs.k8s.io/controller-runtime/pkg/manager/manager.go b/vendor/sigs.k8s.io/controller-runtime/pkg/manager/manager.go index 92906fe6c..c3ae317b0 100644 --- a/vendor/sigs.k8s.io/controller-runtime/pkg/manager/manager.go +++ b/vendor/sigs.k8s.io/controller-runtime/pkg/manager/manager.go @@ -544,6 +544,10 @@ func setOptionsDefaults(options Options) Options { options.Logger = log.Log } + if options.Controller.Logger.GetSink() == nil { + options.Controller.Logger = options.Logger + } + if options.BaseContext == nil { options.BaseContext = defaultBaseContext } diff --git a/vendor/sigs.k8s.io/controller-runtime/pkg/reconcile/reconcile.go b/vendor/sigs.k8s.io/controller-runtime/pkg/reconcile/reconcile.go index ee63f681c..c98b1864e 100644 --- a/vendor/sigs.k8s.io/controller-runtime/pkg/reconcile/reconcile.go +++ b/vendor/sigs.k8s.io/controller-runtime/pkg/reconcile/reconcile.go @@ -28,7 +28,17 @@ import ( // Result contains the result of a Reconciler invocation. type Result struct { - // Requeue tells the Controller to requeue the reconcile key. Defaults to false. + // Requeue tells the Controller to perform a ratelimited requeue + // using the workqueues ratelimiter. Defaults to false. + // + // This setting is deprecated as it causes confusion and there is + // no good reason to use it. When waiting for an external event to + // happen, either the duration until it is supposed to happen or an + // appropriate poll interval should be used, rather than an + // interval emitted by a ratelimiter whose purpose it is to control + // retry on error. + // + // Deprecated: Use `RequeueAfter` instead. Requeue bool // RequeueAfter if greater than 0, tells the Controller to requeue the reconcile key after the Duration. diff --git a/vendor/sigs.k8s.io/controller-runtime/pkg/source/source.go b/vendor/sigs.k8s.io/controller-runtime/pkg/source/source.go index 267a6470b..c2c2dc4e0 100644 --- a/vendor/sigs.k8s.io/controller-runtime/pkg/source/source.go +++ b/vendor/sigs.k8s.io/controller-runtime/pkg/source/source.go @@ -22,11 +22,13 @@ import ( "fmt" "sync" + toolscache "k8s.io/client-go/tools/cache" "k8s.io/client-go/util/workqueue" "k8s.io/utils/ptr" "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/event" "sigs.k8s.io/controller-runtime/pkg/handler" + logf "sigs.k8s.io/controller-runtime/pkg/internal/log" internal "sigs.k8s.io/controller-runtime/pkg/internal/source" "sigs.k8s.io/controller-runtime/pkg/reconcile" @@ -34,6 +36,8 @@ import ( "sigs.k8s.io/controller-runtime/pkg/predicate" ) +var logInformer = logf.RuntimeLog.WithName("source").WithName("Informer") + // Source is a source of events (e.g. Create, Update, Delete operations on Kubernetes Objects, Webhook callbacks, etc) // which should be processed by event.EventHandlers to enqueue reconcile.Requests. // @@ -282,7 +286,9 @@ func (is *Informer) Start(ctx context.Context, queue workqueue.TypedRateLimiting return errors.New("must specify Informer.Handler") } - _, err := is.Informer.AddEventHandler(internal.NewEventHandler(ctx, queue, is.Handler, is.Predicates).HandlerFuncs()) + _, err := is.Informer.AddEventHandlerWithOptions(internal.NewEventHandler(ctx, queue, is.Handler, is.Predicates), toolscache.HandlerOptions{ + Logger: &logInformer, + }) if err != nil { return err } diff --git a/vendor/sigs.k8s.io/controller-runtime/pkg/webhook/admission/multi.go b/vendor/sigs.k8s.io/controller-runtime/pkg/webhook/admission/multi.go index 2f7820d04..ef9c45624 100644 --- a/vendor/sigs.k8s.io/controller-runtime/pkg/webhook/admission/multi.go +++ b/vendor/sigs.k8s.io/controller-runtime/pkg/webhook/admission/multi.go @@ -31,6 +31,7 @@ type multiMutating []Handler func (hs multiMutating) Handle(ctx context.Context, req Request) Response { patches := []jsonpatch.JsonPatchOperation{} + warnings := []string{} for _, handler := range hs { resp := handler.Handle(ctx, req) if !resp.Allowed { @@ -42,6 +43,7 @@ func (hs multiMutating) Handle(ctx context.Context, req Request) Response { resp.PatchType, admissionv1.PatchTypeJSONPatch)) } patches = append(patches, resp.Patches...) + warnings = append(warnings, resp.Warnings...) } var err error marshaledPatch, err := json.Marshal(patches) @@ -55,6 +57,7 @@ func (hs multiMutating) Handle(ctx context.Context, req Request) Response { Code: http.StatusOK, }, Patch: marshaledPatch, + Warnings: warnings, PatchType: func() *admissionv1.PatchType { pt := admissionv1.PatchTypeJSONPatch; return &pt }(), }, } @@ -71,11 +74,13 @@ func MultiMutatingHandler(handlers ...Handler) Handler { type multiValidating []Handler func (hs multiValidating) Handle(ctx context.Context, req Request) Response { + warnings := []string{} for _, handler := range hs { resp := handler.Handle(ctx, req) if !resp.Allowed { return resp } + warnings = append(warnings, resp.Warnings...) } return Response{ AdmissionResponse: admissionv1.AdmissionResponse{ @@ -83,6 +88,7 @@ func (hs multiValidating) Handle(ctx context.Context, req Request) Response { Result: &metav1.Status{ Code: http.StatusOK, }, + Warnings: warnings, }, } } diff --git a/vendor/sigs.k8s.io/controller-runtime/pkg/webhook/internal/metrics/metrics.go b/vendor/sigs.k8s.io/controller-runtime/pkg/webhook/internal/metrics/metrics.go index 557004908..f1e6ce68f 100644 --- a/vendor/sigs.k8s.io/controller-runtime/pkg/webhook/internal/metrics/metrics.go +++ b/vendor/sigs.k8s.io/controller-runtime/pkg/webhook/internal/metrics/metrics.go @@ -18,6 +18,7 @@ package metrics import ( "net/http" + "time" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promhttp" @@ -30,8 +31,11 @@ var ( // of processing admission requests. RequestLatency = prometheus.NewHistogramVec( prometheus.HistogramOpts{ - Name: "controller_runtime_webhook_latency_seconds", - Help: "Histogram of the latency of processing admission requests", + Name: "controller_runtime_webhook_latency_seconds", + Help: "Histogram of the latency of processing admission requests", + NativeHistogramBucketFactor: 1.1, + NativeHistogramMaxBucketNumber: 100, + NativeHistogramMinResetDuration: 1 * time.Hour, }, []string{"webhook"}, )