rebase: bump sigs.k8s.io/controller-runtime

Bumps the k8s-dependencies group with 1 update in the / directory: [sigs.k8s.io/controller-runtime](https://github.com/kubernetes-sigs/controller-runtime).

Updates `sigs.k8s.io/controller-runtime` from 0.17.3 to 0.18.2
- [Release notes](https://github.com/kubernetes-sigs/controller-runtime/releases)
- [Changelog](https://github.com/kubernetes-sigs/controller-runtime/blob/main/RELEASE.md)
- [Commits](https://github.com/kubernetes-sigs/controller-runtime/compare/v0.17.3...v0.18.2)

---
updated-dependencies:
- dependency-name: sigs.k8s.io/controller-runtime
  dependency-type: direct:production
  update-type: version-update:semver-minor
  dependency-group: k8s-dependencies
...

Signed-off-by: dependabot[bot] <support@github.com>
This commit is contained in:
dependabot[bot]
2024-05-13 20:57:03 +00:00
committed by mergify[bot]
parent c8af2b638a
commit c1ee11261e
60 changed files with 1559 additions and 1257 deletions

View File

@ -18,10 +18,12 @@ package source
import (
"context"
"errors"
"fmt"
"sync"
"k8s.io/client-go/util/workqueue"
"k8s.io/utils/ptr"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/event"
"sigs.k8s.io/controller-runtime/pkg/handler"
@ -31,23 +33,18 @@ import (
"sigs.k8s.io/controller-runtime/pkg/predicate"
)
const (
// defaultBufferSize is the default number of event notifications that can be buffered.
defaultBufferSize = 1024
)
// Source is a source of events (eh.g. Create, Update, Delete operations on Kubernetes Objects, Webhook callbacks, etc)
// Source is a source of events (e.g. Create, Update, Delete operations on Kubernetes Objects, Webhook callbacks, etc)
// which should be processed by event.EventHandlers to enqueue reconcile.Requests.
//
// * Use Kind for events originating in the cluster (e.g. Pod Create, Pod Update, Deployment Update).
//
// * Use Channel for events originating outside the cluster (eh.g. GitHub Webhook callback, Polling external urls).
// * Use Channel for events originating outside the cluster (e.g. GitHub Webhook callback, Polling external urls).
//
// Users may build their own Source implementations.
type Source interface {
// Start is internal and should be called only by the Controller to register an EventHandler with the Informer
// to enqueue reconcile.Requests.
Start(context.Context, handler.EventHandler, workqueue.RateLimitingInterface, ...predicate.Predicate) error
Start(context.Context, workqueue.RateLimitingInterface) error
}
// SyncingSource is a source that needs syncing prior to being usable. The controller
@ -58,54 +55,92 @@ type SyncingSource interface {
}
// Kind creates a KindSource with the given cache provider.
func Kind(cache cache.Cache, object client.Object) SyncingSource {
return &internal.Kind{Type: object, Cache: cache}
func Kind[T client.Object](cache cache.Cache, object T, handler handler.TypedEventHandler[T], predicates ...predicate.TypedPredicate[T]) SyncingSource {
return &internal.Kind[T]{
Type: object,
Cache: cache,
Handler: handler,
Predicates: predicates,
}
}
var _ Source = &Channel{}
var _ Source = &channel[string]{}
// ChannelOpt allows to configure a source.Channel.
type ChannelOpt[T any] func(*channel[T])
// WithPredicates adds the configured predicates to a source.Channel.
func WithPredicates[T any](p ...predicate.TypedPredicate[T]) ChannelOpt[T] {
return func(c *channel[T]) {
c.predicates = append(c.predicates, p...)
}
}
// WithBufferSize configures the buffer size for a source.Channel. By
// default, the buffer size is 1024.
func WithBufferSize[T any](bufferSize int) ChannelOpt[T] {
return func(c *channel[T]) {
c.bufferSize = &bufferSize
}
}
// Channel is used to provide a source of events originating outside the cluster
// (e.g. GitHub Webhook callback). Channel requires the user to wire the external
// source (eh.g. http handler) to write GenericEvents to the underlying channel.
type Channel struct {
// source (e.g. http handler) to write GenericEvents to the underlying channel.
func Channel[T any](source <-chan event.TypedGenericEvent[T], handler handler.TypedEventHandler[T], opts ...ChannelOpt[T]) Source {
c := &channel[T]{
source: source,
handler: handler,
}
for _, opt := range opts {
opt(c)
}
return c
}
type channel[T any] struct {
// once ensures the event distribution goroutine will be performed only once
once sync.Once
// Source is the source channel to fetch GenericEvents
Source <-chan event.GenericEvent
// source is the source channel to fetch GenericEvents
source <-chan event.TypedGenericEvent[T]
handler handler.TypedEventHandler[T]
predicates []predicate.TypedPredicate[T]
bufferSize *int
// dest is the destination channels of the added event handlers
dest []chan event.GenericEvent
// DestBufferSize is the specified buffer size of dest channels.
// Default to 1024 if not specified.
DestBufferSize int
dest []chan event.TypedGenericEvent[T]
// destLock is to ensure the destination channels are safely added/removed
destLock sync.Mutex
}
func (cs *Channel) String() string {
func (cs *channel[T]) String() string {
return fmt.Sprintf("channel source: %p", cs)
}
// Start implements Source and should only be called by the Controller.
func (cs *Channel) Start(
func (cs *channel[T]) Start(
ctx context.Context,
handler handler.EventHandler,
queue workqueue.RateLimitingInterface,
prct ...predicate.Predicate) error {
) error {
// Source should have been specified by the user.
if cs.Source == nil {
if cs.source == nil {
return fmt.Errorf("must specify Channel.Source")
}
// use default value if DestBufferSize not specified
if cs.DestBufferSize == 0 {
cs.DestBufferSize = defaultBufferSize
if cs.handler == nil {
return errors.New("must specify Channel.Handler")
}
dst := make(chan event.GenericEvent, cs.DestBufferSize)
if cs.bufferSize == nil {
cs.bufferSize = ptr.To(1024)
}
dst := make(chan event.TypedGenericEvent[T], *cs.bufferSize)
cs.destLock.Lock()
cs.dest = append(cs.dest, dst)
@ -119,7 +154,7 @@ func (cs *Channel) Start(
go func() {
for evt := range dst {
shouldHandle := true
for _, p := range prct {
for _, p := range cs.predicates {
if !p.Generic(evt) {
shouldHandle = false
break
@ -130,7 +165,7 @@ func (cs *Channel) Start(
func() {
ctx, cancel := context.WithCancel(ctx)
defer cancel()
handler.Generic(ctx, evt, queue)
cs.handler.Generic(ctx, evt, queue)
}()
}
}
@ -139,7 +174,7 @@ func (cs *Channel) Start(
return nil
}
func (cs *Channel) doStop() {
func (cs *channel[T]) doStop() {
cs.destLock.Lock()
defer cs.destLock.Unlock()
@ -148,7 +183,7 @@ func (cs *Channel) doStop() {
}
}
func (cs *Channel) distribute(evt event.GenericEvent) {
func (cs *channel[T]) distribute(evt event.TypedGenericEvent[T]) {
cs.destLock.Lock()
defer cs.destLock.Unlock()
@ -162,14 +197,14 @@ func (cs *Channel) distribute(evt event.GenericEvent) {
}
}
func (cs *Channel) syncLoop(ctx context.Context) {
func (cs *channel[T]) syncLoop(ctx context.Context) {
for {
select {
case <-ctx.Done():
// Close destination channels
cs.doStop()
return
case evt, stillOpen := <-cs.Source:
case evt, stillOpen := <-cs.source:
if !stillOpen {
// if the source channel is closed, we're never gonna get
// anything more on it, so stop & bail
@ -184,21 +219,25 @@ func (cs *Channel) syncLoop(ctx context.Context) {
// Informer is used to provide a source of events originating inside the cluster from Watches (e.g. Pod Create).
type Informer struct {
// Informer is the controller-runtime Informer
Informer cache.Informer
Informer cache.Informer
Handler handler.EventHandler
Predicates []predicate.Predicate
}
var _ Source = &Informer{}
// Start is internal and should be called only by the Controller to register an EventHandler with the Informer
// to enqueue reconcile.Requests.
func (is *Informer) Start(ctx context.Context, handler handler.EventHandler, queue workqueue.RateLimitingInterface,
prct ...predicate.Predicate) error {
func (is *Informer) Start(ctx context.Context, queue workqueue.RateLimitingInterface) error {
// Informer should have been specified by the user.
if is.Informer == nil {
return fmt.Errorf("must specify Informer.Informer")
}
if is.Handler == nil {
return errors.New("must specify Informer.Handler")
}
_, err := is.Informer.AddEventHandler(internal.NewEventHandler(ctx, queue, handler, prct).HandlerFuncs())
_, err := is.Informer.AddEventHandler(internal.NewEventHandler(ctx, queue, is.Handler, is.Predicates).HandlerFuncs())
if err != nil {
return err
}
@ -212,12 +251,11 @@ func (is *Informer) String() string {
var _ Source = Func(nil)
// Func is a function that implements Source.
type Func func(context.Context, handler.EventHandler, workqueue.RateLimitingInterface, ...predicate.Predicate) error
type Func func(context.Context, workqueue.RateLimitingInterface) error
// Start implements Source.
func (f Func) Start(ctx context.Context, evt handler.EventHandler, queue workqueue.RateLimitingInterface,
pr ...predicate.Predicate) error {
return f(ctx, evt, queue, pr...)
func (f Func) Start(ctx context.Context, queue workqueue.RateLimitingInterface) error {
return f(ctx, queue)
}
func (f Func) String() string {