mirror of
https://github.com/ceph/ceph-csi.git
synced 2024-11-23 14:50:24 +00:00
34fc1d847e
to v1.18.0 Signed-off-by: Humble Chirammal <hchiramm@redhat.com>
407 lines
13 KiB
Go
407 lines
13 KiB
Go
/*
|
|
Copyright 2015 The Kubernetes Authors.
|
|
|
|
Licensed under the Apache License, Version 2.0 (the "License");
|
|
you may not use this file except in compliance with the License.
|
|
You may obtain a copy of the License at
|
|
|
|
http://www.apache.org/licenses/LICENSE-2.0
|
|
|
|
Unless required by applicable law or agreed to in writing, software
|
|
distributed under the License is distributed on an "AS IS" BASIS,
|
|
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
See the License for the specific language governing permissions and
|
|
limitations under the License.
|
|
*/
|
|
|
|
package cache
|
|
|
|
import (
|
|
"sync"
|
|
"time"
|
|
|
|
"k8s.io/apimachinery/pkg/runtime"
|
|
"k8s.io/apimachinery/pkg/util/clock"
|
|
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
|
|
"k8s.io/apimachinery/pkg/util/wait"
|
|
)
|
|
|
|
// This file implements a low-level controller that is used in
|
|
// sharedIndexInformer, which is an implementation of
|
|
// SharedIndexInformer. Such informers, in turn, are key components
|
|
// in the high level controllers that form the backbone of the
|
|
// Kubernetes control plane. Look at those for examples, or the
|
|
// example in
|
|
// https://github.com/kubernetes/client-go/tree/master/examples/workqueue
|
|
// .
|
|
|
|
// Config contains all the settings for one of these low-level controllers.
|
|
type Config struct {
|
|
// The queue for your objects - has to be a DeltaFIFO due to
|
|
// assumptions in the implementation. Your Process() function
|
|
// should accept the output of this Queue's Pop() method.
|
|
Queue
|
|
|
|
// Something that can list and watch your objects.
|
|
ListerWatcher
|
|
|
|
// Something that can process a popped Deltas.
|
|
Process ProcessFunc
|
|
|
|
// ObjectType is an example object of the type this controller is
|
|
// expected to handle. Only the type needs to be right, except
|
|
// that when that is `unstructured.Unstructured` the object's
|
|
// `"apiVersion"` and `"kind"` must also be right.
|
|
ObjectType runtime.Object
|
|
|
|
// FullResyncPeriod is the period at which ShouldResync is considered.
|
|
FullResyncPeriod time.Duration
|
|
|
|
// ShouldResync is periodically used by the reflector to determine
|
|
// whether to Resync the Queue. If ShouldResync is `nil` or
|
|
// returns true, it means the reflector should proceed with the
|
|
// resync.
|
|
ShouldResync ShouldResyncFunc
|
|
|
|
// If true, when Process() returns an error, re-enqueue the object.
|
|
// TODO: add interface to let you inject a delay/backoff or drop
|
|
// the object completely if desired. Pass the object in
|
|
// question to this interface as a parameter. This is probably moot
|
|
// now that this functionality appears at a higher level.
|
|
RetryOnError bool
|
|
}
|
|
|
|
// ShouldResyncFunc is a type of function that indicates if a reflector should perform a
|
|
// resync or not. It can be used by a shared informer to support multiple event handlers with custom
|
|
// resync periods.
|
|
type ShouldResyncFunc func() bool
|
|
|
|
// ProcessFunc processes a single object.
|
|
type ProcessFunc func(obj interface{}) error
|
|
|
|
// `*controller` implements Controller
|
|
type controller struct {
|
|
config Config
|
|
reflector *Reflector
|
|
reflectorMutex sync.RWMutex
|
|
clock clock.Clock
|
|
}
|
|
|
|
// Controller is a low-level controller that is parameterized by a
|
|
// Config and used in sharedIndexInformer.
|
|
type Controller interface {
|
|
// Run does two things. One is to construct and run a Reflector
|
|
// to pump objects/notifications from the Config's ListerWatcher
|
|
// to the Config's Queue and possibly invoke the occasional Resync
|
|
// on that Queue. The other is to repeatedly Pop from the Queue
|
|
// and process with the Config's ProcessFunc. Both of these
|
|
// continue until `stopCh` is closed.
|
|
Run(stopCh <-chan struct{})
|
|
|
|
// HasSynced delegates to the Config's Queue
|
|
HasSynced() bool
|
|
|
|
// LastSyncResourceVersion delegates to the Reflector when there
|
|
// is one, otherwise returns the empty string
|
|
LastSyncResourceVersion() string
|
|
}
|
|
|
|
// New makes a new Controller from the given Config.
|
|
func New(c *Config) Controller {
|
|
ctlr := &controller{
|
|
config: *c,
|
|
clock: &clock.RealClock{},
|
|
}
|
|
return ctlr
|
|
}
|
|
|
|
// Run begins processing items, and will continue until a value is sent down stopCh or it is closed.
|
|
// It's an error to call Run more than once.
|
|
// Run blocks; call via go.
|
|
func (c *controller) Run(stopCh <-chan struct{}) {
|
|
defer utilruntime.HandleCrash()
|
|
go func() {
|
|
<-stopCh
|
|
c.config.Queue.Close()
|
|
}()
|
|
r := NewReflector(
|
|
c.config.ListerWatcher,
|
|
c.config.ObjectType,
|
|
c.config.Queue,
|
|
c.config.FullResyncPeriod,
|
|
)
|
|
r.ShouldResync = c.config.ShouldResync
|
|
r.clock = c.clock
|
|
|
|
c.reflectorMutex.Lock()
|
|
c.reflector = r
|
|
c.reflectorMutex.Unlock()
|
|
|
|
var wg wait.Group
|
|
defer wg.Wait()
|
|
|
|
wg.StartWithChannel(stopCh, r.Run)
|
|
|
|
wait.Until(c.processLoop, time.Second, stopCh)
|
|
}
|
|
|
|
// Returns true once this controller has completed an initial resource listing
|
|
func (c *controller) HasSynced() bool {
|
|
return c.config.Queue.HasSynced()
|
|
}
|
|
|
|
func (c *controller) LastSyncResourceVersion() string {
|
|
c.reflectorMutex.RLock()
|
|
defer c.reflectorMutex.RUnlock()
|
|
if c.reflector == nil {
|
|
return ""
|
|
}
|
|
return c.reflector.LastSyncResourceVersion()
|
|
}
|
|
|
|
// processLoop drains the work queue.
|
|
// TODO: Consider doing the processing in parallel. This will require a little thought
|
|
// to make sure that we don't end up processing the same object multiple times
|
|
// concurrently.
|
|
//
|
|
// TODO: Plumb through the stopCh here (and down to the queue) so that this can
|
|
// actually exit when the controller is stopped. Or just give up on this stuff
|
|
// ever being stoppable. Converting this whole package to use Context would
|
|
// also be helpful.
|
|
func (c *controller) processLoop() {
|
|
for {
|
|
obj, err := c.config.Queue.Pop(PopProcessFunc(c.config.Process))
|
|
if err != nil {
|
|
if err == ErrFIFOClosed {
|
|
return
|
|
}
|
|
if c.config.RetryOnError {
|
|
// This is the safe way to re-enqueue.
|
|
c.config.Queue.AddIfNotPresent(obj)
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
// ResourceEventHandler can handle notifications for events that happen to a
|
|
// resource. The events are informational only, so you can't return an
|
|
// error.
|
|
// * OnAdd is called when an object is added.
|
|
// * OnUpdate is called when an object is modified. Note that oldObj is the
|
|
// last known state of the object-- it is possible that several changes
|
|
// were combined together, so you can't use this to see every single
|
|
// change. OnUpdate is also called when a re-list happens, and it will
|
|
// get called even if nothing changed. This is useful for periodically
|
|
// evaluating or syncing something.
|
|
// * OnDelete will get the final state of the item if it is known, otherwise
|
|
// it will get an object of type DeletedFinalStateUnknown. This can
|
|
// happen if the watch is closed and misses the delete event and we don't
|
|
// notice the deletion until the subsequent re-list.
|
|
type ResourceEventHandler interface {
|
|
OnAdd(obj interface{})
|
|
OnUpdate(oldObj, newObj interface{})
|
|
OnDelete(obj interface{})
|
|
}
|
|
|
|
// ResourceEventHandlerFuncs is an adaptor to let you easily specify as many or
|
|
// as few of the notification functions as you want while still implementing
|
|
// ResourceEventHandler.
|
|
type ResourceEventHandlerFuncs struct {
|
|
AddFunc func(obj interface{})
|
|
UpdateFunc func(oldObj, newObj interface{})
|
|
DeleteFunc func(obj interface{})
|
|
}
|
|
|
|
// OnAdd calls AddFunc if it's not nil.
|
|
func (r ResourceEventHandlerFuncs) OnAdd(obj interface{}) {
|
|
if r.AddFunc != nil {
|
|
r.AddFunc(obj)
|
|
}
|
|
}
|
|
|
|
// OnUpdate calls UpdateFunc if it's not nil.
|
|
func (r ResourceEventHandlerFuncs) OnUpdate(oldObj, newObj interface{}) {
|
|
if r.UpdateFunc != nil {
|
|
r.UpdateFunc(oldObj, newObj)
|
|
}
|
|
}
|
|
|
|
// OnDelete calls DeleteFunc if it's not nil.
|
|
func (r ResourceEventHandlerFuncs) OnDelete(obj interface{}) {
|
|
if r.DeleteFunc != nil {
|
|
r.DeleteFunc(obj)
|
|
}
|
|
}
|
|
|
|
// FilteringResourceEventHandler applies the provided filter to all events coming
|
|
// in, ensuring the appropriate nested handler method is invoked. An object
|
|
// that starts passing the filter after an update is considered an add, and an
|
|
// object that stops passing the filter after an update is considered a delete.
|
|
type FilteringResourceEventHandler struct {
|
|
FilterFunc func(obj interface{}) bool
|
|
Handler ResourceEventHandler
|
|
}
|
|
|
|
// OnAdd calls the nested handler only if the filter succeeds
|
|
func (r FilteringResourceEventHandler) OnAdd(obj interface{}) {
|
|
if !r.FilterFunc(obj) {
|
|
return
|
|
}
|
|
r.Handler.OnAdd(obj)
|
|
}
|
|
|
|
// OnUpdate ensures the proper handler is called depending on whether the filter matches
|
|
func (r FilteringResourceEventHandler) OnUpdate(oldObj, newObj interface{}) {
|
|
newer := r.FilterFunc(newObj)
|
|
older := r.FilterFunc(oldObj)
|
|
switch {
|
|
case newer && older:
|
|
r.Handler.OnUpdate(oldObj, newObj)
|
|
case newer && !older:
|
|
r.Handler.OnAdd(newObj)
|
|
case !newer && older:
|
|
r.Handler.OnDelete(oldObj)
|
|
default:
|
|
// do nothing
|
|
}
|
|
}
|
|
|
|
// OnDelete calls the nested handler only if the filter succeeds
|
|
func (r FilteringResourceEventHandler) OnDelete(obj interface{}) {
|
|
if !r.FilterFunc(obj) {
|
|
return
|
|
}
|
|
r.Handler.OnDelete(obj)
|
|
}
|
|
|
|
// DeletionHandlingMetaNamespaceKeyFunc checks for
|
|
// DeletedFinalStateUnknown objects before calling
|
|
// MetaNamespaceKeyFunc.
|
|
func DeletionHandlingMetaNamespaceKeyFunc(obj interface{}) (string, error) {
|
|
if d, ok := obj.(DeletedFinalStateUnknown); ok {
|
|
return d.Key, nil
|
|
}
|
|
return MetaNamespaceKeyFunc(obj)
|
|
}
|
|
|
|
// NewInformer returns a Store and a controller for populating the store
|
|
// while also providing event notifications. You should only used the returned
|
|
// Store for Get/List operations; Add/Modify/Deletes will cause the event
|
|
// notifications to be faulty.
|
|
//
|
|
// Parameters:
|
|
// * lw is list and watch functions for the source of the resource you want to
|
|
// be informed of.
|
|
// * objType is an object of the type that you expect to receive.
|
|
// * resyncPeriod: if non-zero, will re-list this often (you will get OnUpdate
|
|
// calls, even if nothing changed). Otherwise, re-list will be delayed as
|
|
// long as possible (until the upstream source closes the watch or times out,
|
|
// or you stop the controller).
|
|
// * h is the object you want notifications sent to.
|
|
//
|
|
func NewInformer(
|
|
lw ListerWatcher,
|
|
objType runtime.Object,
|
|
resyncPeriod time.Duration,
|
|
h ResourceEventHandler,
|
|
) (Store, Controller) {
|
|
// This will hold the client state, as we know it.
|
|
clientState := NewStore(DeletionHandlingMetaNamespaceKeyFunc)
|
|
|
|
return clientState, newInformer(lw, objType, resyncPeriod, h, clientState)
|
|
}
|
|
|
|
// NewIndexerInformer returns a Indexer and a controller for populating the index
|
|
// while also providing event notifications. You should only used the returned
|
|
// Index for Get/List operations; Add/Modify/Deletes will cause the event
|
|
// notifications to be faulty.
|
|
//
|
|
// Parameters:
|
|
// * lw is list and watch functions for the source of the resource you want to
|
|
// be informed of.
|
|
// * objType is an object of the type that you expect to receive.
|
|
// * resyncPeriod: if non-zero, will re-list this often (you will get OnUpdate
|
|
// calls, even if nothing changed). Otherwise, re-list will be delayed as
|
|
// long as possible (until the upstream source closes the watch or times out,
|
|
// or you stop the controller).
|
|
// * h is the object you want notifications sent to.
|
|
// * indexers is the indexer for the received object type.
|
|
//
|
|
func NewIndexerInformer(
|
|
lw ListerWatcher,
|
|
objType runtime.Object,
|
|
resyncPeriod time.Duration,
|
|
h ResourceEventHandler,
|
|
indexers Indexers,
|
|
) (Indexer, Controller) {
|
|
// This will hold the client state, as we know it.
|
|
clientState := NewIndexer(DeletionHandlingMetaNamespaceKeyFunc, indexers)
|
|
|
|
return clientState, newInformer(lw, objType, resyncPeriod, h, clientState)
|
|
}
|
|
|
|
// newInformer returns a controller for populating the store while also
|
|
// providing event notifications.
|
|
//
|
|
// Parameters
|
|
// * lw is list and watch functions for the source of the resource you want to
|
|
// be informed of.
|
|
// * objType is an object of the type that you expect to receive.
|
|
// * resyncPeriod: if non-zero, will re-list this often (you will get OnUpdate
|
|
// calls, even if nothing changed). Otherwise, re-list will be delayed as
|
|
// long as possible (until the upstream source closes the watch or times out,
|
|
// or you stop the controller).
|
|
// * h is the object you want notifications sent to.
|
|
// * clientState is the store you want to populate
|
|
//
|
|
func newInformer(
|
|
lw ListerWatcher,
|
|
objType runtime.Object,
|
|
resyncPeriod time.Duration,
|
|
h ResourceEventHandler,
|
|
clientState Store,
|
|
) Controller {
|
|
// This will hold incoming changes. Note how we pass clientState in as a
|
|
// KeyLister, that way resync operations will result in the correct set
|
|
// of update/delete deltas.
|
|
fifo := NewDeltaFIFOWithOptions(DeltaFIFOOptions{
|
|
KnownObjects: clientState,
|
|
EmitDeltaTypeReplaced: true,
|
|
})
|
|
|
|
cfg := &Config{
|
|
Queue: fifo,
|
|
ListerWatcher: lw,
|
|
ObjectType: objType,
|
|
FullResyncPeriod: resyncPeriod,
|
|
RetryOnError: false,
|
|
|
|
Process: func(obj interface{}) error {
|
|
// from oldest to newest
|
|
for _, d := range obj.(Deltas) {
|
|
switch d.Type {
|
|
case Sync, Replaced, Added, Updated:
|
|
if old, exists, err := clientState.Get(d.Object); err == nil && exists {
|
|
if err := clientState.Update(d.Object); err != nil {
|
|
return err
|
|
}
|
|
h.OnUpdate(old, d.Object)
|
|
} else {
|
|
if err := clientState.Add(d.Object); err != nil {
|
|
return err
|
|
}
|
|
h.OnAdd(d.Object)
|
|
}
|
|
case Deleted:
|
|
if err := clientState.Delete(d.Object); err != nil {
|
|
return err
|
|
}
|
|
h.OnDelete(d.Object)
|
|
}
|
|
}
|
|
return nil
|
|
},
|
|
}
|
|
return New(cfg)
|
|
}
|