rebase: update kubernetes to v1.23.0

updating go dependency to latest kubernetes
released version i.e v1.23.0

Signed-off-by: Madhu Rajanna <madhupr007@gmail.com>
This commit is contained in:
Madhu Rajanna
2021-12-08 19:20:47 +05:30
committed by mergify[bot]
parent 42403e2ba7
commit 5762da3e91
789 changed files with 49781 additions and 11501 deletions

View File

@ -86,8 +86,13 @@ type Informer interface {
HasSynced() bool
}
// ObjectSelector is an alias name of internal.Selector.
type ObjectSelector internal.Selector
// SelectorsByObject associate a client.Object's GVK to a field/label selector.
type SelectorsByObject map[client.Object]internal.Selector
// There is also `DefaultSelector` to set a global default (which will be overridden by
// a more specific setting here, if any).
type SelectorsByObject map[client.Object]ObjectSelector
// Options are the optional arguments for creating a new InformersMap object.
type Options struct {
@ -114,6 +119,10 @@ type Options struct {
// [2] https://pkg.go.dev/k8s.io/apimachinery/pkg/fields#Set
SelectorsByObject SelectorsByObject
// DefaultSelector will be used as selectors for all object types
// that do not have a selector in SelectorsByObject defined.
DefaultSelector ObjectSelector
// UnsafeDisableDeepCopyByObject indicates not to deep copy objects during get or
// list objects per GVK at the specified object.
// Be very careful with this, when enabled you must DeepCopy any object before mutating it,
@ -129,7 +138,7 @@ func New(config *rest.Config, opts Options) (Cache, error) {
if err != nil {
return nil, err
}
selectorsByGVK, err := convertToSelectorsByGVK(opts.SelectorsByObject, opts.Scheme)
selectorsByGVK, err := convertToSelectorsByGVK(opts.SelectorsByObject, opts.DefaultSelector, opts.Scheme)
if err != nil {
return nil, err
}
@ -150,21 +159,23 @@ func New(config *rest.Config, opts Options) (Cache, error) {
// returned from cache get/list before mutating it.
func BuilderWithOptions(options Options) NewCacheFunc {
return func(config *rest.Config, opts Options) (Cache, error) {
if opts.Scheme == nil {
opts.Scheme = options.Scheme
if options.Scheme == nil {
options.Scheme = opts.Scheme
}
if opts.Mapper == nil {
opts.Mapper = options.Mapper
if options.Mapper == nil {
options.Mapper = opts.Mapper
}
if options.Resync == nil {
options.Resync = opts.Resync
}
if options.Namespace == "" {
options.Namespace = opts.Namespace
}
if opts.Resync == nil {
opts.Resync = options.Resync
}
if opts.Namespace == "" {
opts.Namespace = options.Namespace
}
opts.SelectorsByObject = options.SelectorsByObject
opts.UnsafeDisableDeepCopyByObject = options.UnsafeDisableDeepCopyByObject
return New(config, opts)
return New(config, options)
}
}
@ -191,15 +202,16 @@ func defaultOpts(config *rest.Config, opts Options) (Options, error) {
return opts, nil
}
func convertToSelectorsByGVK(selectorsByObject SelectorsByObject, scheme *runtime.Scheme) (internal.SelectorsByGVK, error) {
func convertToSelectorsByGVK(selectorsByObject SelectorsByObject, defaultSelector ObjectSelector, scheme *runtime.Scheme) (internal.SelectorsByGVK, error) {
selectorsByGVK := internal.SelectorsByGVK{}
for object, selector := range selectorsByObject {
gvk, err := apiutil.GVKForObject(object, scheme)
if err != nil {
return nil, err
}
selectorsByGVK[gvk] = selector
selectorsByGVK[gvk] = internal.Selector(selector)
}
selectorsByGVK[schema.GroupVersionKind{}] = internal.Selector(defaultSelector)
return selectorsByGVK, nil
}

View File

@ -66,7 +66,7 @@ func newSpecificInformersMap(config *rest.Config,
startWait: make(chan struct{}),
createListWatcher: createListWatcher,
namespace: namespace,
selectors: selectors,
selectors: selectors.forGVK,
disableDeepCopy: disableDeepCopy,
}
return ip
@ -131,7 +131,7 @@ type specificInformersMap struct {
// selectors are the label or field selectors that will be added to the
// ListWatch ListOptions.
selectors SelectorsByGVK
selectors func(gvk schema.GroupVersionKind) Selector
// disableDeepCopy indicates not to deep copy objects during get or list objects.
disableDeepCopy DisableDeepCopyByGVK
@ -277,19 +277,19 @@ func createStructuredListWatch(gvk schema.GroupVersionKind, ip *specificInformer
// Create a new ListWatch for the obj
return &cache.ListWatch{
ListFunc: func(opts metav1.ListOptions) (runtime.Object, error) {
ip.selectors[gvk].ApplyToList(&opts)
ip.selectors(gvk).ApplyToList(&opts)
res := listObj.DeepCopyObject()
namespace := restrictNamespaceBySelector(ip.namespace, ip.selectors[gvk])
namespace := restrictNamespaceBySelector(ip.namespace, ip.selectors(gvk))
isNamespaceScoped := namespace != "" && mapping.Scope.Name() != meta.RESTScopeNameRoot
err := client.Get().NamespaceIfScoped(namespace, isNamespaceScoped).Resource(mapping.Resource.Resource).VersionedParams(&opts, ip.paramCodec).Do(ctx).Into(res)
return res, err
},
// Setup the watch function
WatchFunc: func(opts metav1.ListOptions) (watch.Interface, error) {
ip.selectors[gvk].ApplyToList(&opts)
ip.selectors(gvk).ApplyToList(&opts)
// Watch needs to be set to true separately
opts.Watch = true
namespace := restrictNamespaceBySelector(ip.namespace, ip.selectors[gvk])
namespace := restrictNamespaceBySelector(ip.namespace, ip.selectors(gvk))
isNamespaceScoped := namespace != "" && mapping.Scope.Name() != meta.RESTScopeNameRoot
return client.Get().NamespaceIfScoped(namespace, isNamespaceScoped).Resource(mapping.Resource.Resource).VersionedParams(&opts, ip.paramCodec).Watch(ctx)
},
@ -319,8 +319,8 @@ func createUnstructuredListWatch(gvk schema.GroupVersionKind, ip *specificInform
// Create a new ListWatch for the obj
return &cache.ListWatch{
ListFunc: func(opts metav1.ListOptions) (runtime.Object, error) {
ip.selectors[gvk].ApplyToList(&opts)
namespace := restrictNamespaceBySelector(ip.namespace, ip.selectors[gvk])
ip.selectors(gvk).ApplyToList(&opts)
namespace := restrictNamespaceBySelector(ip.namespace, ip.selectors(gvk))
if namespace != "" && mapping.Scope.Name() != meta.RESTScopeNameRoot {
return dynamicClient.Resource(mapping.Resource).Namespace(namespace).List(ctx, opts)
}
@ -328,10 +328,10 @@ func createUnstructuredListWatch(gvk schema.GroupVersionKind, ip *specificInform
},
// Setup the watch function
WatchFunc: func(opts metav1.ListOptions) (watch.Interface, error) {
ip.selectors[gvk].ApplyToList(&opts)
ip.selectors(gvk).ApplyToList(&opts)
// Watch needs to be set to true separately
opts.Watch = true
namespace := restrictNamespaceBySelector(ip.namespace, ip.selectors[gvk])
namespace := restrictNamespaceBySelector(ip.namespace, ip.selectors(gvk))
if namespace != "" && mapping.Scope.Name() != meta.RESTScopeNameRoot {
return dynamicClient.Resource(mapping.Resource).Namespace(namespace).Watch(ctx, opts)
}
@ -366,13 +366,13 @@ func createMetadataListWatch(gvk schema.GroupVersionKind, ip *specificInformersM
// create the relevant listwatch
return &cache.ListWatch{
ListFunc: func(opts metav1.ListOptions) (runtime.Object, error) {
ip.selectors[gvk].ApplyToList(&opts)
ip.selectors(gvk).ApplyToList(&opts)
var (
list *metav1.PartialObjectMetadataList
err error
)
namespace := restrictNamespaceBySelector(ip.namespace, ip.selectors[gvk])
namespace := restrictNamespaceBySelector(ip.namespace, ip.selectors(gvk))
if namespace != "" && mapping.Scope.Name() != meta.RESTScopeNameRoot {
list, err = client.Resource(mapping.Resource).Namespace(namespace).List(ctx, opts)
} else {
@ -387,7 +387,7 @@ func createMetadataListWatch(gvk schema.GroupVersionKind, ip *specificInformersM
},
// Setup the watch function
WatchFunc: func(opts metav1.ListOptions) (watch.Interface, error) {
ip.selectors[gvk].ApplyToList(&opts)
ip.selectors(gvk).ApplyToList(&opts)
// Watch needs to be set to true separately
opts.Watch = true
@ -395,7 +395,7 @@ func createMetadataListWatch(gvk schema.GroupVersionKind, ip *specificInformersM
watcher watch.Interface
err error
)
namespace := restrictNamespaceBySelector(ip.namespace, ip.selectors[gvk])
namespace := restrictNamespaceBySelector(ip.namespace, ip.selectors(gvk))
if namespace != "" && mapping.Scope.Name() != meta.RESTScopeNameRoot {
watcher, err = client.Resource(mapping.Resource).Namespace(namespace).Watch(ctx, opts)
} else {

View File

@ -26,6 +26,17 @@ import (
// SelectorsByGVK associate a GroupVersionKind to a field/label selector.
type SelectorsByGVK map[schema.GroupVersionKind]Selector
func (s SelectorsByGVK) forGVK(gvk schema.GroupVersionKind) Selector {
if specific, found := s[gvk]; found {
return specific
}
if defaultSelector, found := s[schema.GroupVersionKind{}]; found {
return defaultSelector
}
return Selector{}
}
// Selector specify the label/field selector to fill in ListOptions.
type Selector struct {
Label labels.Selector

View File

@ -21,7 +21,7 @@ import (
"fmt"
"os"
"os/user"
"path"
"path/filepath"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/clientcmd"
@ -125,7 +125,7 @@ func loadConfig(context string) (*rest.Config, error) {
if err != nil {
return nil, fmt.Errorf("could not get current user: %v", err)
}
loadingRules.Precedence = append(loadingRules.Precedence, path.Join(u.HomeDir, clientcmd.RecommendedHomeDir, clientcmd.RecommendedFileName))
loadingRules.Precedence = append(loadingRules.Precedence, filepath.Join(u.HomeDir, clientcmd.RecommendedHomeDir, clientcmd.RecommendedFileName))
}
return loadConfigWithContext("", loadingRules, context)

View File

@ -245,7 +245,7 @@ func setOptionsDefaults(options Options) Options {
}
}
if options.Logger == nil {
if options.Logger.GetSink() == nil {
options.Logger = logf.RuntimeLog.WithName("cluster")
}

View File

@ -1,3 +1,4 @@
//go:build !ignore_autogenerated
// +build !ignore_autogenerated
// Code generated by controller-gen. DO NOT EDIT.

View File

@ -104,7 +104,7 @@ func NewUnmanaged(name string, mgr manager.Manager, options Options) (Controller
return nil, fmt.Errorf("must specify Name for Controller")
}
if options.Log == nil {
if options.Log.GetSink() == nil {
options.Log = mgr.GetLogger()
}

View File

@ -0,0 +1,16 @@
package httpserver
import (
"net/http"
"time"
)
// New returns a new server with sane defaults.
func New(handler http.Handler) *http.Server {
return &http.Server{
Handler: handler,
MaxHeaderBytes: 1 << 20,
IdleTimeout: 90 * time.Second, // matches http.DefaultTransport keep-alive timeout
ReadHeaderTimeout: 32 * time.Second,
}
}

View File

@ -25,16 +25,15 @@ import (
// loggerPromise knows how to populate a concrete logr.Logger
// with options, given an actual base logger later on down the line.
type loggerPromise struct {
logger *DelegatingLogger
logger *DelegatingLogSink
childPromises []*loggerPromise
promisesLock sync.Mutex
name *string
tags []interface{}
level int
name *string
tags []interface{}
}
func (p *loggerPromise) WithName(l *DelegatingLogger, name string) *loggerPromise {
func (p *loggerPromise) WithName(l *DelegatingLogSink, name string) *loggerPromise {
res := &loggerPromise{
logger: l,
name: &name,
@ -48,7 +47,7 @@ func (p *loggerPromise) WithName(l *DelegatingLogger, name string) *loggerPromis
}
// WithValues provides a new Logger with the tags appended.
func (p *loggerPromise) WithValues(l *DelegatingLogger, tags ...interface{}) *loggerPromise {
func (p *loggerPromise) WithValues(l *DelegatingLogSink, tags ...interface{}) *loggerPromise {
res := &loggerPromise{
logger: l,
tags: tags,
@ -61,61 +60,53 @@ func (p *loggerPromise) WithValues(l *DelegatingLogger, tags ...interface{}) *lo
return res
}
func (p *loggerPromise) V(l *DelegatingLogger, level int) *loggerPromise {
res := &loggerPromise{
logger: l,
level: level,
promisesLock: sync.Mutex{},
}
p.promisesLock.Lock()
defer p.promisesLock.Unlock()
p.childPromises = append(p.childPromises, res)
return res
}
// Fulfill instantiates the Logger with the provided logger.
func (p *loggerPromise) Fulfill(parentLogger logr.Logger) {
logger := logr.WithCallDepth(parentLogger, 1)
func (p *loggerPromise) Fulfill(parentLogSink logr.LogSink) {
sink := parentLogSink
if p.name != nil {
logger = logger.WithName(*p.name)
sink = sink.WithName(*p.name)
}
if p.tags != nil {
logger = logger.WithValues(p.tags...)
}
if p.level != 0 {
logger = logger.V(p.level)
sink = sink.WithValues(p.tags...)
}
p.logger.lock.Lock()
p.logger.logger = logger
p.logger.logger = sink
p.logger.promise = nil
p.logger.lock.Unlock()
for _, childPromise := range p.childPromises {
childPromise.Fulfill(logger)
childPromise.Fulfill(sink)
}
}
// DelegatingLogger is a logr.Logger that delegates to another logr.Logger.
// DelegatingLogSink is a logsink that delegates to another logr.LogSink.
// If the underlying promise is not nil, it registers calls to sub-loggers with
// the logging factory to be populated later, and returns a new delegating
// logger. It expects to have *some* logr.Logger set at all times (generally
// a no-op logger before the promises are fulfilled).
type DelegatingLogger struct {
type DelegatingLogSink struct {
lock sync.RWMutex
logger logr.Logger
logger logr.LogSink
promise *loggerPromise
info logr.RuntimeInfo
}
// Init implements logr.LogSink.
func (l *DelegatingLogSink) Init(info logr.RuntimeInfo) {
l.lock.Lock()
defer l.lock.Unlock()
l.info = info
}
// Enabled tests whether this Logger is enabled. For example, commandline
// flags might be used to set the logging verbosity and disable some info
// logs.
func (l *DelegatingLogger) Enabled() bool {
func (l *DelegatingLogSink) Enabled(level int) bool {
l.lock.RLock()
defer l.lock.RUnlock()
return l.logger.Enabled()
return l.logger.Enabled(level)
}
// Info logs a non-error message with the given key/value pairs as context.
@ -124,10 +115,10 @@ func (l *DelegatingLogger) Enabled() bool {
// the log line. The key/value pairs can then be used to add additional
// variable information. The key/value pairs should alternate string
// keys and arbitrary values.
func (l *DelegatingLogger) Info(msg string, keysAndValues ...interface{}) {
func (l *DelegatingLogSink) Info(level int, msg string, keysAndValues ...interface{}) {
l.lock.RLock()
defer l.lock.RUnlock()
l.logger.Info(msg, keysAndValues...)
l.logger.Info(level, msg, keysAndValues...)
}
// Error logs an error, with the given message and key/value pairs as context.
@ -138,33 +129,14 @@ func (l *DelegatingLogger) Info(msg string, keysAndValues ...interface{}) {
// The msg field should be used to add context to any underlying error,
// while the err field should be used to attach the actual error that
// triggered this log line, if present.
func (l *DelegatingLogger) Error(err error, msg string, keysAndValues ...interface{}) {
func (l *DelegatingLogSink) Error(err error, msg string, keysAndValues ...interface{}) {
l.lock.RLock()
defer l.lock.RUnlock()
l.logger.Error(err, msg, keysAndValues...)
}
// V returns an Logger value for a specific verbosity level, relative to
// this Logger. In other words, V values are additive. V higher verbosity
// level means a log message is less important. It's illegal to pass a log
// level less than zero.
func (l *DelegatingLogger) V(level int) logr.Logger {
l.lock.RLock()
defer l.lock.RUnlock()
if l.promise == nil {
return l.logger.V(level)
}
res := &DelegatingLogger{logger: l.logger}
promise := l.promise.V(res, level)
res.promise = promise
return res
}
// WithName provides a new Logger with the name appended.
func (l *DelegatingLogger) WithName(name string) logr.Logger {
func (l *DelegatingLogSink) WithName(name string) logr.LogSink {
l.lock.RLock()
defer l.lock.RUnlock()
@ -172,7 +144,7 @@ func (l *DelegatingLogger) WithName(name string) logr.Logger {
return l.logger.WithName(name)
}
res := &DelegatingLogger{logger: l.logger}
res := &DelegatingLogSink{logger: l.logger}
promise := l.promise.WithName(res, name)
res.promise = promise
@ -180,7 +152,7 @@ func (l *DelegatingLogger) WithName(name string) logr.Logger {
}
// WithValues provides a new Logger with the tags appended.
func (l *DelegatingLogger) WithValues(tags ...interface{}) logr.Logger {
func (l *DelegatingLogSink) WithValues(tags ...interface{}) logr.LogSink {
l.lock.RLock()
defer l.lock.RUnlock()
@ -188,7 +160,7 @@ func (l *DelegatingLogger) WithValues(tags ...interface{}) logr.Logger {
return l.logger.WithValues(tags...)
}
res := &DelegatingLogger{logger: l.logger}
res := &DelegatingLogSink{logger: l.logger}
promise := l.promise.WithValues(res, tags...)
res.promise = promise
@ -198,16 +170,16 @@ func (l *DelegatingLogger) WithValues(tags ...interface{}) logr.Logger {
// Fulfill switches the logger over to use the actual logger
// provided, instead of the temporary initial one, if this method
// has not been previously called.
func (l *DelegatingLogger) Fulfill(actual logr.Logger) {
func (l *DelegatingLogSink) Fulfill(actual logr.LogSink) {
if l.promise != nil {
l.promise.Fulfill(actual)
}
}
// NewDelegatingLogger constructs a new DelegatingLogger which uses
// NewDelegatingLogSink constructs a new DelegatingLogSink which uses
// the given logger before it's promise is fulfilled.
func NewDelegatingLogger(initial logr.Logger) *DelegatingLogger {
l := &DelegatingLogger{
func NewDelegatingLogSink(initial logr.LogSink) *DelegatingLogSink {
l := &DelegatingLogSink{
logger: initial,
promise: &loggerPromise{promisesLock: sync.Mutex{}},
}

View File

@ -47,14 +47,14 @@ func SetLogger(l logr.Logger) {
defer loggerWasSetLock.Unlock()
loggerWasSet = true
Log.Fulfill(l)
dlog.Fulfill(l.GetSink())
}
// It is safe to assume that if this wasn't set within the first 30 seconds of a binaries
// lifetime, it will never get set. The DelegatingLogger causes a high number of memory
// allocations when not given an actual Logger, so we set a NullLogger to avoid that.
// lifetime, it will never get set. The DelegatingLogSink causes a high number of memory
// allocations when not given an actual Logger, so we set a NullLogSink to avoid that.
//
// We need to keep the DelegatingLogger because we have various inits() that get a logger from
// We need to keep the DelegatingLogSink because we have various inits() that get a logger from
// here. They will always get executed before any code that imports controller-runtime
// has a chance to run and hence to set an actual logger.
func init() {
@ -64,7 +64,7 @@ func init() {
loggerWasSetLock.Lock()
defer loggerWasSetLock.Unlock()
if !loggerWasSet {
Log.Fulfill(NullLogger{})
dlog.Fulfill(NullLogSink{})
}
}()
}
@ -78,14 +78,17 @@ var (
// to another logr.Logger. You *must* call SetLogger to
// get any actual logging. If SetLogger is not called within
// the first 30 seconds of a binaries lifetime, it will get
// set to a NullLogger.
var Log = NewDelegatingLogger(NullLogger{})
// set to a NullLogSink.
var (
dlog = NewDelegatingLogSink(NullLogSink{})
Log = logr.New(dlog)
)
// FromContext returns a logger with predefined values from a context.Context.
func FromContext(ctx context.Context, keysAndValues ...interface{}) logr.Logger {
var log logr.Logger = Log
log := Log
if ctx != nil {
if logger := logr.FromContext(ctx); logger != nil {
if logger, err := logr.FromContext(ctx); err == nil {
log = logger
}
}

View File

@ -24,37 +24,36 @@ import (
// but avoids accidentally adding the testing flags to
// all binaries.
// NullLogger is a logr.Logger that does nothing.
type NullLogger struct{}
// NullLogSink is a logr.Logger that does nothing.
type NullLogSink struct{}
var _ logr.Logger = NullLogger{}
var _ logr.LogSink = NullLogSink{}
// Init implements logr.LogSink.
func (log NullLogSink) Init(logr.RuntimeInfo) {
}
// Info implements logr.InfoLogger.
func (NullLogger) Info(_ string, _ ...interface{}) {
func (NullLogSink) Info(_ int, _ string, _ ...interface{}) {
// Do nothing.
}
// Enabled implements logr.InfoLogger.
func (NullLogger) Enabled() bool {
func (NullLogSink) Enabled(level int) bool {
return false
}
// Error implements logr.Logger.
func (NullLogger) Error(_ error, _ string, _ ...interface{}) {
func (NullLogSink) Error(_ error, _ string, _ ...interface{}) {
// Do nothing.
}
// V implements logr.Logger.
func (log NullLogger) V(_ int) logr.Logger {
return log
}
// WithName implements logr.Logger.
func (log NullLogger) WithName(_ string) logr.Logger {
func (log NullLogSink) WithName(_ string) logr.LogSink {
return log
}
// WithValues implements logr.Logger.
func (log NullLogger) WithValues(_ ...interface{}) logr.Logger {
func (log NullLogSink) WithValues(_ ...interface{}) logr.LogSink {
return log
}

View File

@ -23,6 +23,7 @@ import (
"net"
"net/http"
"sync"
"sync/atomic"
"time"
"github.com/go-logr/logr"
@ -30,6 +31,7 @@ import (
"k8s.io/apimachinery/pkg/api/meta"
"k8s.io/apimachinery/pkg/runtime"
kerrors "k8s.io/apimachinery/pkg/util/errors"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/leaderelection"
"k8s.io/client-go/tools/leaderelection/resourcelock"
@ -40,6 +42,7 @@ import (
"sigs.k8s.io/controller-runtime/pkg/cluster"
"sigs.k8s.io/controller-runtime/pkg/config/v1alpha1"
"sigs.k8s.io/controller-runtime/pkg/healthz"
"sigs.k8s.io/controller-runtime/pkg/internal/httpserver"
intrec "sigs.k8s.io/controller-runtime/pkg/internal/recorder"
"sigs.k8s.io/controller-runtime/pkg/metrics"
"sigs.k8s.io/controller-runtime/pkg/runtime/inject"
@ -61,17 +64,16 @@ const (
var _ Runnable = &controllerManager{}
type controllerManager struct {
sync.Mutex
started bool
stopProcedureEngaged *int64
errChan chan error
runnables *runnables
// cluster holds a variety of methods to interact with a cluster. Required.
cluster cluster.Cluster
// leaderElectionRunnables is the set of Controllers that the controllerManager injects deps into and Starts.
// These Runnables are managed by lead election.
leaderElectionRunnables []Runnable
// nonLeaderElectionRunnables is the set of webhook servers that the controllerManager injects deps into and Starts.
// These Runnables will not be blocked by lead election.
nonLeaderElectionRunnables []Runnable
// recorderProvider is used to generate event recorders that will be injected into Controllers
// (and EventHandlers, Sources and Predicates).
recorderProvider *intrec.Provider
@ -104,12 +106,6 @@ type controllerManager struct {
// Healthz probe handler
healthzHandler *healthz.Handler
mu sync.Mutex
started bool
startedLeader bool
healthzStarted bool
errChan chan error
// controllerOptions are the global controller options.
controllerOptions v1alpha1.ControllerConfigurationSpec
@ -117,25 +113,20 @@ type controllerManager struct {
// If none is set, it defaults to log.Log global logger.
logger logr.Logger
// leaderElectionStopped is an internal channel used to signal the stopping procedure that the
// LeaderElection.Run(...) function has returned and the shutdown can proceed.
leaderElectionStopped chan struct{}
// leaderElectionCancel is used to cancel the leader election. It is distinct from internalStopper,
// because for safety reasons we need to os.Exit() when we lose the leader election, meaning that
// it must be deferred until after gracefulShutdown is done.
leaderElectionCancel context.CancelFunc
// leaderElectionStopped is an internal channel used to signal the stopping procedure that the
// LeaderElection.Run(...) function has returned and the shutdown can proceed.
leaderElectionStopped chan struct{}
// stop procedure engaged. In other words, we should not add anything else to the manager
stopProcedureEngaged bool
// elected is closed when this manager becomes the leader of a group of
// managers, either because it won a leader election or because no leader
// election was configured.
elected chan struct{}
caches []hasCache
// port is the port that the webhook server serves at.
port int
// host is the hostname that the webhook server binds to.
@ -160,10 +151,6 @@ type controllerManager struct {
// between tries of actions.
retryPeriod time.Duration
// waitForRunnable is holding the number of runnables currently running so that
// we can wait for them to exit before quitting the manager
waitForRunnable sync.WaitGroup
// gracefulShutdownTimeout is the duration given to runnable to stop
// before the manager actually returns on stop.
gracefulShutdownTimeout time.Duration
@ -192,36 +179,17 @@ type hasCache interface {
// Add sets dependencies on i, and adds it to the list of Runnables to start.
func (cm *controllerManager) Add(r Runnable) error {
cm.mu.Lock()
defer cm.mu.Unlock()
if cm.stopProcedureEngaged {
return errors.New("can't accept new runnable as stop procedure is already engaged")
}
cm.Lock()
defer cm.Unlock()
return cm.add(r)
}
func (cm *controllerManager) add(r Runnable) error {
// Set dependencies on the object
if err := cm.SetFields(r); err != nil {
return err
}
var shouldStart bool
// Add the runnable to the leader election or the non-leaderelection list
if leRunnable, ok := r.(LeaderElectionRunnable); ok && !leRunnable.NeedLeaderElection() {
shouldStart = cm.started
cm.nonLeaderElectionRunnables = append(cm.nonLeaderElectionRunnables, r)
} else if hasCache, ok := r.(hasCache); ok {
cm.caches = append(cm.caches, hasCache)
} else {
shouldStart = cm.startedLeader
cm.leaderElectionRunnables = append(cm.leaderElectionRunnables, r)
}
if shouldStart {
// If already started, start the controller
cm.startRunnable(r)
}
return nil
return cm.runnables.Add(r)
}
// Deprecated: use the equivalent Options field to set a field. This method will be removed in v0.10.
@ -244,13 +212,17 @@ func (cm *controllerManager) SetFields(i interface{}) error {
// AddMetricsExtraHandler adds extra handler served on path to the http server that serves metrics.
func (cm *controllerManager) AddMetricsExtraHandler(path string, handler http.Handler) error {
cm.Lock()
defer cm.Unlock()
if cm.started {
return fmt.Errorf("unable to add new metrics handler because metrics endpoint has already been created")
}
if path == defaultMetricsEndpoint {
return fmt.Errorf("overriding builtin %s endpoint is not allowed", defaultMetricsEndpoint)
}
cm.mu.Lock()
defer cm.mu.Unlock()
if _, found := cm.metricsExtraHandlers[path]; found {
return fmt.Errorf("can't register extra handler by duplicate path %q on metrics http server", path)
}
@ -262,14 +234,10 @@ func (cm *controllerManager) AddMetricsExtraHandler(path string, handler http.Ha
// AddHealthzCheck allows you to add Healthz checker.
func (cm *controllerManager) AddHealthzCheck(name string, check healthz.Checker) error {
cm.mu.Lock()
defer cm.mu.Unlock()
cm.Lock()
defer cm.Unlock()
if cm.stopProcedureEngaged {
return errors.New("can't accept new healthCheck as stop procedure is already engaged")
}
if cm.healthzStarted {
if cm.started {
return fmt.Errorf("unable to add new checker because healthz endpoint has already been created")
}
@ -283,15 +251,11 @@ func (cm *controllerManager) AddHealthzCheck(name string, check healthz.Checker)
// AddReadyzCheck allows you to add Readyz checker.
func (cm *controllerManager) AddReadyzCheck(name string, check healthz.Checker) error {
cm.mu.Lock()
defer cm.mu.Unlock()
cm.Lock()
defer cm.Unlock()
if cm.stopProcedureEngaged {
return errors.New("can't accept new ready check as stop procedure is already engaged")
}
if cm.healthzStarted {
return fmt.Errorf("unable to add new checker because readyz endpoint has already been created")
if cm.started {
return fmt.Errorf("unable to add new checker because healthz endpoint has already been created")
}
if cm.readyzHandler == nil {
@ -344,7 +308,7 @@ func (cm *controllerManager) GetWebhookServer() *webhook.Server {
}
}
if err := cm.Add(cm.webhookServer); err != nil {
panic("unable to add webhook server to the controller manager")
panic(fmt.Sprintf("unable to add webhook server to the controller manager: %s", err))
}
})
return cm.webhookServer
@ -365,79 +329,89 @@ func (cm *controllerManager) serveMetrics() {
// TODO(JoelSpeed): Use existing Kubernetes machinery for serving metrics
mux := http.NewServeMux()
mux.Handle(defaultMetricsEndpoint, handler)
func() {
cm.mu.Lock()
defer cm.mu.Unlock()
for path, extraHandler := range cm.metricsExtraHandlers {
mux.Handle(path, extraHandler)
}
}()
server := http.Server{
Handler: mux,
for path, extraHandler := range cm.metricsExtraHandlers {
mux.Handle(path, extraHandler)
}
// Run the server
cm.startRunnable(RunnableFunc(func(_ context.Context) error {
cm.logger.Info("Starting metrics server", "path", defaultMetricsEndpoint)
if err := server.Serve(cm.metricsListener); err != nil && err != http.ErrServerClosed {
return err
}
return nil
}))
// Shutdown the server when stop is closed
<-cm.internalProceduresStop
if err := server.Shutdown(cm.shutdownCtx); err != nil {
cm.errChan <- err
}
server := httpserver.New(mux)
go cm.httpServe("metrics", cm.logger.WithValues("path", defaultMetricsEndpoint), server, cm.metricsListener)
}
func (cm *controllerManager) serveHealthProbes() {
mux := http.NewServeMux()
server := http.Server{
Handler: mux,
server := httpserver.New(mux)
if cm.readyzHandler != nil {
mux.Handle(cm.readinessEndpointName, http.StripPrefix(cm.readinessEndpointName, cm.readyzHandler))
// Append '/' suffix to handle subpaths
mux.Handle(cm.readinessEndpointName+"/", http.StripPrefix(cm.readinessEndpointName, cm.readyzHandler))
}
if cm.healthzHandler != nil {
mux.Handle(cm.livenessEndpointName, http.StripPrefix(cm.livenessEndpointName, cm.healthzHandler))
// Append '/' suffix to handle subpaths
mux.Handle(cm.livenessEndpointName+"/", http.StripPrefix(cm.livenessEndpointName, cm.healthzHandler))
}
func() {
cm.mu.Lock()
defer cm.mu.Unlock()
go cm.httpServe("health probe", cm.logger, server, cm.healthProbeListener)
}
if cm.readyzHandler != nil {
mux.Handle(cm.readinessEndpointName, http.StripPrefix(cm.readinessEndpointName, cm.readyzHandler))
// Append '/' suffix to handle subpaths
mux.Handle(cm.readinessEndpointName+"/", http.StripPrefix(cm.readinessEndpointName, cm.readyzHandler))
}
if cm.healthzHandler != nil {
mux.Handle(cm.livenessEndpointName, http.StripPrefix(cm.livenessEndpointName, cm.healthzHandler))
// Append '/' suffix to handle subpaths
mux.Handle(cm.livenessEndpointName+"/", http.StripPrefix(cm.livenessEndpointName, cm.healthzHandler))
}
// Run server
cm.startRunnable(RunnableFunc(func(_ context.Context) error {
if err := server.Serve(cm.healthProbeListener); err != nil && err != http.ErrServerClosed {
return err
}
return nil
}))
cm.healthzStarted = true
}()
func (cm *controllerManager) httpServe(kind string, log logr.Logger, server *http.Server, ln net.Listener) {
log = log.WithValues("kind", kind, "addr", ln.Addr())
go func() {
// Shutdown the server when stop is closed
<-cm.internalProceduresStop
if err := server.Shutdown(cm.shutdownCtx); err != nil {
log.Info("Starting server")
if err := server.Serve(ln); err != nil {
if errors.Is(err, http.ErrServerClosed) {
return
}
if atomic.LoadInt64(cm.stopProcedureEngaged) > 0 {
// There might be cases where connections are still open and we try to shutdown
// but not having enough time to close the connection causes an error in Serve
//
// In that case we want to avoid returning an error to the main error channel.
log.Error(err, "error on Serve after stop has been engaged")
return
}
cm.errChan <- err
}
}()
// Shutdown the server when stop is closed.
<-cm.internalProceduresStop
if err := server.Shutdown(cm.shutdownCtx); err != nil {
if errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded) {
// Avoid logging context related errors.
return
}
if atomic.LoadInt64(cm.stopProcedureEngaged) > 0 {
cm.logger.Error(err, "error on Shutdown after stop has been engaged")
return
}
cm.errChan <- err
}
}
// Start starts the manager and waits indefinitely.
// There is only two ways to have start return:
// An error has occurred during in one of the internal operations,
// such as leader election, cache start, webhooks, and so on.
// Or, the context is cancelled.
func (cm *controllerManager) Start(ctx context.Context) (err error) {
if err := cm.Add(cm.cluster); err != nil {
return fmt.Errorf("failed to add cluster to runnables: %w", err)
cm.Lock()
if cm.started {
cm.Unlock()
return errors.New("manager already started")
}
var ready bool
defer func() {
// Only unlock the manager if we haven't reached
// the internal readiness condition.
if !ready {
cm.Unlock()
}
}()
// Initialize the internal context.
cm.internalCtx, cm.internalCancel = context.WithCancel(ctx)
// This chan indicates that stop is complete, in other words all runnables have returned or timeout on stop request
@ -459,45 +433,70 @@ func (cm *controllerManager) Start(ctx context.Context) (err error) {
}
}()
// initialize this here so that we reset the signal channel state on every start
// Everything that might write into this channel must be started in a new goroutine,
// because otherwise we might block this routine trying to write into the full channel
// and will not be able to enter the deferred cm.engageStopProcedure() which drains
// it.
cm.errChan = make(chan error)
// Add the cluster runnable.
if err := cm.add(cm.cluster); err != nil {
return fmt.Errorf("failed to add cluster to runnables: %w", err)
}
// Metrics should be served whether the controller is leader or not.
// (If we don't serve metrics for non-leaders, prometheus will still scrape
// the pod but will get a connection refused)
// the pod but will get a connection refused).
if cm.metricsListener != nil {
go cm.serveMetrics()
cm.serveMetrics()
}
// Serve health probes
// Serve health probes.
if cm.healthProbeListener != nil {
cm.serveHealthProbes()
}
// Webhooks MUST start before any cache is populated, otherwise there is a race condition
// First start any webhook servers, which includes conversion, validation, and defaulting
// webhooks that are registered.
//
// WARNING: Webhooks MUST start before any cache is populated, otherwise there is a race condition
// between conversion webhooks and the cache sync (usually initial list) which causes the webhooks
// to never start because no cache can be populated.
cm.startWebhookRunnables()
go cm.startNonLeaderElectionRunnables()
go func() {
if cm.resourceLock != nil {
err := cm.startLeaderElection()
if err != nil {
cm.errChan <- err
}
} else {
// Treat not having leader election enabled the same as being elected.
cm.startLeaderElectionRunnables()
close(cm.elected)
if err := cm.runnables.Webhooks.Start(cm.internalCtx); err != nil {
if err != wait.ErrWaitTimeout {
return err
}
}()
}
// Start and wait for caches.
if err := cm.runnables.Caches.Start(cm.internalCtx); err != nil {
if err != wait.ErrWaitTimeout {
return err
}
}
// Start the non-leaderelection Runnables after the cache has synced.
if err := cm.runnables.Others.Start(cm.internalCtx); err != nil {
if err != wait.ErrWaitTimeout {
return err
}
}
// Start the leader election and all required runnables.
{
ctx, cancel := context.WithCancel(context.Background())
cm.leaderElectionCancel = cancel
go func() {
if cm.resourceLock != nil {
if err := cm.startLeaderElection(ctx); err != nil {
cm.errChan <- err
}
} else {
// Treat not having leader election enabled the same as being elected.
if err := cm.startLeaderElectionRunnables(); err != nil {
cm.errChan <- err
}
close(cm.elected)
}
}()
}
ready = true
cm.Unlock()
select {
case <-ctx.Done():
// We are done
@ -511,24 +510,31 @@ func (cm *controllerManager) Start(ctx context.Context) (err error) {
// engageStopProcedure signals all runnables to stop, reads potential errors
// from the errChan and waits for them to end. It must not be called more than once.
func (cm *controllerManager) engageStopProcedure(stopComplete <-chan struct{}) error {
// Populate the shutdown context.
var shutdownCancel context.CancelFunc
if cm.gracefulShutdownTimeout > 0 {
cm.shutdownCtx, shutdownCancel = context.WithTimeout(context.Background(), cm.gracefulShutdownTimeout)
} else {
cm.shutdownCtx, shutdownCancel = context.WithCancel(context.Background())
if !atomic.CompareAndSwapInt64(cm.stopProcedureEngaged, 0, 1) {
return errors.New("stop procedure already engaged")
}
defer shutdownCancel()
// Cancel the internal stop channel and wait for the procedures to stop and complete.
close(cm.internalProceduresStop)
cm.internalCancel()
// Populate the shutdown context, this operation MUST be done before
// closing the internalProceduresStop channel.
//
// The shutdown context immediately expires if the gracefulShutdownTimeout is not set.
var shutdownCancel context.CancelFunc
cm.shutdownCtx, shutdownCancel = context.WithTimeout(context.Background(), cm.gracefulShutdownTimeout)
defer shutdownCancel()
// Start draining the errors before acquiring the lock to make sure we don't deadlock
// if something that has the lock is blocked on trying to write into the unbuffered
// channel after something else already wrote into it.
var closeOnce sync.Once
go func() {
for {
// Closing in the for loop is required to avoid race conditions between
// the closure of all internal procedures and making sure to have a reader off the error channel.
closeOnce.Do(func() {
// Cancel the internal stop channel and wait for the procedures to stop and complete.
close(cm.internalProceduresStop)
cm.internalCancel()
})
select {
case err, ok := <-cm.errChan:
if ok {
@ -539,26 +545,14 @@ func (cm *controllerManager) engageStopProcedure(stopComplete <-chan struct{}) e
}
}
}()
if cm.gracefulShutdownTimeout == 0 {
return nil
}
cm.mu.Lock()
defer cm.mu.Unlock()
cm.stopProcedureEngaged = true
// we want to close this after the other runnables stop, because we don't
// We want to close this after the other runnables stop, because we don't
// want things like leader election to try and emit events on a closed
// channel
defer cm.recorderProvider.Stop(cm.shutdownCtx)
return cm.waitForRunnableToEnd(shutdownCancel)
}
// waitForRunnableToEnd blocks until all runnables ended or the
// tearDownTimeout was reached. In the latter case, an error is returned.
func (cm *controllerManager) waitForRunnableToEnd(shutdownCancel context.CancelFunc) (retErr error) {
// Cancel leader election only after we waited. It will os.Exit() the app for safety.
defer func() {
if retErr == nil && cm.leaderElectionCancel != nil {
// Cancel leader election only after we waited. It will os.Exit() the app for safety.
if cm.resourceLock != nil {
// After asking the context to be cancelled, make sure
// we wait for the leader stopped channel to be closed, otherwise
// we might encounter race conditions between this code
@ -569,104 +563,48 @@ func (cm *controllerManager) waitForRunnableToEnd(shutdownCancel context.CancelF
}()
go func() {
cm.waitForRunnable.Wait()
// First stop the non-leader election runnables.
cm.logger.Info("Stopping and waiting for non leader election runnables")
cm.runnables.Others.StopAndWait(cm.shutdownCtx)
// Stop all the leader election runnables, which includes reconcilers.
cm.logger.Info("Stopping and waiting for leader election runnables")
cm.runnables.LeaderElection.StopAndWait(cm.shutdownCtx)
// Stop the caches before the leader election runnables, this is an important
// step to make sure that we don't race with the reconcilers by receiving more events
// from the API servers and enqueueing them.
cm.logger.Info("Stopping and waiting for caches")
cm.runnables.Caches.StopAndWait(cm.shutdownCtx)
// Webhooks should come last, as they might be still serving some requests.
cm.logger.Info("Stopping and waiting for webhooks")
cm.runnables.Webhooks.StopAndWait(cm.shutdownCtx)
// Proceed to close the manager and overall shutdown context.
cm.logger.Info("Wait completed, proceeding to shutdown the manager")
shutdownCancel()
}()
<-cm.shutdownCtx.Done()
if err := cm.shutdownCtx.Err(); err != nil && err != context.Canceled {
return fmt.Errorf("failed waiting for all runnables to end within grace period of %s: %w", cm.gracefulShutdownTimeout, err)
if errors.Is(err, context.DeadlineExceeded) {
if cm.gracefulShutdownTimeout > 0 {
return fmt.Errorf("failed waiting for all runnables to end within grace period of %s: %w", cm.gracefulShutdownTimeout, err)
}
return nil
}
// For any other error, return the error.
return err
}
return nil
}
func (cm *controllerManager) startWebhookRunnables() {
cm.mu.Lock()
defer cm.mu.Unlock()
// WARNING: Webhooks MUST start before any cache is populated, otherwise there is a race condition
// between conversion webhooks and the cache sync (usually initial list) which causes the webhooks
// to never start because no cache can be populated.
for _, c := range cm.nonLeaderElectionRunnables {
if _, ok := c.(*webhook.Server); ok {
cm.startRunnable(c)
}
}
func (cm *controllerManager) startLeaderElectionRunnables() error {
return cm.runnables.LeaderElection.Start(cm.internalCtx)
}
func (cm *controllerManager) startNonLeaderElectionRunnables() {
cm.mu.Lock()
defer cm.mu.Unlock()
// Start and wait for caches.
cm.waitForCache(cm.internalCtx)
// Start the non-leaderelection Runnables after the cache has synced
for _, c := range cm.nonLeaderElectionRunnables {
if _, ok := c.(*webhook.Server); ok {
continue
}
// Controllers block, but we want to return an error if any have an error starting.
// Write any Start errors to a channel so we can return them
cm.startRunnable(c)
}
}
func (cm *controllerManager) startLeaderElectionRunnables() {
cm.mu.Lock()
defer cm.mu.Unlock()
cm.waitForCache(cm.internalCtx)
// Start the leader election Runnables after the cache has synced
for _, c := range cm.leaderElectionRunnables {
// Controllers block, but we want to return an error if any have an error starting.
// Write any Start errors to a channel so we can return them
cm.startRunnable(c)
}
cm.startedLeader = true
}
func (cm *controllerManager) waitForCache(ctx context.Context) {
if cm.started {
return
}
for _, cache := range cm.caches {
cm.startRunnable(cache)
}
// Wait for the caches to sync.
// TODO(community): Check the return value and write a test
for _, cache := range cm.caches {
cache.GetCache().WaitForCacheSync(ctx)
}
// TODO: This should be the return value of cm.cache.WaitForCacheSync but we abuse
// cm.started as check if we already started the cache so it must always become true.
// Making sure that the cache doesn't get started twice is needed to not get a "close
// of closed channel" panic
cm.started = true
}
func (cm *controllerManager) startLeaderElection() (err error) {
ctx, cancel := context.WithCancel(context.Background())
cm.mu.Lock()
cm.leaderElectionCancel = cancel
cm.mu.Unlock()
if cm.onStoppedLeading == nil {
cm.onStoppedLeading = func() {
// Make sure graceful shutdown is skipped if we lost the leader lock without
// intending to.
cm.gracefulShutdownTimeout = time.Duration(0)
// Most implementations of leader election log.Fatal() here.
// Since Start is wrapped in log.Fatal when called, we can just return
// an error here which will cause the program to exit.
cm.errChan <- errors.New("leader election lost")
}
}
func (cm *controllerManager) startLeaderElection(ctx context.Context) (err error) {
l, err := leaderelection.NewLeaderElector(leaderelection.LeaderElectionConfig{
Lock: cm.resourceLock,
LeaseDuration: cm.leaseDuration,
@ -674,10 +612,24 @@ func (cm *controllerManager) startLeaderElection() (err error) {
RetryPeriod: cm.retryPeriod,
Callbacks: leaderelection.LeaderCallbacks{
OnStartedLeading: func(_ context.Context) {
cm.startLeaderElectionRunnables()
if err := cm.startLeaderElectionRunnables(); err != nil {
cm.errChan <- err
return
}
close(cm.elected)
},
OnStoppedLeading: cm.onStoppedLeading,
OnStoppedLeading: func() {
if cm.onStoppedLeading != nil {
cm.onStoppedLeading()
}
// Make sure graceful shutdown is skipped if we lost the leader lock without
// intending to.
cm.gracefulShutdownTimeout = time.Duration(0)
// Most implementations of leader election log.Fatal() here.
// Since Start is wrapped in log.Fatal when called, we can just return
// an error here which will cause the program to exit.
cm.errChan <- errors.New("leader election lost")
},
},
ReleaseOnCancel: cm.leaderElectionReleaseOnCancel,
})
@ -697,13 +649,3 @@ func (cm *controllerManager) startLeaderElection() (err error) {
func (cm *controllerManager) Elected() <-chan struct{} {
return cm.elected
}
func (cm *controllerManager) startRunnable(r Runnable) {
cm.waitForRunnable.Add(1)
go func() {
defer cm.waitForRunnable.Done()
if err := r.Start(cm.internalCtx); err != nil {
cm.errChan <- err
}
}()
}

View File

@ -31,6 +31,7 @@ import (
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/leaderelection/resourcelock"
"k8s.io/client-go/tools/record"
"k8s.io/utils/pointer"
"sigs.k8s.io/controller-runtime/pkg/cache"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/cluster"
@ -365,8 +366,14 @@ func New(config *rest.Config, options Options) (Manager, error) {
return nil, err
}
errChan := make(chan error)
runnables := newRunnables(errChan)
return &controllerManager{
stopProcedureEngaged: pointer.Int64(0),
cluster: cluster,
runnables: runnables,
errChan: errChan,
recorderProvider: recorderProvider,
resourceLock: resourceLock,
metricsListener: metricsListener,
@ -571,7 +578,7 @@ func setOptionsDefaults(options Options) Options {
options.GracefulShutdownTimeout = &gracefulShutdownTimeout
}
if options.Logger == nil {
if options.Logger.GetSink() == nil {
options.Logger = log.Log
}

View File

@ -0,0 +1,296 @@
package manager
import (
"context"
"errors"
"sync"
"sigs.k8s.io/controller-runtime/pkg/webhook"
)
var (
errRunnableGroupStopped = errors.New("can't accept new runnable as stop procedure is already engaged")
)
// readyRunnable encapsulates a runnable with
// a ready check.
type readyRunnable struct {
Runnable
Check runnableCheck
signalReady bool
}
// runnableCheck can be passed to Add() to let the runnable group determine that a
// runnable is ready. A runnable check should block until a runnable is ready,
// if the returned result is false, the runnable is considered not ready and failed.
type runnableCheck func(ctx context.Context) bool
// runnables handles all the runnables for a manager by grouping them accordingly to their
// type (webhooks, caches etc.).
type runnables struct {
Webhooks *runnableGroup
Caches *runnableGroup
LeaderElection *runnableGroup
Others *runnableGroup
}
// newRunnables creates a new runnables object.
func newRunnables(errChan chan error) *runnables {
return &runnables{
Webhooks: newRunnableGroup(errChan),
Caches: newRunnableGroup(errChan),
LeaderElection: newRunnableGroup(errChan),
Others: newRunnableGroup(errChan),
}
}
// Add adds a runnable to closest group of runnable that they belong to.
//
// Add should be able to be called before and after Start, but not after StopAndWait.
// Add should return an error when called during StopAndWait.
// The runnables added before Start are started when Start is called.
// The runnables added after Start are started directly.
func (r *runnables) Add(fn Runnable) error {
switch runnable := fn.(type) {
case hasCache:
return r.Caches.Add(fn, func(ctx context.Context) bool {
return runnable.GetCache().WaitForCacheSync(ctx)
})
case *webhook.Server:
return r.Webhooks.Add(fn, nil)
case LeaderElectionRunnable:
if !runnable.NeedLeaderElection() {
return r.Others.Add(fn, nil)
}
return r.LeaderElection.Add(fn, nil)
default:
return r.LeaderElection.Add(fn, nil)
}
}
// runnableGroup manages a group of runnables that are
// meant to be running together until StopAndWait is called.
//
// Runnables can be added to a group after the group has started
// but not after it's stopped or while shutting down.
type runnableGroup struct {
ctx context.Context
cancel context.CancelFunc
start sync.Mutex
startOnce sync.Once
started bool
startQueue []*readyRunnable
startReadyCh chan *readyRunnable
stop sync.RWMutex
stopOnce sync.Once
stopped bool
// errChan is the error channel passed by the caller
// when the group is created.
// All errors are forwarded to this channel once they occur.
errChan chan error
// ch is the internal channel where the runnables are read off from.
ch chan *readyRunnable
// wg is an internal sync.WaitGroup that allows us to properly stop
// and wait for all the runnables to finish before returning.
wg *sync.WaitGroup
}
func newRunnableGroup(errChan chan error) *runnableGroup {
r := &runnableGroup{
startReadyCh: make(chan *readyRunnable),
errChan: errChan,
ch: make(chan *readyRunnable),
wg: new(sync.WaitGroup),
}
r.ctx, r.cancel = context.WithCancel(context.Background())
return r
}
// Started returns true if the group has started.
func (r *runnableGroup) Started() bool {
r.start.Lock()
defer r.start.Unlock()
return r.started
}
// Start starts the group and waits for all
// initially registered runnables to start.
// It can only be called once, subsequent calls have no effect.
func (r *runnableGroup) Start(ctx context.Context) error {
var retErr error
r.startOnce.Do(func() {
defer close(r.startReadyCh)
// Start the internal reconciler.
go r.reconcile()
// Start the group and queue up all
// the runnables that were added prior.
r.start.Lock()
r.started = true
for _, rn := range r.startQueue {
rn.signalReady = true
r.ch <- rn
}
r.start.Unlock()
// If we don't have any queue, return.
if len(r.startQueue) == 0 {
return
}
// Wait for all runnables to signal.
for {
select {
case <-ctx.Done():
if err := ctx.Err(); !errors.Is(err, context.Canceled) {
retErr = err
}
case rn := <-r.startReadyCh:
for i, existing := range r.startQueue {
if existing == rn {
// Remove the item from the start queue.
r.startQueue = append(r.startQueue[:i], r.startQueue[i+1:]...)
break
}
}
// We're done waiting if the queue is empty, return.
if len(r.startQueue) == 0 {
return
}
}
}
})
return retErr
}
// reconcile is our main entrypoint for every runnable added
// to this group. Its primary job is to read off the internal channel
// and schedule runnables while tracking their state.
func (r *runnableGroup) reconcile() {
for runnable := range r.ch {
// Handle stop.
// If the shutdown has been called we want to avoid
// adding new goroutines to the WaitGroup because Wait()
// panics if Add() is called after it.
{
r.stop.RLock()
if r.stopped {
// Drop any runnables if we're stopped.
r.errChan <- errRunnableGroupStopped
r.stop.RUnlock()
continue
}
// Why is this here?
// When StopAndWait is called, if a runnable is in the process
// of being added, we could end up in a situation where
// the WaitGroup is incremented while StopAndWait has called Wait(),
// which would result in a panic.
r.wg.Add(1)
r.stop.RUnlock()
}
// Start the runnable.
go func(rn *readyRunnable) {
go func() {
if rn.Check(r.ctx) {
if rn.signalReady {
r.startReadyCh <- rn
}
}
}()
// If we return, the runnable ended cleanly
// or returned an error to the channel.
//
// We should always decrement the WaitGroup here.
defer r.wg.Done()
// Start the runnable.
if err := rn.Start(r.ctx); err != nil {
r.errChan <- err
}
}(runnable)
}
}
// Add should be able to be called before and after Start, but not after StopAndWait.
// Add should return an error when called during StopAndWait.
func (r *runnableGroup) Add(rn Runnable, ready runnableCheck) error {
r.stop.RLock()
if r.stopped {
r.stop.RUnlock()
return errRunnableGroupStopped
}
r.stop.RUnlock()
if ready == nil {
ready = func(_ context.Context) bool { return true }
}
readyRunnable := &readyRunnable{
Runnable: rn,
Check: ready,
}
// Handle start.
// If the overall runnable group isn't started yet
// we want to buffer the runnables and let Start()
// queue them up again later.
{
r.start.Lock()
// Check if we're already started.
if !r.started {
// Store the runnable in the internal if not.
r.startQueue = append(r.startQueue, readyRunnable)
r.start.Unlock()
return nil
}
r.start.Unlock()
}
// Enqueue the runnable.
r.ch <- readyRunnable
return nil
}
// StopAndWait waits for all the runnables to finish before returning.
func (r *runnableGroup) StopAndWait(ctx context.Context) {
r.stopOnce.Do(func() {
// Close the reconciler channel once we're done.
defer close(r.ch)
_ = r.Start(ctx)
r.stop.Lock()
// Store the stopped variable so we don't accept any new
// runnables for the time being.
r.stopped = true
r.stop.Unlock()
// Cancel the internal channel.
r.cancel()
done := make(chan struct{})
go func() {
defer close(done)
// Wait for all the runnables to finish.
r.wg.Wait()
}()
select {
case <-done:
// We're done, exit.
case <-ctx.Done():
// Calling context has expired, exit.
}
})
}

View File

@ -24,8 +24,8 @@ import (
var onlyOneSignalHandler = make(chan struct{})
// SetupSignalHandler registers for SIGTERM and SIGINT. A stop channel is returned
// which is closed on one of these signals. If a second signal is caught, the program
// SetupSignalHandler registers for SIGTERM and SIGINT. A context is returned
// which is canceled on one of these signals. If a second signal is caught, the program
// is terminated with exit code 1.
func SetupSignalHandler() context.Context {
close(onlyOneSignalHandler) // panics when called twice

View File

@ -87,7 +87,7 @@ For example if responding to a Pod Delete Event, the Request won't contain that
instead the reconcile function observes this when reading the cluster state and seeing the Pod as missing.
*/
type Reconciler interface {
// Reconciler performs a full reconciliation for the object referred to by the Request.
// Reconcile performs a full reconciliation for the object referred to by the Request.
// The Controller will requeue the Request to be processed again if an error is non-nil or
// Result.Requeue is true, otherwise upon completion it will remove the work from the queue.
Reconcile(context.Context, Request) (Result, error)

View File

@ -243,7 +243,7 @@ func StandaloneWebhook(hook *Webhook, opts StandaloneOptions) (http.Handler, err
return nil, err
}
if opts.Logger == nil {
if opts.Logger.GetSink() == nil {
opts.Logger = logf.RuntimeLog.WithName("webhook")
}
hook.log = opts.Logger

View File

@ -34,6 +34,7 @@ import (
kscheme "k8s.io/client-go/kubernetes/scheme"
"sigs.k8s.io/controller-runtime/pkg/certwatcher"
"sigs.k8s.io/controller-runtime/pkg/healthz"
"sigs.k8s.io/controller-runtime/pkg/internal/httpserver"
"sigs.k8s.io/controller-runtime/pkg/runtime/inject"
"sigs.k8s.io/controller-runtime/pkg/webhook/internal/metrics"
)
@ -261,9 +262,7 @@ func (s *Server) Start(ctx context.Context) error {
log.Info("Serving webhook server", "host", s.Host, "port", s.Port)
srv := &http.Server{
Handler: s.WebhookMux,
}
srv := httpserver.New(s.WebhookMux)
idleConnsClosed := make(chan struct{})
go func() {