rebase: update controller-runtime

update controller-runtime to latest
release.

Signed-off-by: Madhu Rajanna <madhupr007@gmail.com>
This commit is contained in:
Madhu Rajanna 2024-08-19 10:02:11 +02:00 committed by mergify[bot]
parent 5a66991bb3
commit dbbca6ebf8
28 changed files with 739 additions and 361 deletions

4
go.mod
View File

@ -42,7 +42,7 @@ require (
k8s.io/mount-utils v0.31.0 k8s.io/mount-utils v0.31.0
k8s.io/pod-security-admission v0.31.0 k8s.io/pod-security-admission v0.31.0
k8s.io/utils v0.0.0-20240711033017-18e509b52bc8 k8s.io/utils v0.0.0-20240711033017-18e509b52bc8
sigs.k8s.io/controller-runtime v0.18.5 sigs.k8s.io/controller-runtime v0.19.0
) )
require ( require (
@ -172,7 +172,7 @@ require (
gopkg.in/natefinch/lumberjack.v2 v2.2.1 // indirect gopkg.in/natefinch/lumberjack.v2 v2.2.1 // indirect
gopkg.in/yaml.v2 v2.4.0 // indirect gopkg.in/yaml.v2 v2.4.0 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect gopkg.in/yaml.v3 v3.0.1 // indirect
k8s.io/apiextensions-apiserver v0.30.1 // indirect k8s.io/apiextensions-apiserver v0.31.0 // indirect
k8s.io/apiserver v0.31.0 // indirect k8s.io/apiserver v0.31.0 // indirect
k8s.io/component-base v0.31.0 // indirect k8s.io/component-base v0.31.0 // indirect
k8s.io/component-helpers v0.31.0 // indirect k8s.io/component-helpers v0.31.0 // indirect

4
go.sum
View File

@ -3483,8 +3483,8 @@ rsc.io/sampler v1.3.0/go.mod h1:T1hPZKmBbMNahiBKFy5HrXp6adAjACjK9JXDnKaTXpA=
sigs.k8s.io/apiserver-network-proxy/konnectivity-client v0.30.3 h1:2770sDpzrjjsAtVhSeUFseziht227YAWYHLGNM8QPwY= sigs.k8s.io/apiserver-network-proxy/konnectivity-client v0.30.3 h1:2770sDpzrjjsAtVhSeUFseziht227YAWYHLGNM8QPwY=
sigs.k8s.io/apiserver-network-proxy/konnectivity-client v0.30.3/go.mod h1:Ve9uj1L+deCXFrPOk1LpFXqTg7LCFzFso6PA48q/XZw= sigs.k8s.io/apiserver-network-proxy/konnectivity-client v0.30.3/go.mod h1:Ve9uj1L+deCXFrPOk1LpFXqTg7LCFzFso6PA48q/XZw=
sigs.k8s.io/controller-runtime v0.2.2/go.mod h1:9dyohw3ZtoXQuV1e766PHUn+cmrRCIcBh6XIMFNMZ+I= sigs.k8s.io/controller-runtime v0.2.2/go.mod h1:9dyohw3ZtoXQuV1e766PHUn+cmrRCIcBh6XIMFNMZ+I=
sigs.k8s.io/controller-runtime v0.18.5 h1:nTHio/W+Q4aBlQMgbnC5hZb4IjIidyrizMai9P6n4Rk= sigs.k8s.io/controller-runtime v0.19.0 h1:nWVM7aq+Il2ABxwiCizrVDSlmDcshi9llbaFbC0ji/Q=
sigs.k8s.io/controller-runtime v0.18.5/go.mod h1:TVoGrfdpbA9VRFaRnKgk9P5/atA0pMwq+f+msb9M8Sg= sigs.k8s.io/controller-runtime v0.19.0/go.mod h1:iRmWllt8IlaLjvTTDLhRBXIEtkCK6hwVBJJsYS9Ajf4=
sigs.k8s.io/json v0.0.0-20221116044647-bc3834ca7abd h1:EDPBXCAspyGV4jQlpZSudPeMmr1bNJefnuqLsRAsHZo= sigs.k8s.io/json v0.0.0-20221116044647-bc3834ca7abd h1:EDPBXCAspyGV4jQlpZSudPeMmr1bNJefnuqLsRAsHZo=
sigs.k8s.io/json v0.0.0-20221116044647-bc3834ca7abd/go.mod h1:B8JuhiUyNFVKdsE8h686QcCxMaH6HrOAZj4vswFpcB0= sigs.k8s.io/json v0.0.0-20221116044647-bc3834ca7abd/go.mod h1:B8JuhiUyNFVKdsE8h686QcCxMaH6HrOAZj4vswFpcB0=
sigs.k8s.io/structured-merge-diff/v4 v4.2.3/go.mod h1:qjx8mGObPmV2aSZepjQjbmb2ihdVs8cGKBraizNC69E= sigs.k8s.io/structured-merge-diff/v4 v4.2.3/go.mod h1:qjx8mGObPmV2aSZepjQjbmb2ihdVs8cGKBraizNC69E=

6
vendor/modules.txt vendored
View File

@ -1000,7 +1000,7 @@ k8s.io/api/storage/v1
k8s.io/api/storage/v1alpha1 k8s.io/api/storage/v1alpha1
k8s.io/api/storage/v1beta1 k8s.io/api/storage/v1beta1
k8s.io/api/storagemigration/v1alpha1 k8s.io/api/storagemigration/v1alpha1
# k8s.io/apiextensions-apiserver v0.30.1 => k8s.io/apiextensions-apiserver v0.31.0 # k8s.io/apiextensions-apiserver v0.31.0 => k8s.io/apiextensions-apiserver v0.31.0
## explicit; go 1.22.0 ## explicit; go 1.22.0
k8s.io/apiextensions-apiserver/pkg/apis/apiextensions k8s.io/apiextensions-apiserver/pkg/apis/apiextensions
k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1 k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1
@ -1767,7 +1767,7 @@ sigs.k8s.io/apiserver-network-proxy/konnectivity-client/pkg/client
sigs.k8s.io/apiserver-network-proxy/konnectivity-client/pkg/client/metrics sigs.k8s.io/apiserver-network-proxy/konnectivity-client/pkg/client/metrics
sigs.k8s.io/apiserver-network-proxy/konnectivity-client/pkg/common/metrics sigs.k8s.io/apiserver-network-proxy/konnectivity-client/pkg/common/metrics
sigs.k8s.io/apiserver-network-proxy/konnectivity-client/proto/client sigs.k8s.io/apiserver-network-proxy/konnectivity-client/proto/client
# sigs.k8s.io/controller-runtime v0.18.5 # sigs.k8s.io/controller-runtime v0.19.0
## explicit; go 1.22.0 ## explicit; go 1.22.0
sigs.k8s.io/controller-runtime/pkg/cache sigs.k8s.io/controller-runtime/pkg/cache
sigs.k8s.io/controller-runtime/pkg/cache/internal sigs.k8s.io/controller-runtime/pkg/cache/internal
@ -1797,12 +1797,12 @@ sigs.k8s.io/controller-runtime/pkg/manager/signals
sigs.k8s.io/controller-runtime/pkg/metrics sigs.k8s.io/controller-runtime/pkg/metrics
sigs.k8s.io/controller-runtime/pkg/metrics/server sigs.k8s.io/controller-runtime/pkg/metrics/server
sigs.k8s.io/controller-runtime/pkg/predicate sigs.k8s.io/controller-runtime/pkg/predicate
sigs.k8s.io/controller-runtime/pkg/ratelimiter
sigs.k8s.io/controller-runtime/pkg/reconcile sigs.k8s.io/controller-runtime/pkg/reconcile
sigs.k8s.io/controller-runtime/pkg/recorder sigs.k8s.io/controller-runtime/pkg/recorder
sigs.k8s.io/controller-runtime/pkg/source sigs.k8s.io/controller-runtime/pkg/source
sigs.k8s.io/controller-runtime/pkg/webhook sigs.k8s.io/controller-runtime/pkg/webhook
sigs.k8s.io/controller-runtime/pkg/webhook/admission sigs.k8s.io/controller-runtime/pkg/webhook/admission
sigs.k8s.io/controller-runtime/pkg/webhook/admission/metrics
sigs.k8s.io/controller-runtime/pkg/webhook/internal/metrics sigs.k8s.io/controller-runtime/pkg/webhook/internal/metrics
# sigs.k8s.io/json v0.0.0-20221116044647-bc3834ca7abd # sigs.k8s.io/json v0.0.0-20221116044647-bc3834ca7abd
## explicit; go 1.18 ## explicit; go 1.18

View File

@ -117,8 +117,8 @@ type Informer interface {
// This function is guaranteed to be idempotent and thread-safe. // This function is guaranteed to be idempotent and thread-safe.
RemoveEventHandler(handle toolscache.ResourceEventHandlerRegistration) error RemoveEventHandler(handle toolscache.ResourceEventHandlerRegistration) error
// AddIndexers adds indexers to this store. If this is called after there is already data // AddIndexers adds indexers to this store. It is valid to add indexers
// in the store, the results are undefined. // after an informer was started.
AddIndexers(indexers toolscache.Indexers) error AddIndexers(indexers toolscache.Indexers) error
// HasSynced return true if the informers underlying store has synced. // HasSynced return true if the informers underlying store has synced.

View File

@ -72,7 +72,10 @@ func IsObjectNamespaced(obj runtime.Object, scheme *runtime.Scheme, restmapper m
// IsGVKNamespaced returns true if the object having the provided // IsGVKNamespaced returns true if the object having the provided
// GVK is namespace scoped. // GVK is namespace scoped.
func IsGVKNamespaced(gvk schema.GroupVersionKind, restmapper meta.RESTMapper) (bool, error) { func IsGVKNamespaced(gvk schema.GroupVersionKind, restmapper meta.RESTMapper) (bool, error) {
restmapping, err := restmapper.RESTMapping(schema.GroupKind{Group: gvk.Group, Kind: gvk.Kind}) // Fetch the RESTMapping using the complete GVK. If we exclude the Version, the Version set
// will be populated using the cached Group if available. This can lead to failures updating
// the cache with new Versions of CRDs registered at runtime.
restmapping, err := restmapper.RESTMapping(schema.GroupKind{Group: gvk.Group, Kind: gvk.Kind}, gvk.Version)
if err != nil { if err != nil {
return false, fmt.Errorf("failed to get restmapping: %w", err) return false, fmt.Errorf("failed to get restmapping: %w", err)
} }

View File

@ -50,28 +50,10 @@ type Options struct {
// Cache, if provided, is used to read objects from the cache. // Cache, if provided, is used to read objects from the cache.
Cache *CacheOptions Cache *CacheOptions
// WarningHandler is used to configure the warning handler responsible for
// surfacing and handling warnings messages sent by the API server.
WarningHandler WarningHandlerOptions
// DryRun instructs the client to only perform dry run requests. // DryRun instructs the client to only perform dry run requests.
DryRun *bool DryRun *bool
} }
// WarningHandlerOptions are options for configuring a
// warning handler for the client which is responsible
// for surfacing API Server warnings.
type WarningHandlerOptions struct {
// SuppressWarnings decides if the warnings from the
// API server are suppressed or surfaced in the client.
SuppressWarnings bool
// AllowDuplicateLogs does not deduplicate the to-be
// logged surfaced warnings messages. See
// log.WarningHandlerOptions for considerations
// regarding deduplication
AllowDuplicateLogs bool
}
// CacheOptions are options for creating a cache-backed client. // CacheOptions are options for creating a cache-backed client.
type CacheOptions struct { type CacheOptions struct {
// Reader is a cache-backed reader that will be used to read objects from the cache. // Reader is a cache-backed reader that will be used to read objects from the cache.
@ -91,6 +73,12 @@ type NewClientFunc func(config *rest.Config, options Options) (Client, error)
// New returns a new Client using the provided config and Options. // New returns a new Client using the provided config and Options.
// //
// By default, the client surfaces warnings returned by the server. To
// suppress warnings, set config.WarningHandler = rest.NoWarnings{}. To
// define custom behavior, implement the rest.WarningHandler interface.
// See [sigs.k8s.io/controller-runtime/pkg/log.KubeAPIWarningLogger] for
// an example.
//
// The client's read behavior is determined by Options.Cache. // The client's read behavior is determined by Options.Cache.
// If either Options.Cache or Options.Cache.Reader is nil, // If either Options.Cache or Options.Cache.Reader is nil,
// the client reads directly from the API server. // the client reads directly from the API server.
@ -124,15 +112,14 @@ func newClient(config *rest.Config, options Options) (*client, error) {
config.UserAgent = rest.DefaultKubernetesUserAgent() config.UserAgent = rest.DefaultKubernetesUserAgent()
} }
if config.WarningHandler == nil {
// By default, we de-duplicate and surface warnings. // By default, we de-duplicate and surface warnings.
config.WarningHandler = log.NewKubeAPIWarningLogger( config.WarningHandler = log.NewKubeAPIWarningLogger(
log.Log.WithName("KubeAPIWarningLogger"), log.Log.WithName("KubeAPIWarningLogger"),
log.KubeAPIWarningLoggerOptions{ log.KubeAPIWarningLoggerOptions{
Deduplicate: !options.WarningHandler.AllowDuplicateLogs, Deduplicate: true,
}, },
) )
if options.WarningHandler.SuppressWarnings {
config.WarningHandler = rest.NoWarnings{}
} }
// Use the rest HTTP client for the provided config if unset // Use the rest HTTP client for the provided config if unset

View File

@ -0,0 +1,106 @@
/*
Copyright 2024 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package client
import (
"context"
"k8s.io/apimachinery/pkg/api/meta"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
)
// WithFieldValidation wraps a Client and configures field validation, by
// default, for all write requests from this client. Users can override field
// validation for individual write requests.
func WithFieldValidation(c Client, validation FieldValidation) Client {
return &clientWithFieldValidation{
validation: validation,
client: c,
Reader: c,
}
}
type clientWithFieldValidation struct {
validation FieldValidation
client Client
Reader
}
func (c *clientWithFieldValidation) Create(ctx context.Context, obj Object, opts ...CreateOption) error {
return c.client.Create(ctx, obj, append([]CreateOption{c.validation}, opts...)...)
}
func (c *clientWithFieldValidation) Update(ctx context.Context, obj Object, opts ...UpdateOption) error {
return c.client.Update(ctx, obj, append([]UpdateOption{c.validation}, opts...)...)
}
func (c *clientWithFieldValidation) Patch(ctx context.Context, obj Object, patch Patch, opts ...PatchOption) error {
return c.client.Patch(ctx, obj, patch, append([]PatchOption{c.validation}, opts...)...)
}
func (c *clientWithFieldValidation) Delete(ctx context.Context, obj Object, opts ...DeleteOption) error {
return c.client.Delete(ctx, obj, opts...)
}
func (c *clientWithFieldValidation) DeleteAllOf(ctx context.Context, obj Object, opts ...DeleteAllOfOption) error {
return c.client.DeleteAllOf(ctx, obj, opts...)
}
func (c *clientWithFieldValidation) Scheme() *runtime.Scheme { return c.client.Scheme() }
func (c *clientWithFieldValidation) RESTMapper() meta.RESTMapper { return c.client.RESTMapper() }
func (c *clientWithFieldValidation) GroupVersionKindFor(obj runtime.Object) (schema.GroupVersionKind, error) {
return c.client.GroupVersionKindFor(obj)
}
func (c *clientWithFieldValidation) IsObjectNamespaced(obj runtime.Object) (bool, error) {
return c.client.IsObjectNamespaced(obj)
}
func (c *clientWithFieldValidation) Status() StatusWriter {
return &subresourceClientWithFieldValidation{
validation: c.validation,
subresourceWriter: c.client.Status(),
}
}
func (c *clientWithFieldValidation) SubResource(subresource string) SubResourceClient {
srClient := c.client.SubResource(subresource)
return &subresourceClientWithFieldValidation{
validation: c.validation,
subresourceWriter: srClient,
SubResourceReader: srClient,
}
}
type subresourceClientWithFieldValidation struct {
validation FieldValidation
subresourceWriter SubResourceWriter
SubResourceReader
}
func (c *subresourceClientWithFieldValidation) Create(ctx context.Context, obj Object, subresource Object, opts ...SubResourceCreateOption) error {
return c.subresourceWriter.Create(ctx, obj, subresource, append([]SubResourceCreateOption{c.validation}, opts...)...)
}
func (c *subresourceClientWithFieldValidation) Update(ctx context.Context, obj Object, opts ...SubResourceUpdateOption) error {
return c.subresourceWriter.Update(ctx, obj, append([]SubResourceUpdateOption{c.validation}, opts...)...)
}
func (c *subresourceClientWithFieldValidation) Patch(ctx context.Context, obj Object, patch Patch, opts ...SubResourcePatchOption) error {
return c.subresourceWriter.Patch(ctx, obj, patch, append([]SubResourcePatchOption{c.validation}, opts...)...)
}

View File

@ -169,6 +169,39 @@ func (f FieldOwner) ApplyToSubResourceUpdate(opts *SubResourceUpdateOptions) {
opts.FieldManager = string(f) opts.FieldManager = string(f)
} }
// FieldValidation configures field validation for the given requests.
type FieldValidation string
// ApplyToPatch applies this configuration to the given patch options.
func (f FieldValidation) ApplyToPatch(opts *PatchOptions) {
opts.FieldValidation = string(f)
}
// ApplyToCreate applies this configuration to the given create options.
func (f FieldValidation) ApplyToCreate(opts *CreateOptions) {
opts.FieldValidation = string(f)
}
// ApplyToUpdate applies this configuration to the given update options.
func (f FieldValidation) ApplyToUpdate(opts *UpdateOptions) {
opts.FieldValidation = string(f)
}
// ApplyToSubResourcePatch applies this configuration to the given patch options.
func (f FieldValidation) ApplyToSubResourcePatch(opts *SubResourcePatchOptions) {
opts.FieldValidation = string(f)
}
// ApplyToSubResourceCreate applies this configuration to the given create options.
func (f FieldValidation) ApplyToSubResourceCreate(opts *SubResourceCreateOptions) {
opts.FieldValidation = string(f)
}
// ApplyToSubResourceUpdate applies this configuration to the given update options.
func (f FieldValidation) ApplyToSubResourceUpdate(opts *SubResourceUpdateOptions) {
opts.FieldValidation = string(f)
}
// }}} // }}}
// {{{ Create Options // {{{ Create Options
@ -187,6 +220,24 @@ type CreateOptions struct {
// this request. It must be set with server-side apply. // this request. It must be set with server-side apply.
FieldManager string FieldManager string
// fieldValidation instructs the server on how to handle
// objects in the request (POST/PUT/PATCH) containing unknown
// or duplicate fields. Valid values are:
// - Ignore: This will ignore any unknown fields that are silently
// dropped from the object, and will ignore all but the last duplicate
// field that the decoder encounters. This is the default behavior
// prior to v1.23.
// - Warn: This will send a warning via the standard warning response
// header for each unknown field that is dropped from the object, and
// for each duplicate field that is encountered. The request will
// still succeed if there are no other errors, and will only persist
// the last of any duplicate fields. This is the default in v1.23+
// - Strict: This will fail the request with a BadRequest error if
// any unknown fields would be dropped from the object, or if any
// duplicate fields are present. The error returned from the server
// will contain all unknown and duplicate fields encountered.
FieldValidation string
// Raw represents raw CreateOptions, as passed to the API server. // Raw represents raw CreateOptions, as passed to the API server.
Raw *metav1.CreateOptions Raw *metav1.CreateOptions
} }
@ -203,6 +254,7 @@ func (o *CreateOptions) AsCreateOptions() *metav1.CreateOptions {
o.Raw.DryRun = o.DryRun o.Raw.DryRun = o.DryRun
o.Raw.FieldManager = o.FieldManager o.Raw.FieldManager = o.FieldManager
o.Raw.FieldValidation = o.FieldValidation
return o.Raw return o.Raw
} }
@ -223,6 +275,9 @@ func (o *CreateOptions) ApplyToCreate(co *CreateOptions) {
if o.FieldManager != "" { if o.FieldManager != "" {
co.FieldManager = o.FieldManager co.FieldManager = o.FieldManager
} }
if o.FieldValidation != "" {
co.FieldValidation = o.FieldValidation
}
if o.Raw != nil { if o.Raw != nil {
co.Raw = o.Raw co.Raw = o.Raw
} }
@ -679,6 +734,24 @@ type UpdateOptions struct {
// this request. It must be set with server-side apply. // this request. It must be set with server-side apply.
FieldManager string FieldManager string
// fieldValidation instructs the server on how to handle
// objects in the request (POST/PUT/PATCH) containing unknown
// or duplicate fields. Valid values are:
// - Ignore: This will ignore any unknown fields that are silently
// dropped from the object, and will ignore all but the last duplicate
// field that the decoder encounters. This is the default behavior
// prior to v1.23.
// - Warn: This will send a warning via the standard warning response
// header for each unknown field that is dropped from the object, and
// for each duplicate field that is encountered. The request will
// still succeed if there are no other errors, and will only persist
// the last of any duplicate fields. This is the default in v1.23+
// - Strict: This will fail the request with a BadRequest error if
// any unknown fields would be dropped from the object, or if any
// duplicate fields are present. The error returned from the server
// will contain all unknown and duplicate fields encountered.
FieldValidation string
// Raw represents raw UpdateOptions, as passed to the API server. // Raw represents raw UpdateOptions, as passed to the API server.
Raw *metav1.UpdateOptions Raw *metav1.UpdateOptions
} }
@ -695,6 +768,7 @@ func (o *UpdateOptions) AsUpdateOptions() *metav1.UpdateOptions {
o.Raw.DryRun = o.DryRun o.Raw.DryRun = o.DryRun
o.Raw.FieldManager = o.FieldManager o.Raw.FieldManager = o.FieldManager
o.Raw.FieldValidation = o.FieldValidation
return o.Raw return o.Raw
} }
@ -717,6 +791,9 @@ func (o *UpdateOptions) ApplyToUpdate(uo *UpdateOptions) {
if o.FieldManager != "" { if o.FieldManager != "" {
uo.FieldManager = o.FieldManager uo.FieldManager = o.FieldManager
} }
if o.FieldValidation != "" {
uo.FieldValidation = o.FieldValidation
}
if o.Raw != nil { if o.Raw != nil {
uo.Raw = o.Raw uo.Raw = o.Raw
} }
@ -745,6 +822,24 @@ type PatchOptions struct {
// this request. It must be set with server-side apply. // this request. It must be set with server-side apply.
FieldManager string FieldManager string
// fieldValidation instructs the server on how to handle
// objects in the request (POST/PUT/PATCH) containing unknown
// or duplicate fields. Valid values are:
// - Ignore: This will ignore any unknown fields that are silently
// dropped from the object, and will ignore all but the last duplicate
// field that the decoder encounters. This is the default behavior
// prior to v1.23.
// - Warn: This will send a warning via the standard warning response
// header for each unknown field that is dropped from the object, and
// for each duplicate field that is encountered. The request will
// still succeed if there are no other errors, and will only persist
// the last of any duplicate fields. This is the default in v1.23+
// - Strict: This will fail the request with a BadRequest error if
// any unknown fields would be dropped from the object, or if any
// duplicate fields are present. The error returned from the server
// will contain all unknown and duplicate fields encountered.
FieldValidation string
// Raw represents raw PatchOptions, as passed to the API server. // Raw represents raw PatchOptions, as passed to the API server.
Raw *metav1.PatchOptions Raw *metav1.PatchOptions
} }
@ -771,6 +866,7 @@ func (o *PatchOptions) AsPatchOptions() *metav1.PatchOptions {
o.Raw.DryRun = o.DryRun o.Raw.DryRun = o.DryRun
o.Raw.Force = o.Force o.Raw.Force = o.Force
o.Raw.FieldManager = o.FieldManager o.Raw.FieldManager = o.FieldManager
o.Raw.FieldValidation = o.FieldValidation
return o.Raw return o.Raw
} }
@ -787,6 +883,9 @@ func (o *PatchOptions) ApplyToPatch(po *PatchOptions) {
if o.FieldManager != "" { if o.FieldManager != "" {
po.FieldManager = o.FieldManager po.FieldManager = o.FieldManager
} }
if o.FieldValidation != "" {
po.FieldValidation = o.FieldValidation
}
if o.Raw != nil { if o.Raw != nil {
po.Raw = o.Raw po.Raw = o.Raw
} }

View File

@ -20,6 +20,12 @@ import "time"
// Controller contains configuration options for a controller. // Controller contains configuration options for a controller.
type Controller struct { type Controller struct {
// SkipNameValidation allows skipping the name validation that ensures that every controller name is unique.
// Unique controller names are important to get unique metrics and logs for a controller.
// Can be overwritten for a controller via the SkipNameValidation setting on the controller.
// Defaults to false if SkipNameValidation setting on controller and Manager are unset.
SkipNameValidation *bool
// GroupKindConcurrency is a map from a Kind to the number of concurrent reconciliation // GroupKindConcurrency is a map from a Kind to the number of concurrent reconciliation
// allowed for that controller. // allowed for that controller.
// //
@ -40,7 +46,8 @@ type Controller struct {
CacheSyncTimeout time.Duration CacheSyncTimeout time.Duration
// RecoverPanic indicates whether the panic caused by reconcile should be recovered. // RecoverPanic indicates whether the panic caused by reconcile should be recovered.
// Defaults to the Controller.RecoverPanic setting from the Manager if unset. // Can be overwritten for a controller via the RecoverPanic setting on the controller.
// Defaults to true if RecoverPanic setting on controller and Manager are unset.
RecoverPanic *bool RecoverPanic *bool
// NeedLeaderElection indicates whether the controller needs to use leader election. // NeedLeaderElection indicates whether the controller needs to use leader election.

View File

@ -27,13 +27,21 @@ import (
"sigs.k8s.io/controller-runtime/pkg/internal/controller" "sigs.k8s.io/controller-runtime/pkg/internal/controller"
"sigs.k8s.io/controller-runtime/pkg/manager" "sigs.k8s.io/controller-runtime/pkg/manager"
"sigs.k8s.io/controller-runtime/pkg/ratelimiter"
"sigs.k8s.io/controller-runtime/pkg/reconcile" "sigs.k8s.io/controller-runtime/pkg/reconcile"
"sigs.k8s.io/controller-runtime/pkg/source" "sigs.k8s.io/controller-runtime/pkg/source"
) )
// Options are the arguments for creating a new Controller. // Options are the arguments for creating a new Controller.
type Options struct { type Options = TypedOptions[reconcile.Request]
// TypedOptions are the arguments for creating a new Controller.
type TypedOptions[request comparable] struct {
// SkipNameValidation allows skipping the name validation that ensures that every controller name is unique.
// Unique controller names are important to get unique metrics and logs for a controller.
// Defaults to the Controller.SkipNameValidation setting from the Manager if unset.
// Defaults to false if Controller.SkipNameValidation setting from the Manager is also unset.
SkipNameValidation *bool
// MaxConcurrentReconciles is the maximum number of concurrent Reconciles which can be run. Defaults to 1. // MaxConcurrentReconciles is the maximum number of concurrent Reconciles which can be run. Defaults to 1.
MaxConcurrentReconciles int MaxConcurrentReconciles int
@ -43,6 +51,7 @@ type Options struct {
// RecoverPanic indicates whether the panic caused by reconcile should be recovered. // RecoverPanic indicates whether the panic caused by reconcile should be recovered.
// Defaults to the Controller.RecoverPanic setting from the Manager if unset. // Defaults to the Controller.RecoverPanic setting from the Manager if unset.
// Defaults to true if Controller.RecoverPanic setting from the Manager is also unset.
RecoverPanic *bool RecoverPanic *bool
// NeedLeaderElection indicates whether the controller needs to use leader election. // NeedLeaderElection indicates whether the controller needs to use leader election.
@ -50,12 +59,12 @@ type Options struct {
NeedLeaderElection *bool NeedLeaderElection *bool
// Reconciler reconciles an object // Reconciler reconciles an object
Reconciler reconcile.Reconciler Reconciler reconcile.TypedReconciler[request]
// RateLimiter is used to limit how frequently requests may be queued. // RateLimiter is used to limit how frequently requests may be queued.
// Defaults to MaxOfRateLimiter which has both overall and per-item rate limiting. // Defaults to MaxOfRateLimiter which has both overall and per-item rate limiting.
// The overall is a token bucket and the per-item is exponential. // The overall is a token bucket and the per-item is exponential.
RateLimiter ratelimiter.RateLimiter RateLimiter workqueue.TypedRateLimiter[request]
// NewQueue constructs the queue for this controller once the controller is ready to start. // NewQueue constructs the queue for this controller once the controller is ready to start.
// With NewQueue a custom queue implementation can be used, e.g. a priority queue to prioritize with which // With NewQueue a custom queue implementation can be used, e.g. a priority queue to prioritize with which
@ -67,23 +76,26 @@ type Options struct {
// //
// NOTE: LOW LEVEL PRIMITIVE! // NOTE: LOW LEVEL PRIMITIVE!
// Only use a custom NewQueue if you know what you are doing. // Only use a custom NewQueue if you know what you are doing.
NewQueue func(controllerName string, rateLimiter ratelimiter.RateLimiter) workqueue.RateLimitingInterface NewQueue func(controllerName string, rateLimiter workqueue.TypedRateLimiter[request]) workqueue.TypedRateLimitingInterface[request]
// LogConstructor is used to construct a logger used for this controller and passed // LogConstructor is used to construct a logger used for this controller and passed
// to each reconciliation via the context field. // to each reconciliation via the context field.
LogConstructor func(request *reconcile.Request) logr.Logger LogConstructor func(request *request) logr.Logger
} }
// Controller implements a Kubernetes API. A Controller manages a work queue fed reconcile.Requests // Controller implements a Kubernetes API. A Controller manages a work queue fed reconcile.Requests
// from source.Sources. Work is performed through the reconcile.Reconciler for each enqueued item. // from source.Sources. Work is performed through the reconcile.Reconciler for each enqueued item.
// Work typically is reads and writes Kubernetes objects to make the system state match the state specified // Work typically is reads and writes Kubernetes objects to make the system state match the state specified
// in the object Spec. // in the object Spec.
type Controller interface { type Controller = TypedController[reconcile.Request]
// TypedController implements an API.
type TypedController[request comparable] interface {
// Reconciler is called to reconcile an object by Namespace/Name // Reconciler is called to reconcile an object by Namespace/Name
reconcile.Reconciler reconcile.TypedReconciler[request]
// Watch watches the provided Source. // Watch watches the provided Source.
Watch(src source.Source) error Watch(src source.TypedSource[request]) error
// Start starts the controller. Start blocks until the context is closed or a // Start starts the controller. Start blocks until the context is closed or a
// controller has an error starting. // controller has an error starting.
@ -95,8 +107,17 @@ type Controller interface {
// New returns a new Controller registered with the Manager. The Manager will ensure that shared Caches have // New returns a new Controller registered with the Manager. The Manager will ensure that shared Caches have
// been synced before the Controller is Started. // been synced before the Controller is Started.
//
// The name must be unique as it is used to identify the controller in metrics and logs.
func New(name string, mgr manager.Manager, options Options) (Controller, error) { func New(name string, mgr manager.Manager, options Options) (Controller, error) {
c, err := NewUnmanaged(name, mgr, options) return NewTyped(name, mgr, options)
}
// NewTyped returns a new typed controller registered with the Manager,
//
// The name must be unique as it is used to identify the controller in metrics and logs.
func NewTyped[request comparable](name string, mgr manager.Manager, options TypedOptions[request]) (TypedController[request], error) {
c, err := NewTypedUnmanaged(name, mgr, options)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -107,7 +128,16 @@ func New(name string, mgr manager.Manager, options Options) (Controller, error)
// NewUnmanaged returns a new controller without adding it to the manager. The // NewUnmanaged returns a new controller without adding it to the manager. The
// caller is responsible for starting the returned controller. // caller is responsible for starting the returned controller.
//
// The name must be unique as it is used to identify the controller in metrics and logs.
func NewUnmanaged(name string, mgr manager.Manager, options Options) (Controller, error) { func NewUnmanaged(name string, mgr manager.Manager, options Options) (Controller, error) {
return NewTypedUnmanaged(name, mgr, options)
}
// NewTypedUnmanaged returns a new typed controller without adding it to the manager.
//
// The name must be unique as it is used to identify the controller in metrics and logs.
func NewTypedUnmanaged[request comparable](name string, mgr manager.Manager, options TypedOptions[request]) (TypedController[request], error) {
if options.Reconciler == nil { if options.Reconciler == nil {
return nil, fmt.Errorf("must specify Reconciler") return nil, fmt.Errorf("must specify Reconciler")
} }
@ -116,13 +146,23 @@ func NewUnmanaged(name string, mgr manager.Manager, options Options) (Controller
return nil, fmt.Errorf("must specify Name for Controller") return nil, fmt.Errorf("must specify Name for Controller")
} }
if options.SkipNameValidation == nil {
options.SkipNameValidation = mgr.GetControllerOptions().SkipNameValidation
}
if options.SkipNameValidation == nil || !*options.SkipNameValidation {
if err := checkName(name); err != nil {
return nil, err
}
}
if options.LogConstructor == nil { if options.LogConstructor == nil {
log := mgr.GetLogger().WithValues( log := mgr.GetLogger().WithValues(
"controller", name, "controller", name,
) )
options.LogConstructor = func(req *reconcile.Request) logr.Logger { options.LogConstructor = func(in *request) logr.Logger {
log := log log := log
if req != nil { if req, ok := any(in).(*reconcile.Request); ok && req != nil {
log = log.WithValues( log = log.WithValues(
"object", klog.KRef(req.Namespace, req.Name), "object", klog.KRef(req.Namespace, req.Name),
"namespace", req.Namespace, "name", req.Name, "namespace", req.Namespace, "name", req.Name,
@ -149,12 +189,12 @@ func NewUnmanaged(name string, mgr manager.Manager, options Options) (Controller
} }
if options.RateLimiter == nil { if options.RateLimiter == nil {
options.RateLimiter = workqueue.DefaultControllerRateLimiter() options.RateLimiter = workqueue.DefaultTypedControllerRateLimiter[request]()
} }
if options.NewQueue == nil { if options.NewQueue == nil {
options.NewQueue = func(controllerName string, rateLimiter ratelimiter.RateLimiter) workqueue.RateLimitingInterface { options.NewQueue = func(controllerName string, rateLimiter workqueue.TypedRateLimiter[request]) workqueue.TypedRateLimitingInterface[request] {
return workqueue.NewRateLimitingQueueWithConfig(rateLimiter, workqueue.RateLimitingQueueConfig{ return workqueue.NewTypedRateLimitingQueueWithConfig(rateLimiter, workqueue.TypedRateLimitingQueueConfig[request]{
Name: controllerName, Name: controllerName,
}) })
} }
@ -169,7 +209,7 @@ func NewUnmanaged(name string, mgr manager.Manager, options Options) (Controller
} }
// Create controller with dependencies set // Create controller with dependencies set
return &controller.Controller{ return &controller.Controller[request]{
Do: options.Reconciler, Do: options.Reconciler,
RateLimiter: options.RateLimiter, RateLimiter: options.RateLimiter,
NewQueue: options.NewQueue, NewQueue: options.NewQueue,

View File

@ -14,9 +14,30 @@ See the License for the specific language governing permissions and
limitations under the License. limitations under the License.
*/ */
/* package controller
Package ratelimiter defines rate limiters used by Controllers to limit how frequently requests may be queued.
Typical rate limiters that can be used are implemented in client-go's workqueue package. import (
*/ "fmt"
package ratelimiter "sync"
"k8s.io/apimachinery/pkg/util/sets"
)
var nameLock sync.Mutex
var usedNames sets.Set[string]
func checkName(name string) error {
nameLock.Lock()
defer nameLock.Unlock()
if usedNames == nil {
usedNames = sets.Set[string]{}
}
if usedNames.Has(name) {
return fmt.Errorf("controller with name %s already exists. Controller names must be unique to avoid multiple controllers reporting to the same metric", name)
}
usedNames.Insert(name)
return nil
}

View File

@ -37,26 +37,26 @@ type GenericEvent = TypedGenericEvent[client.Object]
// TypedCreateEvent is an event where a Kubernetes object was created. TypedCreateEvent should be generated // TypedCreateEvent is an event where a Kubernetes object was created. TypedCreateEvent should be generated
// by a source.Source and transformed into a reconcile.Request by an handler.TypedEventHandler. // by a source.Source and transformed into a reconcile.Request by an handler.TypedEventHandler.
type TypedCreateEvent[T any] struct { type TypedCreateEvent[object any] struct {
// Object is the object from the event // Object is the object from the event
Object T Object object
} }
// TypedUpdateEvent is an event where a Kubernetes object was updated. TypedUpdateEvent should be generated // TypedUpdateEvent is an event where a Kubernetes object was updated. TypedUpdateEvent should be generated
// by a source.Source and transformed into a reconcile.Request by an handler.TypedEventHandler. // by a source.Source and transformed into a reconcile.Request by an handler.TypedEventHandler.
type TypedUpdateEvent[T any] struct { type TypedUpdateEvent[object any] struct {
// ObjectOld is the object from the event // ObjectOld is the object from the event
ObjectOld T ObjectOld object
// ObjectNew is the object from the event // ObjectNew is the object from the event
ObjectNew T ObjectNew object
} }
// TypedDeleteEvent is an event where a Kubernetes object was deleted. TypedDeleteEvent should be generated // TypedDeleteEvent is an event where a Kubernetes object was deleted. TypedDeleteEvent should be generated
// by a source.Source and transformed into a reconcile.Request by an handler.TypedEventHandler. // by a source.Source and transformed into a reconcile.Request by an handler.TypedEventHandler.
type TypedDeleteEvent[T any] struct { type TypedDeleteEvent[object any] struct {
// Object is the object from the event // Object is the object from the event
Object T Object object
// DeleteStateUnknown is true if the Delete event was missed but we identified the object // DeleteStateUnknown is true if the Delete event was missed but we identified the object
// as having been deleted. // as having been deleted.
@ -66,7 +66,7 @@ type TypedDeleteEvent[T any] struct {
// TypedGenericEvent is an event where the operation type is unknown (e.g. polling or event originating outside the cluster). // TypedGenericEvent is an event where the operation type is unknown (e.g. polling or event originating outside the cluster).
// TypedGenericEvent should be generated by a source.Source and transformed into a reconcile.Request by an // TypedGenericEvent should be generated by a source.Source and transformed into a reconcile.Request by an
// handler.TypedEventHandler. // handler.TypedEventHandler.
type TypedGenericEvent[T any] struct { type TypedGenericEvent[object any] struct {
// Object is the object from the event // Object is the object from the event
Object T Object object
} }

View File

@ -44,10 +44,10 @@ type EnqueueRequestForObject = TypedEnqueueRequestForObject[client.Object]
// Controllers that have associated Resources (e.g. CRDs) to reconcile the associated Resource. // Controllers that have associated Resources (e.g. CRDs) to reconcile the associated Resource.
// //
// TypedEnqueueRequestForObject is experimental and subject to future change. // TypedEnqueueRequestForObject is experimental and subject to future change.
type TypedEnqueueRequestForObject[T client.Object] struct{} type TypedEnqueueRequestForObject[object client.Object] struct{}
// Create implements EventHandler. // Create implements EventHandler.
func (e *TypedEnqueueRequestForObject[T]) Create(ctx context.Context, evt event.TypedCreateEvent[T], q workqueue.RateLimitingInterface) { func (e *TypedEnqueueRequestForObject[T]) Create(ctx context.Context, evt event.TypedCreateEvent[T], q workqueue.TypedRateLimitingInterface[reconcile.Request]) {
if isNil(evt.Object) { if isNil(evt.Object) {
enqueueLog.Error(nil, "CreateEvent received with no metadata", "event", evt) enqueueLog.Error(nil, "CreateEvent received with no metadata", "event", evt)
return return
@ -59,7 +59,7 @@ func (e *TypedEnqueueRequestForObject[T]) Create(ctx context.Context, evt event.
} }
// Update implements EventHandler. // Update implements EventHandler.
func (e *TypedEnqueueRequestForObject[T]) Update(ctx context.Context, evt event.TypedUpdateEvent[T], q workqueue.RateLimitingInterface) { func (e *TypedEnqueueRequestForObject[T]) Update(ctx context.Context, evt event.TypedUpdateEvent[T], q workqueue.TypedRateLimitingInterface[reconcile.Request]) {
switch { switch {
case !isNil(evt.ObjectNew): case !isNil(evt.ObjectNew):
q.Add(reconcile.Request{NamespacedName: types.NamespacedName{ q.Add(reconcile.Request{NamespacedName: types.NamespacedName{
@ -77,7 +77,7 @@ func (e *TypedEnqueueRequestForObject[T]) Update(ctx context.Context, evt event.
} }
// Delete implements EventHandler. // Delete implements EventHandler.
func (e *TypedEnqueueRequestForObject[T]) Delete(ctx context.Context, evt event.TypedDeleteEvent[T], q workqueue.RateLimitingInterface) { func (e *TypedEnqueueRequestForObject[T]) Delete(ctx context.Context, evt event.TypedDeleteEvent[T], q workqueue.TypedRateLimitingInterface[reconcile.Request]) {
if isNil(evt.Object) { if isNil(evt.Object) {
enqueueLog.Error(nil, "DeleteEvent received with no metadata", "event", evt) enqueueLog.Error(nil, "DeleteEvent received with no metadata", "event", evt)
return return
@ -89,7 +89,7 @@ func (e *TypedEnqueueRequestForObject[T]) Delete(ctx context.Context, evt event.
} }
// Generic implements EventHandler. // Generic implements EventHandler.
func (e *TypedEnqueueRequestForObject[T]) Generic(ctx context.Context, evt event.TypedGenericEvent[T], q workqueue.RateLimitingInterface) { func (e *TypedEnqueueRequestForObject[T]) Generic(ctx context.Context, evt event.TypedGenericEvent[T], q workqueue.TypedRateLimitingInterface[reconcile.Request]) {
if isNil(evt.Object) { if isNil(evt.Object) {
enqueueLog.Error(nil, "GenericEvent received with no metadata", "event", evt) enqueueLog.Error(nil, "GenericEvent received with no metadata", "event", evt)
return return

View File

@ -27,13 +27,13 @@ import (
// MapFunc is the signature required for enqueueing requests from a generic function. // MapFunc is the signature required for enqueueing requests from a generic function.
// This type is usually used with EnqueueRequestsFromMapFunc when registering an event handler. // This type is usually used with EnqueueRequestsFromMapFunc when registering an event handler.
type MapFunc = TypedMapFunc[client.Object] type MapFunc = TypedMapFunc[client.Object, reconcile.Request]
// TypedMapFunc is the signature required for enqueueing requests from a generic function. // TypedMapFunc is the signature required for enqueueing requests from a generic function.
// This type is usually used with EnqueueRequestsFromTypedMapFunc when registering an event handler. // This type is usually used with EnqueueRequestsFromTypedMapFunc when registering an event handler.
// //
// TypedMapFunc is experimental and subject to future change. // TypedMapFunc is experimental and subject to future change.
type TypedMapFunc[T any] func(context.Context, T) []reconcile.Request type TypedMapFunc[object any, request comparable] func(context.Context, object) []request
// EnqueueRequestsFromMapFunc enqueues Requests by running a transformation function that outputs a collection // EnqueueRequestsFromMapFunc enqueues Requests by running a transformation function that outputs a collection
// of reconcile.Requests on each Event. The reconcile.Requests may be for an arbitrary set of objects // of reconcile.Requests on each Event. The reconcile.Requests may be for an arbitrary set of objects
@ -61,46 +61,62 @@ func EnqueueRequestsFromMapFunc(fn MapFunc) EventHandler {
// objects and both sets of Requests are enqueue. // objects and both sets of Requests are enqueue.
// //
// TypedEnqueueRequestsFromMapFunc is experimental and subject to future change. // TypedEnqueueRequestsFromMapFunc is experimental and subject to future change.
func TypedEnqueueRequestsFromMapFunc[T any](fn TypedMapFunc[T]) TypedEventHandler[T] { func TypedEnqueueRequestsFromMapFunc[object any, request comparable](fn TypedMapFunc[object, request]) TypedEventHandler[object, request] {
return &enqueueRequestsFromMapFunc[T]{ return &enqueueRequestsFromMapFunc[object, request]{
toRequests: fn, toRequests: fn,
} }
} }
var _ EventHandler = &enqueueRequestsFromMapFunc[client.Object]{} var _ EventHandler = &enqueueRequestsFromMapFunc[client.Object, reconcile.Request]{}
type enqueueRequestsFromMapFunc[T any] struct { type enqueueRequestsFromMapFunc[object any, request comparable] struct {
// Mapper transforms the argument into a slice of keys to be reconciled // Mapper transforms the argument into a slice of keys to be reconciled
toRequests TypedMapFunc[T] toRequests TypedMapFunc[object, request]
} }
// Create implements EventHandler. // Create implements EventHandler.
func (e *enqueueRequestsFromMapFunc[T]) Create(ctx context.Context, evt event.TypedCreateEvent[T], q workqueue.RateLimitingInterface) { func (e *enqueueRequestsFromMapFunc[object, request]) Create(
reqs := map[reconcile.Request]empty{} ctx context.Context,
evt event.TypedCreateEvent[object],
q workqueue.TypedRateLimitingInterface[request],
) {
reqs := map[request]empty{}
e.mapAndEnqueue(ctx, q, evt.Object, reqs) e.mapAndEnqueue(ctx, q, evt.Object, reqs)
} }
// Update implements EventHandler. // Update implements EventHandler.
func (e *enqueueRequestsFromMapFunc[T]) Update(ctx context.Context, evt event.TypedUpdateEvent[T], q workqueue.RateLimitingInterface) { func (e *enqueueRequestsFromMapFunc[object, request]) Update(
reqs := map[reconcile.Request]empty{} ctx context.Context,
evt event.TypedUpdateEvent[object],
q workqueue.TypedRateLimitingInterface[request],
) {
reqs := map[request]empty{}
e.mapAndEnqueue(ctx, q, evt.ObjectOld, reqs) e.mapAndEnqueue(ctx, q, evt.ObjectOld, reqs)
e.mapAndEnqueue(ctx, q, evt.ObjectNew, reqs) e.mapAndEnqueue(ctx, q, evt.ObjectNew, reqs)
} }
// Delete implements EventHandler. // Delete implements EventHandler.
func (e *enqueueRequestsFromMapFunc[T]) Delete(ctx context.Context, evt event.TypedDeleteEvent[T], q workqueue.RateLimitingInterface) { func (e *enqueueRequestsFromMapFunc[object, request]) Delete(
reqs := map[reconcile.Request]empty{} ctx context.Context,
evt event.TypedDeleteEvent[object],
q workqueue.TypedRateLimitingInterface[request],
) {
reqs := map[request]empty{}
e.mapAndEnqueue(ctx, q, evt.Object, reqs) e.mapAndEnqueue(ctx, q, evt.Object, reqs)
} }
// Generic implements EventHandler. // Generic implements EventHandler.
func (e *enqueueRequestsFromMapFunc[T]) Generic(ctx context.Context, evt event.TypedGenericEvent[T], q workqueue.RateLimitingInterface) { func (e *enqueueRequestsFromMapFunc[object, request]) Generic(
reqs := map[reconcile.Request]empty{} ctx context.Context,
evt event.TypedGenericEvent[object],
q workqueue.TypedRateLimitingInterface[request],
) {
reqs := map[request]empty{}
e.mapAndEnqueue(ctx, q, evt.Object, reqs) e.mapAndEnqueue(ctx, q, evt.Object, reqs)
} }
func (e *enqueueRequestsFromMapFunc[T]) mapAndEnqueue(ctx context.Context, q workqueue.RateLimitingInterface, object T, reqs map[reconcile.Request]empty) { func (e *enqueueRequestsFromMapFunc[object, request]) mapAndEnqueue(ctx context.Context, q workqueue.TypedRateLimitingInterface[request], o object, reqs map[request]empty) {
for _, req := range e.toRequests(ctx, object) { for _, req := range e.toRequests(ctx, o) {
_, ok := reqs[req] _, ok := reqs[req]
if !ok { if !ok {
q.Add(req) q.Add(req)

View File

@ -61,8 +61,8 @@ func EnqueueRequestForOwner(scheme *runtime.Scheme, mapper meta.RESTMapper, owne
// - a handler.typedEnqueueRequestForOwner EventHandler with an OwnerType of ReplicaSet and OnlyControllerOwner set to true. // - a handler.typedEnqueueRequestForOwner EventHandler with an OwnerType of ReplicaSet and OnlyControllerOwner set to true.
// //
// TypedEnqueueRequestForOwner is experimental and subject to future change. // TypedEnqueueRequestForOwner is experimental and subject to future change.
func TypedEnqueueRequestForOwner[T client.Object](scheme *runtime.Scheme, mapper meta.RESTMapper, ownerType client.Object, opts ...OwnerOption) TypedEventHandler[T] { func TypedEnqueueRequestForOwner[object client.Object](scheme *runtime.Scheme, mapper meta.RESTMapper, ownerType client.Object, opts ...OwnerOption) TypedEventHandler[object, reconcile.Request] {
e := &enqueueRequestForOwner[T]{ e := &enqueueRequestForOwner[object]{
ownerType: ownerType, ownerType: ownerType,
mapper: mapper, mapper: mapper,
} }
@ -86,7 +86,7 @@ type enqueueRequestForOwnerInterface interface {
setIsController(bool) setIsController(bool)
} }
type enqueueRequestForOwner[T client.Object] struct { type enqueueRequestForOwner[object client.Object] struct {
// ownerType is the type of the Owner object to look for in OwnerReferences. Only Group and Kind are compared. // ownerType is the type of the Owner object to look for in OwnerReferences. Only Group and Kind are compared.
ownerType runtime.Object ownerType runtime.Object
@ -100,12 +100,12 @@ type enqueueRequestForOwner[T client.Object] struct {
mapper meta.RESTMapper mapper meta.RESTMapper
} }
func (e *enqueueRequestForOwner[T]) setIsController(isController bool) { func (e *enqueueRequestForOwner[object]) setIsController(isController bool) {
e.isController = isController e.isController = isController
} }
// Create implements EventHandler. // Create implements EventHandler.
func (e *enqueueRequestForOwner[T]) Create(ctx context.Context, evt event.TypedCreateEvent[T], q workqueue.RateLimitingInterface) { func (e *enqueueRequestForOwner[object]) Create(ctx context.Context, evt event.TypedCreateEvent[object], q workqueue.TypedRateLimitingInterface[reconcile.Request]) {
reqs := map[reconcile.Request]empty{} reqs := map[reconcile.Request]empty{}
e.getOwnerReconcileRequest(evt.Object, reqs) e.getOwnerReconcileRequest(evt.Object, reqs)
for req := range reqs { for req := range reqs {
@ -114,7 +114,7 @@ func (e *enqueueRequestForOwner[T]) Create(ctx context.Context, evt event.TypedC
} }
// Update implements EventHandler. // Update implements EventHandler.
func (e *enqueueRequestForOwner[T]) Update(ctx context.Context, evt event.TypedUpdateEvent[T], q workqueue.RateLimitingInterface) { func (e *enqueueRequestForOwner[object]) Update(ctx context.Context, evt event.TypedUpdateEvent[object], q workqueue.TypedRateLimitingInterface[reconcile.Request]) {
reqs := map[reconcile.Request]empty{} reqs := map[reconcile.Request]empty{}
e.getOwnerReconcileRequest(evt.ObjectOld, reqs) e.getOwnerReconcileRequest(evt.ObjectOld, reqs)
e.getOwnerReconcileRequest(evt.ObjectNew, reqs) e.getOwnerReconcileRequest(evt.ObjectNew, reqs)
@ -124,7 +124,7 @@ func (e *enqueueRequestForOwner[T]) Update(ctx context.Context, evt event.TypedU
} }
// Delete implements EventHandler. // Delete implements EventHandler.
func (e *enqueueRequestForOwner[T]) Delete(ctx context.Context, evt event.TypedDeleteEvent[T], q workqueue.RateLimitingInterface) { func (e *enqueueRequestForOwner[object]) Delete(ctx context.Context, evt event.TypedDeleteEvent[object], q workqueue.TypedRateLimitingInterface[reconcile.Request]) {
reqs := map[reconcile.Request]empty{} reqs := map[reconcile.Request]empty{}
e.getOwnerReconcileRequest(evt.Object, reqs) e.getOwnerReconcileRequest(evt.Object, reqs)
for req := range reqs { for req := range reqs {
@ -133,7 +133,7 @@ func (e *enqueueRequestForOwner[T]) Delete(ctx context.Context, evt event.TypedD
} }
// Generic implements EventHandler. // Generic implements EventHandler.
func (e *enqueueRequestForOwner[T]) Generic(ctx context.Context, evt event.TypedGenericEvent[T], q workqueue.RateLimitingInterface) { func (e *enqueueRequestForOwner[object]) Generic(ctx context.Context, evt event.TypedGenericEvent[object], q workqueue.TypedRateLimitingInterface[reconcile.Request]) {
reqs := map[reconcile.Request]empty{} reqs := map[reconcile.Request]empty{}
e.getOwnerReconcileRequest(evt.Object, reqs) e.getOwnerReconcileRequest(evt.Object, reqs)
for req := range reqs { for req := range reqs {
@ -143,7 +143,7 @@ func (e *enqueueRequestForOwner[T]) Generic(ctx context.Context, evt event.Typed
// parseOwnerTypeGroupKind parses the OwnerType into a Group and Kind and caches the result. Returns false // parseOwnerTypeGroupKind parses the OwnerType into a Group and Kind and caches the result. Returns false
// if the OwnerType could not be parsed using the scheme. // if the OwnerType could not be parsed using the scheme.
func (e *enqueueRequestForOwner[T]) parseOwnerTypeGroupKind(scheme *runtime.Scheme) error { func (e *enqueueRequestForOwner[object]) parseOwnerTypeGroupKind(scheme *runtime.Scheme) error {
// Get the kinds of the type // Get the kinds of the type
kinds, _, err := scheme.ObjectKinds(e.ownerType) kinds, _, err := scheme.ObjectKinds(e.ownerType)
if err != nil { if err != nil {
@ -163,10 +163,10 @@ func (e *enqueueRequestForOwner[T]) parseOwnerTypeGroupKind(scheme *runtime.Sche
// getOwnerReconcileRequest looks at object and builds a map of reconcile.Request to reconcile // getOwnerReconcileRequest looks at object and builds a map of reconcile.Request to reconcile
// owners of object that match e.OwnerType. // owners of object that match e.OwnerType.
func (e *enqueueRequestForOwner[T]) getOwnerReconcileRequest(object metav1.Object, result map[reconcile.Request]empty) { func (e *enqueueRequestForOwner[object]) getOwnerReconcileRequest(obj metav1.Object, result map[reconcile.Request]empty) {
// Iterate through the OwnerReferences looking for a match on Group and Kind against what was requested // Iterate through the OwnerReferences looking for a match on Group and Kind against what was requested
// by the user // by the user
for _, ref := range e.getOwnersReferences(object) { for _, ref := range e.getOwnersReferences(obj) {
// Parse the Group out of the OwnerReference to compare it to what was parsed out of the requested OwnerType // Parse the Group out of the OwnerReference to compare it to what was parsed out of the requested OwnerType
refGV, err := schema.ParseGroupVersion(ref.APIVersion) refGV, err := schema.ParseGroupVersion(ref.APIVersion)
if err != nil { if err != nil {
@ -192,7 +192,7 @@ func (e *enqueueRequestForOwner[T]) getOwnerReconcileRequest(object metav1.Objec
return return
} }
if mapping.Scope.Name() != meta.RESTScopeNameRoot { if mapping.Scope.Name() != meta.RESTScopeNameRoot {
request.Namespace = object.GetNamespace() request.Namespace = obj.GetNamespace()
} }
result[request] = empty{} result[request] = empty{}
@ -203,17 +203,17 @@ func (e *enqueueRequestForOwner[T]) getOwnerReconcileRequest(object metav1.Objec
// getOwnersReferences returns the OwnerReferences for an object as specified by the enqueueRequestForOwner // getOwnersReferences returns the OwnerReferences for an object as specified by the enqueueRequestForOwner
// - if IsController is true: only take the Controller OwnerReference (if found) // - if IsController is true: only take the Controller OwnerReference (if found)
// - if IsController is false: take all OwnerReferences. // - if IsController is false: take all OwnerReferences.
func (e *enqueueRequestForOwner[T]) getOwnersReferences(object metav1.Object) []metav1.OwnerReference { func (e *enqueueRequestForOwner[object]) getOwnersReferences(obj metav1.Object) []metav1.OwnerReference {
if object == nil { if obj == nil {
return nil return nil
} }
// If not filtered as Controller only, then use all the OwnerReferences // If not filtered as Controller only, then use all the OwnerReferences
if !e.isController { if !e.isController {
return object.GetOwnerReferences() return obj.GetOwnerReferences()
} }
// If filtered to a Controller, only take the Controller OwnerReference // If filtered to a Controller, only take the Controller OwnerReference
if ownerRef := metav1.GetControllerOf(object); ownerRef != nil { if ownerRef := metav1.GetControllerOf(obj); ownerRef != nil {
return []metav1.OwnerReference{*ownerRef} return []metav1.OwnerReference{*ownerRef}
} }
// No Controller OwnerReference found // No Controller OwnerReference found

View File

@ -22,6 +22,7 @@ import (
"k8s.io/client-go/util/workqueue" "k8s.io/client-go/util/workqueue"
"sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/event" "sigs.k8s.io/controller-runtime/pkg/event"
"sigs.k8s.io/controller-runtime/pkg/reconcile"
) )
// EventHandler enqueues reconcile.Requests in response to events (e.g. Pod Create). EventHandlers map an Event // EventHandler enqueues reconcile.Requests in response to events (e.g. Pod Create). EventHandlers map an Event
@ -42,7 +43,7 @@ import (
// //
// Unless you are implementing your own EventHandler, you can ignore the functions on the EventHandler interface. // Unless you are implementing your own EventHandler, you can ignore the functions on the EventHandler interface.
// Most users shouldn't need to implement their own EventHandler. // Most users shouldn't need to implement their own EventHandler.
type EventHandler TypedEventHandler[client.Object] type EventHandler = TypedEventHandler[client.Object, reconcile.Request]
// TypedEventHandler enqueues reconcile.Requests in response to events (e.g. Pod Create). TypedEventHandlers map an Event // TypedEventHandler enqueues reconcile.Requests in response to events (e.g. Pod Create). TypedEventHandlers map an Event
// for one object to trigger Reconciles for either the same object or different objects - e.g. if there is an // for one object to trigger Reconciles for either the same object or different objects - e.g. if there is an
@ -64,70 +65,70 @@ type EventHandler TypedEventHandler[client.Object]
// Most users shouldn't need to implement their own TypedEventHandler. // Most users shouldn't need to implement their own TypedEventHandler.
// //
// TypedEventHandler is experimental and subject to future change. // TypedEventHandler is experimental and subject to future change.
type TypedEventHandler[T any] interface { type TypedEventHandler[object any, request comparable] interface {
// Create is called in response to a create event - e.g. Pod Creation. // Create is called in response to a create event - e.g. Pod Creation.
Create(context.Context, event.TypedCreateEvent[T], workqueue.RateLimitingInterface) Create(context.Context, event.TypedCreateEvent[object], workqueue.TypedRateLimitingInterface[request])
// Update is called in response to an update event - e.g. Pod Updated. // Update is called in response to an update event - e.g. Pod Updated.
Update(context.Context, event.TypedUpdateEvent[T], workqueue.RateLimitingInterface) Update(context.Context, event.TypedUpdateEvent[object], workqueue.TypedRateLimitingInterface[request])
// Delete is called in response to a delete event - e.g. Pod Deleted. // Delete is called in response to a delete event - e.g. Pod Deleted.
Delete(context.Context, event.TypedDeleteEvent[T], workqueue.RateLimitingInterface) Delete(context.Context, event.TypedDeleteEvent[object], workqueue.TypedRateLimitingInterface[request])
// Generic is called in response to an event of an unknown type or a synthetic event triggered as a cron or // Generic is called in response to an event of an unknown type or a synthetic event triggered as a cron or
// external trigger request - e.g. reconcile Autoscaling, or a Webhook. // external trigger request - e.g. reconcile Autoscaling, or a Webhook.
Generic(context.Context, event.TypedGenericEvent[T], workqueue.RateLimitingInterface) Generic(context.Context, event.TypedGenericEvent[object], workqueue.TypedRateLimitingInterface[request])
} }
var _ EventHandler = Funcs{} var _ EventHandler = Funcs{}
// Funcs implements eventhandler. // Funcs implements eventhandler.
type Funcs = TypedFuncs[client.Object] type Funcs = TypedFuncs[client.Object, reconcile.Request]
// TypedFuncs implements eventhandler. // TypedFuncs implements eventhandler.
// //
// TypedFuncs is experimental and subject to future change. // TypedFuncs is experimental and subject to future change.
type TypedFuncs[T any] struct { type TypedFuncs[object any, request comparable] struct {
// Create is called in response to an add event. Defaults to no-op. // Create is called in response to an add event. Defaults to no-op.
// RateLimitingInterface is used to enqueue reconcile.Requests. // RateLimitingInterface is used to enqueue reconcile.Requests.
CreateFunc func(context.Context, event.TypedCreateEvent[T], workqueue.RateLimitingInterface) CreateFunc func(context.Context, event.TypedCreateEvent[object], workqueue.TypedRateLimitingInterface[request])
// Update is called in response to an update event. Defaults to no-op. // Update is called in response to an update event. Defaults to no-op.
// RateLimitingInterface is used to enqueue reconcile.Requests. // RateLimitingInterface is used to enqueue reconcile.Requests.
UpdateFunc func(context.Context, event.TypedUpdateEvent[T], workqueue.RateLimitingInterface) UpdateFunc func(context.Context, event.TypedUpdateEvent[object], workqueue.TypedRateLimitingInterface[request])
// Delete is called in response to a delete event. Defaults to no-op. // Delete is called in response to a delete event. Defaults to no-op.
// RateLimitingInterface is used to enqueue reconcile.Requests. // RateLimitingInterface is used to enqueue reconcile.Requests.
DeleteFunc func(context.Context, event.TypedDeleteEvent[T], workqueue.RateLimitingInterface) DeleteFunc func(context.Context, event.TypedDeleteEvent[object], workqueue.TypedRateLimitingInterface[request])
// GenericFunc is called in response to a generic event. Defaults to no-op. // GenericFunc is called in response to a generic event. Defaults to no-op.
// RateLimitingInterface is used to enqueue reconcile.Requests. // RateLimitingInterface is used to enqueue reconcile.Requests.
GenericFunc func(context.Context, event.TypedGenericEvent[T], workqueue.RateLimitingInterface) GenericFunc func(context.Context, event.TypedGenericEvent[object], workqueue.TypedRateLimitingInterface[request])
} }
// Create implements EventHandler. // Create implements EventHandler.
func (h TypedFuncs[T]) Create(ctx context.Context, e event.TypedCreateEvent[T], q workqueue.RateLimitingInterface) { func (h TypedFuncs[object, request]) Create(ctx context.Context, e event.TypedCreateEvent[object], q workqueue.TypedRateLimitingInterface[request]) {
if h.CreateFunc != nil { if h.CreateFunc != nil {
h.CreateFunc(ctx, e, q) h.CreateFunc(ctx, e, q)
} }
} }
// Delete implements EventHandler. // Delete implements EventHandler.
func (h TypedFuncs[T]) Delete(ctx context.Context, e event.TypedDeleteEvent[T], q workqueue.RateLimitingInterface) { func (h TypedFuncs[object, request]) Delete(ctx context.Context, e event.TypedDeleteEvent[object], q workqueue.TypedRateLimitingInterface[request]) {
if h.DeleteFunc != nil { if h.DeleteFunc != nil {
h.DeleteFunc(ctx, e, q) h.DeleteFunc(ctx, e, q)
} }
} }
// Update implements EventHandler. // Update implements EventHandler.
func (h TypedFuncs[T]) Update(ctx context.Context, e event.TypedUpdateEvent[T], q workqueue.RateLimitingInterface) { func (h TypedFuncs[object, request]) Update(ctx context.Context, e event.TypedUpdateEvent[object], q workqueue.TypedRateLimitingInterface[request]) {
if h.UpdateFunc != nil { if h.UpdateFunc != nil {
h.UpdateFunc(ctx, e, q) h.UpdateFunc(ctx, e, q)
} }
} }
// Generic implements EventHandler. // Generic implements EventHandler.
func (h TypedFuncs[T]) Generic(ctx context.Context, e event.TypedGenericEvent[T], q workqueue.RateLimitingInterface) { func (h TypedFuncs[object, request]) Generic(ctx context.Context, e event.TypedGenericEvent[object], q workqueue.TypedRateLimitingInterface[request]) {
if h.GenericFunc != nil { if h.GenericFunc != nil {
h.GenericFunc(ctx, e, q) h.GenericFunc(ctx, e, q)
} }

View File

@ -31,13 +31,12 @@ import (
ctrlmetrics "sigs.k8s.io/controller-runtime/pkg/internal/controller/metrics" ctrlmetrics "sigs.k8s.io/controller-runtime/pkg/internal/controller/metrics"
logf "sigs.k8s.io/controller-runtime/pkg/log" logf "sigs.k8s.io/controller-runtime/pkg/log"
"sigs.k8s.io/controller-runtime/pkg/ratelimiter"
"sigs.k8s.io/controller-runtime/pkg/reconcile" "sigs.k8s.io/controller-runtime/pkg/reconcile"
"sigs.k8s.io/controller-runtime/pkg/source" "sigs.k8s.io/controller-runtime/pkg/source"
) )
// Controller implements controller.Controller. // Controller implements controller.Controller.
type Controller struct { type Controller[request comparable] struct {
// Name is used to uniquely identify a Controller in tracing, logging and monitoring. Name is required. // Name is used to uniquely identify a Controller in tracing, logging and monitoring. Name is required.
Name string Name string
@ -47,19 +46,19 @@ type Controller struct {
// Reconciler is a function that can be called at any time with the Name / Namespace of an object and // Reconciler is a function that can be called at any time with the Name / Namespace of an object and
// ensures that the state of the system matches the state specified in the object. // ensures that the state of the system matches the state specified in the object.
// Defaults to the DefaultReconcileFunc. // Defaults to the DefaultReconcileFunc.
Do reconcile.Reconciler Do reconcile.TypedReconciler[request]
// RateLimiter is used to limit how frequently requests may be queued into the work queue. // RateLimiter is used to limit how frequently requests may be queued into the work queue.
RateLimiter ratelimiter.RateLimiter RateLimiter workqueue.TypedRateLimiter[request]
// NewQueue constructs the queue for this controller once the controller is ready to start. // NewQueue constructs the queue for this controller once the controller is ready to start.
// This is a func because the standard Kubernetes work queues start themselves immediately, which // This is a func because the standard Kubernetes work queues start themselves immediately, which
// leads to goroutine leaks if something calls controller.New repeatedly. // leads to goroutine leaks if something calls controller.New repeatedly.
NewQueue func(controllerName string, rateLimiter ratelimiter.RateLimiter) workqueue.RateLimitingInterface NewQueue func(controllerName string, rateLimiter workqueue.TypedRateLimiter[request]) workqueue.TypedRateLimitingInterface[request]
// Queue is an listeningQueue that listens for events from Informers and adds object keys to // Queue is an listeningQueue that listens for events from Informers and adds object keys to
// the Queue for processing // the Queue for processing
Queue workqueue.RateLimitingInterface Queue workqueue.TypedRateLimitingInterface[request]
// mu is used to synchronize Controller setup // mu is used to synchronize Controller setup
mu sync.Mutex mu sync.Mutex
@ -79,15 +78,16 @@ type Controller struct {
CacheSyncTimeout time.Duration CacheSyncTimeout time.Duration
// startWatches maintains a list of sources, handlers, and predicates to start when the controller is started. // startWatches maintains a list of sources, handlers, and predicates to start when the controller is started.
startWatches []source.Source startWatches []source.TypedSource[request]
// LogConstructor is used to construct a logger to then log messages to users during reconciliation, // LogConstructor is used to construct a logger to then log messages to users during reconciliation,
// or for example when a watch is started. // or for example when a watch is started.
// Note: LogConstructor has to be able to handle nil requests as we are also using it // Note: LogConstructor has to be able to handle nil requests as we are also using it
// outside the context of a reconciliation. // outside the context of a reconciliation.
LogConstructor func(request *reconcile.Request) logr.Logger LogConstructor func(request *request) logr.Logger
// RecoverPanic indicates whether the panic caused by reconcile should be recovered. // RecoverPanic indicates whether the panic caused by reconcile should be recovered.
// Defaults to true.
RecoverPanic *bool RecoverPanic *bool
// LeaderElected indicates whether the controller is leader elected or always running. // LeaderElected indicates whether the controller is leader elected or always running.
@ -95,12 +95,14 @@ type Controller struct {
} }
// Reconcile implements reconcile.Reconciler. // Reconcile implements reconcile.Reconciler.
func (c *Controller) Reconcile(ctx context.Context, req reconcile.Request) (_ reconcile.Result, err error) { func (c *Controller[request]) Reconcile(ctx context.Context, req request) (_ reconcile.Result, err error) {
defer func() { defer func() {
if r := recover(); r != nil { if r := recover(); r != nil {
if c.RecoverPanic != nil && *c.RecoverPanic { ctrlmetrics.ReconcilePanics.WithLabelValues(c.Name).Inc()
if c.RecoverPanic == nil || *c.RecoverPanic {
for _, fn := range utilruntime.PanicHandlers { for _, fn := range utilruntime.PanicHandlers {
fn(r) fn(ctx, r)
} }
err = fmt.Errorf("panic: %v [recovered]", r) err = fmt.Errorf("panic: %v [recovered]", r)
return return
@ -115,7 +117,7 @@ func (c *Controller) Reconcile(ctx context.Context, req reconcile.Request) (_ re
} }
// Watch implements controller.Controller. // Watch implements controller.Controller.
func (c *Controller) Watch(src source.Source) error { func (c *Controller[request]) Watch(src source.TypedSource[request]) error {
c.mu.Lock() c.mu.Lock()
defer c.mu.Unlock() defer c.mu.Unlock()
@ -132,7 +134,7 @@ func (c *Controller) Watch(src source.Source) error {
} }
// NeedLeaderElection implements the manager.LeaderElectionRunnable interface. // NeedLeaderElection implements the manager.LeaderElectionRunnable interface.
func (c *Controller) NeedLeaderElection() bool { func (c *Controller[request]) NeedLeaderElection() bool {
if c.LeaderElected == nil { if c.LeaderElected == nil {
return true return true
} }
@ -140,7 +142,7 @@ func (c *Controller) NeedLeaderElection() bool {
} }
// Start implements controller.Controller. // Start implements controller.Controller.
func (c *Controller) Start(ctx context.Context) error { func (c *Controller[request]) Start(ctx context.Context) error {
// use an IIFE to get proper lock handling // use an IIFE to get proper lock handling
// but lock outside to get proper handling of the queue shutdown // but lock outside to get proper handling of the queue shutdown
c.mu.Lock() c.mu.Lock()
@ -240,7 +242,7 @@ func (c *Controller) Start(ctx context.Context) error {
// processNextWorkItem will read a single work item off the workqueue and // processNextWorkItem will read a single work item off the workqueue and
// attempt to process it, by calling the reconcileHandler. // attempt to process it, by calling the reconcileHandler.
func (c *Controller) processNextWorkItem(ctx context.Context) bool { func (c *Controller[request]) processNextWorkItem(ctx context.Context) bool {
obj, shutdown := c.Queue.Get() obj, shutdown := c.Queue.Get()
if shutdown { if shutdown {
// Stop working // Stop working
@ -269,35 +271,25 @@ const (
labelSuccess = "success" labelSuccess = "success"
) )
func (c *Controller) initMetrics() { func (c *Controller[request]) initMetrics() {
ctrlmetrics.ActiveWorkers.WithLabelValues(c.Name).Set(0)
ctrlmetrics.ReconcileErrors.WithLabelValues(c.Name).Add(0)
ctrlmetrics.ReconcileTotal.WithLabelValues(c.Name, labelError).Add(0) ctrlmetrics.ReconcileTotal.WithLabelValues(c.Name, labelError).Add(0)
ctrlmetrics.ReconcileTotal.WithLabelValues(c.Name, labelRequeueAfter).Add(0) ctrlmetrics.ReconcileTotal.WithLabelValues(c.Name, labelRequeueAfter).Add(0)
ctrlmetrics.ReconcileTotal.WithLabelValues(c.Name, labelRequeue).Add(0) ctrlmetrics.ReconcileTotal.WithLabelValues(c.Name, labelRequeue).Add(0)
ctrlmetrics.ReconcileTotal.WithLabelValues(c.Name, labelSuccess).Add(0) ctrlmetrics.ReconcileTotal.WithLabelValues(c.Name, labelSuccess).Add(0)
ctrlmetrics.ReconcileErrors.WithLabelValues(c.Name).Add(0)
ctrlmetrics.TerminalReconcileErrors.WithLabelValues(c.Name).Add(0)
ctrlmetrics.ReconcilePanics.WithLabelValues(c.Name).Add(0)
ctrlmetrics.WorkerCount.WithLabelValues(c.Name).Set(float64(c.MaxConcurrentReconciles)) ctrlmetrics.WorkerCount.WithLabelValues(c.Name).Set(float64(c.MaxConcurrentReconciles))
ctrlmetrics.ActiveWorkers.WithLabelValues(c.Name).Set(0)
} }
func (c *Controller) reconcileHandler(ctx context.Context, obj interface{}) { func (c *Controller[request]) reconcileHandler(ctx context.Context, req request) {
// Update metrics after processing each item // Update metrics after processing each item
reconcileStartTS := time.Now() reconcileStartTS := time.Now()
defer func() { defer func() {
c.updateMetrics(time.Since(reconcileStartTS)) c.updateMetrics(time.Since(reconcileStartTS))
}() }()
// Make sure that the object is a valid request.
req, ok := obj.(reconcile.Request)
if !ok {
// As the item in the workqueue is actually invalid, we call
// Forget here else we'd go into a loop of attempting to
// process a work item that is invalid.
c.Queue.Forget(obj)
c.LogConstructor(nil).Error(nil, "Queue item was not a Request", "type", fmt.Sprintf("%T", obj), "value", obj)
// Return true, don't take a break
return
}
log := c.LogConstructor(&req) log := c.LogConstructor(&req)
reconcileID := uuid.NewUUID() reconcileID := uuid.NewUUID()
@ -328,7 +320,7 @@ func (c *Controller) reconcileHandler(ctx context.Context, obj interface{}) {
// along with a non-nil error. But this is intended as // along with a non-nil error. But this is intended as
// We need to drive to stable reconcile loops before queuing due // We need to drive to stable reconcile loops before queuing due
// to result.RequestAfter // to result.RequestAfter
c.Queue.Forget(obj) c.Queue.Forget(req)
c.Queue.AddAfter(req, result.RequeueAfter) c.Queue.AddAfter(req, result.RequeueAfter)
ctrlmetrics.ReconcileTotal.WithLabelValues(c.Name, labelRequeueAfter).Inc() ctrlmetrics.ReconcileTotal.WithLabelValues(c.Name, labelRequeueAfter).Inc()
case result.Requeue: case result.Requeue:
@ -339,18 +331,18 @@ func (c *Controller) reconcileHandler(ctx context.Context, obj interface{}) {
log.V(5).Info("Reconcile successful") log.V(5).Info("Reconcile successful")
// Finally, if no error occurs we Forget this item so it does not // Finally, if no error occurs we Forget this item so it does not
// get queued again until another change happens. // get queued again until another change happens.
c.Queue.Forget(obj) c.Queue.Forget(req)
ctrlmetrics.ReconcileTotal.WithLabelValues(c.Name, labelSuccess).Inc() ctrlmetrics.ReconcileTotal.WithLabelValues(c.Name, labelSuccess).Inc()
} }
} }
// GetLogger returns this controller's logger. // GetLogger returns this controller's logger.
func (c *Controller) GetLogger() logr.Logger { func (c *Controller[request]) GetLogger() logr.Logger {
return c.LogConstructor(nil) return c.LogConstructor(nil)
} }
// updateMetrics updates prometheus metrics within the controller. // updateMetrics updates prometheus metrics within the controller.
func (c *Controller) updateMetrics(reconcileTime time.Duration) { func (c *Controller[request]) updateMetrics(reconcileTime time.Duration) {
ctrlmetrics.ReconcileTime.WithLabelValues(c.Name).Observe(reconcileTime.Seconds()) ctrlmetrics.ReconcileTime.WithLabelValues(c.Name).Observe(reconcileTime.Seconds())
} }

View File

@ -46,6 +46,13 @@ var (
Help: "Total number of terminal reconciliation errors per controller", Help: "Total number of terminal reconciliation errors per controller",
}, []string{"controller"}) }, []string{"controller"})
// ReconcilePanics is a prometheus counter metrics which holds the total
// number of panics from the Reconciler.
ReconcilePanics = prometheus.NewCounterVec(prometheus.CounterOpts{
Name: "controller_runtime_reconcile_panics_total",
Help: "Total number of reconciliation panics per controller",
}, []string{"controller"})
// ReconcileTime is a prometheus metric which keeps track of the duration // ReconcileTime is a prometheus metric which keeps track of the duration
// of reconciliations. // of reconciliations.
ReconcileTime = prometheus.NewHistogramVec(prometheus.HistogramOpts{ ReconcileTime = prometheus.NewHistogramVec(prometheus.HistogramOpts{
@ -75,6 +82,7 @@ func init() {
ReconcileTotal, ReconcileTotal,
ReconcileErrors, ReconcileErrors,
TerminalReconcileErrors, TerminalReconcileErrors,
ReconcilePanics,
ReconcileTime, ReconcileTime,
WorkerCount, WorkerCount,
ActiveWorkers, ActiveWorkers,

View File

@ -33,8 +33,12 @@ import (
var log = logf.RuntimeLog.WithName("source").WithName("EventHandler") var log = logf.RuntimeLog.WithName("source").WithName("EventHandler")
// NewEventHandler creates a new EventHandler. // NewEventHandler creates a new EventHandler.
func NewEventHandler[T client.Object](ctx context.Context, queue workqueue.RateLimitingInterface, handler handler.TypedEventHandler[T], predicates []predicate.TypedPredicate[T]) *EventHandler[T] { func NewEventHandler[object client.Object, request comparable](
return &EventHandler[T]{ ctx context.Context,
queue workqueue.TypedRateLimitingInterface[request],
handler handler.TypedEventHandler[object, request],
predicates []predicate.TypedPredicate[object]) *EventHandler[object, request] {
return &EventHandler[object, request]{
ctx: ctx, ctx: ctx,
handler: handler, handler: handler,
queue: queue, queue: queue,
@ -43,19 +47,19 @@ func NewEventHandler[T client.Object](ctx context.Context, queue workqueue.RateL
} }
// EventHandler adapts a handler.EventHandler interface to a cache.ResourceEventHandler interface. // EventHandler adapts a handler.EventHandler interface to a cache.ResourceEventHandler interface.
type EventHandler[T client.Object] struct { type EventHandler[object client.Object, request comparable] struct {
// ctx stores the context that created the event handler // ctx stores the context that created the event handler
// that is used to propagate cancellation signals to each handler function. // that is used to propagate cancellation signals to each handler function.
ctx context.Context ctx context.Context
handler handler.TypedEventHandler[T] handler handler.TypedEventHandler[object, request]
queue workqueue.RateLimitingInterface queue workqueue.TypedRateLimitingInterface[request]
predicates []predicate.TypedPredicate[T] predicates []predicate.TypedPredicate[object]
} }
// HandlerFuncs converts EventHandler to a ResourceEventHandlerFuncs // HandlerFuncs converts EventHandler to a ResourceEventHandlerFuncs
// TODO: switch to ResourceEventHandlerDetailedFuncs with client-go 1.27 // TODO: switch to ResourceEventHandlerDetailedFuncs with client-go 1.27
func (e *EventHandler[T]) HandlerFuncs() cache.ResourceEventHandlerFuncs { func (e *EventHandler[object, request]) HandlerFuncs() cache.ResourceEventHandlerFuncs {
return cache.ResourceEventHandlerFuncs{ return cache.ResourceEventHandlerFuncs{
AddFunc: e.OnAdd, AddFunc: e.OnAdd,
UpdateFunc: e.OnUpdate, UpdateFunc: e.OnUpdate,
@ -64,11 +68,11 @@ func (e *EventHandler[T]) HandlerFuncs() cache.ResourceEventHandlerFuncs {
} }
// OnAdd creates CreateEvent and calls Create on EventHandler. // OnAdd creates CreateEvent and calls Create on EventHandler.
func (e *EventHandler[T]) OnAdd(obj interface{}) { func (e *EventHandler[object, request]) OnAdd(obj interface{}) {
c := event.TypedCreateEvent[T]{} c := event.TypedCreateEvent[object]{}
// Pull Object out of the object // Pull Object out of the object
if o, ok := obj.(T); ok { if o, ok := obj.(object); ok {
c.Object = o c.Object = o
} else { } else {
log.Error(nil, "OnAdd missing Object", log.Error(nil, "OnAdd missing Object",
@ -89,10 +93,10 @@ func (e *EventHandler[T]) OnAdd(obj interface{}) {
} }
// OnUpdate creates UpdateEvent and calls Update on EventHandler. // OnUpdate creates UpdateEvent and calls Update on EventHandler.
func (e *EventHandler[T]) OnUpdate(oldObj, newObj interface{}) { func (e *EventHandler[object, request]) OnUpdate(oldObj, newObj interface{}) {
u := event.TypedUpdateEvent[T]{} u := event.TypedUpdateEvent[object]{}
if o, ok := oldObj.(T); ok { if o, ok := oldObj.(object); ok {
u.ObjectOld = o u.ObjectOld = o
} else { } else {
log.Error(nil, "OnUpdate missing ObjectOld", log.Error(nil, "OnUpdate missing ObjectOld",
@ -101,7 +105,7 @@ func (e *EventHandler[T]) OnUpdate(oldObj, newObj interface{}) {
} }
// Pull Object out of the object // Pull Object out of the object
if o, ok := newObj.(T); ok { if o, ok := newObj.(object); ok {
u.ObjectNew = o u.ObjectNew = o
} else { } else {
log.Error(nil, "OnUpdate missing ObjectNew", log.Error(nil, "OnUpdate missing ObjectNew",
@ -122,8 +126,8 @@ func (e *EventHandler[T]) OnUpdate(oldObj, newObj interface{}) {
} }
// OnDelete creates DeleteEvent and calls Delete on EventHandler. // OnDelete creates DeleteEvent and calls Delete on EventHandler.
func (e *EventHandler[T]) OnDelete(obj interface{}) { func (e *EventHandler[object, request]) OnDelete(obj interface{}) {
d := event.TypedDeleteEvent[T]{} d := event.TypedDeleteEvent[object]{}
// Deal with tombstone events by pulling the object out. Tombstone events wrap the object in a // Deal with tombstone events by pulling the object out. Tombstone events wrap the object in a
// DeleteFinalStateUnknown struct, so the object needs to be pulled out. // DeleteFinalStateUnknown struct, so the object needs to be pulled out.
@ -149,7 +153,7 @@ func (e *EventHandler[T]) OnDelete(obj interface{}) {
} }
// Pull Object out of the object // Pull Object out of the object
if o, ok := obj.(T); ok { if o, ok := obj.(object); ok {
d.Object = o d.Object = o
} else { } else {
log.Error(nil, "OnDelete missing Object", log.Error(nil, "OnDelete missing Object",

View File

@ -19,16 +19,16 @@ import (
) )
// Kind is used to provide a source of events originating inside the cluster from Watches (e.g. Pod Create). // Kind is used to provide a source of events originating inside the cluster from Watches (e.g. Pod Create).
type Kind[T client.Object] struct { type Kind[object client.Object, request comparable] struct {
// Type is the type of object to watch. e.g. &v1.Pod{} // Type is the type of object to watch. e.g. &v1.Pod{}
Type T Type object
// Cache used to watch APIs // Cache used to watch APIs
Cache cache.Cache Cache cache.Cache
Handler handler.TypedEventHandler[T] Handler handler.TypedEventHandler[object, request]
Predicates []predicate.TypedPredicate[T] Predicates []predicate.TypedPredicate[object]
// startedErr may contain an error if one was encountered during startup. If its closed and does not // startedErr may contain an error if one was encountered during startup. If its closed and does not
// contain an error, startup and syncing finished. // contain an error, startup and syncing finished.
@ -38,7 +38,7 @@ type Kind[T client.Object] struct {
// Start is internal and should be called only by the Controller to register an EventHandler with the Informer // Start is internal and should be called only by the Controller to register an EventHandler with the Informer
// to enqueue reconcile.Requests. // to enqueue reconcile.Requests.
func (ks *Kind[T]) Start(ctx context.Context, queue workqueue.RateLimitingInterface) error { func (ks *Kind[object, request]) Start(ctx context.Context, queue workqueue.TypedRateLimitingInterface[request]) error {
if isNil(ks.Type) { if isNil(ks.Type) {
return fmt.Errorf("must create Kind with a non-nil object") return fmt.Errorf("must create Kind with a non-nil object")
} }
@ -102,7 +102,7 @@ func (ks *Kind[T]) Start(ctx context.Context, queue workqueue.RateLimitingInterf
return nil return nil
} }
func (ks *Kind[T]) String() string { func (ks *Kind[object, request]) String() string {
if !isNil(ks.Type) { if !isNil(ks.Type) {
return fmt.Sprintf("kind source: %T", ks.Type) return fmt.Sprintf("kind source: %T", ks.Type)
} }
@ -111,7 +111,7 @@ func (ks *Kind[T]) String() string {
// WaitForSync implements SyncingSource to allow controllers to wait with starting // WaitForSync implements SyncingSource to allow controllers to wait with starting
// workers until the cache is synced. // workers until the cache is synced.
func (ks *Kind[T]) WaitForSync(ctx context.Context) error { func (ks *Kind[object, request]) WaitForSync(ctx context.Context) error {
select { select {
case err := <-ks.startedErr: case err := <-ks.startedErr:
return err return err

View File

@ -351,6 +351,16 @@ func (cm *controllerManager) Start(ctx context.Context) (err error) {
// Initialize the internal context. // Initialize the internal context.
cm.internalCtx, cm.internalCancel = context.WithCancel(ctx) cm.internalCtx, cm.internalCancel = context.WithCancel(ctx)
// Leader elector must be created before defer that contains engageStopProcedure function
// https://github.com/kubernetes-sigs/controller-runtime/issues/2873
var leaderElector *leaderelection.LeaderElector
if cm.resourceLock != nil {
leaderElector, err = cm.initLeaderElector()
if err != nil {
return fmt.Errorf("failed during initialization leader election process: %w", err)
}
}
// This chan indicates that stop is complete, in other words all runnables have returned or timeout on stop request // This chan indicates that stop is complete, in other words all runnables have returned or timeout on stop request
stopComplete := make(chan struct{}) stopComplete := make(chan struct{})
defer close(stopComplete) defer close(stopComplete)
@ -433,20 +443,23 @@ func (cm *controllerManager) Start(ctx context.Context) (err error) {
{ {
ctx, cancel := context.WithCancel(context.Background()) ctx, cancel := context.WithCancel(context.Background())
cm.leaderElectionCancel = cancel cm.leaderElectionCancel = cancel
if leaderElector != nil {
// Start the leader elector process
go func() { go func() {
if cm.resourceLock != nil { leaderElector.Run(ctx)
if err := cm.startLeaderElection(ctx); err != nil { <-ctx.Done()
cm.errChan <- err close(cm.leaderElectionStopped)
} }()
} else { } else {
go func() {
// Treat not having leader election enabled the same as being elected. // Treat not having leader election enabled the same as being elected.
if err := cm.startLeaderElectionRunnables(); err != nil { if err := cm.startLeaderElectionRunnables(); err != nil {
cm.errChan <- err cm.errChan <- err
} }
close(cm.elected) close(cm.elected)
}
}() }()
} }
}
ready = true ready = true
cm.Unlock() cm.Unlock()
@ -494,8 +507,8 @@ func (cm *controllerManager) engageStopProcedure(stopComplete <-chan struct{}) e
cm.internalCancel() cm.internalCancel()
}) })
select { select {
case err, ok := <-cm.errChan: case err := <-cm.errChan:
if ok { if !errors.Is(err, context.Canceled) {
cm.logger.Error(err, "error received after stop sequence was engaged") cm.logger.Error(err, "error received after stop sequence was engaged")
} }
case <-stopComplete: case <-stopComplete:
@ -564,12 +577,8 @@ func (cm *controllerManager) engageStopProcedure(stopComplete <-chan struct{}) e
return nil return nil
} }
func (cm *controllerManager) startLeaderElectionRunnables() error { func (cm *controllerManager) initLeaderElector() (*leaderelection.LeaderElector, error) {
return cm.runnables.LeaderElection.Start(cm.internalCtx) leaderElector, err := leaderelection.NewLeaderElector(leaderelection.LeaderElectionConfig{
}
func (cm *controllerManager) startLeaderElection(ctx context.Context) (err error) {
l, err := leaderelection.NewLeaderElector(leaderelection.LeaderElectionConfig{
Lock: cm.resourceLock, Lock: cm.resourceLock,
LeaseDuration: cm.leaseDuration, LeaseDuration: cm.leaseDuration,
RenewDeadline: cm.renewDeadline, RenewDeadline: cm.renewDeadline,
@ -599,16 +608,14 @@ func (cm *controllerManager) startLeaderElection(ctx context.Context) (err error
Name: cm.leaderElectionID, Name: cm.leaderElectionID,
}) })
if err != nil { if err != nil {
return err return nil, err
} }
// Start the leader elector process return leaderElector, nil
go func() { }
l.Run(ctx)
<-ctx.Done() func (cm *controllerManager) startLeaderElectionRunnables() error {
close(cm.leaderElectionStopped) return cm.runnables.LeaderElection.Start(cm.internalCtx)
}()
return nil
} }
func (cm *controllerManager) Elected() <-chan struct{} { func (cm *controllerManager) Elected() <-chan struct{} {

View File

@ -42,27 +42,27 @@ var (
Subsystem: WorkQueueSubsystem, Subsystem: WorkQueueSubsystem,
Name: DepthKey, Name: DepthKey,
Help: "Current depth of workqueue", Help: "Current depth of workqueue",
}, []string{"name"}) }, []string{"name", "controller"})
adds = prometheus.NewCounterVec(prometheus.CounterOpts{ adds = prometheus.NewCounterVec(prometheus.CounterOpts{
Subsystem: WorkQueueSubsystem, Subsystem: WorkQueueSubsystem,
Name: AddsKey, Name: AddsKey,
Help: "Total number of adds handled by workqueue", Help: "Total number of adds handled by workqueue",
}, []string{"name"}) }, []string{"name", "controller"})
latency = prometheus.NewHistogramVec(prometheus.HistogramOpts{ latency = prometheus.NewHistogramVec(prometheus.HistogramOpts{
Subsystem: WorkQueueSubsystem, Subsystem: WorkQueueSubsystem,
Name: QueueLatencyKey, Name: QueueLatencyKey,
Help: "How long in seconds an item stays in workqueue before being requested", Help: "How long in seconds an item stays in workqueue before being requested",
Buckets: prometheus.ExponentialBuckets(10e-9, 10, 12), Buckets: prometheus.ExponentialBuckets(10e-9, 10, 12),
}, []string{"name"}) }, []string{"name", "controller"})
workDuration = prometheus.NewHistogramVec(prometheus.HistogramOpts{ workDuration = prometheus.NewHistogramVec(prometheus.HistogramOpts{
Subsystem: WorkQueueSubsystem, Subsystem: WorkQueueSubsystem,
Name: WorkDurationKey, Name: WorkDurationKey,
Help: "How long in seconds processing an item from workqueue takes.", Help: "How long in seconds processing an item from workqueue takes.",
Buckets: prometheus.ExponentialBuckets(10e-9, 10, 12), Buckets: prometheus.ExponentialBuckets(10e-9, 10, 12),
}, []string{"name"}) }, []string{"name", "controller"})
unfinished = prometheus.NewGaugeVec(prometheus.GaugeOpts{ unfinished = prometheus.NewGaugeVec(prometheus.GaugeOpts{
Subsystem: WorkQueueSubsystem, Subsystem: WorkQueueSubsystem,
@ -71,20 +71,20 @@ var (
"is in progress and hasn't been observed by work_duration. Large " + "is in progress and hasn't been observed by work_duration. Large " +
"values indicate stuck threads. One can deduce the number of stuck " + "values indicate stuck threads. One can deduce the number of stuck " +
"threads by observing the rate at which this increases.", "threads by observing the rate at which this increases.",
}, []string{"name"}) }, []string{"name", "controller"})
longestRunningProcessor = prometheus.NewGaugeVec(prometheus.GaugeOpts{ longestRunningProcessor = prometheus.NewGaugeVec(prometheus.GaugeOpts{
Subsystem: WorkQueueSubsystem, Subsystem: WorkQueueSubsystem,
Name: LongestRunningProcessorKey, Name: LongestRunningProcessorKey,
Help: "How many seconds has the longest running " + Help: "How many seconds has the longest running " +
"processor for workqueue been running.", "processor for workqueue been running.",
}, []string{"name"}) }, []string{"name", "controller"})
retries = prometheus.NewCounterVec(prometheus.CounterOpts{ retries = prometheus.NewCounterVec(prometheus.CounterOpts{
Subsystem: WorkQueueSubsystem, Subsystem: WorkQueueSubsystem,
Name: RetriesKey, Name: RetriesKey,
Help: "Total number of retries handled by workqueue", Help: "Total number of retries handled by workqueue",
}, []string{"name"}) }, []string{"name", "controller"})
) )
func init() { func init() {
@ -102,29 +102,29 @@ func init() {
type workqueueMetricsProvider struct{} type workqueueMetricsProvider struct{}
func (workqueueMetricsProvider) NewDepthMetric(name string) workqueue.GaugeMetric { func (workqueueMetricsProvider) NewDepthMetric(name string) workqueue.GaugeMetric {
return depth.WithLabelValues(name) return depth.WithLabelValues(name, name)
} }
func (workqueueMetricsProvider) NewAddsMetric(name string) workqueue.CounterMetric { func (workqueueMetricsProvider) NewAddsMetric(name string) workqueue.CounterMetric {
return adds.WithLabelValues(name) return adds.WithLabelValues(name, name)
} }
func (workqueueMetricsProvider) NewLatencyMetric(name string) workqueue.HistogramMetric { func (workqueueMetricsProvider) NewLatencyMetric(name string) workqueue.HistogramMetric {
return latency.WithLabelValues(name) return latency.WithLabelValues(name, name)
} }
func (workqueueMetricsProvider) NewWorkDurationMetric(name string) workqueue.HistogramMetric { func (workqueueMetricsProvider) NewWorkDurationMetric(name string) workqueue.HistogramMetric {
return workDuration.WithLabelValues(name) return workDuration.WithLabelValues(name, name)
} }
func (workqueueMetricsProvider) NewUnfinishedWorkSecondsMetric(name string) workqueue.SettableGaugeMetric { func (workqueueMetricsProvider) NewUnfinishedWorkSecondsMetric(name string) workqueue.SettableGaugeMetric {
return unfinished.WithLabelValues(name) return unfinished.WithLabelValues(name, name)
} }
func (workqueueMetricsProvider) NewLongestRunningProcessorSecondsMetric(name string) workqueue.SettableGaugeMetric { func (workqueueMetricsProvider) NewLongestRunningProcessorSecondsMetric(name string) workqueue.SettableGaugeMetric {
return longestRunningProcessor.WithLabelValues(name) return longestRunningProcessor.WithLabelValues(name, name)
} }
func (workqueueMetricsProvider) NewRetriesMetric(name string) workqueue.CounterMetric { func (workqueueMetricsProvider) NewRetriesMetric(name string) workqueue.CounterMetric {
return retries.WithLabelValues(name) return retries.WithLabelValues(name, name)
} }

View File

@ -33,18 +33,18 @@ var log = logf.RuntimeLog.WithName("predicate").WithName("eventFilters")
type Predicate = TypedPredicate[client.Object] type Predicate = TypedPredicate[client.Object]
// TypedPredicate filters events before enqueuing the keys. // TypedPredicate filters events before enqueuing the keys.
type TypedPredicate[T any] interface { type TypedPredicate[object any] interface {
// Create returns true if the Create event should be processed // Create returns true if the Create event should be processed
Create(event.TypedCreateEvent[T]) bool Create(event.TypedCreateEvent[object]) bool
// Delete returns true if the Delete event should be processed // Delete returns true if the Delete event should be processed
Delete(event.TypedDeleteEvent[T]) bool Delete(event.TypedDeleteEvent[object]) bool
// Update returns true if the Update event should be processed // Update returns true if the Update event should be processed
Update(event.TypedUpdateEvent[T]) bool Update(event.TypedUpdateEvent[object]) bool
// Generic returns true if the Generic event should be processed // Generic returns true if the Generic event should be processed
Generic(event.TypedGenericEvent[T]) bool Generic(event.TypedGenericEvent[object]) bool
} }
var _ Predicate = Funcs{} var _ Predicate = Funcs{}
@ -59,22 +59,22 @@ var _ Predicate = not[client.Object]{}
type Funcs = TypedFuncs[client.Object] type Funcs = TypedFuncs[client.Object]
// TypedFuncs is a function that implements TypedPredicate. // TypedFuncs is a function that implements TypedPredicate.
type TypedFuncs[T any] struct { type TypedFuncs[object any] struct {
// Create returns true if the Create event should be processed // Create returns true if the Create event should be processed
CreateFunc func(event.TypedCreateEvent[T]) bool CreateFunc func(event.TypedCreateEvent[object]) bool
// Delete returns true if the Delete event should be processed // Delete returns true if the Delete event should be processed
DeleteFunc func(event.TypedDeleteEvent[T]) bool DeleteFunc func(event.TypedDeleteEvent[object]) bool
// Update returns true if the Update event should be processed // Update returns true if the Update event should be processed
UpdateFunc func(event.TypedUpdateEvent[T]) bool UpdateFunc func(event.TypedUpdateEvent[object]) bool
// Generic returns true if the Generic event should be processed // Generic returns true if the Generic event should be processed
GenericFunc func(event.TypedGenericEvent[T]) bool GenericFunc func(event.TypedGenericEvent[object]) bool
} }
// Create implements Predicate. // Create implements Predicate.
func (p TypedFuncs[T]) Create(e event.TypedCreateEvent[T]) bool { func (p TypedFuncs[object]) Create(e event.TypedCreateEvent[object]) bool {
if p.CreateFunc != nil { if p.CreateFunc != nil {
return p.CreateFunc(e) return p.CreateFunc(e)
} }
@ -82,7 +82,7 @@ func (p TypedFuncs[T]) Create(e event.TypedCreateEvent[T]) bool {
} }
// Delete implements Predicate. // Delete implements Predicate.
func (p TypedFuncs[T]) Delete(e event.TypedDeleteEvent[T]) bool { func (p TypedFuncs[object]) Delete(e event.TypedDeleteEvent[object]) bool {
if p.DeleteFunc != nil { if p.DeleteFunc != nil {
return p.DeleteFunc(e) return p.DeleteFunc(e)
} }
@ -90,7 +90,7 @@ func (p TypedFuncs[T]) Delete(e event.TypedDeleteEvent[T]) bool {
} }
// Update implements Predicate. // Update implements Predicate.
func (p TypedFuncs[T]) Update(e event.TypedUpdateEvent[T]) bool { func (p TypedFuncs[object]) Update(e event.TypedUpdateEvent[object]) bool {
if p.UpdateFunc != nil { if p.UpdateFunc != nil {
return p.UpdateFunc(e) return p.UpdateFunc(e)
} }
@ -98,7 +98,7 @@ func (p TypedFuncs[T]) Update(e event.TypedUpdateEvent[T]) bool {
} }
// Generic implements Predicate. // Generic implements Predicate.
func (p TypedFuncs[T]) Generic(e event.TypedGenericEvent[T]) bool { func (p TypedFuncs[object]) Generic(e event.TypedGenericEvent[object]) bool {
if p.GenericFunc != nil { if p.GenericFunc != nil {
return p.GenericFunc(e) return p.GenericFunc(e)
} }
@ -128,35 +128,38 @@ func NewPredicateFuncs(filter func(object client.Object) bool) Funcs {
// NewTypedPredicateFuncs returns a predicate funcs that applies the given filter function // NewTypedPredicateFuncs returns a predicate funcs that applies the given filter function
// on CREATE, UPDATE, DELETE and GENERIC events. For UPDATE events, the filter is applied // on CREATE, UPDATE, DELETE and GENERIC events. For UPDATE events, the filter is applied
// to the new object. // to the new object.
func NewTypedPredicateFuncs[T any](filter func(object T) bool) TypedFuncs[T] { func NewTypedPredicateFuncs[object any](filter func(object object) bool) TypedFuncs[object] {
return TypedFuncs[T]{ return TypedFuncs[object]{
CreateFunc: func(e event.TypedCreateEvent[T]) bool { CreateFunc: func(e event.TypedCreateEvent[object]) bool {
return filter(e.Object) return filter(e.Object)
}, },
UpdateFunc: func(e event.TypedUpdateEvent[T]) bool { UpdateFunc: func(e event.TypedUpdateEvent[object]) bool {
return filter(e.ObjectNew) return filter(e.ObjectNew)
}, },
DeleteFunc: func(e event.TypedDeleteEvent[T]) bool { DeleteFunc: func(e event.TypedDeleteEvent[object]) bool {
return filter(e.Object) return filter(e.Object)
}, },
GenericFunc: func(e event.TypedGenericEvent[T]) bool { GenericFunc: func(e event.TypedGenericEvent[object]) bool {
return filter(e.Object) return filter(e.Object)
}, },
} }
} }
// ResourceVersionChangedPredicate implements a default update predicate function on resource version change. // ResourceVersionChangedPredicate implements a default update predicate function on resource version change.
type ResourceVersionChangedPredicate struct { type ResourceVersionChangedPredicate = TypedResourceVersionChangedPredicate[client.Object]
Funcs
// TypedResourceVersionChangedPredicate implements a default update predicate function on resource version change.
type TypedResourceVersionChangedPredicate[T metav1.Object] struct {
TypedFuncs[T]
} }
// Update implements default UpdateEvent filter for validating resource version change. // Update implements default UpdateEvent filter for validating resource version change.
func (ResourceVersionChangedPredicate) Update(e event.UpdateEvent) bool { func (TypedResourceVersionChangedPredicate[T]) Update(e event.TypedUpdateEvent[T]) bool {
if e.ObjectOld == nil { if isNil(e.ObjectOld) {
log.Error(nil, "Update event has no old object to update", "event", e) log.Error(nil, "Update event has no old object to update", "event", e)
return false return false
} }
if e.ObjectNew == nil { if isNil(e.ObjectNew) {
log.Error(nil, "Update event has no new object to update", "event", e) log.Error(nil, "Update event has no new object to update", "event", e)
return false return false
} }
@ -198,12 +201,12 @@ type GenerationChangedPredicate = TypedGenerationChangedPredicate[client.Object]
// //
// * With this predicate, any update events with writes only to the status field will not be reconciled. // * With this predicate, any update events with writes only to the status field will not be reconciled.
// So in the event that the status block is overwritten or wiped by someone else the controller will not self-correct to restore the correct status. // So in the event that the status block is overwritten or wiped by someone else the controller will not self-correct to restore the correct status.
type TypedGenerationChangedPredicate[T metav1.Object] struct { type TypedGenerationChangedPredicate[object metav1.Object] struct {
TypedFuncs[T] TypedFuncs[object]
} }
// Update implements default UpdateEvent filter for validating generation change. // Update implements default UpdateEvent filter for validating generation change.
func (TypedGenerationChangedPredicate[T]) Update(e event.TypedUpdateEvent[T]) bool { func (TypedGenerationChangedPredicate[object]) Update(e event.TypedUpdateEvent[object]) bool {
if isNil(e.ObjectOld) { if isNil(e.ObjectOld) {
log.Error(nil, "Update event has no old object to update", "event", e) log.Error(nil, "Update event has no old object to update", "event", e)
return false return false
@ -231,12 +234,12 @@ func (TypedGenerationChangedPredicate[T]) Update(e event.TypedUpdateEvent[T]) bo
type AnnotationChangedPredicate = TypedAnnotationChangedPredicate[client.Object] type AnnotationChangedPredicate = TypedAnnotationChangedPredicate[client.Object]
// TypedAnnotationChangedPredicate implements a default update predicate function on annotation change. // TypedAnnotationChangedPredicate implements a default update predicate function on annotation change.
type TypedAnnotationChangedPredicate[T metav1.Object] struct { type TypedAnnotationChangedPredicate[object metav1.Object] struct {
TypedFuncs[T] TypedFuncs[object]
} }
// Update implements default UpdateEvent filter for validating annotation change. // Update implements default UpdateEvent filter for validating annotation change.
func (TypedAnnotationChangedPredicate[T]) Update(e event.TypedUpdateEvent[T]) bool { func (TypedAnnotationChangedPredicate[object]) Update(e event.TypedUpdateEvent[object]) bool {
if isNil(e.ObjectOld) { if isNil(e.ObjectOld) {
log.Error(nil, "Update event has no old object to update", "event", e) log.Error(nil, "Update event has no old object to update", "event", e)
return false return false
@ -265,12 +268,12 @@ func (TypedAnnotationChangedPredicate[T]) Update(e event.TypedUpdateEvent[T]) bo
type LabelChangedPredicate = TypedLabelChangedPredicate[client.Object] type LabelChangedPredicate = TypedLabelChangedPredicate[client.Object]
// TypedLabelChangedPredicate implements a default update predicate function on label change. // TypedLabelChangedPredicate implements a default update predicate function on label change.
type TypedLabelChangedPredicate[T metav1.Object] struct { type TypedLabelChangedPredicate[object metav1.Object] struct {
TypedFuncs[T] TypedFuncs[object]
} }
// Update implements default UpdateEvent filter for checking label change. // Update implements default UpdateEvent filter for checking label change.
func (TypedLabelChangedPredicate[T]) Update(e event.TypedUpdateEvent[T]) bool { func (TypedLabelChangedPredicate[object]) Update(e event.TypedUpdateEvent[object]) bool {
if isNil(e.ObjectOld) { if isNil(e.ObjectOld) {
log.Error(nil, "Update event has no old object to update", "event", e) log.Error(nil, "Update event has no old object to update", "event", e)
return false return false
@ -284,15 +287,15 @@ func (TypedLabelChangedPredicate[T]) Update(e event.TypedUpdateEvent[T]) bool {
} }
// And returns a composite predicate that implements a logical AND of the predicates passed to it. // And returns a composite predicate that implements a logical AND of the predicates passed to it.
func And[T any](predicates ...TypedPredicate[T]) TypedPredicate[T] { func And[object any](predicates ...TypedPredicate[object]) TypedPredicate[object] {
return and[T]{predicates} return and[object]{predicates}
} }
type and[T any] struct { type and[object any] struct {
predicates []TypedPredicate[T] predicates []TypedPredicate[object]
} }
func (a and[T]) Create(e event.TypedCreateEvent[T]) bool { func (a and[object]) Create(e event.TypedCreateEvent[object]) bool {
for _, p := range a.predicates { for _, p := range a.predicates {
if !p.Create(e) { if !p.Create(e) {
return false return false
@ -301,7 +304,7 @@ func (a and[T]) Create(e event.TypedCreateEvent[T]) bool {
return true return true
} }
func (a and[T]) Update(e event.TypedUpdateEvent[T]) bool { func (a and[object]) Update(e event.TypedUpdateEvent[object]) bool {
for _, p := range a.predicates { for _, p := range a.predicates {
if !p.Update(e) { if !p.Update(e) {
return false return false
@ -310,7 +313,7 @@ func (a and[T]) Update(e event.TypedUpdateEvent[T]) bool {
return true return true
} }
func (a and[T]) Delete(e event.TypedDeleteEvent[T]) bool { func (a and[object]) Delete(e event.TypedDeleteEvent[object]) bool {
for _, p := range a.predicates { for _, p := range a.predicates {
if !p.Delete(e) { if !p.Delete(e) {
return false return false
@ -319,7 +322,7 @@ func (a and[T]) Delete(e event.TypedDeleteEvent[T]) bool {
return true return true
} }
func (a and[T]) Generic(e event.TypedGenericEvent[T]) bool { func (a and[object]) Generic(e event.TypedGenericEvent[object]) bool {
for _, p := range a.predicates { for _, p := range a.predicates {
if !p.Generic(e) { if !p.Generic(e) {
return false return false
@ -329,15 +332,15 @@ func (a and[T]) Generic(e event.TypedGenericEvent[T]) bool {
} }
// Or returns a composite predicate that implements a logical OR of the predicates passed to it. // Or returns a composite predicate that implements a logical OR of the predicates passed to it.
func Or[T any](predicates ...TypedPredicate[T]) TypedPredicate[T] { func Or[object any](predicates ...TypedPredicate[object]) TypedPredicate[object] {
return or[T]{predicates} return or[object]{predicates}
} }
type or[T any] struct { type or[object any] struct {
predicates []TypedPredicate[T] predicates []TypedPredicate[object]
} }
func (o or[T]) Create(e event.TypedCreateEvent[T]) bool { func (o or[object]) Create(e event.TypedCreateEvent[object]) bool {
for _, p := range o.predicates { for _, p := range o.predicates {
if p.Create(e) { if p.Create(e) {
return true return true
@ -346,7 +349,7 @@ func (o or[T]) Create(e event.TypedCreateEvent[T]) bool {
return false return false
} }
func (o or[T]) Update(e event.TypedUpdateEvent[T]) bool { func (o or[object]) Update(e event.TypedUpdateEvent[object]) bool {
for _, p := range o.predicates { for _, p := range o.predicates {
if p.Update(e) { if p.Update(e) {
return true return true
@ -355,7 +358,7 @@ func (o or[T]) Update(e event.TypedUpdateEvent[T]) bool {
return false return false
} }
func (o or[T]) Delete(e event.TypedDeleteEvent[T]) bool { func (o or[object]) Delete(e event.TypedDeleteEvent[object]) bool {
for _, p := range o.predicates { for _, p := range o.predicates {
if p.Delete(e) { if p.Delete(e) {
return true return true
@ -364,7 +367,7 @@ func (o or[T]) Delete(e event.TypedDeleteEvent[T]) bool {
return false return false
} }
func (o or[T]) Generic(e event.TypedGenericEvent[T]) bool { func (o or[object]) Generic(e event.TypedGenericEvent[object]) bool {
for _, p := range o.predicates { for _, p := range o.predicates {
if p.Generic(e) { if p.Generic(e) {
return true return true
@ -374,27 +377,27 @@ func (o or[T]) Generic(e event.TypedGenericEvent[T]) bool {
} }
// Not returns a predicate that implements a logical NOT of the predicate passed to it. // Not returns a predicate that implements a logical NOT of the predicate passed to it.
func Not[T any](predicate TypedPredicate[T]) TypedPredicate[T] { func Not[object any](predicate TypedPredicate[object]) TypedPredicate[object] {
return not[T]{predicate} return not[object]{predicate}
} }
type not[T any] struct { type not[object any] struct {
predicate TypedPredicate[T] predicate TypedPredicate[object]
} }
func (n not[T]) Create(e event.TypedCreateEvent[T]) bool { func (n not[object]) Create(e event.TypedCreateEvent[object]) bool {
return !n.predicate.Create(e) return !n.predicate.Create(e)
} }
func (n not[T]) Update(e event.TypedUpdateEvent[T]) bool { func (n not[object]) Update(e event.TypedUpdateEvent[object]) bool {
return !n.predicate.Update(e) return !n.predicate.Update(e)
} }
func (n not[T]) Delete(e event.TypedDeleteEvent[T]) bool { func (n not[object]) Delete(e event.TypedDeleteEvent[object]) bool {
return !n.predicate.Delete(e) return !n.predicate.Delete(e)
} }
func (n not[T]) Generic(e event.TypedGenericEvent[T]) bool { func (n not[object]) Generic(e event.TypedGenericEvent[object]) bool {
return !n.predicate.Generic(e) return !n.predicate.Generic(e)
} }

View File

@ -1,30 +0,0 @@
/*
Copyright 2020 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package ratelimiter
import "time"
// RateLimiter is an identical interface of client-go workqueue RateLimiter.
type RateLimiter interface {
// When gets an item and gets to decide how long that item should wait
When(item interface{}) time.Duration
// Forget indicates that an item is finished being retried. Doesn't matter whether its for perm failing
// or for success, we'll stop tracking it
Forget(item interface{})
// NumRequeues returns back how many failures the item has had
NumRequeues(item interface{}) int
}

View File

@ -89,7 +89,14 @@ driven by actual cluster state read from the apiserver or a local cache.
For example if responding to a Pod Delete Event, the Request won't contain that a Pod was deleted, For example if responding to a Pod Delete Event, the Request won't contain that a Pod was deleted,
instead the reconcile function observes this when reading the cluster state and seeing the Pod as missing. instead the reconcile function observes this when reading the cluster state and seeing the Pod as missing.
*/ */
type Reconciler interface { type Reconciler = TypedReconciler[Request]
// TypedReconciler implements an API for a specific Resource by Creating, Updating or Deleting Kubernetes
// objects, or by making changes to systems external to the cluster (e.g. cloudproviders, github, etc).
//
// The request type is what event handlers put into the workqueue. The workqueue then de-duplicates identical
// requests.
type TypedReconciler[request comparable] interface {
// Reconcile performs a full reconciliation for the object referred to by the Request. // Reconcile performs a full reconciliation for the object referred to by the Request.
// //
// If the returned error is non-nil, the Result is ignored and the request will be // If the returned error is non-nil, the Result is ignored and the request will be
@ -101,40 +108,45 @@ type Reconciler interface {
// //
// If the error is nil and result.RequeueAfter is zero and result.Requeue is true, the request // If the error is nil and result.RequeueAfter is zero and result.Requeue is true, the request
// will be requeued using exponential backoff. // will be requeued using exponential backoff.
Reconcile(context.Context, Request) (Result, error) Reconcile(context.Context, request) (Result, error)
} }
// Func is a function that implements the reconcile interface. // Func is a function that implements the reconcile interface.
type Func func(context.Context, Request) (Result, error) type Func = TypedFunc[Request]
// TypedFunc is a function that implements the reconcile interface.
type TypedFunc[request comparable] func(context.Context, request) (Result, error)
var _ Reconciler = Func(nil) var _ Reconciler = Func(nil)
// Reconcile implements Reconciler. // Reconcile implements Reconciler.
func (r Func) Reconcile(ctx context.Context, o Request) (Result, error) { return r(ctx, o) } func (r TypedFunc[request]) Reconcile(ctx context.Context, req request) (Result, error) {
return r(ctx, req)
}
// ObjectReconciler is a specialized version of Reconciler that acts on instances of client.Object. Each reconciliation // ObjectReconciler is a specialized version of Reconciler that acts on instances of client.Object. Each reconciliation
// event gets the associated object from Kubernetes before passing it to Reconcile. An ObjectReconciler can be used in // event gets the associated object from Kubernetes before passing it to Reconcile. An ObjectReconciler can be used in
// Builder.Complete by calling AsReconciler. See Reconciler for more details. // Builder.Complete by calling AsReconciler. See Reconciler for more details.
type ObjectReconciler[T client.Object] interface { type ObjectReconciler[object client.Object] interface {
Reconcile(context.Context, T) (Result, error) Reconcile(context.Context, object) (Result, error)
} }
// AsReconciler creates a Reconciler based on the given ObjectReconciler. // AsReconciler creates a Reconciler based on the given ObjectReconciler.
func AsReconciler[T client.Object](client client.Client, rec ObjectReconciler[T]) Reconciler { func AsReconciler[object client.Object](client client.Client, rec ObjectReconciler[object]) Reconciler {
return &objectReconcilerAdapter[T]{ return &objectReconcilerAdapter[object]{
objReconciler: rec, objReconciler: rec,
client: client, client: client,
} }
} }
type objectReconcilerAdapter[T client.Object] struct { type objectReconcilerAdapter[object client.Object] struct {
objReconciler ObjectReconciler[T] objReconciler ObjectReconciler[object]
client client.Client client client.Client
} }
// Reconcile implements Reconciler. // Reconcile implements Reconciler.
func (a *objectReconcilerAdapter[T]) Reconcile(ctx context.Context, req Request) (Result, error) { func (a *objectReconcilerAdapter[object]) Reconcile(ctx context.Context, req Request) (Result, error) {
o := reflect.New(reflect.TypeOf(*new(T)).Elem()).Interface().(T) o := reflect.New(reflect.TypeOf(*new(object)).Elem()).Interface().(object)
if err := a.client.Get(ctx, req.NamespacedName, o); err != nil { if err := a.client.Get(ctx, req.NamespacedName, o); err != nil {
return Result{}, client.IgnoreNotFound(err) return Result{}, client.IgnoreNotFound(err)
} }

View File

@ -28,6 +28,7 @@ import (
"sigs.k8s.io/controller-runtime/pkg/event" "sigs.k8s.io/controller-runtime/pkg/event"
"sigs.k8s.io/controller-runtime/pkg/handler" "sigs.k8s.io/controller-runtime/pkg/handler"
internal "sigs.k8s.io/controller-runtime/pkg/internal/source" internal "sigs.k8s.io/controller-runtime/pkg/internal/source"
"sigs.k8s.io/controller-runtime/pkg/reconcile"
"sigs.k8s.io/controller-runtime/pkg/cache" "sigs.k8s.io/controller-runtime/pkg/cache"
"sigs.k8s.io/controller-runtime/pkg/predicate" "sigs.k8s.io/controller-runtime/pkg/predicate"
@ -41,45 +42,74 @@ import (
// * Use Channel for events originating outside the cluster (e.g. GitHub Webhook callback, Polling external urls). // * Use Channel for events originating outside the cluster (e.g. GitHub Webhook callback, Polling external urls).
// //
// Users may build their own Source implementations. // Users may build their own Source implementations.
type Source interface { type Source = TypedSource[reconcile.Request]
// Start is internal and should be called only by the Controller to register an EventHandler with the Informer
// to enqueue reconcile.Requests. // TypedSource is a generic source of events (e.g. Create, Update, Delete operations on Kubernetes Objects, Webhook callbacks, etc)
Start(context.Context, workqueue.RateLimitingInterface) error // which should be processed by event.EventHandlers to enqueue a request.
//
// * Use Kind for events originating in the cluster (e.g. Pod Create, Pod Update, Deployment Update).
//
// * Use Channel for events originating outside the cluster (e.g. GitHub Webhook callback, Polling external urls).
//
// Users may build their own Source implementations.
type TypedSource[request comparable] interface {
// Start is internal and should be called only by the Controller to start the source.
// Start must be non-blocking.
Start(context.Context, workqueue.TypedRateLimitingInterface[request]) error
} }
// SyncingSource is a source that needs syncing prior to being usable. The controller // SyncingSource is a source that needs syncing prior to being usable. The controller
// will call its WaitForSync prior to starting workers. // will call its WaitForSync prior to starting workers.
type SyncingSource interface { type SyncingSource = TypedSyncingSource[reconcile.Request]
Source
// TypedSyncingSource is a source that needs syncing prior to being usable. The controller
// will call its WaitForSync prior to starting workers.
type TypedSyncingSource[request comparable] interface {
TypedSource[request]
WaitForSync(ctx context.Context) error WaitForSync(ctx context.Context) error
} }
// Kind creates a KindSource with the given cache provider. // Kind creates a KindSource with the given cache provider.
func Kind[T client.Object](cache cache.Cache, object T, handler handler.TypedEventHandler[T], predicates ...predicate.TypedPredicate[T]) SyncingSource { func Kind[object client.Object](
return &internal.Kind[T]{ cache cache.Cache,
Type: object, obj object,
handler handler.TypedEventHandler[object, reconcile.Request],
predicates ...predicate.TypedPredicate[object],
) SyncingSource {
return TypedKind(cache, obj, handler, predicates...)
}
// TypedKind creates a KindSource with the given cache provider.
func TypedKind[object client.Object, request comparable](
cache cache.Cache,
obj object,
handler handler.TypedEventHandler[object, request],
predicates ...predicate.TypedPredicate[object],
) TypedSyncingSource[request] {
return &internal.Kind[object, request]{
Type: obj,
Cache: cache, Cache: cache,
Handler: handler, Handler: handler,
Predicates: predicates, Predicates: predicates,
} }
} }
var _ Source = &channel[string]{} var _ Source = &channel[string, reconcile.Request]{}
// ChannelOpt allows to configure a source.Channel. // ChannelOpt allows to configure a source.Channel.
type ChannelOpt[T any] func(*channel[T]) type ChannelOpt[object any, request comparable] func(*channel[object, request])
// WithPredicates adds the configured predicates to a source.Channel. // WithPredicates adds the configured predicates to a source.Channel.
func WithPredicates[T any](p ...predicate.TypedPredicate[T]) ChannelOpt[T] { func WithPredicates[object any, request comparable](p ...predicate.TypedPredicate[object]) ChannelOpt[object, request] {
return func(c *channel[T]) { return func(c *channel[object, request]) {
c.predicates = append(c.predicates, p...) c.predicates = append(c.predicates, p...)
} }
} }
// WithBufferSize configures the buffer size for a source.Channel. By // WithBufferSize configures the buffer size for a source.Channel. By
// default, the buffer size is 1024. // default, the buffer size is 1024.
func WithBufferSize[T any](bufferSize int) ChannelOpt[T] { func WithBufferSize[object any, request comparable](bufferSize int) ChannelOpt[object, request] {
return func(c *channel[T]) { return func(c *channel[object, request]) {
c.bufferSize = &bufferSize c.bufferSize = &bufferSize
} }
} }
@ -87,8 +117,23 @@ func WithBufferSize[T any](bufferSize int) ChannelOpt[T] {
// Channel is used to provide a source of events originating outside the cluster // Channel is used to provide a source of events originating outside the cluster
// (e.g. GitHub Webhook callback). Channel requires the user to wire the external // (e.g. GitHub Webhook callback). Channel requires the user to wire the external
// source (e.g. http handler) to write GenericEvents to the underlying channel. // source (e.g. http handler) to write GenericEvents to the underlying channel.
func Channel[T any](source <-chan event.TypedGenericEvent[T], handler handler.TypedEventHandler[T], opts ...ChannelOpt[T]) Source { func Channel[object any](
c := &channel[T]{ source <-chan event.TypedGenericEvent[object],
handler handler.TypedEventHandler[object, reconcile.Request],
opts ...ChannelOpt[object, reconcile.Request],
) Source {
return TypedChannel[object, reconcile.Request](source, handler, opts...)
}
// TypedChannel is used to provide a source of events originating outside the cluster
// (e.g. GitHub Webhook callback). Channel requires the user to wire the external
// source (e.g. http handler) to write GenericEvents to the underlying channel.
func TypedChannel[object any, request comparable](
source <-chan event.TypedGenericEvent[object],
handler handler.TypedEventHandler[object, request],
opts ...ChannelOpt[object, request],
) TypedSource[request] {
c := &channel[object, request]{
source: source, source: source,
handler: handler, handler: handler,
} }
@ -99,34 +144,34 @@ func Channel[T any](source <-chan event.TypedGenericEvent[T], handler handler.Ty
return c return c
} }
type channel[T any] struct { type channel[object any, request comparable] struct {
// once ensures the event distribution goroutine will be performed only once // once ensures the event distribution goroutine will be performed only once
once sync.Once once sync.Once
// source is the source channel to fetch GenericEvents // source is the source channel to fetch GenericEvents
source <-chan event.TypedGenericEvent[T] source <-chan event.TypedGenericEvent[object]
handler handler.TypedEventHandler[T] handler handler.TypedEventHandler[object, request]
predicates []predicate.TypedPredicate[T] predicates []predicate.TypedPredicate[object]
bufferSize *int bufferSize *int
// dest is the destination channels of the added event handlers // dest is the destination channels of the added event handlers
dest []chan event.TypedGenericEvent[T] dest []chan event.TypedGenericEvent[object]
// destLock is to ensure the destination channels are safely added/removed // destLock is to ensure the destination channels are safely added/removed
destLock sync.Mutex destLock sync.Mutex
} }
func (cs *channel[T]) String() string { func (cs *channel[object, request]) String() string {
return fmt.Sprintf("channel source: %p", cs) return fmt.Sprintf("channel source: %p", cs)
} }
// Start implements Source and should only be called by the Controller. // Start implements Source and should only be called by the Controller.
func (cs *channel[T]) Start( func (cs *channel[object, request]) Start(
ctx context.Context, ctx context.Context,
queue workqueue.RateLimitingInterface, queue workqueue.TypedRateLimitingInterface[request],
) error { ) error {
// Source should have been specified by the user. // Source should have been specified by the user.
if cs.source == nil { if cs.source == nil {
@ -140,7 +185,7 @@ func (cs *channel[T]) Start(
cs.bufferSize = ptr.To(1024) cs.bufferSize = ptr.To(1024)
} }
dst := make(chan event.TypedGenericEvent[T], *cs.bufferSize) dst := make(chan event.TypedGenericEvent[object], *cs.bufferSize)
cs.destLock.Lock() cs.destLock.Lock()
cs.dest = append(cs.dest, dst) cs.dest = append(cs.dest, dst)
@ -174,7 +219,7 @@ func (cs *channel[T]) Start(
return nil return nil
} }
func (cs *channel[T]) doStop() { func (cs *channel[object, request]) doStop() {
cs.destLock.Lock() cs.destLock.Lock()
defer cs.destLock.Unlock() defer cs.destLock.Unlock()
@ -183,7 +228,7 @@ func (cs *channel[T]) doStop() {
} }
} }
func (cs *channel[T]) distribute(evt event.TypedGenericEvent[T]) { func (cs *channel[object, request]) distribute(evt event.TypedGenericEvent[object]) {
cs.destLock.Lock() cs.destLock.Lock()
defer cs.destLock.Unlock() defer cs.destLock.Unlock()
@ -197,7 +242,7 @@ func (cs *channel[T]) distribute(evt event.TypedGenericEvent[T]) {
} }
} }
func (cs *channel[T]) syncLoop(ctx context.Context) { func (cs *channel[object, request]) syncLoop(ctx context.Context) {
for { for {
select { select {
case <-ctx.Done(): case <-ctx.Done():
@ -228,7 +273,7 @@ var _ Source = &Informer{}
// Start is internal and should be called only by the Controller to register an EventHandler with the Informer // Start is internal and should be called only by the Controller to register an EventHandler with the Informer
// to enqueue reconcile.Requests. // to enqueue reconcile.Requests.
func (is *Informer) Start(ctx context.Context, queue workqueue.RateLimitingInterface) error { func (is *Informer) Start(ctx context.Context, queue workqueue.TypedRateLimitingInterface[reconcile.Request]) error {
// Informer should have been specified by the user. // Informer should have been specified by the user.
if is.Informer == nil { if is.Informer == nil {
return fmt.Errorf("must specify Informer.Informer") return fmt.Errorf("must specify Informer.Informer")
@ -251,13 +296,16 @@ func (is *Informer) String() string {
var _ Source = Func(nil) var _ Source = Func(nil)
// Func is a function that implements Source. // Func is a function that implements Source.
type Func func(context.Context, workqueue.RateLimitingInterface) error type Func = TypedFunc[reconcile.Request]
// TypedFunc is a function that implements Source.
type TypedFunc[request comparable] func(context.Context, workqueue.TypedRateLimitingInterface[request]) error
// Start implements Source. // Start implements Source.
func (f Func) Start(ctx context.Context, queue workqueue.RateLimitingInterface) error { func (f TypedFunc[request]) Start(ctx context.Context, queue workqueue.TypedRateLimitingInterface[request]) error {
return f(ctx, queue) return f(ctx, queue)
} }
func (f Func) String() string { func (f TypedFunc[request]) String() string {
return fmt.Sprintf("func source: %p", f) return fmt.Sprintf("func source: %p", f)
} }

View File

@ -0,0 +1,39 @@
/*
Copyright 2024 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package metrics
import (
"github.com/prometheus/client_golang/prometheus"
"sigs.k8s.io/controller-runtime/pkg/metrics"
)
var (
// WebhookPanics is a prometheus counter metrics which holds the total
// number of panics from webhooks.
WebhookPanics = prometheus.NewCounterVec(prometheus.CounterOpts{
Name: "controller_runtime_webhook_panics_total",
Help: "Total number of webhook panics",
}, []string{})
)
func init() {
metrics.Registry.MustRegister(
WebhookPanics,
)
// Init metric.
WebhookPanics.WithLabelValues().Add(0)
}

View File

@ -30,6 +30,7 @@ import (
"k8s.io/apimachinery/pkg/util/json" "k8s.io/apimachinery/pkg/util/json"
utilruntime "k8s.io/apimachinery/pkg/util/runtime" utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/klog/v2" "k8s.io/klog/v2"
admissionmetrics "sigs.k8s.io/controller-runtime/pkg/webhook/admission/metrics"
logf "sigs.k8s.io/controller-runtime/pkg/log" logf "sigs.k8s.io/controller-runtime/pkg/log"
"sigs.k8s.io/controller-runtime/pkg/webhook/internal/metrics" "sigs.k8s.io/controller-runtime/pkg/webhook/internal/metrics"
@ -123,7 +124,8 @@ type Webhook struct {
Handler Handler Handler Handler
// RecoverPanic indicates whether the panic caused by webhook should be recovered. // RecoverPanic indicates whether the panic caused by webhook should be recovered.
RecoverPanic bool // Defaults to true.
RecoverPanic *bool
// WithContextFunc will allow you to take the http.Request.Context() and // WithContextFunc will allow you to take the http.Request.Context() and
// add any additional information such as passing the request path or // add any additional information such as passing the request path or
@ -141,8 +143,9 @@ type Webhook struct {
} }
// WithRecoverPanic takes a bool flag which indicates whether the panic caused by webhook should be recovered. // WithRecoverPanic takes a bool flag which indicates whether the panic caused by webhook should be recovered.
// Defaults to true.
func (wh *Webhook) WithRecoverPanic(recoverPanic bool) *Webhook { func (wh *Webhook) WithRecoverPanic(recoverPanic bool) *Webhook {
wh.RecoverPanic = recoverPanic wh.RecoverPanic = &recoverPanic
return wh return wh
} }
@ -151,17 +154,26 @@ func (wh *Webhook) WithRecoverPanic(recoverPanic bool) *Webhook {
// If the webhook is validating type, it delegates the AdmissionRequest to each handler and // If the webhook is validating type, it delegates the AdmissionRequest to each handler and
// deny the request if anyone denies. // deny the request if anyone denies.
func (wh *Webhook) Handle(ctx context.Context, req Request) (response Response) { func (wh *Webhook) Handle(ctx context.Context, req Request) (response Response) {
if wh.RecoverPanic {
defer func() { defer func() {
if r := recover(); r != nil { if r := recover(); r != nil {
admissionmetrics.WebhookPanics.WithLabelValues().Inc()
if wh.RecoverPanic == nil || *wh.RecoverPanic {
for _, fn := range utilruntime.PanicHandlers { for _, fn := range utilruntime.PanicHandlers {
fn(r) fn(ctx, r)
} }
response = Errored(http.StatusInternalServerError, fmt.Errorf("panic: %v [recovered]", r)) response = Errored(http.StatusInternalServerError, fmt.Errorf("panic: %v [recovered]", r))
// Note: We explicitly have to set the response UID. Usually that is done via resp.Complete below,
// but if we encounter a panic in wh.Handler.Handle we are never going to reach resp.Complete.
response.UID = req.UID
return return
} }
}()
log := logf.FromContext(ctx)
log.Info(fmt.Sprintf("Observed a panic in webhook: %v", r))
panic(r)
} }
}()
reqLog := wh.getLogger(&req) reqLog := wh.getLogger(&req)
ctx = logf.IntoContext(ctx, reqLog) ctx = logf.IntoContext(ctx, reqLog)
@ -169,7 +181,10 @@ func (wh *Webhook) Handle(ctx context.Context, req Request) (response Response)
resp := wh.Handler.Handle(ctx, req) resp := wh.Handler.Handle(ctx, req)
if err := resp.Complete(req); err != nil { if err := resp.Complete(req); err != nil {
reqLog.Error(err, "unable to encode response") reqLog.Error(err, "unable to encode response")
return Errored(http.StatusInternalServerError, errUnableToEncodeResponse) resp := Errored(http.StatusInternalServerError, errUnableToEncodeResponse)
// Note: We explicitly have to set the response UID. Usually that is done via resp.Complete.
resp.UID = req.UID
return resp
} }
return resp return resp