2020-10-21 05:49:41 +00:00
/ *
Copyright 2018 The Kubernetes Authors .
Licensed under the Apache License , Version 2.0 ( the "License" ) ;
you may not use this file except in compliance with the License .
You may obtain a copy of the License at
http : //www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing , software
distributed under the License is distributed on an "AS IS" BASIS ,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND , either express or implied .
See the License for the specific language governing permissions and
limitations under the License .
* /
package controller
import (
2021-06-25 05:02:01 +00:00
"context"
"errors"
2020-10-21 05:49:41 +00:00
"fmt"
"sync"
"time"
2021-06-25 05:02:01 +00:00
"github.com/go-logr/logr"
2023-02-01 17:06:36 +00:00
"k8s.io/apimachinery/pkg/types"
2020-10-21 05:49:41 +00:00
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
2023-02-01 17:06:36 +00:00
"k8s.io/apimachinery/pkg/util/uuid"
2020-10-21 05:49:41 +00:00
"k8s.io/client-go/util/workqueue"
2023-08-28 20:44:55 +00:00
2020-10-21 05:49:41 +00:00
"sigs.k8s.io/controller-runtime/pkg/handler"
ctrlmetrics "sigs.k8s.io/controller-runtime/pkg/internal/controller/metrics"
2021-06-25 05:02:01 +00:00
logf "sigs.k8s.io/controller-runtime/pkg/log"
2020-10-21 05:49:41 +00:00
"sigs.k8s.io/controller-runtime/pkg/predicate"
"sigs.k8s.io/controller-runtime/pkg/reconcile"
"sigs.k8s.io/controller-runtime/pkg/source"
)
2021-06-25 05:02:01 +00:00
// Controller implements controller.Controller.
2020-10-21 05:49:41 +00:00
type Controller struct {
// Name is used to uniquely identify a Controller in tracing, logging and monitoring. Name is required.
Name string
// MaxConcurrentReconciles is the maximum number of concurrent Reconciles which can be run. Defaults to 1.
MaxConcurrentReconciles int
// Reconciler is a function that can be called at any time with the Name / Namespace of an object and
// ensures that the state of the system matches the state specified in the object.
// Defaults to the DefaultReconcileFunc.
Do reconcile . Reconciler
// MakeQueue constructs the queue for this controller once the controller is ready to start.
// This exists because the standard Kubernetes workqueues start themselves immediately, which
// leads to goroutine leaks if something calls controller.New repeatedly.
MakeQueue func ( ) workqueue . RateLimitingInterface
// Queue is an listeningQueue that listens for events from Informers and adds object keys to
// the Queue for processing
Queue workqueue . RateLimitingInterface
// mu is used to synchronize Controller setup
mu sync . Mutex
// Started is true if the Controller has been Started
Started bool
2021-06-25 05:02:01 +00:00
// ctx is the context that was passed to Start() and used when starting watches.
//
// According to the docs, contexts should not be stored in a struct: https://golang.org/pkg/context,
// while we usually always strive to follow best practices, we consider this a legacy case and it should
// undergo a major refactoring and redesign to allow for context to not be stored in a struct.
ctx context . Context
// CacheSyncTimeout refers to the time limit set on waiting for cache to sync
// Defaults to 2 minutes if not set.
CacheSyncTimeout time . Duration
2020-10-21 05:49:41 +00:00
2021-06-25 05:02:01 +00:00
// startWatches maintains a list of sources, handlers, and predicates to start when the controller is started.
startWatches [ ] watchDescription
2020-10-21 05:49:41 +00:00
2023-02-01 17:06:36 +00:00
// LogConstructor is used to construct a logger to then log messages to users during reconciliation,
// or for example when a watch is started.
// Note: LogConstructor has to be able to handle nil requests as we are also using it
// outside the context of a reconciliation.
LogConstructor func ( request * reconcile . Request ) logr . Logger
2021-09-02 12:01:06 +00:00
// RecoverPanic indicates whether the panic caused by reconcile should be recovered.
2023-02-01 17:06:36 +00:00
RecoverPanic * bool
2023-06-01 17:01:19 +00:00
// LeaderElected indicates whether the controller is leader elected or always running.
LeaderElected * bool
2020-10-21 05:49:41 +00:00
}
// watchDescription contains all the information necessary to start a watch.
type watchDescription struct {
src source . Source
handler handler . EventHandler
predicates [ ] predicate . Predicate
}
2021-06-25 05:02:01 +00:00
// Reconcile implements reconcile.Reconciler.
2021-09-02 12:01:06 +00:00
func ( c * Controller ) Reconcile ( ctx context . Context , req reconcile . Request ) ( _ reconcile . Result , err error ) {
2023-02-01 17:06:36 +00:00
defer func ( ) {
if r := recover ( ) ; r != nil {
if c . RecoverPanic != nil && * c . RecoverPanic {
2021-09-02 12:01:06 +00:00
for _ , fn := range utilruntime . PanicHandlers {
fn ( r )
}
err = fmt . Errorf ( "panic: %v [recovered]" , r )
2023-02-01 17:06:36 +00:00
return
2021-09-02 12:01:06 +00:00
}
2023-02-01 17:06:36 +00:00
log := logf . FromContext ( ctx )
log . Info ( fmt . Sprintf ( "Observed a panic in reconciler: %v" , r ) )
panic ( r )
}
} ( )
2021-06-25 05:02:01 +00:00
return c . Do . Reconcile ( ctx , req )
2020-10-21 05:49:41 +00:00
}
2021-06-25 05:02:01 +00:00
// Watch implements controller.Controller.
2020-10-21 05:49:41 +00:00
func ( c * Controller ) Watch ( src source . Source , evthdler handler . EventHandler , prct ... predicate . Predicate ) error {
c . mu . Lock ( )
defer c . mu . Unlock ( )
2021-06-25 05:02:01 +00:00
// Controller hasn't started yet, store the watches locally and return.
//
// These watches are going to be held on the controller struct until the manager or user calls Start(...).
if ! c . Started {
c . startWatches = append ( c . startWatches , watchDescription { src : src , handler : evthdler , predicates : prct } )
return nil
2020-10-21 05:49:41 +00:00
}
2023-02-01 17:06:36 +00:00
c . LogConstructor ( nil ) . Info ( "Starting EventSource" , "source" , src )
2021-06-25 05:02:01 +00:00
return src . Start ( c . ctx , evthdler , c . Queue , prct ... )
2020-10-21 05:49:41 +00:00
}
2023-06-01 17:01:19 +00:00
// NeedLeaderElection implements the manager.LeaderElectionRunnable interface.
func ( c * Controller ) NeedLeaderElection ( ) bool {
if c . LeaderElected == nil {
return true
}
return * c . LeaderElected
}
2021-06-25 05:02:01 +00:00
// Start implements controller.Controller.
func ( c * Controller ) Start ( ctx context . Context ) error {
2020-10-21 05:49:41 +00:00
// use an IIFE to get proper lock handling
// but lock outside to get proper handling of the queue shutdown
c . mu . Lock ( )
2021-06-25 05:02:01 +00:00
if c . Started {
return errors . New ( "controller was started more than once. This is likely to be caused by being added to a manager multiple times" )
}
c . initMetrics ( )
// Set the internal context.
c . ctx = ctx
2020-10-21 05:49:41 +00:00
c . Queue = c . MakeQueue ( )
2021-06-25 05:02:01 +00:00
go func ( ) {
<- ctx . Done ( )
c . Queue . ShutDown ( )
} ( )
2020-10-21 05:49:41 +00:00
2021-06-25 05:02:01 +00:00
wg := & sync . WaitGroup { }
2020-10-21 05:49:41 +00:00
err := func ( ) error {
defer c . mu . Unlock ( )
// TODO(pwittrock): Reconsider HandleCrash
defer utilruntime . HandleCrash ( )
// NB(directxman12): launch the sources *before* trying to wait for the
// caches to sync so that they have a chance to register their intendeded
// caches.
2021-06-25 05:02:01 +00:00
for _ , watch := range c . startWatches {
2023-02-01 17:06:36 +00:00
c . LogConstructor ( nil ) . Info ( "Starting EventSource" , "source" , fmt . Sprintf ( "%s" , watch . src ) )
2021-06-25 05:02:01 +00:00
if err := watch . src . Start ( ctx , watch . handler , c . Queue , watch . predicates ... ) ; err != nil {
2020-10-21 05:49:41 +00:00
return err
}
}
// Start the SharedIndexInformer factories to begin populating the SharedIndexInformer caches
2023-02-01 17:06:36 +00:00
c . LogConstructor ( nil ) . Info ( "Starting Controller" )
2020-10-21 05:49:41 +00:00
2021-06-25 05:02:01 +00:00
for _ , watch := range c . startWatches {
syncingSource , ok := watch . src . ( source . SyncingSource )
if ! ok {
continue
}
2020-10-21 05:49:41 +00:00
2021-06-25 05:02:01 +00:00
if err := func ( ) error {
// use a context with timeout for launching sources and syncing caches.
sourceStartCtx , cancel := context . WithTimeout ( ctx , c . CacheSyncTimeout )
defer cancel ( )
// WaitForSync waits for a definitive timeout, and returns if there
// is an error or a timeout
if err := syncingSource . WaitForSync ( sourceStartCtx ) ; err != nil {
err := fmt . Errorf ( "failed to wait for %s caches to sync: %w" , c . Name , err )
2023-02-01 17:06:36 +00:00
c . LogConstructor ( nil ) . Error ( err , "Could not wait for Cache to sync" )
2021-06-25 05:02:01 +00:00
return err
}
return nil
} ( ) ; err != nil {
return err
}
2020-10-21 05:49:41 +00:00
}
2021-06-25 05:02:01 +00:00
// All the watches have been started, we can reset the local slice.
//
// We should never hold watches more than necessary, each watch source can hold a backing cache,
// which won't be garbage collected if we hold a reference to it.
c . startWatches = nil
2020-10-21 05:49:41 +00:00
// Launch workers to process resources
2023-02-01 17:06:36 +00:00
c . LogConstructor ( nil ) . Info ( "Starting workers" , "worker count" , c . MaxConcurrentReconciles )
2021-06-25 05:02:01 +00:00
wg . Add ( c . MaxConcurrentReconciles )
2020-10-21 05:49:41 +00:00
for i := 0 ; i < c . MaxConcurrentReconciles ; i ++ {
2021-06-25 05:02:01 +00:00
go func ( ) {
defer wg . Done ( )
// Run a worker thread that just dequeues items, processes them, and marks them done.
// It enforces that the reconcileHandler is never invoked concurrently with the same object.
for c . processNextWorkItem ( ctx ) {
}
} ( )
2020-10-21 05:49:41 +00:00
}
c . Started = true
return nil
} ( )
if err != nil {
return err
}
2021-06-25 05:02:01 +00:00
<- ctx . Done ( )
2023-02-01 17:06:36 +00:00
c . LogConstructor ( nil ) . Info ( "Shutdown signal received, waiting for all workers to finish" )
2021-06-25 05:02:01 +00:00
wg . Wait ( )
2023-02-01 17:06:36 +00:00
c . LogConstructor ( nil ) . Info ( "All workers finished" )
2020-10-21 05:49:41 +00:00
return nil
}
// processNextWorkItem will read a single work item off the workqueue and
// attempt to process it, by calling the reconcileHandler.
2021-06-25 05:02:01 +00:00
func ( c * Controller ) processNextWorkItem ( ctx context . Context ) bool {
2020-10-21 05:49:41 +00:00
obj , shutdown := c . Queue . Get ( )
if shutdown {
// Stop working
return false
}
// We call Done here so the workqueue knows we have finished
// processing this item. We also must remember to call Forget if we
// do not want this work item being re-queued. For example, we do
// not call Forget if a transient error occurs, instead the item is
// put back on the workqueue and attempted again after a back-off
// period.
defer c . Queue . Done ( obj )
2021-06-25 05:02:01 +00:00
ctrlmetrics . ActiveWorkers . WithLabelValues ( c . Name ) . Add ( 1 )
defer ctrlmetrics . ActiveWorkers . WithLabelValues ( c . Name ) . Add ( - 1 )
c . reconcileHandler ( ctx , obj )
return true
}
const (
labelError = "error"
labelRequeueAfter = "requeue_after"
labelRequeue = "requeue"
labelSuccess = "success"
)
func ( c * Controller ) initMetrics ( ) {
ctrlmetrics . ActiveWorkers . WithLabelValues ( c . Name ) . Set ( 0 )
ctrlmetrics . ReconcileErrors . WithLabelValues ( c . Name ) . Add ( 0 )
ctrlmetrics . ReconcileTotal . WithLabelValues ( c . Name , labelError ) . Add ( 0 )
ctrlmetrics . ReconcileTotal . WithLabelValues ( c . Name , labelRequeueAfter ) . Add ( 0 )
ctrlmetrics . ReconcileTotal . WithLabelValues ( c . Name , labelRequeue ) . Add ( 0 )
ctrlmetrics . ReconcileTotal . WithLabelValues ( c . Name , labelSuccess ) . Add ( 0 )
ctrlmetrics . WorkerCount . WithLabelValues ( c . Name ) . Set ( float64 ( c . MaxConcurrentReconciles ) )
2020-10-21 05:49:41 +00:00
}
2021-06-25 05:02:01 +00:00
func ( c * Controller ) reconcileHandler ( ctx context . Context , obj interface { } ) {
2020-10-21 05:49:41 +00:00
// Update metrics after processing each item
reconcileStartTS := time . Now ( )
defer func ( ) {
c . updateMetrics ( time . Since ( reconcileStartTS ) )
} ( )
2023-02-01 17:06:36 +00:00
// Make sure that the object is a valid request.
2021-06-25 05:02:01 +00:00
req , ok := obj . ( reconcile . Request )
if ! ok {
2020-10-21 05:49:41 +00:00
// As the item in the workqueue is actually invalid, we call
// Forget here else we'd go into a loop of attempting to
// process a work item that is invalid.
c . Queue . Forget ( obj )
2023-02-01 17:06:36 +00:00
c . LogConstructor ( nil ) . Error ( nil , "Queue item was not a Request" , "type" , fmt . Sprintf ( "%T" , obj ) , "value" , obj )
2020-10-21 05:49:41 +00:00
// Return true, don't take a break
2021-06-25 05:02:01 +00:00
return
2020-10-21 05:49:41 +00:00
}
2021-06-25 05:02:01 +00:00
2023-02-01 17:06:36 +00:00
log := c . LogConstructor ( & req )
reconcileID := uuid . NewUUID ( )
log = log . WithValues ( "reconcileID" , reconcileID )
2021-06-25 05:02:01 +00:00
ctx = logf . IntoContext ( ctx , log )
2023-02-01 17:06:36 +00:00
ctx = addReconcileID ( ctx , reconcileID )
2021-06-25 05:02:01 +00:00
// RunInformersAndControllers the syncHandler, passing it the Namespace/Name string of the
2020-10-21 05:49:41 +00:00
// resource to be synced.
2023-08-28 20:44:55 +00:00
log . V ( 5 ) . Info ( "Reconciling" )
2021-09-02 12:01:06 +00:00
result , err := c . Reconcile ( ctx , req )
2021-06-25 05:02:01 +00:00
switch {
case err != nil :
2023-06-01 17:01:19 +00:00
if errors . Is ( err , reconcile . TerminalError ( nil ) ) {
ctrlmetrics . TerminalReconcileErrors . WithLabelValues ( c . Name ) . Inc ( )
} else {
c . Queue . AddRateLimited ( req )
}
2020-10-21 05:49:41 +00:00
ctrlmetrics . ReconcileErrors . WithLabelValues ( c . Name ) . Inc ( )
2021-06-25 05:02:01 +00:00
ctrlmetrics . ReconcileTotal . WithLabelValues ( c . Name , labelError ) . Inc ( )
2023-08-28 20:44:55 +00:00
if ! result . IsZero ( ) {
log . Info ( "Warning: Reconciler returned both a non-zero result and a non-nil error. The result will always be ignored if the error is non-nil and the non-nil error causes reqeueuing with exponential backoff. For more details, see: https://pkg.go.dev/sigs.k8s.io/controller-runtime/pkg/reconcile#Reconciler" )
}
2021-06-25 05:02:01 +00:00
log . Error ( err , "Reconciler error" )
case result . RequeueAfter > 0 :
2023-08-28 20:44:55 +00:00
log . V ( 5 ) . Info ( fmt . Sprintf ( "Reconcile done, requeueing after %s" , result . RequeueAfter ) )
2020-10-21 05:49:41 +00:00
// The result.RequeueAfter request will be lost, if it is returned
// along with a non-nil error. But this is intended as
// We need to drive to stable reconcile loops before queuing due
// to result.RequestAfter
c . Queue . Forget ( obj )
c . Queue . AddAfter ( req , result . RequeueAfter )
2021-06-25 05:02:01 +00:00
ctrlmetrics . ReconcileTotal . WithLabelValues ( c . Name , labelRequeueAfter ) . Inc ( )
case result . Requeue :
2023-08-28 20:44:55 +00:00
log . V ( 5 ) . Info ( "Reconcile done, requeueing" )
2020-10-21 05:49:41 +00:00
c . Queue . AddRateLimited ( req )
2021-06-25 05:02:01 +00:00
ctrlmetrics . ReconcileTotal . WithLabelValues ( c . Name , labelRequeue ) . Inc ( )
default :
2023-08-28 20:44:55 +00:00
log . V ( 5 ) . Info ( "Reconcile successful" )
2021-06-25 05:02:01 +00:00
// Finally, if no error occurs we Forget this item so it does not
// get queued again until another change happens.
c . Queue . Forget ( obj )
ctrlmetrics . ReconcileTotal . WithLabelValues ( c . Name , labelSuccess ) . Inc ( )
2020-10-21 05:49:41 +00:00
}
2021-06-25 05:02:01 +00:00
}
2020-10-21 05:49:41 +00:00
2021-06-25 05:02:01 +00:00
// GetLogger returns this controller's logger.
func ( c * Controller ) GetLogger ( ) logr . Logger {
2023-02-01 17:06:36 +00:00
return c . LogConstructor ( nil )
2020-10-21 05:49:41 +00:00
}
2021-06-25 05:02:01 +00:00
// updateMetrics updates prometheus metrics within the controller.
2020-10-21 05:49:41 +00:00
func ( c * Controller ) updateMetrics ( reconcileTime time . Duration ) {
ctrlmetrics . ReconcileTime . WithLabelValues ( c . Name ) . Observe ( reconcileTime . Seconds ( ) )
}
2023-02-01 17:06:36 +00:00
// ReconcileIDFromContext gets the reconcileID from the current context.
func ReconcileIDFromContext ( ctx context . Context ) types . UID {
r , ok := ctx . Value ( reconcileIDKey { } ) . ( types . UID )
if ! ok {
return ""
}
return r
}
// reconcileIDKey is a context.Context Value key. Its associated value should
// be a types.UID.
type reconcileIDKey struct { }
func addReconcileID ( ctx context . Context , reconcileID types . UID ) context . Context {
return context . WithValue ( ctx , reconcileIDKey { } , reconcileID )
}