mirror of
https://github.com/ceph/ceph-csi.git
synced 2025-01-07 12:29:31 +00:00
297 lines
7.4 KiB
Go
297 lines
7.4 KiB
Go
|
package manager
|
||
|
|
||
|
import (
|
||
|
"context"
|
||
|
"errors"
|
||
|
"sync"
|
||
|
|
||
|
"sigs.k8s.io/controller-runtime/pkg/webhook"
|
||
|
)
|
||
|
|
||
|
var (
|
||
|
errRunnableGroupStopped = errors.New("can't accept new runnable as stop procedure is already engaged")
|
||
|
)
|
||
|
|
||
|
// readyRunnable encapsulates a runnable with
|
||
|
// a ready check.
|
||
|
type readyRunnable struct {
|
||
|
Runnable
|
||
|
Check runnableCheck
|
||
|
signalReady bool
|
||
|
}
|
||
|
|
||
|
// runnableCheck can be passed to Add() to let the runnable group determine that a
|
||
|
// runnable is ready. A runnable check should block until a runnable is ready,
|
||
|
// if the returned result is false, the runnable is considered not ready and failed.
|
||
|
type runnableCheck func(ctx context.Context) bool
|
||
|
|
||
|
// runnables handles all the runnables for a manager by grouping them accordingly to their
|
||
|
// type (webhooks, caches etc.).
|
||
|
type runnables struct {
|
||
|
Webhooks *runnableGroup
|
||
|
Caches *runnableGroup
|
||
|
LeaderElection *runnableGroup
|
||
|
Others *runnableGroup
|
||
|
}
|
||
|
|
||
|
// newRunnables creates a new runnables object.
|
||
|
func newRunnables(errChan chan error) *runnables {
|
||
|
return &runnables{
|
||
|
Webhooks: newRunnableGroup(errChan),
|
||
|
Caches: newRunnableGroup(errChan),
|
||
|
LeaderElection: newRunnableGroup(errChan),
|
||
|
Others: newRunnableGroup(errChan),
|
||
|
}
|
||
|
}
|
||
|
|
||
|
// Add adds a runnable to closest group of runnable that they belong to.
|
||
|
//
|
||
|
// Add should be able to be called before and after Start, but not after StopAndWait.
|
||
|
// Add should return an error when called during StopAndWait.
|
||
|
// The runnables added before Start are started when Start is called.
|
||
|
// The runnables added after Start are started directly.
|
||
|
func (r *runnables) Add(fn Runnable) error {
|
||
|
switch runnable := fn.(type) {
|
||
|
case hasCache:
|
||
|
return r.Caches.Add(fn, func(ctx context.Context) bool {
|
||
|
return runnable.GetCache().WaitForCacheSync(ctx)
|
||
|
})
|
||
|
case *webhook.Server:
|
||
|
return r.Webhooks.Add(fn, nil)
|
||
|
case LeaderElectionRunnable:
|
||
|
if !runnable.NeedLeaderElection() {
|
||
|
return r.Others.Add(fn, nil)
|
||
|
}
|
||
|
return r.LeaderElection.Add(fn, nil)
|
||
|
default:
|
||
|
return r.LeaderElection.Add(fn, nil)
|
||
|
}
|
||
|
}
|
||
|
|
||
|
// runnableGroup manages a group of runnables that are
|
||
|
// meant to be running together until StopAndWait is called.
|
||
|
//
|
||
|
// Runnables can be added to a group after the group has started
|
||
|
// but not after it's stopped or while shutting down.
|
||
|
type runnableGroup struct {
|
||
|
ctx context.Context
|
||
|
cancel context.CancelFunc
|
||
|
|
||
|
start sync.Mutex
|
||
|
startOnce sync.Once
|
||
|
started bool
|
||
|
startQueue []*readyRunnable
|
||
|
startReadyCh chan *readyRunnable
|
||
|
|
||
|
stop sync.RWMutex
|
||
|
stopOnce sync.Once
|
||
|
stopped bool
|
||
|
|
||
|
// errChan is the error channel passed by the caller
|
||
|
// when the group is created.
|
||
|
// All errors are forwarded to this channel once they occur.
|
||
|
errChan chan error
|
||
|
|
||
|
// ch is the internal channel where the runnables are read off from.
|
||
|
ch chan *readyRunnable
|
||
|
|
||
|
// wg is an internal sync.WaitGroup that allows us to properly stop
|
||
|
// and wait for all the runnables to finish before returning.
|
||
|
wg *sync.WaitGroup
|
||
|
}
|
||
|
|
||
|
func newRunnableGroup(errChan chan error) *runnableGroup {
|
||
|
r := &runnableGroup{
|
||
|
startReadyCh: make(chan *readyRunnable),
|
||
|
errChan: errChan,
|
||
|
ch: make(chan *readyRunnable),
|
||
|
wg: new(sync.WaitGroup),
|
||
|
}
|
||
|
r.ctx, r.cancel = context.WithCancel(context.Background())
|
||
|
return r
|
||
|
}
|
||
|
|
||
|
// Started returns true if the group has started.
|
||
|
func (r *runnableGroup) Started() bool {
|
||
|
r.start.Lock()
|
||
|
defer r.start.Unlock()
|
||
|
return r.started
|
||
|
}
|
||
|
|
||
|
// Start starts the group and waits for all
|
||
|
// initially registered runnables to start.
|
||
|
// It can only be called once, subsequent calls have no effect.
|
||
|
func (r *runnableGroup) Start(ctx context.Context) error {
|
||
|
var retErr error
|
||
|
|
||
|
r.startOnce.Do(func() {
|
||
|
defer close(r.startReadyCh)
|
||
|
|
||
|
// Start the internal reconciler.
|
||
|
go r.reconcile()
|
||
|
|
||
|
// Start the group and queue up all
|
||
|
// the runnables that were added prior.
|
||
|
r.start.Lock()
|
||
|
r.started = true
|
||
|
for _, rn := range r.startQueue {
|
||
|
rn.signalReady = true
|
||
|
r.ch <- rn
|
||
|
}
|
||
|
r.start.Unlock()
|
||
|
|
||
|
// If we don't have any queue, return.
|
||
|
if len(r.startQueue) == 0 {
|
||
|
return
|
||
|
}
|
||
|
|
||
|
// Wait for all runnables to signal.
|
||
|
for {
|
||
|
select {
|
||
|
case <-ctx.Done():
|
||
|
if err := ctx.Err(); !errors.Is(err, context.Canceled) {
|
||
|
retErr = err
|
||
|
}
|
||
|
case rn := <-r.startReadyCh:
|
||
|
for i, existing := range r.startQueue {
|
||
|
if existing == rn {
|
||
|
// Remove the item from the start queue.
|
||
|
r.startQueue = append(r.startQueue[:i], r.startQueue[i+1:]...)
|
||
|
break
|
||
|
}
|
||
|
}
|
||
|
// We're done waiting if the queue is empty, return.
|
||
|
if len(r.startQueue) == 0 {
|
||
|
return
|
||
|
}
|
||
|
}
|
||
|
}
|
||
|
})
|
||
|
|
||
|
return retErr
|
||
|
}
|
||
|
|
||
|
// reconcile is our main entrypoint for every runnable added
|
||
|
// to this group. Its primary job is to read off the internal channel
|
||
|
// and schedule runnables while tracking their state.
|
||
|
func (r *runnableGroup) reconcile() {
|
||
|
for runnable := range r.ch {
|
||
|
// Handle stop.
|
||
|
// If the shutdown has been called we want to avoid
|
||
|
// adding new goroutines to the WaitGroup because Wait()
|
||
|
// panics if Add() is called after it.
|
||
|
{
|
||
|
r.stop.RLock()
|
||
|
if r.stopped {
|
||
|
// Drop any runnables if we're stopped.
|
||
|
r.errChan <- errRunnableGroupStopped
|
||
|
r.stop.RUnlock()
|
||
|
continue
|
||
|
}
|
||
|
|
||
|
// Why is this here?
|
||
|
// When StopAndWait is called, if a runnable is in the process
|
||
|
// of being added, we could end up in a situation where
|
||
|
// the WaitGroup is incremented while StopAndWait has called Wait(),
|
||
|
// which would result in a panic.
|
||
|
r.wg.Add(1)
|
||
|
r.stop.RUnlock()
|
||
|
}
|
||
|
|
||
|
// Start the runnable.
|
||
|
go func(rn *readyRunnable) {
|
||
|
go func() {
|
||
|
if rn.Check(r.ctx) {
|
||
|
if rn.signalReady {
|
||
|
r.startReadyCh <- rn
|
||
|
}
|
||
|
}
|
||
|
}()
|
||
|
|
||
|
// If we return, the runnable ended cleanly
|
||
|
// or returned an error to the channel.
|
||
|
//
|
||
|
// We should always decrement the WaitGroup here.
|
||
|
defer r.wg.Done()
|
||
|
|
||
|
// Start the runnable.
|
||
|
if err := rn.Start(r.ctx); err != nil {
|
||
|
r.errChan <- err
|
||
|
}
|
||
|
}(runnable)
|
||
|
}
|
||
|
}
|
||
|
|
||
|
// Add should be able to be called before and after Start, but not after StopAndWait.
|
||
|
// Add should return an error when called during StopAndWait.
|
||
|
func (r *runnableGroup) Add(rn Runnable, ready runnableCheck) error {
|
||
|
r.stop.RLock()
|
||
|
if r.stopped {
|
||
|
r.stop.RUnlock()
|
||
|
return errRunnableGroupStopped
|
||
|
}
|
||
|
r.stop.RUnlock()
|
||
|
|
||
|
if ready == nil {
|
||
|
ready = func(_ context.Context) bool { return true }
|
||
|
}
|
||
|
|
||
|
readyRunnable := &readyRunnable{
|
||
|
Runnable: rn,
|
||
|
Check: ready,
|
||
|
}
|
||
|
|
||
|
// Handle start.
|
||
|
// If the overall runnable group isn't started yet
|
||
|
// we want to buffer the runnables and let Start()
|
||
|
// queue them up again later.
|
||
|
{
|
||
|
r.start.Lock()
|
||
|
|
||
|
// Check if we're already started.
|
||
|
if !r.started {
|
||
|
// Store the runnable in the internal if not.
|
||
|
r.startQueue = append(r.startQueue, readyRunnable)
|
||
|
r.start.Unlock()
|
||
|
return nil
|
||
|
}
|
||
|
r.start.Unlock()
|
||
|
}
|
||
|
|
||
|
// Enqueue the runnable.
|
||
|
r.ch <- readyRunnable
|
||
|
return nil
|
||
|
}
|
||
|
|
||
|
// StopAndWait waits for all the runnables to finish before returning.
|
||
|
func (r *runnableGroup) StopAndWait(ctx context.Context) {
|
||
|
r.stopOnce.Do(func() {
|
||
|
// Close the reconciler channel once we're done.
|
||
|
defer close(r.ch)
|
||
|
|
||
|
_ = r.Start(ctx)
|
||
|
r.stop.Lock()
|
||
|
// Store the stopped variable so we don't accept any new
|
||
|
// runnables for the time being.
|
||
|
r.stopped = true
|
||
|
r.stop.Unlock()
|
||
|
|
||
|
// Cancel the internal channel.
|
||
|
r.cancel()
|
||
|
|
||
|
done := make(chan struct{})
|
||
|
go func() {
|
||
|
defer close(done)
|
||
|
// Wait for all the runnables to finish.
|
||
|
r.wg.Wait()
|
||
|
}()
|
||
|
|
||
|
select {
|
||
|
case <-done:
|
||
|
// We're done, exit.
|
||
|
case <-ctx.Done():
|
||
|
// Calling context has expired, exit.
|
||
|
}
|
||
|
})
|
||
|
}
|