rebase: bump the k8s-dependencies group with 2 updates

Bumps the k8s-dependencies group with 2 updates: [k8s.io/klog/v2](https://github.com/kubernetes/klog) and [sigs.k8s.io/controller-runtime](https://github.com/kubernetes-sigs/controller-runtime).


Updates `k8s.io/klog/v2` from 2.120.0 to 2.120.1
- [Release notes](https://github.com/kubernetes/klog/releases)
- [Changelog](https://github.com/kubernetes/klog/blob/main/RELEASE.md)
- [Commits](https://github.com/kubernetes/klog/compare/v2.120.0...v2.120.1)

Updates `sigs.k8s.io/controller-runtime` from 0.16.3 to 0.17.0
- [Release notes](https://github.com/kubernetes-sigs/controller-runtime/releases)
- [Changelog](https://github.com/kubernetes-sigs/controller-runtime/blob/main/RELEASE.md)
- [Commits](https://github.com/kubernetes-sigs/controller-runtime/compare/v0.16.3...v0.17.0)

---
updated-dependencies:
- dependency-name: k8s.io/klog/v2
  dependency-type: direct:production
  update-type: version-update:semver-patch
  dependency-group: k8s-dependencies
- dependency-name: sigs.k8s.io/controller-runtime
  dependency-type: direct:production
  update-type: version-update:semver-minor
  dependency-group: k8s-dependencies
...

Signed-off-by: dependabot[bot] <support@github.com>
This commit is contained in:
dependabot[bot]
2024-02-01 13:08:54 +00:00
committed by mergify[bot]
parent 2217d106c4
commit ea86bf7d83
76 changed files with 16305 additions and 756 deletions

View File

@ -33,7 +33,7 @@ import (
"k8s.io/client-go/kubernetes/scheme"
"k8s.io/client-go/rest"
toolscache "k8s.io/client-go/tools/cache"
"k8s.io/utils/pointer"
"k8s.io/utils/ptr"
"sigs.k8s.io/controller-runtime/pkg/cache/internal"
"sigs.k8s.io/controller-runtime/pkg/client"
@ -83,6 +83,9 @@ type Informers interface {
// of the underlying object.
GetInformerForKind(ctx context.Context, gvk schema.GroupVersionKind, opts ...InformerGetOption) (Informer, error)
// RemoveInformer removes an informer entry and stops it if it was running.
RemoveInformer(ctx context.Context, obj client.Object) error
// Start runs all the informers known to this cache until the context is closed.
// It blocks.
Start(ctx context.Context) error
@ -121,6 +124,8 @@ type Informer interface {
// HasSynced return true if the informers underlying store has synced.
HasSynced() bool
// IsStopped returns true if the informer has been stopped.
IsStopped() bool
}
// AllNamespaces should be used as the map key to deliminate namespace settings
@ -199,6 +204,12 @@ type Options struct {
// unless there is already one set in ByObject or DefaultNamespaces.
DefaultTransform toolscache.TransformFunc
// DefaultWatchErrorHandler will be used to the WatchErrorHandler which is called
// whenever ListAndWatch drops the connection with an error.
//
// After calling this handler, the informer will backoff and retry.
DefaultWatchErrorHandler toolscache.WatchErrorHandler
// DefaultUnsafeDisableDeepCopy is the default for UnsafeDisableDeepCopy
// for everything that doesn't specify this.
//
@ -369,7 +380,8 @@ func newCache(restConfig *rest.Config, opts Options) newCacheFunc {
Field: config.FieldSelector,
},
Transform: config.Transform,
UnsafeDisableDeepCopy: pointer.BoolDeref(config.UnsafeDisableDeepCopy, false),
WatchErrorHandler: opts.DefaultWatchErrorHandler,
UnsafeDisableDeepCopy: ptr.Deref(config.UnsafeDisableDeepCopy, false),
NewInformer: opts.newInformer,
}),
readerFailOnMissingInformer: opts.ReaderFailOnMissingInformer,
@ -400,7 +412,7 @@ func defaultOpts(config *rest.Config, opts Options) (Options, error) {
// Construct a new Mapper if unset
if opts.Mapper == nil {
var err error
opts.Mapper, err = apiutil.NewDiscoveryRESTMapper(config, opts.HTTPClient)
opts.Mapper, err = apiutil.NewDynamicRESTMapper(config, opts.HTTPClient)
if err != nil {
return Options{}, fmt.Errorf("could not create RESTMapper from config: %w", err)
}

View File

@ -52,6 +52,14 @@ func (dbt *delegatingByGVKCache) List(ctx context.Context, list client.ObjectLis
return cache.List(ctx, list, opts...)
}
func (dbt *delegatingByGVKCache) RemoveInformer(ctx context.Context, obj client.Object) error {
cache, err := dbt.cacheForObject(obj)
if err != nil {
return err
}
return cache.RemoveInformer(ctx, obj)
}
func (dbt *delegatingByGVKCache) GetInformer(ctx context.Context, obj client.Object, opts ...InformerGetOption) (Informer, error) {
cache, err := dbt.cacheForObject(obj)
if err != nil {

View File

@ -190,6 +190,17 @@ func (ic *informerCache) getInformerForKind(ctx context.Context, gvk schema.Grou
return ic.Informers.Get(ctx, gvk, obj, &internal.GetOptions{})
}
// RemoveInformer deactivates and removes the informer from the cache.
func (ic *informerCache) RemoveInformer(_ context.Context, obj client.Object) error {
gvk, err := apiutil.GVKForObject(obj, ic.scheme)
if err != nil {
return err
}
ic.Informers.Remove(gvk, obj)
return nil
}
// NeedLeaderElection implements the LeaderElectionRunnable interface
// to indicate that this can be started without requiring the leader lock.
func (ic *informerCache) NeedLeaderElection() bool {

View File

@ -23,6 +23,7 @@ import (
apierrors "k8s.io/apimachinery/pkg/api/errors"
apimeta "k8s.io/apimachinery/pkg/api/meta"
"k8s.io/apimachinery/pkg/fields"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
@ -117,16 +118,14 @@ func (c *CacheReader) List(_ context.Context, out client.ObjectList, opts ...cli
switch {
case listOpts.FieldSelector != nil:
// TODO(directxman12): support more complicated field selectors by
// combining multiple indices, GetIndexers, etc
field, val, requiresExact := selector.RequiresExactMatch(listOpts.FieldSelector)
requiresExact := selector.RequiresExactMatch(listOpts.FieldSelector)
if !requiresExact {
return fmt.Errorf("non-exact field matches are not supported by the cache")
}
// list all objects by the field selector. If this is namespaced and we have one, ask for the
// namespaced index key. Otherwise, ask for the non-namespaced variant by using the fake "all namespaces"
// namespace.
objs, err = c.indexer.ByIndex(FieldIndexName(field), KeyToNamespacedKey(listOpts.Namespace, val))
objs, err = byIndexes(c.indexer, listOpts.FieldSelector.Requirements(), listOpts.Namespace)
case listOpts.Namespace != "":
objs, err = c.indexer.ByIndex(cache.NamespaceIndex, listOpts.Namespace)
default:
@ -178,6 +177,54 @@ func (c *CacheReader) List(_ context.Context, out client.ObjectList, opts ...cli
return apimeta.SetList(out, runtimeObjs)
}
func byIndexes(indexer cache.Indexer, requires fields.Requirements, namespace string) ([]interface{}, error) {
var (
err error
objs []interface{}
vals []string
)
indexers := indexer.GetIndexers()
for idx, req := range requires {
indexName := FieldIndexName(req.Field)
indexedValue := KeyToNamespacedKey(namespace, req.Value)
if idx == 0 {
// we use first require to get snapshot data
// TODO(halfcrazy): use complicated index when client-go provides byIndexes
// https://github.com/kubernetes/kubernetes/issues/109329
objs, err = indexer.ByIndex(indexName, indexedValue)
if err != nil {
return nil, err
}
if len(objs) == 0 {
return nil, nil
}
continue
}
fn, exist := indexers[indexName]
if !exist {
return nil, fmt.Errorf("index with name %s does not exist", indexName)
}
filteredObjects := make([]interface{}, 0, len(objs))
for _, obj := range objs {
vals, err = fn(obj)
if err != nil {
return nil, err
}
for _, val := range vals {
if val == indexedValue {
filteredObjects = append(filteredObjects, obj)
break
}
}
}
if len(filteredObjects) == 0 {
return nil, nil
}
objs = filteredObjects
}
return objs, nil
}
// objectKeyToStorageKey converts an object key to store key.
// It's akin to MetaNamespaceKeyFunc. It's separate from
// String to allow keeping the key format easily in sync with

View File

@ -36,6 +36,7 @@ import (
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/cache"
"sigs.k8s.io/controller-runtime/pkg/client/apiutil"
"sigs.k8s.io/controller-runtime/pkg/internal/syncs"
)
// InformersOpts configures an InformerMap.
@ -49,6 +50,7 @@ type InformersOpts struct {
Selector Selector
Transform cache.TransformFunc
UnsafeDisableDeepCopy bool
WatchErrorHandler cache.WatchErrorHandler
}
// NewInformers creates a new InformersMap that can create informers under the hood.
@ -76,6 +78,7 @@ func NewInformers(config *rest.Config, options *InformersOpts) *Informers {
transform: options.Transform,
unsafeDisableDeepCopy: options.UnsafeDisableDeepCopy,
newInformer: newInformer,
watchErrorHandler: options.WatchErrorHandler,
}
}
@ -86,6 +89,20 @@ type Cache struct {
// CacheReader wraps Informer and implements the CacheReader interface for a single type
Reader CacheReader
// Stop can be used to stop this individual informer.
stop chan struct{}
}
// Start starts the informer managed by a MapEntry.
// Blocks until the informer stops. The informer can be stopped
// either individually (via the entry's stop channel) or globally
// via the provided stop argument.
func (c *Cache) Start(stop <-chan struct{}) {
// Stop on either the whole map stopping or just this informer being removed.
internalStop, cancel := syncs.MergeChans(stop, c.stop)
defer cancel()
c.Informer.Run(internalStop)
}
type tracker struct {
@ -159,6 +176,11 @@ type Informers struct {
// NewInformer allows overriding of the shared index informer constructor for testing.
newInformer func(cache.ListerWatcher, runtime.Object, time.Duration, cache.Indexers) cache.SharedIndexInformer
// WatchErrorHandler allows the shared index informer's
// watchErrorHandler to be set by overriding the options
// or to use the default watchErrorHandler
watchErrorHandler cache.WatchErrorHandler
}
// Start calls Run on each of the informers and sets started to true. Blocks on the context.
@ -173,13 +195,13 @@ func (ip *Informers) Start(ctx context.Context) error {
// Start each informer
for _, i := range ip.tracker.Structured {
ip.startInformerLocked(i.Informer)
ip.startInformerLocked(i)
}
for _, i := range ip.tracker.Unstructured {
ip.startInformerLocked(i.Informer)
ip.startInformerLocked(i)
}
for _, i := range ip.tracker.Metadata {
ip.startInformerLocked(i.Informer)
ip.startInformerLocked(i)
}
// Set started to true so we immediately start any informers added later.
@ -194,7 +216,7 @@ func (ip *Informers) Start(ctx context.Context) error {
return nil
}
func (ip *Informers) startInformerLocked(informer cache.SharedIndexInformer) {
func (ip *Informers) startInformerLocked(cacheEntry *Cache) {
// Don't start the informer in case we are already waiting for the items in
// the waitGroup to finish, since waitGroups don't support waiting and adding
// at the same time.
@ -205,7 +227,7 @@ func (ip *Informers) startInformerLocked(informer cache.SharedIndexInformer) {
ip.waitGroup.Add(1)
go func() {
defer ip.waitGroup.Done()
informer.Run(ip.ctx.Done())
cacheEntry.Start(ip.ctx.Done())
}()
}
@ -281,6 +303,21 @@ func (ip *Informers) Get(ctx context.Context, gvk schema.GroupVersionKind, obj r
return started, i, nil
}
// Remove removes an informer entry and stops it if it was running.
func (ip *Informers) Remove(gvk schema.GroupVersionKind, obj runtime.Object) {
ip.mu.Lock()
defer ip.mu.Unlock()
informerMap := ip.informersByType(obj)
entry, ok := informerMap[gvk]
if !ok {
return
}
close(entry.stop)
delete(informerMap, gvk)
}
func (ip *Informers) informersByType(obj runtime.Object) map[schema.GroupVersionKind]*Cache {
switch obj.(type) {
case runtime.Unstructured:
@ -323,6 +360,13 @@ func (ip *Informers) addInformerToMap(gvk schema.GroupVersionKind, obj runtime.O
cache.NamespaceIndex: cache.MetaNamespaceIndexFunc,
})
// Set WatchErrorHandler on SharedIndexInformer if set
if ip.watchErrorHandler != nil {
if err := sharedIndexInformer.SetWatchErrorHandler(ip.watchErrorHandler); err != nil {
return nil, false, err
}
}
// Check to see if there is a transformer for this gvk
if err := sharedIndexInformer.SetTransform(ip.transform); err != nil {
return nil, false, err
@ -342,13 +386,14 @@ func (ip *Informers) addInformerToMap(gvk schema.GroupVersionKind, obj runtime.O
scopeName: mapping.Scope.Name(),
disableDeepCopy: ip.unsafeDisableDeepCopy,
},
stop: make(chan struct{}),
}
ip.informersByType(obj)[gvk] = i
// Start the informer in case the InformersMap has started, otherwise it will be
// started when the InformersMap starts.
if ip.started {
ip.startInformerLocked(i.Informer)
ip.startInformerLocked(i)
}
return i, ip.started, nil
}

View File

@ -109,6 +109,27 @@ func (c *multiNamespaceCache) GetInformer(ctx context.Context, obj client.Object
return &multiNamespaceInformer{namespaceToInformer: namespaceToInformer}, nil
}
func (c *multiNamespaceCache) RemoveInformer(ctx context.Context, obj client.Object) error {
// If the object is clusterscoped, get the informer from clusterCache,
// if not use the namespaced caches.
isNamespaced, err := apiutil.IsObjectNamespaced(obj, c.Scheme, c.RESTMapper)
if err != nil {
return err
}
if !isNamespaced {
return c.clusterCache.RemoveInformer(ctx, obj)
}
for _, cache := range c.namespaceToCache {
err := cache.RemoveInformer(ctx, obj)
if err != nil {
return err
}
}
return nil
}
func (c *multiNamespaceCache) GetInformerForKind(ctx context.Context, gvk schema.GroupVersionKind, opts ...InformerGetOption) (Informer, error) {
// If the object is cluster scoped, get the informer from clusterCache,
// if not use the namespaced caches.
@ -391,3 +412,13 @@ func (i *multiNamespaceInformer) HasSynced() bool {
}
return true
}
// IsStopped checks if each namespaced informer has stopped, returns false if any are still running.
func (i *multiNamespaceInformer) IsStopped() bool {
for _, informer := range i.namespaceToInformer {
if stopped := informer.IsStopped(); !stopped {
return false
}
}
return true
}

View File

@ -31,11 +31,9 @@ import (
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/runtime/serializer"
"k8s.io/client-go/discovery"
"k8s.io/client-go/dynamic"
clientgoscheme "k8s.io/client-go/kubernetes/scheme"
"k8s.io/client-go/rest"
"k8s.io/client-go/restmapper"
)
var (
@ -60,25 +58,6 @@ func AddToProtobufScheme(addToScheme func(*runtime.Scheme) error) error {
return addToScheme(protobufScheme)
}
// NewDiscoveryRESTMapper constructs a new RESTMapper based on discovery
// information fetched by a new client with the given config.
func NewDiscoveryRESTMapper(c *rest.Config, httpClient *http.Client) (meta.RESTMapper, error) {
if httpClient == nil {
return nil, fmt.Errorf("httpClient must not be nil, consider using rest.HTTPClientFor(c) to create a client")
}
// Get a mapper
dc, err := discovery.NewDiscoveryClientForConfigAndClient(c, httpClient)
if err != nil {
return nil, err
}
gr, err := restmapper.GetAPIGroupResources(dc)
if err != nil {
return nil, err
}
return restmapper.NewDiscoveryRESTMapper(gr), nil
}
// IsObjectNamespaced returns true if the object is namespace scoped.
// For unstructured objects the gvk is found from the object itself.
func IsObjectNamespaced(obj runtime.Object, scheme *runtime.Scheme, restmapper meta.RESTMapper) (bool, error) {

View File

@ -21,6 +21,7 @@ import (
"net/http"
"sync"
apierrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime/schema"
@ -166,8 +167,10 @@ func (m *mapper) addKnownGroupAndReload(groupName string, versions ...string) er
if err != nil {
return err
}
for _, version := range apiGroup.Versions {
versions = append(versions, version.Version)
if apiGroup != nil {
for _, version := range apiGroup.Versions {
versions = append(versions, version.Version)
}
}
}
@ -254,17 +257,12 @@ func (m *mapper) findAPIGroupByName(groupName string) (*metav1.APIGroup, error)
m.mu.Unlock()
// Looking in the cache again.
{
m.mu.RLock()
group, ok := m.apiGroups[groupName]
m.mu.RUnlock()
if ok {
return group, nil
}
}
m.mu.RLock()
defer m.mu.RUnlock()
// If there is still nothing, return an error.
return nil, fmt.Errorf("failed to find API group %q", groupName)
// Don't return an error here if the API group is not present.
// The reloaded RESTMapper will take care of returning a NoMatchError.
return m.apiGroups[groupName], nil
}
// fetchGroupVersionResources fetches the resources for the specified group and its versions.
@ -276,7 +274,7 @@ func (m *mapper) fetchGroupVersionResources(groupName string, versions ...string
groupVersion := schema.GroupVersion{Group: groupName, Version: version}
apiResourceList, err := m.client.ServerResourcesForGroupVersion(groupVersion.String())
if err != nil {
if err != nil && !apierrors.IsNotFound(err) {
failedGroups[groupVersion] = err
}
if apiResourceList != nil {

View File

@ -90,11 +90,18 @@ type CacheOptions struct {
type NewClientFunc func(config *rest.Config, options Options) (Client, error)
// New returns a new Client using the provided config and Options.
// The returned client reads *and* writes directly from the server
// (it doesn't use object caches). It understands how to work with
// normal types (both custom resources and aggregated/built-in resources),
// as well as unstructured types.
//
// The client's read behavior is determined by Options.Cache.
// If either Options.Cache or Options.Cache.Reader is nil,
// the client reads directly from the API server.
// If both Options.Cache and Options.Cache.Reader are non-nil,
// the client reads from a local cache. However, specific
// resources can still be configured to bypass the cache based
// on Options.Cache.Unstructured and Options.Cache.DisableFor.
// Write operations are always performed directly on the API server.
//
// The client understands how to work with normal types (both custom resources
// and aggregated/built-in resources), as well as unstructured types.
// In the case of normal types, the scheme will be used to look up the
// corresponding group, version, and kind for the given type. In the
// case of unstructured types, the group, version, and kind will be extracted
@ -210,7 +217,8 @@ func newClient(config *rest.Config, options Options) (*client, error) {
var _ Client = &client{}
// client is a client.Client that reads and writes directly from/to an API server.
// client is a client.Client configured to either read from a local cache or directly from the API server.
// Write operations are always performed directly on the API server.
// It lazily initializes new clients at the time they are used.
type client struct {
typedClient typedClient

View File

@ -419,7 +419,7 @@ type ListOptions struct {
LabelSelector labels.Selector
// FieldSelector filters results by a particular field. In order
// to use this with cache-based implementations, restrict usage to
// a single field-value pair that's been added to the indexers.
// exact match field-value pair that's been added to the indexers.
FieldSelector fields.Selector
// Namespace represents the namespace to list for, or empty for
@ -514,7 +514,8 @@ type MatchingLabels map[string]string
func (m MatchingLabels) ApplyToList(opts *ListOptions) {
// TODO(directxman12): can we avoid reserializing this over and over?
if opts.LabelSelector == nil {
opts.LabelSelector = labels.NewSelector()
opts.LabelSelector = labels.SelectorFromValidatedSet(map[string]string(m))
return
}
// If there's already a selector, we need to AND the two together.
noValidSel := labels.SelectorFromValidatedSet(map[string]string(m))

View File

@ -42,7 +42,7 @@ import (
// Unless you are implementing your own EventHandler, you can ignore the functions on the EventHandler interface.
// Most users shouldn't need to implement their own EventHandler.
type EventHandler interface {
// Create is called in response to an create event - e.g. Pod Creation.
// Create is called in response to a create event - e.g. Pod Creation.
Create(context.Context, event.CreateEvent, workqueue.RateLimitingInterface)
// Update is called in response to an update event - e.g. Pod Updated.

View File

@ -22,14 +22,16 @@ import (
)
// RequiresExactMatch checks if the given field selector is of the form `k=v` or `k==v`.
func RequiresExactMatch(sel fields.Selector) (field, val string, required bool) {
func RequiresExactMatch(sel fields.Selector) bool {
reqs := sel.Requirements()
if len(reqs) != 1 {
return "", "", false
if len(reqs) == 0 {
return false
}
req := reqs[0]
if req.Operator != selection.Equals && req.Operator != selection.DoubleEquals {
return "", "", false
for _, req := range reqs {
if req.Operator != selection.Equals && req.Operator != selection.DoubleEquals {
return false
}
}
return req.Field, req.Value, true
return true
}

View File

@ -0,0 +1,38 @@
package syncs
import (
"context"
"reflect"
"sync"
)
// MergeChans returns a channel that is closed when any of the input channels are signaled.
// The caller must call the returned CancelFunc to ensure no resources are leaked.
func MergeChans[T any](chans ...<-chan T) (<-chan T, context.CancelFunc) {
var once sync.Once
out := make(chan T)
cancel := make(chan T)
cancelFunc := func() {
once.Do(func() {
close(cancel)
})
<-out
}
cases := make([]reflect.SelectCase, len(chans)+1)
for i := range chans {
cases[i] = reflect.SelectCase{
Dir: reflect.SelectRecv,
Chan: reflect.ValueOf(chans[i]),
}
}
cases[len(cases)-1] = reflect.SelectCase{
Dir: reflect.SelectRecv,
Chan: reflect.ValueOf(cancel),
}
go func() {
defer close(out)
_, _, _ = reflect.Select(cases)
}()
return out, cancelFunc
}

View File

@ -34,7 +34,7 @@ import (
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/leaderelection/resourcelock"
"k8s.io/client-go/tools/record"
"k8s.io/utils/pointer"
"k8s.io/utils/ptr"
metricsserver "sigs.k8s.io/controller-runtime/pkg/metrics/server"
"sigs.k8s.io/controller-runtime/pkg/cache"
@ -409,10 +409,10 @@ func New(config *rest.Config, options Options) (Manager, error) {
return nil, fmt.Errorf("failed to new pprof listener: %w", err)
}
errChan := make(chan error)
errChan := make(chan error, 1)
runnables := newRunnables(options.BaseContext, errChan)
return &controllerManager{
stopProcedureEngaged: pointer.Int64(0),
stopProcedureEngaged: ptr.To(int64(0)),
cluster: cluster,
runnables: runnables,
errChan: errChan,

View File

@ -54,14 +54,14 @@ var (
Subsystem: WorkQueueSubsystem,
Name: QueueLatencyKey,
Help: "How long in seconds an item stays in workqueue before being requested",
Buckets: prometheus.ExponentialBuckets(10e-9, 10, 10),
Buckets: prometheus.ExponentialBuckets(10e-9, 10, 12),
}, []string{"name"})
workDuration = prometheus.NewHistogramVec(prometheus.HistogramOpts{
Subsystem: WorkQueueSubsystem,
Name: WorkDurationKey,
Help: "How long in seconds processing an item from workqueue takes.",
Buckets: prometheus.ExponentialBuckets(10e-9, 10, 10),
Buckets: prometheus.ExponentialBuckets(10e-9, 10, 12),
}, []string{"name"})
unfinished = prometheus.NewGaugeVec(prometheus.GaugeOpts{

View File

@ -19,9 +19,11 @@ package reconcile
import (
"context"
"errors"
"reflect"
"time"
"k8s.io/apimachinery/pkg/types"
"sigs.k8s.io/controller-runtime/pkg/client"
)
// Result contains the result of a Reconciler invocation.
@ -97,7 +99,7 @@ type Reconciler interface {
// If the error is nil and the returned Result has a non-zero result.RequeueAfter, the request
// will be requeued after the specified duration.
//
// If the error is nil and result.RequeueAfter is zero and result.Reque is true, the request
// If the error is nil and result.RequeueAfter is zero and result.Requeue is true, the request
// will be requeued using exponential backoff.
Reconcile(context.Context, Request) (Result, error)
}
@ -110,6 +112,36 @@ var _ Reconciler = Func(nil)
// Reconcile implements Reconciler.
func (r Func) Reconcile(ctx context.Context, o Request) (Result, error) { return r(ctx, o) }
// ObjectReconciler is a specialized version of Reconciler that acts on instances of client.Object. Each reconciliation
// event gets the associated object from Kubernetes before passing it to Reconcile. An ObjectReconciler can be used in
// Builder.Complete by calling AsReconciler. See Reconciler for more details.
type ObjectReconciler[T client.Object] interface {
Reconcile(context.Context, T) (Result, error)
}
// AsReconciler creates a Reconciler based on the given ObjectReconciler.
func AsReconciler[T client.Object](client client.Client, rec ObjectReconciler[T]) Reconciler {
return &objectReconcilerAdapter[T]{
objReconciler: rec,
client: client,
}
}
type objectReconcilerAdapter[T client.Object] struct {
objReconciler ObjectReconciler[T]
client client.Client
}
// Reconcile implements Reconciler.
func (a *objectReconcilerAdapter[T]) Reconcile(ctx context.Context, req Request) (Result, error) {
o := reflect.New(reflect.TypeOf(*new(T)).Elem()).Interface().(T)
if err := a.client.Get(ctx, req.NamespacedName, o); err != nil {
return Result{}, client.IgnoreNotFound(err)
}
return a.objReconciler.Reconcile(ctx, o)
}
// TerminalError is an error that will not be retried but still be logged
// and recorded in metrics.
func TerminalError(wrapped error) error {

View File

@ -27,12 +27,14 @@ import (
)
// Defaulter defines functions for setting defaults on resources.
// Deprecated: Ue CustomDefaulter instead.
type Defaulter interface {
runtime.Object
Default()
}
// DefaultingWebhookFor creates a new Webhook for Defaulting the provided type.
// Deprecated: Use WithCustomDefaulter instead.
func DefaultingWebhookFor(scheme *runtime.Scheme, defaulter Defaulter) *Webhook {
return &Webhook{
Handler: &mutatingHandler{defaulter: defaulter, decoder: NewDecoder(scheme)},

View File

@ -34,6 +34,26 @@ import (
var admissionScheme = runtime.NewScheme()
var admissionCodecs = serializer.NewCodecFactory(admissionScheme)
// adapted from https://github.com/kubernetes/kubernetes/blob/c28c2009181fcc44c5f6b47e10e62dacf53e4da0/staging/src/k8s.io/pod-security-admission/cmd/webhook/server/server.go
//
// From https://github.com/kubernetes/apiserver/blob/d6876a0600de06fef75968c4641c64d7da499f25/pkg/server/config.go#L433-L442C5:
//
// 1.5MB is the recommended client request size in byte
// the etcd server should accept. See
// https://github.com/etcd-io/etcd/blob/release-3.4/embed/config.go#L56.
// A request body might be encoded in json, and is converted to
// proto when persisted in etcd, so we allow 2x as the largest request
// body size to be accepted and decoded in a write request.
//
// For the admission request, we can infer that it contains at most two objects
// (the old and new versions of the object being admitted), each of which can
// be at most 3MB in size. For the rest of the request, we can assume that
// it will be less than 1MB in size. Therefore, we can set the max request
// size to 7MB.
// If your use case requires larger max request sizes, please
// open an issue (https://github.com/kubernetes-sigs/controller-runtime/issues/new).
const maxRequestSize = int64(7 * 1024 * 1024)
func init() {
utilruntime.Must(v1.AddToScheme(admissionScheme))
utilruntime.Must(v1beta1.AddToScheme(admissionScheme))
@ -42,27 +62,30 @@ func init() {
var _ http.Handler = &Webhook{}
func (wh *Webhook) ServeHTTP(w http.ResponseWriter, r *http.Request) {
var body []byte
var err error
ctx := r.Context()
if wh.WithContextFunc != nil {
ctx = wh.WithContextFunc(ctx, r)
}
var reviewResponse Response
if r.Body == nil {
err = errors.New("request body is empty")
if r.Body == nil || r.Body == http.NoBody {
err := errors.New("request body is empty")
wh.getLogger(nil).Error(err, "bad request")
reviewResponse = Errored(http.StatusBadRequest, err)
wh.writeResponse(w, reviewResponse)
wh.writeResponse(w, Errored(http.StatusBadRequest, err))
return
}
defer r.Body.Close()
if body, err = io.ReadAll(r.Body); err != nil {
limitedReader := &io.LimitedReader{R: r.Body, N: maxRequestSize}
body, err := io.ReadAll(limitedReader)
if err != nil {
wh.getLogger(nil).Error(err, "unable to read the body from the incoming request")
reviewResponse = Errored(http.StatusBadRequest, err)
wh.writeResponse(w, reviewResponse)
wh.writeResponse(w, Errored(http.StatusBadRequest, err))
return
}
if limitedReader.N <= 0 {
err := fmt.Errorf("request entity is too large; limit is %d bytes", maxRequestSize)
wh.getLogger(nil).Error(err, "unable to read the body from the incoming request; limit reached")
wh.writeResponse(w, Errored(http.StatusRequestEntityTooLarge, err))
return
}
@ -70,8 +93,7 @@ func (wh *Webhook) ServeHTTP(w http.ResponseWriter, r *http.Request) {
if contentType := r.Header.Get("Content-Type"); contentType != "application/json" {
err = fmt.Errorf("contentType=%s, expected application/json", contentType)
wh.getLogger(nil).Error(err, "unable to process a request with unknown content type")
reviewResponse = Errored(http.StatusBadRequest, err)
wh.writeResponse(w, reviewResponse)
wh.writeResponse(w, Errored(http.StatusBadRequest, err))
return
}
@ -89,14 +111,12 @@ func (wh *Webhook) ServeHTTP(w http.ResponseWriter, r *http.Request) {
_, actualAdmRevGVK, err := admissionCodecs.UniversalDeserializer().Decode(body, nil, &ar)
if err != nil {
wh.getLogger(nil).Error(err, "unable to decode the request")
reviewResponse = Errored(http.StatusBadRequest, err)
wh.writeResponse(w, reviewResponse)
wh.writeResponse(w, Errored(http.StatusBadRequest, err))
return
}
wh.getLogger(&req).V(5).Info("received request")
reviewResponse = wh.Handle(ctx, req)
wh.writeResponseTyped(w, reviewResponse, actualAdmRevGVK)
wh.writeResponseTyped(w, wh.Handle(ctx, req), actualAdmRevGVK)
}
// writeResponse writes response to w generically, i.e. without encoding GVK information.

View File

@ -33,6 +33,7 @@ type Warnings []string
// Validator defines functions for validating an operation.
// The custom resource kind which implements this interface can validate itself.
// To validate the custom resource with another specific struct, use CustomValidator instead.
// Deprecated: Use CustomValidator instead.
type Validator interface {
runtime.Object
@ -53,6 +54,7 @@ type Validator interface {
}
// ValidatingWebhookFor creates a new Webhook for validating the provided type.
// Deprecated: Use WithCustomValidator instead.
func ValidatingWebhookFor(scheme *runtime.Scheme, validator Validator) *Webhook {
return &Webhook{
Handler: &validatingHandler{validator: validator, decoder: NewDecoder(scheme)},

View File

@ -30,7 +30,6 @@ import (
// CustomValidator defines functions for validating an operation.
// The object to be validated is passed into methods as a parameter.
type CustomValidator interface {
// ValidateCreate validates the object on creation.
// The optional warnings will be added to the response as warning messages.
// Return an error if the object is invalid.

View File

@ -24,9 +24,11 @@ import (
// define some aliases for common bits of the webhook functionality
// Defaulter defines functions for setting defaults on resources.
// Deprecated: Use CustomDefaulter instead.
type Defaulter = admission.Defaulter
// Validator defines functions for validating an operation.
// Deprecated: Use CustomValidator instead.
type Validator = admission.Validator
// CustomDefaulter defines functions for setting defaults on resources.