mirror of
https://github.com/ceph/ceph-csi.git
synced 2025-01-06 20:09:31 +00:00
9eaa55506f
This commit updates controller-runtime to v0.9.2 and makes changes in persistentvolume.go to add context to various functions and function calls made here instead of context.TODO(). Signed-off-by: Rakshith R <rar@redhat.com>
701 lines
22 KiB
Go
701 lines
22 KiB
Go
/*
|
|
Copyright 2018 The Kubernetes Authors.
|
|
|
|
Licensed under the Apache License, Version 2.0 (the "License");
|
|
you may not use this file except in compliance with the License.
|
|
You may obtain a copy of the License at
|
|
|
|
http://www.apache.org/licenses/LICENSE-2.0
|
|
|
|
Unless required by applicable law or agreed to in writing, software
|
|
distributed under the License is distributed on an "AS IS" BASIS,
|
|
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
See the License for the specific language governing permissions and
|
|
limitations under the License.
|
|
*/
|
|
|
|
package manager
|
|
|
|
import (
|
|
"context"
|
|
"errors"
|
|
"fmt"
|
|
"net"
|
|
"net/http"
|
|
"sync"
|
|
"time"
|
|
|
|
"github.com/go-logr/logr"
|
|
"github.com/prometheus/client_golang/prometheus/promhttp"
|
|
"k8s.io/apimachinery/pkg/api/meta"
|
|
"k8s.io/apimachinery/pkg/runtime"
|
|
kerrors "k8s.io/apimachinery/pkg/util/errors"
|
|
"k8s.io/client-go/rest"
|
|
"k8s.io/client-go/tools/leaderelection"
|
|
"k8s.io/client-go/tools/leaderelection/resourcelock"
|
|
"k8s.io/client-go/tools/record"
|
|
|
|
"sigs.k8s.io/controller-runtime/pkg/cache"
|
|
"sigs.k8s.io/controller-runtime/pkg/client"
|
|
"sigs.k8s.io/controller-runtime/pkg/cluster"
|
|
"sigs.k8s.io/controller-runtime/pkg/config/v1alpha1"
|
|
"sigs.k8s.io/controller-runtime/pkg/healthz"
|
|
intrec "sigs.k8s.io/controller-runtime/pkg/internal/recorder"
|
|
"sigs.k8s.io/controller-runtime/pkg/metrics"
|
|
"sigs.k8s.io/controller-runtime/pkg/runtime/inject"
|
|
"sigs.k8s.io/controller-runtime/pkg/webhook"
|
|
)
|
|
|
|
const (
|
|
// Values taken from: https://github.com/kubernetes/apiserver/blob/master/pkg/apis/config/v1alpha1/defaults.go
|
|
defaultLeaseDuration = 15 * time.Second
|
|
defaultRenewDeadline = 10 * time.Second
|
|
defaultRetryPeriod = 2 * time.Second
|
|
defaultGracefulShutdownPeriod = 30 * time.Second
|
|
|
|
defaultReadinessEndpoint = "/readyz"
|
|
defaultLivenessEndpoint = "/healthz"
|
|
defaultMetricsEndpoint = "/metrics"
|
|
)
|
|
|
|
var _ Runnable = &controllerManager{}
|
|
|
|
type controllerManager struct {
|
|
// cluster holds a variety of methods to interact with a cluster. Required.
|
|
cluster cluster.Cluster
|
|
|
|
// leaderElectionRunnables is the set of Controllers that the controllerManager injects deps into and Starts.
|
|
// These Runnables are managed by lead election.
|
|
leaderElectionRunnables []Runnable
|
|
|
|
// nonLeaderElectionRunnables is the set of webhook servers that the controllerManager injects deps into and Starts.
|
|
// These Runnables will not be blocked by lead election.
|
|
nonLeaderElectionRunnables []Runnable
|
|
|
|
// recorderProvider is used to generate event recorders that will be injected into Controllers
|
|
// (and EventHandlers, Sources and Predicates).
|
|
recorderProvider *intrec.Provider
|
|
|
|
// resourceLock forms the basis for leader election
|
|
resourceLock resourcelock.Interface
|
|
|
|
// leaderElectionReleaseOnCancel defines if the manager should step back from the leader lease
|
|
// on shutdown
|
|
leaderElectionReleaseOnCancel bool
|
|
|
|
// metricsListener is used to serve prometheus metrics
|
|
metricsListener net.Listener
|
|
|
|
// metricsExtraHandlers contains extra handlers to register on http server that serves metrics.
|
|
metricsExtraHandlers map[string]http.Handler
|
|
|
|
// healthProbeListener is used to serve liveness probe
|
|
healthProbeListener net.Listener
|
|
|
|
// Readiness probe endpoint name
|
|
readinessEndpointName string
|
|
|
|
// Liveness probe endpoint name
|
|
livenessEndpointName string
|
|
|
|
// Readyz probe handler
|
|
readyzHandler *healthz.Handler
|
|
|
|
// Healthz probe handler
|
|
healthzHandler *healthz.Handler
|
|
|
|
mu sync.Mutex
|
|
started bool
|
|
startedLeader bool
|
|
healthzStarted bool
|
|
errChan chan error
|
|
|
|
// controllerOptions are the global controller options.
|
|
controllerOptions v1alpha1.ControllerConfigurationSpec
|
|
|
|
// Logger is the logger that should be used by this manager.
|
|
// If none is set, it defaults to log.Log global logger.
|
|
logger logr.Logger
|
|
|
|
// leaderElectionCancel is used to cancel the leader election. It is distinct from internalStopper,
|
|
// because for safety reasons we need to os.Exit() when we lose the leader election, meaning that
|
|
// it must be deferred until after gracefulShutdown is done.
|
|
leaderElectionCancel context.CancelFunc
|
|
|
|
// leaderElectionStopped is an internal channel used to signal the stopping procedure that the
|
|
// LeaderElection.Run(...) function has returned and the shutdown can proceed.
|
|
leaderElectionStopped chan struct{}
|
|
|
|
// stop procedure engaged. In other words, we should not add anything else to the manager
|
|
stopProcedureEngaged bool
|
|
|
|
// elected is closed when this manager becomes the leader of a group of
|
|
// managers, either because it won a leader election or because no leader
|
|
// election was configured.
|
|
elected chan struct{}
|
|
|
|
caches []hasCache
|
|
|
|
// port is the port that the webhook server serves at.
|
|
port int
|
|
// host is the hostname that the webhook server binds to.
|
|
host string
|
|
// CertDir is the directory that contains the server key and certificate.
|
|
// if not set, webhook server would look up the server key and certificate in
|
|
// {TempDir}/k8s-webhook-server/serving-certs
|
|
certDir string
|
|
|
|
webhookServer *webhook.Server
|
|
// webhookServerOnce will be called in GetWebhookServer() to optionally initialize
|
|
// webhookServer if unset, and Add() it to controllerManager.
|
|
webhookServerOnce sync.Once
|
|
|
|
// leaseDuration is the duration that non-leader candidates will
|
|
// wait to force acquire leadership.
|
|
leaseDuration time.Duration
|
|
// renewDeadline is the duration that the acting controlplane will retry
|
|
// refreshing leadership before giving up.
|
|
renewDeadline time.Duration
|
|
// retryPeriod is the duration the LeaderElector clients should wait
|
|
// between tries of actions.
|
|
retryPeriod time.Duration
|
|
|
|
// waitForRunnable is holding the number of runnables currently running so that
|
|
// we can wait for them to exit before quitting the manager
|
|
waitForRunnable sync.WaitGroup
|
|
|
|
// gracefulShutdownTimeout is the duration given to runnable to stop
|
|
// before the manager actually returns on stop.
|
|
gracefulShutdownTimeout time.Duration
|
|
|
|
// onStoppedLeading is callled when the leader election lease is lost.
|
|
// It can be overridden for tests.
|
|
onStoppedLeading func()
|
|
|
|
// shutdownCtx is the context that can be used during shutdown. It will be cancelled
|
|
// after the gracefulShutdownTimeout ended. It must not be accessed before internalStop
|
|
// is closed because it will be nil.
|
|
shutdownCtx context.Context
|
|
|
|
internalCtx context.Context
|
|
internalCancel context.CancelFunc
|
|
|
|
// internalProceduresStop channel is used internally to the manager when coordinating
|
|
// the proper shutdown of servers. This channel is also used for dependency injection.
|
|
internalProceduresStop chan struct{}
|
|
}
|
|
|
|
type hasCache interface {
|
|
Runnable
|
|
GetCache() cache.Cache
|
|
}
|
|
|
|
// Add sets dependencies on i, and adds it to the list of Runnables to start.
|
|
func (cm *controllerManager) Add(r Runnable) error {
|
|
cm.mu.Lock()
|
|
defer cm.mu.Unlock()
|
|
if cm.stopProcedureEngaged {
|
|
return errors.New("can't accept new runnable as stop procedure is already engaged")
|
|
}
|
|
|
|
// Set dependencies on the object
|
|
if err := cm.SetFields(r); err != nil {
|
|
return err
|
|
}
|
|
|
|
var shouldStart bool
|
|
|
|
// Add the runnable to the leader election or the non-leaderelection list
|
|
if leRunnable, ok := r.(LeaderElectionRunnable); ok && !leRunnable.NeedLeaderElection() {
|
|
shouldStart = cm.started
|
|
cm.nonLeaderElectionRunnables = append(cm.nonLeaderElectionRunnables, r)
|
|
} else if hasCache, ok := r.(hasCache); ok {
|
|
cm.caches = append(cm.caches, hasCache)
|
|
} else {
|
|
shouldStart = cm.startedLeader
|
|
cm.leaderElectionRunnables = append(cm.leaderElectionRunnables, r)
|
|
}
|
|
|
|
if shouldStart {
|
|
// If already started, start the controller
|
|
cm.startRunnable(r)
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// Deprecated: use the equivalent Options field to set a field. This method will be removed in v0.10.
|
|
func (cm *controllerManager) SetFields(i interface{}) error {
|
|
if err := cm.cluster.SetFields(i); err != nil {
|
|
return err
|
|
}
|
|
if _, err := inject.InjectorInto(cm.SetFields, i); err != nil {
|
|
return err
|
|
}
|
|
if _, err := inject.StopChannelInto(cm.internalProceduresStop, i); err != nil {
|
|
return err
|
|
}
|
|
if _, err := inject.LoggerInto(cm.logger, i); err != nil {
|
|
return err
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// AddMetricsExtraHandler adds extra handler served on path to the http server that serves metrics.
|
|
func (cm *controllerManager) AddMetricsExtraHandler(path string, handler http.Handler) error {
|
|
if path == defaultMetricsEndpoint {
|
|
return fmt.Errorf("overriding builtin %s endpoint is not allowed", defaultMetricsEndpoint)
|
|
}
|
|
|
|
cm.mu.Lock()
|
|
defer cm.mu.Unlock()
|
|
|
|
if _, found := cm.metricsExtraHandlers[path]; found {
|
|
return fmt.Errorf("can't register extra handler by duplicate path %q on metrics http server", path)
|
|
}
|
|
|
|
cm.metricsExtraHandlers[path] = handler
|
|
cm.logger.V(2).Info("Registering metrics http server extra handler", "path", path)
|
|
return nil
|
|
}
|
|
|
|
// AddHealthzCheck allows you to add Healthz checker.
|
|
func (cm *controllerManager) AddHealthzCheck(name string, check healthz.Checker) error {
|
|
cm.mu.Lock()
|
|
defer cm.mu.Unlock()
|
|
|
|
if cm.stopProcedureEngaged {
|
|
return errors.New("can't accept new healthCheck as stop procedure is already engaged")
|
|
}
|
|
|
|
if cm.healthzStarted {
|
|
return fmt.Errorf("unable to add new checker because healthz endpoint has already been created")
|
|
}
|
|
|
|
if cm.healthzHandler == nil {
|
|
cm.healthzHandler = &healthz.Handler{Checks: map[string]healthz.Checker{}}
|
|
}
|
|
|
|
cm.healthzHandler.Checks[name] = check
|
|
return nil
|
|
}
|
|
|
|
// AddReadyzCheck allows you to add Readyz checker.
|
|
func (cm *controllerManager) AddReadyzCheck(name string, check healthz.Checker) error {
|
|
cm.mu.Lock()
|
|
defer cm.mu.Unlock()
|
|
|
|
if cm.stopProcedureEngaged {
|
|
return errors.New("can't accept new ready check as stop procedure is already engaged")
|
|
}
|
|
|
|
if cm.healthzStarted {
|
|
return fmt.Errorf("unable to add new checker because readyz endpoint has already been created")
|
|
}
|
|
|
|
if cm.readyzHandler == nil {
|
|
cm.readyzHandler = &healthz.Handler{Checks: map[string]healthz.Checker{}}
|
|
}
|
|
|
|
cm.readyzHandler.Checks[name] = check
|
|
return nil
|
|
}
|
|
|
|
func (cm *controllerManager) GetConfig() *rest.Config {
|
|
return cm.cluster.GetConfig()
|
|
}
|
|
|
|
func (cm *controllerManager) GetClient() client.Client {
|
|
return cm.cluster.GetClient()
|
|
}
|
|
|
|
func (cm *controllerManager) GetScheme() *runtime.Scheme {
|
|
return cm.cluster.GetScheme()
|
|
}
|
|
|
|
func (cm *controllerManager) GetFieldIndexer() client.FieldIndexer {
|
|
return cm.cluster.GetFieldIndexer()
|
|
}
|
|
|
|
func (cm *controllerManager) GetCache() cache.Cache {
|
|
return cm.cluster.GetCache()
|
|
}
|
|
|
|
func (cm *controllerManager) GetEventRecorderFor(name string) record.EventRecorder {
|
|
return cm.cluster.GetEventRecorderFor(name)
|
|
}
|
|
|
|
func (cm *controllerManager) GetRESTMapper() meta.RESTMapper {
|
|
return cm.cluster.GetRESTMapper()
|
|
}
|
|
|
|
func (cm *controllerManager) GetAPIReader() client.Reader {
|
|
return cm.cluster.GetAPIReader()
|
|
}
|
|
|
|
func (cm *controllerManager) GetWebhookServer() *webhook.Server {
|
|
cm.webhookServerOnce.Do(func() {
|
|
if cm.webhookServer == nil {
|
|
cm.webhookServer = &webhook.Server{
|
|
Port: cm.port,
|
|
Host: cm.host,
|
|
CertDir: cm.certDir,
|
|
}
|
|
}
|
|
if err := cm.Add(cm.webhookServer); err != nil {
|
|
panic("unable to add webhook server to the controller manager")
|
|
}
|
|
})
|
|
return cm.webhookServer
|
|
}
|
|
|
|
func (cm *controllerManager) GetLogger() logr.Logger {
|
|
return cm.logger
|
|
}
|
|
|
|
func (cm *controllerManager) GetControllerOptions() v1alpha1.ControllerConfigurationSpec {
|
|
return cm.controllerOptions
|
|
}
|
|
|
|
func (cm *controllerManager) serveMetrics() {
|
|
handler := promhttp.HandlerFor(metrics.Registry, promhttp.HandlerOpts{
|
|
ErrorHandling: promhttp.HTTPErrorOnError,
|
|
})
|
|
// TODO(JoelSpeed): Use existing Kubernetes machinery for serving metrics
|
|
mux := http.NewServeMux()
|
|
mux.Handle(defaultMetricsEndpoint, handler)
|
|
|
|
func() {
|
|
cm.mu.Lock()
|
|
defer cm.mu.Unlock()
|
|
|
|
for path, extraHandler := range cm.metricsExtraHandlers {
|
|
mux.Handle(path, extraHandler)
|
|
}
|
|
}()
|
|
|
|
server := http.Server{
|
|
Handler: mux,
|
|
}
|
|
// Run the server
|
|
cm.startRunnable(RunnableFunc(func(_ context.Context) error {
|
|
cm.logger.Info("starting metrics server", "path", defaultMetricsEndpoint)
|
|
if err := server.Serve(cm.metricsListener); err != nil && err != http.ErrServerClosed {
|
|
return err
|
|
}
|
|
return nil
|
|
}))
|
|
|
|
// Shutdown the server when stop is closed
|
|
<-cm.internalProceduresStop
|
|
if err := server.Shutdown(cm.shutdownCtx); err != nil {
|
|
cm.errChan <- err
|
|
}
|
|
}
|
|
|
|
func (cm *controllerManager) serveHealthProbes() {
|
|
mux := http.NewServeMux()
|
|
server := http.Server{
|
|
Handler: mux,
|
|
}
|
|
|
|
func() {
|
|
cm.mu.Lock()
|
|
defer cm.mu.Unlock()
|
|
|
|
if cm.readyzHandler != nil {
|
|
mux.Handle(cm.readinessEndpointName, http.StripPrefix(cm.readinessEndpointName, cm.readyzHandler))
|
|
// Append '/' suffix to handle subpaths
|
|
mux.Handle(cm.readinessEndpointName+"/", http.StripPrefix(cm.readinessEndpointName, cm.readyzHandler))
|
|
}
|
|
if cm.healthzHandler != nil {
|
|
mux.Handle(cm.livenessEndpointName, http.StripPrefix(cm.livenessEndpointName, cm.healthzHandler))
|
|
// Append '/' suffix to handle subpaths
|
|
mux.Handle(cm.livenessEndpointName+"/", http.StripPrefix(cm.livenessEndpointName, cm.healthzHandler))
|
|
}
|
|
|
|
// Run server
|
|
cm.startRunnable(RunnableFunc(func(_ context.Context) error {
|
|
if err := server.Serve(cm.healthProbeListener); err != nil && err != http.ErrServerClosed {
|
|
return err
|
|
}
|
|
return nil
|
|
}))
|
|
cm.healthzStarted = true
|
|
}()
|
|
|
|
// Shutdown the server when stop is closed
|
|
<-cm.internalProceduresStop
|
|
if err := server.Shutdown(cm.shutdownCtx); err != nil {
|
|
cm.errChan <- err
|
|
}
|
|
}
|
|
|
|
func (cm *controllerManager) Start(ctx context.Context) (err error) {
|
|
if err := cm.Add(cm.cluster); err != nil {
|
|
return fmt.Errorf("failed to add cluster to runnables: %w", err)
|
|
}
|
|
cm.internalCtx, cm.internalCancel = context.WithCancel(ctx)
|
|
|
|
// This chan indicates that stop is complete, in other words all runnables have returned or timeout on stop request
|
|
stopComplete := make(chan struct{})
|
|
defer close(stopComplete)
|
|
// This must be deferred after closing stopComplete, otherwise we deadlock.
|
|
defer func() {
|
|
// https://hips.hearstapps.com/hmg-prod.s3.amazonaws.com/images/gettyimages-459889618-1533579787.jpg
|
|
stopErr := cm.engageStopProcedure(stopComplete)
|
|
if stopErr != nil {
|
|
if err != nil {
|
|
// Utilerrors.Aggregate allows to use errors.Is for all contained errors
|
|
// whereas fmt.Errorf allows wrapping at most one error which means the
|
|
// other one can not be found anymore.
|
|
err = kerrors.NewAggregate([]error{err, stopErr})
|
|
} else {
|
|
err = stopErr
|
|
}
|
|
}
|
|
}()
|
|
|
|
// initialize this here so that we reset the signal channel state on every start
|
|
// Everything that might write into this channel must be started in a new goroutine,
|
|
// because otherwise we might block this routine trying to write into the full channel
|
|
// and will not be able to enter the deferred cm.engageStopProcedure() which drains
|
|
// it.
|
|
cm.errChan = make(chan error)
|
|
|
|
// Metrics should be served whether the controller is leader or not.
|
|
// (If we don't serve metrics for non-leaders, prometheus will still scrape
|
|
// the pod but will get a connection refused)
|
|
if cm.metricsListener != nil {
|
|
go cm.serveMetrics()
|
|
}
|
|
|
|
// Serve health probes
|
|
if cm.healthProbeListener != nil {
|
|
go cm.serveHealthProbes()
|
|
}
|
|
|
|
go cm.startNonLeaderElectionRunnables()
|
|
|
|
go func() {
|
|
if cm.resourceLock != nil {
|
|
err := cm.startLeaderElection()
|
|
if err != nil {
|
|
cm.errChan <- err
|
|
}
|
|
} else {
|
|
// Treat not having leader election enabled the same as being elected.
|
|
cm.startLeaderElectionRunnables()
|
|
close(cm.elected)
|
|
}
|
|
}()
|
|
|
|
select {
|
|
case <-ctx.Done():
|
|
// We are done
|
|
return nil
|
|
case err := <-cm.errChan:
|
|
// Error starting or running a runnable
|
|
return err
|
|
}
|
|
}
|
|
|
|
// engageStopProcedure signals all runnables to stop, reads potential errors
|
|
// from the errChan and waits for them to end. It must not be called more than once.
|
|
func (cm *controllerManager) engageStopProcedure(stopComplete <-chan struct{}) error {
|
|
// Populate the shutdown context.
|
|
var shutdownCancel context.CancelFunc
|
|
if cm.gracefulShutdownTimeout > 0 {
|
|
cm.shutdownCtx, shutdownCancel = context.WithTimeout(context.Background(), cm.gracefulShutdownTimeout)
|
|
} else {
|
|
cm.shutdownCtx, shutdownCancel = context.WithCancel(context.Background())
|
|
}
|
|
defer shutdownCancel()
|
|
|
|
// Cancel the internal stop channel and wait for the procedures to stop and complete.
|
|
close(cm.internalProceduresStop)
|
|
cm.internalCancel()
|
|
|
|
// Start draining the errors before acquiring the lock to make sure we don't deadlock
|
|
// if something that has the lock is blocked on trying to write into the unbuffered
|
|
// channel after something else already wrote into it.
|
|
go func() {
|
|
for {
|
|
select {
|
|
case err, ok := <-cm.errChan:
|
|
if ok {
|
|
cm.logger.Error(err, "error received after stop sequence was engaged")
|
|
}
|
|
case <-stopComplete:
|
|
return
|
|
}
|
|
}
|
|
}()
|
|
if cm.gracefulShutdownTimeout == 0 {
|
|
return nil
|
|
}
|
|
cm.mu.Lock()
|
|
defer cm.mu.Unlock()
|
|
cm.stopProcedureEngaged = true
|
|
|
|
// we want to close this after the other runnables stop, because we don't
|
|
// want things like leader election to try and emit events on a closed
|
|
// channel
|
|
defer cm.recorderProvider.Stop(cm.shutdownCtx)
|
|
return cm.waitForRunnableToEnd(shutdownCancel)
|
|
}
|
|
|
|
// waitForRunnableToEnd blocks until all runnables ended or the
|
|
// tearDownTimeout was reached. In the latter case, an error is returned.
|
|
func (cm *controllerManager) waitForRunnableToEnd(shutdownCancel context.CancelFunc) (retErr error) {
|
|
// Cancel leader election only after we waited. It will os.Exit() the app for safety.
|
|
defer func() {
|
|
if retErr == nil && cm.leaderElectionCancel != nil {
|
|
// After asking the context to be cancelled, make sure
|
|
// we wait for the leader stopped channel to be closed, otherwise
|
|
// we might encounter race conditions between this code
|
|
// and the event recorder, which is used within leader election code.
|
|
cm.leaderElectionCancel()
|
|
<-cm.leaderElectionStopped
|
|
}
|
|
}()
|
|
|
|
go func() {
|
|
cm.waitForRunnable.Wait()
|
|
shutdownCancel()
|
|
}()
|
|
|
|
<-cm.shutdownCtx.Done()
|
|
if err := cm.shutdownCtx.Err(); err != nil && err != context.Canceled {
|
|
return fmt.Errorf("failed waiting for all runnables to end within grace period of %s: %w", cm.gracefulShutdownTimeout, err)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (cm *controllerManager) startNonLeaderElectionRunnables() {
|
|
cm.mu.Lock()
|
|
defer cm.mu.Unlock()
|
|
|
|
// First start any webhook servers, which includes conversion, validation, and defaulting
|
|
// webhooks that are registered.
|
|
//
|
|
// WARNING: Webhooks MUST start before any cache is populated, otherwise there is a race condition
|
|
// between conversion webhooks and the cache sync (usually initial list) which causes the webhooks
|
|
// to never start because no cache can be populated.
|
|
for _, c := range cm.nonLeaderElectionRunnables {
|
|
if _, ok := c.(*webhook.Server); ok {
|
|
cm.startRunnable(c)
|
|
}
|
|
}
|
|
|
|
// Start and wait for caches.
|
|
cm.waitForCache(cm.internalCtx)
|
|
|
|
// Start the non-leaderelection Runnables after the cache has synced
|
|
for _, c := range cm.nonLeaderElectionRunnables {
|
|
if _, ok := c.(*webhook.Server); ok {
|
|
continue
|
|
}
|
|
|
|
// Controllers block, but we want to return an error if any have an error starting.
|
|
// Write any Start errors to a channel so we can return them
|
|
cm.startRunnable(c)
|
|
}
|
|
}
|
|
|
|
func (cm *controllerManager) startLeaderElectionRunnables() {
|
|
cm.mu.Lock()
|
|
defer cm.mu.Unlock()
|
|
|
|
cm.waitForCache(cm.internalCtx)
|
|
|
|
// Start the leader election Runnables after the cache has synced
|
|
for _, c := range cm.leaderElectionRunnables {
|
|
// Controllers block, but we want to return an error if any have an error starting.
|
|
// Write any Start errors to a channel so we can return them
|
|
cm.startRunnable(c)
|
|
}
|
|
|
|
cm.startedLeader = true
|
|
}
|
|
|
|
func (cm *controllerManager) waitForCache(ctx context.Context) {
|
|
if cm.started {
|
|
return
|
|
}
|
|
|
|
for _, cache := range cm.caches {
|
|
cm.startRunnable(cache)
|
|
}
|
|
|
|
// Wait for the caches to sync.
|
|
// TODO(community): Check the return value and write a test
|
|
for _, cache := range cm.caches {
|
|
cache.GetCache().WaitForCacheSync(ctx)
|
|
}
|
|
// TODO: This should be the return value of cm.cache.WaitForCacheSync but we abuse
|
|
// cm.started as check if we already started the cache so it must always become true.
|
|
// Making sure that the cache doesn't get started twice is needed to not get a "close
|
|
// of closed channel" panic
|
|
cm.started = true
|
|
}
|
|
|
|
func (cm *controllerManager) startLeaderElection() (err error) {
|
|
ctx, cancel := context.WithCancel(context.Background())
|
|
cm.mu.Lock()
|
|
cm.leaderElectionCancel = cancel
|
|
cm.mu.Unlock()
|
|
|
|
if cm.onStoppedLeading == nil {
|
|
cm.onStoppedLeading = func() {
|
|
// Make sure graceful shutdown is skipped if we lost the leader lock without
|
|
// intending to.
|
|
cm.gracefulShutdownTimeout = time.Duration(0)
|
|
// Most implementations of leader election log.Fatal() here.
|
|
// Since Start is wrapped in log.Fatal when called, we can just return
|
|
// an error here which will cause the program to exit.
|
|
cm.errChan <- errors.New("leader election lost")
|
|
}
|
|
}
|
|
l, err := leaderelection.NewLeaderElector(leaderelection.LeaderElectionConfig{
|
|
Lock: cm.resourceLock,
|
|
LeaseDuration: cm.leaseDuration,
|
|
RenewDeadline: cm.renewDeadline,
|
|
RetryPeriod: cm.retryPeriod,
|
|
Callbacks: leaderelection.LeaderCallbacks{
|
|
OnStartedLeading: func(_ context.Context) {
|
|
cm.startLeaderElectionRunnables()
|
|
close(cm.elected)
|
|
},
|
|
OnStoppedLeading: cm.onStoppedLeading,
|
|
},
|
|
ReleaseOnCancel: cm.leaderElectionReleaseOnCancel,
|
|
})
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
// Start the leader elector process
|
|
go func() {
|
|
l.Run(ctx)
|
|
<-ctx.Done()
|
|
close(cm.leaderElectionStopped)
|
|
}()
|
|
return nil
|
|
}
|
|
|
|
func (cm *controllerManager) Elected() <-chan struct{} {
|
|
return cm.elected
|
|
}
|
|
|
|
func (cm *controllerManager) startRunnable(r Runnable) {
|
|
cm.waitForRunnable.Add(1)
|
|
go func() {
|
|
defer cm.waitForRunnable.Done()
|
|
if err := r.Start(cm.internalCtx); err != nil {
|
|
cm.errChan <- err
|
|
}
|
|
}()
|
|
}
|