rebase: update controller-runtime package to v0.9.2

This commit updates controller-runtime to v0.9.2 and
makes changes in persistentvolume.go to add context to
various functions and function calls made here instead of
context.TODO().

Signed-off-by: Rakshith R <rar@redhat.com>
This commit is contained in:
Rakshith R
2021-06-25 10:32:01 +05:30
committed by mergify[bot]
parent 1b23d78113
commit 9eaa55506f
238 changed files with 19614 additions and 10805 deletions

View File

@ -19,15 +19,13 @@ package internal
import (
"fmt"
"k8s.io/apimachinery/pkg/api/meta"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/util/workqueue"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/event"
"sigs.k8s.io/controller-runtime/pkg/handler"
logf "sigs.k8s.io/controller-runtime/pkg/internal/log"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"sigs.k8s.io/controller-runtime/pkg/predicate"
)
@ -35,31 +33,22 @@ var log = logf.RuntimeLog.WithName("source").WithName("EventHandler")
var _ cache.ResourceEventHandler = EventHandler{}
// EventHandler adapts a handler.EventHandler interface to a cache.ResourceEventHandler interface
// EventHandler adapts a handler.EventHandler interface to a cache.ResourceEventHandler interface.
type EventHandler struct {
EventHandler handler.EventHandler
Queue workqueue.RateLimitingInterface
Predicates []predicate.Predicate
}
// OnAdd creates CreateEvent and calls Create on EventHandler
// OnAdd creates CreateEvent and calls Create on EventHandler.
func (e EventHandler) OnAdd(obj interface{}) {
c := event.CreateEvent{}
// Pull metav1.Object out of the object
if o, err := meta.Accessor(obj); err == nil {
c.Meta = o
} else {
log.Error(err, "OnAdd missing Meta",
"object", obj, "type", fmt.Sprintf("%T", obj))
return
}
// Pull the runtime.Object out of the object
if o, ok := obj.(runtime.Object); ok {
// Pull Object out of the object
if o, ok := obj.(client.Object); ok {
c.Object = o
} else {
log.Error(nil, "OnAdd missing runtime.Object",
log.Error(nil, "OnAdd missing Object",
"object", obj, "type", fmt.Sprintf("%T", obj))
return
}
@ -74,21 +63,11 @@ func (e EventHandler) OnAdd(obj interface{}) {
e.EventHandler.Create(c, e.Queue)
}
// OnUpdate creates UpdateEvent and calls Update on EventHandler
// OnUpdate creates UpdateEvent and calls Update on EventHandler.
func (e EventHandler) OnUpdate(oldObj, newObj interface{}) {
u := event.UpdateEvent{}
// Pull metav1.Object out of the object
if o, err := meta.Accessor(oldObj); err == nil {
u.MetaOld = o
} else {
log.Error(err, "OnUpdate missing MetaOld",
"object", oldObj, "type", fmt.Sprintf("%T", oldObj))
return
}
// Pull the runtime.Object out of the object
if o, ok := oldObj.(runtime.Object); ok {
if o, ok := oldObj.(client.Object); ok {
u.ObjectOld = o
} else {
log.Error(nil, "OnUpdate missing ObjectOld",
@ -96,21 +75,12 @@ func (e EventHandler) OnUpdate(oldObj, newObj interface{}) {
return
}
// Pull metav1.Object out of the object
if o, err := meta.Accessor(newObj); err == nil {
u.MetaNew = o
} else {
log.Error(err, "OnUpdate missing MetaNew",
"object", newObj, "type", fmt.Sprintf("%T", newObj))
return
}
// Pull the runtime.Object out of the object
if o, ok := newObj.(runtime.Object); ok {
// Pull Object out of the object
if o, ok := newObj.(client.Object); ok {
u.ObjectNew = o
} else {
log.Error(nil, "OnUpdate missing ObjectNew",
"object", oldObj, "type", fmt.Sprintf("%T", oldObj))
"object", newObj, "type", fmt.Sprintf("%T", newObj))
return
}
@ -124,7 +94,7 @@ func (e EventHandler) OnUpdate(oldObj, newObj interface{}) {
e.EventHandler.Update(u, e.Queue)
}
// OnDelete creates DeleteEvent and calls Delete on EventHandler
// OnDelete creates DeleteEvent and calls Delete on EventHandler.
func (e EventHandler) OnDelete(obj interface{}) {
d := event.DeleteEvent{}
@ -134,7 +104,7 @@ func (e EventHandler) OnDelete(obj interface{}) {
// This should never happen if we aren't missing events, which we have concluded that we are not
// and made decisions off of this belief. Maybe this shouldn't be here?
var ok bool
if _, ok = obj.(metav1.Object); !ok {
if _, ok = obj.(client.Object); !ok {
// If the object doesn't have Metadata, assume it is a tombstone object of type DeletedFinalStateUnknown
tombstone, ok := obj.(cache.DeletedFinalStateUnknown)
if !ok {
@ -148,20 +118,11 @@ func (e EventHandler) OnDelete(obj interface{}) {
obj = tombstone.Obj
}
// Pull metav1.Object out of the object
if o, err := meta.Accessor(obj); err == nil {
d.Meta = o
} else {
log.Error(err, "OnDelete missing Meta",
"object", obj, "type", fmt.Sprintf("%T", obj))
return
}
// Pull the runtime.Object out of the object
if o, ok := obj.(runtime.Object); ok {
// Pull Object out of the object
if o, ok := obj.(client.Object); ok {
d.Object = o
} else {
log.Error(nil, "OnDelete missing runtime.Object",
log.Error(nil, "OnDelete missing Object",
"object", obj, "type", fmt.Sprintf("%T", obj))
return
}

View File

@ -18,12 +18,13 @@ package source
import (
"context"
"errors"
"fmt"
"sync"
"k8s.io/apimachinery/pkg/api/meta"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/client-go/util/workqueue"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/event"
"sigs.k8s.io/controller-runtime/pkg/handler"
logf "sigs.k8s.io/controller-runtime/pkg/internal/log"
@ -53,13 +54,20 @@ const (
type Source interface {
// Start is internal and should be called only by the Controller to register an EventHandler with the Informer
// to enqueue reconcile.Requests.
Start(handler.EventHandler, workqueue.RateLimitingInterface, ...predicate.Predicate) error
Start(context.Context, handler.EventHandler, workqueue.RateLimitingInterface, ...predicate.Predicate) error
}
// SyncingSource is a source that needs syncing prior to being usable. The controller
// will call its WaitForSync prior to starting workers.
type SyncingSource interface {
Source
WaitForSync(ctx context.Context) error
}
// NewKindWithCache creates a Source without InjectCache, so that it is assured that the given cache is used
// and not overwritten. It can be used to watch objects in a different cluster by passing the cache
// from that other cluster
func NewKindWithCache(object runtime.Object, cache cache.Cache) Source {
// from that other cluster.
func NewKindWithCache(object client.Object, cache cache.Cache) SyncingSource {
return &kindWithCache{kind: Kind{Type: object, cache: cache}}
}
@ -67,27 +75,35 @@ type kindWithCache struct {
kind Kind
}
func (ks *kindWithCache) Start(handler handler.EventHandler, queue workqueue.RateLimitingInterface,
func (ks *kindWithCache) Start(ctx context.Context, handler handler.EventHandler, queue workqueue.RateLimitingInterface,
prct ...predicate.Predicate) error {
return ks.kind.Start(handler, queue, prct...)
return ks.kind.Start(ctx, handler, queue, prct...)
}
// Kind is used to provide a source of events originating inside the cluster from Watches (e.g. Pod Create)
func (ks *kindWithCache) WaitForSync(ctx context.Context) error {
return ks.kind.WaitForSync(ctx)
}
// Kind is used to provide a source of events originating inside the cluster from Watches (e.g. Pod Create).
type Kind struct {
// Type is the type of object to watch. e.g. &v1.Pod{}
Type runtime.Object
Type client.Object
// cache used to watch APIs
cache cache.Cache
// started may contain an error if one was encountered during startup. If its closed and does not
// contain an error, startup and syncing finished.
started chan error
startCancel func()
}
var _ Source = &Kind{}
var _ SyncingSource = &Kind{}
// Start is internal and should be called only by the Controller to register an EventHandler with the Informer
// to enqueue reconcile.Requests.
func (ks *Kind) Start(handler handler.EventHandler, queue workqueue.RateLimitingInterface,
func (ks *Kind) Start(ctx context.Context, handler handler.EventHandler, queue workqueue.RateLimitingInterface,
prct ...predicate.Predicate) error {
// Type should have been specified by the user.
if ks.Type == nil {
return fmt.Errorf("must specify Kind.Type")
@ -98,16 +114,30 @@ func (ks *Kind) Start(handler handler.EventHandler, queue workqueue.RateLimiting
return fmt.Errorf("must call CacheInto on Kind before calling Start")
}
// Lookup the Informer from the Cache and add an EventHandler which populates the Queue
i, err := ks.cache.GetInformer(context.TODO(), ks.Type)
if err != nil {
if kindMatchErr, ok := err.(*meta.NoKindMatchError); ok {
log.Error(err, "if kind is a CRD, it should be installed before calling Start",
"kind", kindMatchErr.GroupKind)
// cache.GetInformer will block until its context is cancelled if the cache was already started and it can not
// sync that informer (most commonly due to RBAC issues).
ctx, ks.startCancel = context.WithCancel(ctx)
ks.started = make(chan error)
go func() {
// Lookup the Informer from the Cache and add an EventHandler which populates the Queue
i, err := ks.cache.GetInformer(ctx, ks.Type)
if err != nil {
kindMatchErr := &meta.NoKindMatchError{}
if errors.As(err, &kindMatchErr) {
log.Error(err, "if kind is a CRD, it should be installed before calling Start",
"kind", kindMatchErr.GroupKind)
}
ks.started <- err
return
}
return err
}
i.AddEventHandler(internal.EventHandler{Queue: queue, EventHandler: handler, Predicates: prct})
i.AddEventHandler(internal.EventHandler{Queue: queue, EventHandler: handler, Predicates: prct})
if !ks.cache.WaitForCacheSync(ctx) {
// Would be great to return something more informative here
ks.started <- errors.New("cache did not sync")
}
close(ks.started)
}()
return nil
}
@ -115,7 +145,19 @@ func (ks *Kind) String() string {
if ks.Type != nil && ks.Type.GetObjectKind() != nil {
return fmt.Sprintf("kind source: %v", ks.Type.GetObjectKind().GroupVersionKind().String())
}
return fmt.Sprintf("kind source: unknown GVK")
return "kind source: unknown GVK"
}
// WaitForSync implements SyncingSource to allow controllers to wait with starting
// workers until the cache is synced.
func (ks *Kind) WaitForSync(ctx context.Context) error {
select {
case err := <-ks.started:
return err
case <-ctx.Done():
ks.startCancel()
return errors.New("timed out waiting for cache to be synced")
}
}
var _ inject.Cache = &Kind{}
@ -173,6 +215,7 @@ func (cs *Channel) InjectStopChannel(stop <-chan struct{}) error {
// Start implements Source and should only be called by the Controller.
func (cs *Channel) Start(
ctx context.Context,
handler handler.EventHandler,
queue workqueue.RateLimitingInterface,
prct ...predicate.Predicate) error {
@ -191,12 +234,17 @@ func (cs *Channel) Start(
cs.DestBufferSize = defaultBufferSize
}
dst := make(chan event.GenericEvent, cs.DestBufferSize)
cs.destLock.Lock()
cs.dest = append(cs.dest, dst)
cs.destLock.Unlock()
cs.once.Do(func() {
// Distribute GenericEvents to all EventHandler / Queue pairs Watching this source
go cs.syncLoop()
go cs.syncLoop(ctx)
})
dst := make(chan event.GenericEvent, cs.DestBufferSize)
go func() {
for evt := range dst {
shouldHandle := true
@ -213,11 +261,6 @@ func (cs *Channel) Start(
}
}()
cs.destLock.Lock()
defer cs.destLock.Unlock()
cs.dest = append(cs.dest, dst)
return nil
}
@ -244,20 +287,26 @@ func (cs *Channel) distribute(evt event.GenericEvent) {
}
}
func (cs *Channel) syncLoop() {
func (cs *Channel) syncLoop(ctx context.Context) {
for {
select {
case <-cs.stop:
case <-ctx.Done():
// Close destination channels
cs.doStop()
return
case evt := <-cs.Source:
case evt, stillOpen := <-cs.Source:
if !stillOpen {
// if the source channel is closed, we're never gonna get
// anything more on it, so stop & bail
cs.doStop()
return
}
cs.distribute(evt)
}
}
}
// Informer is used to provide a source of events originating inside the cluster from Watches (e.g. Pod Create)
// Informer is used to provide a source of events originating inside the cluster from Watches (e.g. Pod Create).
type Informer struct {
// Informer is the controller-runtime Informer
Informer cache.Informer
@ -267,9 +316,8 @@ var _ Source = &Informer{}
// Start is internal and should be called only by the Controller to register an EventHandler with the Informer
// to enqueue reconcile.Requests.
func (is *Informer) Start(handler handler.EventHandler, queue workqueue.RateLimitingInterface,
func (is *Informer) Start(ctx context.Context, handler handler.EventHandler, queue workqueue.RateLimitingInterface,
prct ...predicate.Predicate) error {
// Informer should have been specified by the user.
if is.Informer == nil {
return fmt.Errorf("must specify Informer.Informer")
@ -283,13 +331,15 @@ func (is *Informer) String() string {
return fmt.Sprintf("informer source: %p", is.Informer)
}
// Func is a function that implements Source
type Func func(handler.EventHandler, workqueue.RateLimitingInterface, ...predicate.Predicate) error
var _ Source = Func(nil)
// Start implements Source
func (f Func) Start(evt handler.EventHandler, queue workqueue.RateLimitingInterface,
// Func is a function that implements Source.
type Func func(context.Context, handler.EventHandler, workqueue.RateLimitingInterface, ...predicate.Predicate) error
// Start implements Source.
func (f Func) Start(ctx context.Context, evt handler.EventHandler, queue workqueue.RateLimitingInterface,
pr ...predicate.Predicate) error {
return f(evt, queue, pr...)
return f(ctx, evt, queue, pr...)
}
func (f Func) String() string {