rebase: update sigs.k8s.io/controller-runtime to current version

There is no release for sigs.k8s.io/controller-runtime that supports
Kubernetes v1.27. The main branch has all the required modifications, so
we can use that for the time being.

Signed-off-by: Niels de Vos <ndevos@ibm.com>
This commit is contained in:
Niels de Vos
2023-06-01 19:01:19 +02:00
committed by mergify[bot]
parent 2551a0b05f
commit b1a4590967
74 changed files with 3412 additions and 3741 deletions

View File

@ -1,138 +0,0 @@
/*
Copyright 2018 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package internal
import (
"fmt"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/util/workqueue"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/event"
"sigs.k8s.io/controller-runtime/pkg/handler"
logf "sigs.k8s.io/controller-runtime/pkg/internal/log"
"sigs.k8s.io/controller-runtime/pkg/predicate"
)
var log = logf.RuntimeLog.WithName("source").WithName("EventHandler")
var _ cache.ResourceEventHandler = EventHandler{}
// EventHandler adapts a handler.EventHandler interface to a cache.ResourceEventHandler interface.
type EventHandler struct {
EventHandler handler.EventHandler
Queue workqueue.RateLimitingInterface
Predicates []predicate.Predicate
}
// OnAdd creates CreateEvent and calls Create on EventHandler.
func (e EventHandler) OnAdd(obj interface{}) {
c := event.CreateEvent{}
// Pull Object out of the object
if o, ok := obj.(client.Object); ok {
c.Object = o
} else {
log.Error(nil, "OnAdd missing Object",
"object", obj, "type", fmt.Sprintf("%T", obj))
return
}
for _, p := range e.Predicates {
if !p.Create(c) {
return
}
}
// Invoke create handler
e.EventHandler.Create(c, e.Queue)
}
// OnUpdate creates UpdateEvent and calls Update on EventHandler.
func (e EventHandler) OnUpdate(oldObj, newObj interface{}) {
u := event.UpdateEvent{}
if o, ok := oldObj.(client.Object); ok {
u.ObjectOld = o
} else {
log.Error(nil, "OnUpdate missing ObjectOld",
"object", oldObj, "type", fmt.Sprintf("%T", oldObj))
return
}
// Pull Object out of the object
if o, ok := newObj.(client.Object); ok {
u.ObjectNew = o
} else {
log.Error(nil, "OnUpdate missing ObjectNew",
"object", newObj, "type", fmt.Sprintf("%T", newObj))
return
}
for _, p := range e.Predicates {
if !p.Update(u) {
return
}
}
// Invoke update handler
e.EventHandler.Update(u, e.Queue)
}
// OnDelete creates DeleteEvent and calls Delete on EventHandler.
func (e EventHandler) OnDelete(obj interface{}) {
d := event.DeleteEvent{}
// Deal with tombstone events by pulling the object out. Tombstone events wrap the object in a
// DeleteFinalStateUnknown struct, so the object needs to be pulled out.
// Copied from sample-controller
// This should never happen if we aren't missing events, which we have concluded that we are not
// and made decisions off of this belief. Maybe this shouldn't be here?
var ok bool
if _, ok = obj.(client.Object); !ok {
// If the object doesn't have Metadata, assume it is a tombstone object of type DeletedFinalStateUnknown
tombstone, ok := obj.(cache.DeletedFinalStateUnknown)
if !ok {
log.Error(nil, "Error decoding objects. Expected cache.DeletedFinalStateUnknown",
"type", fmt.Sprintf("%T", obj),
"object", obj)
return
}
// Set obj to the tombstone obj
obj = tombstone.Obj
}
// Pull Object out of the object
if o, ok := obj.(client.Object); ok {
d.Object = o
} else {
log.Error(nil, "OnDelete missing Object",
"object", obj, "type", fmt.Sprintf("%T", obj))
return
}
for _, p := range e.Predicates {
if !p.Delete(d) {
return
}
}
// Invoke delete handler
e.EventHandler.Delete(d, e.Queue)
}

View File

@ -18,28 +18,19 @@ package source
import (
"context"
"errors"
"fmt"
"sync"
"time"
"k8s.io/apimachinery/pkg/api/meta"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/util/workqueue"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/event"
"sigs.k8s.io/controller-runtime/pkg/handler"
logf "sigs.k8s.io/controller-runtime/pkg/internal/log"
"sigs.k8s.io/controller-runtime/pkg/runtime/inject"
"sigs.k8s.io/controller-runtime/pkg/source/internal"
internal "sigs.k8s.io/controller-runtime/pkg/internal/source"
"sigs.k8s.io/controller-runtime/pkg/cache"
"sigs.k8s.io/controller-runtime/pkg/predicate"
)
var log = logf.RuntimeLog.WithName("source")
const (
// defaultBufferSize is the default number of event notifications that can be buffered.
defaultBufferSize = 1024
@ -52,8 +43,7 @@ const (
//
// * Use Channel for events originating outside the cluster (eh.g. GitHub Webhook callback, Polling external urls).
//
// Users may build their own Source implementations. If their implementations implement any of the inject package
// interfaces, the dependencies will be injected by the Controller when Watch is called.
// Users may build their own Source implementations.
type Source interface {
// Start is internal and should be called only by the Controller to register an EventHandler with the Informer
// to enqueue reconcile.Requests.
@ -67,144 +57,9 @@ type SyncingSource interface {
WaitForSync(ctx context.Context) error
}
// NewKindWithCache creates a Source without InjectCache, so that it is assured that the given cache is used
// and not overwritten. It can be used to watch objects in a different cluster by passing the cache
// from that other cluster.
func NewKindWithCache(object client.Object, cache cache.Cache) SyncingSource {
return &kindWithCache{kind: Kind{Type: object, cache: cache}}
}
type kindWithCache struct {
kind Kind
}
func (ks *kindWithCache) Start(ctx context.Context, handler handler.EventHandler, queue workqueue.RateLimitingInterface,
prct ...predicate.Predicate) error {
return ks.kind.Start(ctx, handler, queue, prct...)
}
func (ks *kindWithCache) String() string {
return ks.kind.String()
}
func (ks *kindWithCache) WaitForSync(ctx context.Context) error {
return ks.kind.WaitForSync(ctx)
}
// Kind is used to provide a source of events originating inside the cluster from Watches (e.g. Pod Create).
type Kind struct {
// Type is the type of object to watch. e.g. &v1.Pod{}
Type client.Object
// cache used to watch APIs
cache cache.Cache
// started may contain an error if one was encountered during startup. If its closed and does not
// contain an error, startup and syncing finished.
started chan error
startCancel func()
}
var _ SyncingSource = &Kind{}
// Start is internal and should be called only by the Controller to register an EventHandler with the Informer
// to enqueue reconcile.Requests.
func (ks *Kind) Start(ctx context.Context, handler handler.EventHandler, queue workqueue.RateLimitingInterface,
prct ...predicate.Predicate) error {
// Type should have been specified by the user.
if ks.Type == nil {
return fmt.Errorf("must specify Kind.Type")
}
// cache should have been injected before Start was called
if ks.cache == nil {
return fmt.Errorf("must call CacheInto on Kind before calling Start")
}
// cache.GetInformer will block until its context is cancelled if the cache was already started and it can not
// sync that informer (most commonly due to RBAC issues).
ctx, ks.startCancel = context.WithCancel(ctx)
ks.started = make(chan error)
go func() {
var (
i cache.Informer
lastErr error
)
// Tries to get an informer until it returns true,
// an error or the specified context is cancelled or expired.
if err := wait.PollImmediateUntilWithContext(ctx, 10*time.Second, func(ctx context.Context) (bool, error) {
// Lookup the Informer from the Cache and add an EventHandler which populates the Queue
i, lastErr = ks.cache.GetInformer(ctx, ks.Type)
if lastErr != nil {
kindMatchErr := &meta.NoKindMatchError{}
switch {
case errors.As(lastErr, &kindMatchErr):
log.Error(lastErr, "if kind is a CRD, it should be installed before calling Start",
"kind", kindMatchErr.GroupKind)
case runtime.IsNotRegisteredError(lastErr):
log.Error(lastErr, "kind must be registered to the Scheme")
default:
log.Error(lastErr, "failed to get informer from cache")
}
return false, nil // Retry.
}
return true, nil
}); err != nil {
if lastErr != nil {
ks.started <- fmt.Errorf("failed to get informer from cache: %w", lastErr)
return
}
ks.started <- err
return
}
_, err := i.AddEventHandler(internal.EventHandler{Queue: queue, EventHandler: handler, Predicates: prct})
if err != nil {
ks.started <- err
return
}
if !ks.cache.WaitForCacheSync(ctx) {
// Would be great to return something more informative here
ks.started <- errors.New("cache did not sync")
}
close(ks.started)
}()
return nil
}
func (ks *Kind) String() string {
if ks.Type != nil {
return fmt.Sprintf("kind source: %T", ks.Type)
}
return "kind source: unknown type"
}
// WaitForSync implements SyncingSource to allow controllers to wait with starting
// workers until the cache is synced.
func (ks *Kind) WaitForSync(ctx context.Context) error {
select {
case err := <-ks.started:
return err
case <-ctx.Done():
ks.startCancel()
if errors.Is(ctx.Err(), context.Canceled) {
return nil
}
return errors.New("timed out waiting for cache to be synced")
}
}
var _ inject.Cache = &Kind{}
// InjectCache is internal should be called only by the Controller. InjectCache is used to inject
// the Cache dependency initialized by the ControllerManager.
func (ks *Kind) InjectCache(c cache.Cache) error {
if ks.cache == nil {
ks.cache = c
}
return nil
// Kind creates a KindSource with the given cache provider.
func Kind(cache cache.Cache, object client.Object) SyncingSource {
return &internal.Kind{Type: object, Cache: cache}
}
var _ Source = &Channel{}
@ -219,9 +74,6 @@ type Channel struct {
// Source is the source channel to fetch GenericEvents
Source <-chan event.GenericEvent
// stop is to end ongoing goroutine, and close the channels
stop <-chan struct{}
// dest is the destination channels of the added event handlers
dest []chan event.GenericEvent
@ -237,18 +89,6 @@ func (cs *Channel) String() string {
return fmt.Sprintf("channel source: %p", cs)
}
var _ inject.Stoppable = &Channel{}
// InjectStopChannel is internal should be called only by the Controller.
// It is used to inject the stop channel initialized by the ControllerManager.
func (cs *Channel) InjectStopChannel(stop <-chan struct{}) error {
if cs.stop == nil {
cs.stop = stop
}
return nil
}
// Start implements Source and should only be called by the Controller.
func (cs *Channel) Start(
ctx context.Context,
@ -260,11 +100,6 @@ func (cs *Channel) Start(
return fmt.Errorf("must specify Channel.Source")
}
// stop should have been injected before Start was called
if cs.stop == nil {
return fmt.Errorf("must call InjectStop on Channel before calling Start")
}
// use default value if DestBufferSize not specified
if cs.DestBufferSize == 0 {
cs.DestBufferSize = defaultBufferSize
@ -292,7 +127,11 @@ func (cs *Channel) Start(
}
if shouldHandle {
handler.Generic(evt, queue)
func() {
ctx, cancel := context.WithCancel(ctx)
defer cancel()
handler.Generic(ctx, evt, queue)
}()
}
}
}()
@ -359,7 +198,7 @@ func (is *Informer) Start(ctx context.Context, handler handler.EventHandler, que
return fmt.Errorf("must specify Informer.Informer")
}
_, err := is.Informer.AddEventHandler(internal.EventHandler{Queue: queue, EventHandler: handler, Predicates: prct})
_, err := is.Informer.AddEventHandler(internal.NewEventHandler(ctx, queue, handler, prct).HandlerFuncs())
if err != nil {
return err
}