mirror of
https://github.com/ceph/ceph-csi.git
synced 2024-11-26 00:00:23 +00:00
02b8cd0b4b
Signed-off-by: Humble Chirammal <hchiramm@redhat.com>
607 lines
20 KiB
Go
607 lines
20 KiB
Go
/*
|
|
Copyright 2014 The Kubernetes Authors.
|
|
|
|
Licensed under the Apache License, Version 2.0 (the "License");
|
|
you may not use this file except in compliance with the License.
|
|
You may obtain a copy of the License at
|
|
|
|
http://www.apache.org/licenses/LICENSE-2.0
|
|
|
|
Unless required by applicable law or agreed to in writing, software
|
|
distributed under the License is distributed on an "AS IS" BASIS,
|
|
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
See the License for the specific language governing permissions and
|
|
limitations under the License.
|
|
*/
|
|
|
|
package wait
|
|
|
|
import (
|
|
"context"
|
|
"errors"
|
|
"math"
|
|
"math/rand"
|
|
"sync"
|
|
"time"
|
|
|
|
"k8s.io/apimachinery/pkg/util/clock"
|
|
"k8s.io/apimachinery/pkg/util/runtime"
|
|
)
|
|
|
|
// For any test of the style:
|
|
// ...
|
|
// <- time.After(timeout):
|
|
// t.Errorf("Timed out")
|
|
// The value for timeout should effectively be "forever." Obviously we don't want our tests to truly lock up forever, but 30s
|
|
// is long enough that it is effectively forever for the things that can slow down a run on a heavily contended machine
|
|
// (GC, seeks, etc), but not so long as to make a developer ctrl-c a test run if they do happen to break that test.
|
|
var ForeverTestTimeout = time.Second * 30
|
|
|
|
// NeverStop may be passed to Until to make it never stop.
|
|
var NeverStop <-chan struct{} = make(chan struct{})
|
|
|
|
// Group allows to start a group of goroutines and wait for their completion.
|
|
type Group struct {
|
|
wg sync.WaitGroup
|
|
}
|
|
|
|
func (g *Group) Wait() {
|
|
g.wg.Wait()
|
|
}
|
|
|
|
// StartWithChannel starts f in a new goroutine in the group.
|
|
// stopCh is passed to f as an argument. f should stop when stopCh is available.
|
|
func (g *Group) StartWithChannel(stopCh <-chan struct{}, f func(stopCh <-chan struct{})) {
|
|
g.Start(func() {
|
|
f(stopCh)
|
|
})
|
|
}
|
|
|
|
// StartWithContext starts f in a new goroutine in the group.
|
|
// ctx is passed to f as an argument. f should stop when ctx.Done() is available.
|
|
func (g *Group) StartWithContext(ctx context.Context, f func(context.Context)) {
|
|
g.Start(func() {
|
|
f(ctx)
|
|
})
|
|
}
|
|
|
|
// Start starts f in a new goroutine in the group.
|
|
func (g *Group) Start(f func()) {
|
|
g.wg.Add(1)
|
|
go func() {
|
|
defer g.wg.Done()
|
|
f()
|
|
}()
|
|
}
|
|
|
|
// Forever calls f every period for ever.
|
|
//
|
|
// Forever is syntactic sugar on top of Until.
|
|
func Forever(f func(), period time.Duration) {
|
|
Until(f, period, NeverStop)
|
|
}
|
|
|
|
// Until loops until stop channel is closed, running f every period.
|
|
//
|
|
// Until is syntactic sugar on top of JitterUntil with zero jitter factor and
|
|
// with sliding = true (which means the timer for period starts after the f
|
|
// completes).
|
|
func Until(f func(), period time.Duration, stopCh <-chan struct{}) {
|
|
JitterUntil(f, period, 0.0, true, stopCh)
|
|
}
|
|
|
|
// UntilWithContext loops until context is done, running f every period.
|
|
//
|
|
// UntilWithContext is syntactic sugar on top of JitterUntilWithContext
|
|
// with zero jitter factor and with sliding = true (which means the timer
|
|
// for period starts after the f completes).
|
|
func UntilWithContext(ctx context.Context, f func(context.Context), period time.Duration) {
|
|
JitterUntilWithContext(ctx, f, period, 0.0, true)
|
|
}
|
|
|
|
// NonSlidingUntil loops until stop channel is closed, running f every
|
|
// period.
|
|
//
|
|
// NonSlidingUntil is syntactic sugar on top of JitterUntil with zero jitter
|
|
// factor, with sliding = false (meaning the timer for period starts at the same
|
|
// time as the function starts).
|
|
func NonSlidingUntil(f func(), period time.Duration, stopCh <-chan struct{}) {
|
|
JitterUntil(f, period, 0.0, false, stopCh)
|
|
}
|
|
|
|
// NonSlidingUntilWithContext loops until context is done, running f every
|
|
// period.
|
|
//
|
|
// NonSlidingUntilWithContext is syntactic sugar on top of JitterUntilWithContext
|
|
// with zero jitter factor, with sliding = false (meaning the timer for period
|
|
// starts at the same time as the function starts).
|
|
func NonSlidingUntilWithContext(ctx context.Context, f func(context.Context), period time.Duration) {
|
|
JitterUntilWithContext(ctx, f, period, 0.0, false)
|
|
}
|
|
|
|
// JitterUntil loops until stop channel is closed, running f every period.
|
|
//
|
|
// If jitterFactor is positive, the period is jittered before every run of f.
|
|
// If jitterFactor is not positive, the period is unchanged and not jittered.
|
|
//
|
|
// If sliding is true, the period is computed after f runs. If it is false then
|
|
// period includes the runtime for f.
|
|
//
|
|
// Close stopCh to stop. f may not be invoked if stop channel is already
|
|
// closed. Pass NeverStop to if you don't want it stop.
|
|
func JitterUntil(f func(), period time.Duration, jitterFactor float64, sliding bool, stopCh <-chan struct{}) {
|
|
BackoffUntil(f, NewJitteredBackoffManager(period, jitterFactor, &clock.RealClock{}), sliding, stopCh)
|
|
}
|
|
|
|
// BackoffUntil loops until stop channel is closed, run f every duration given by BackoffManager.
|
|
//
|
|
// If sliding is true, the period is computed after f runs. If it is false then
|
|
// period includes the runtime for f.
|
|
func BackoffUntil(f func(), backoff BackoffManager, sliding bool, stopCh <-chan struct{}) {
|
|
var t clock.Timer
|
|
for {
|
|
select {
|
|
case <-stopCh:
|
|
return
|
|
default:
|
|
}
|
|
|
|
if !sliding {
|
|
t = backoff.Backoff()
|
|
}
|
|
|
|
func() {
|
|
defer runtime.HandleCrash()
|
|
f()
|
|
}()
|
|
|
|
if sliding {
|
|
t = backoff.Backoff()
|
|
}
|
|
|
|
// NOTE: b/c there is no priority selection in golang
|
|
// it is possible for this to race, meaning we could
|
|
// trigger t.C and stopCh, and t.C select falls through.
|
|
// In order to mitigate we re-check stopCh at the beginning
|
|
// of every loop to prevent extra executions of f().
|
|
select {
|
|
case <-stopCh:
|
|
return
|
|
case <-t.C():
|
|
}
|
|
}
|
|
}
|
|
|
|
// JitterUntilWithContext loops until context is done, running f every period.
|
|
//
|
|
// If jitterFactor is positive, the period is jittered before every run of f.
|
|
// If jitterFactor is not positive, the period is unchanged and not jittered.
|
|
//
|
|
// If sliding is true, the period is computed after f runs. If it is false then
|
|
// period includes the runtime for f.
|
|
//
|
|
// Cancel context to stop. f may not be invoked if context is already expired.
|
|
func JitterUntilWithContext(ctx context.Context, f func(context.Context), period time.Duration, jitterFactor float64, sliding bool) {
|
|
JitterUntil(func() { f(ctx) }, period, jitterFactor, sliding, ctx.Done())
|
|
}
|
|
|
|
// Jitter returns a time.Duration between duration and duration + maxFactor *
|
|
// duration.
|
|
//
|
|
// This allows clients to avoid converging on periodic behavior. If maxFactor
|
|
// is 0.0, a suggested default value will be chosen.
|
|
func Jitter(duration time.Duration, maxFactor float64) time.Duration {
|
|
if maxFactor <= 0.0 {
|
|
maxFactor = 1.0
|
|
}
|
|
wait := duration + time.Duration(rand.Float64()*maxFactor*float64(duration))
|
|
return wait
|
|
}
|
|
|
|
// ErrWaitTimeout is returned when the condition exited without success.
|
|
var ErrWaitTimeout = errors.New("timed out waiting for the condition")
|
|
|
|
// ConditionFunc returns true if the condition is satisfied, or an error
|
|
// if the loop should be aborted.
|
|
type ConditionFunc func() (done bool, err error)
|
|
|
|
// runConditionWithCrashProtection runs a ConditionFunc with crash protection
|
|
func runConditionWithCrashProtection(condition ConditionFunc) (bool, error) {
|
|
defer runtime.HandleCrash()
|
|
return condition()
|
|
}
|
|
|
|
// Backoff holds parameters applied to a Backoff function.
|
|
type Backoff struct {
|
|
// The initial duration.
|
|
Duration time.Duration
|
|
// Duration is multiplied by factor each iteration, if factor is not zero
|
|
// and the limits imposed by Steps and Cap have not been reached.
|
|
// Should not be negative.
|
|
// The jitter does not contribute to the updates to the duration parameter.
|
|
Factor float64
|
|
// The sleep at each iteration is the duration plus an additional
|
|
// amount chosen uniformly at random from the interval between
|
|
// zero and `jitter*duration`.
|
|
Jitter float64
|
|
// The remaining number of iterations in which the duration
|
|
// parameter may change (but progress can be stopped earlier by
|
|
// hitting the cap). If not positive, the duration is not
|
|
// changed. Used for exponential backoff in combination with
|
|
// Factor and Cap.
|
|
Steps int
|
|
// A limit on revised values of the duration parameter. If a
|
|
// multiplication by the factor parameter would make the duration
|
|
// exceed the cap then the duration is set to the cap and the
|
|
// steps parameter is set to zero.
|
|
Cap time.Duration
|
|
}
|
|
|
|
// Step (1) returns an amount of time to sleep determined by the
|
|
// original Duration and Jitter and (2) mutates the provided Backoff
|
|
// to update its Steps and Duration.
|
|
func (b *Backoff) Step() time.Duration {
|
|
if b.Steps < 1 {
|
|
if b.Jitter > 0 {
|
|
return Jitter(b.Duration, b.Jitter)
|
|
}
|
|
return b.Duration
|
|
}
|
|
b.Steps--
|
|
|
|
duration := b.Duration
|
|
|
|
// calculate the next step
|
|
if b.Factor != 0 {
|
|
b.Duration = time.Duration(float64(b.Duration) * b.Factor)
|
|
if b.Cap > 0 && b.Duration > b.Cap {
|
|
b.Duration = b.Cap
|
|
b.Steps = 0
|
|
}
|
|
}
|
|
|
|
if b.Jitter > 0 {
|
|
duration = Jitter(duration, b.Jitter)
|
|
}
|
|
return duration
|
|
}
|
|
|
|
// contextForChannel derives a child context from a parent channel.
|
|
//
|
|
// The derived context's Done channel is closed when the returned cancel function
|
|
// is called or when the parent channel is closed, whichever happens first.
|
|
//
|
|
// Note the caller must *always* call the CancelFunc, otherwise resources may be leaked.
|
|
func contextForChannel(parentCh <-chan struct{}) (context.Context, context.CancelFunc) {
|
|
ctx, cancel := context.WithCancel(context.Background())
|
|
|
|
go func() {
|
|
select {
|
|
case <-parentCh:
|
|
cancel()
|
|
case <-ctx.Done():
|
|
}
|
|
}()
|
|
return ctx, cancel
|
|
}
|
|
|
|
// BackoffManager manages backoff with a particular scheme based on its underlying implementation. It provides
|
|
// an interface to return a timer for backoff, and caller shall backoff until Timer.C() drains. If the second Backoff()
|
|
// is called before the timer from the first Backoff() call finishes, the first timer will NOT be drained and result in
|
|
// undetermined behavior.
|
|
// The BackoffManager is supposed to be called in a single-threaded environment.
|
|
type BackoffManager interface {
|
|
Backoff() clock.Timer
|
|
}
|
|
|
|
type exponentialBackoffManagerImpl struct {
|
|
backoff *Backoff
|
|
backoffTimer clock.Timer
|
|
lastBackoffStart time.Time
|
|
initialBackoff time.Duration
|
|
backoffResetDuration time.Duration
|
|
clock clock.Clock
|
|
}
|
|
|
|
// NewExponentialBackoffManager returns a manager for managing exponential backoff. Each backoff is jittered and
|
|
// backoff will not exceed the given max. If the backoff is not called within resetDuration, the backoff is reset.
|
|
// This backoff manager is used to reduce load during upstream unhealthiness.
|
|
func NewExponentialBackoffManager(initBackoff, maxBackoff, resetDuration time.Duration, backoffFactor, jitter float64, c clock.Clock) BackoffManager {
|
|
return &exponentialBackoffManagerImpl{
|
|
backoff: &Backoff{
|
|
Duration: initBackoff,
|
|
Factor: backoffFactor,
|
|
Jitter: jitter,
|
|
|
|
// the current impl of wait.Backoff returns Backoff.Duration once steps are used up, which is not
|
|
// what we ideally need here, we set it to max int and assume we will never use up the steps
|
|
Steps: math.MaxInt32,
|
|
Cap: maxBackoff,
|
|
},
|
|
backoffTimer: nil,
|
|
initialBackoff: initBackoff,
|
|
lastBackoffStart: c.Now(),
|
|
backoffResetDuration: resetDuration,
|
|
clock: c,
|
|
}
|
|
}
|
|
|
|
func (b *exponentialBackoffManagerImpl) getNextBackoff() time.Duration {
|
|
if b.clock.Now().Sub(b.lastBackoffStart) > b.backoffResetDuration {
|
|
b.backoff.Steps = math.MaxInt32
|
|
b.backoff.Duration = b.initialBackoff
|
|
}
|
|
b.lastBackoffStart = b.clock.Now()
|
|
return b.backoff.Step()
|
|
}
|
|
|
|
// Backoff implements BackoffManager.Backoff, it returns a timer so caller can block on the timer for exponential backoff.
|
|
// The returned timer must be drained before calling Backoff() the second time
|
|
func (b *exponentialBackoffManagerImpl) Backoff() clock.Timer {
|
|
if b.backoffTimer == nil {
|
|
b.backoffTimer = b.clock.NewTimer(b.getNextBackoff())
|
|
} else {
|
|
b.backoffTimer.Reset(b.getNextBackoff())
|
|
}
|
|
return b.backoffTimer
|
|
}
|
|
|
|
type jitteredBackoffManagerImpl struct {
|
|
clock clock.Clock
|
|
duration time.Duration
|
|
jitter float64
|
|
backoffTimer clock.Timer
|
|
}
|
|
|
|
// NewJitteredBackoffManager returns a BackoffManager that backoffs with given duration plus given jitter. If the jitter
|
|
// is negative, backoff will not be jittered.
|
|
func NewJitteredBackoffManager(duration time.Duration, jitter float64, c clock.Clock) BackoffManager {
|
|
return &jitteredBackoffManagerImpl{
|
|
clock: c,
|
|
duration: duration,
|
|
jitter: jitter,
|
|
backoffTimer: nil,
|
|
}
|
|
}
|
|
|
|
func (j *jitteredBackoffManagerImpl) getNextBackoff() time.Duration {
|
|
jitteredPeriod := j.duration
|
|
if j.jitter > 0.0 {
|
|
jitteredPeriod = Jitter(j.duration, j.jitter)
|
|
}
|
|
return jitteredPeriod
|
|
}
|
|
|
|
// Backoff implements BackoffManager.Backoff, it returns a timer so caller can block on the timer for jittered backoff.
|
|
// The returned timer must be drained before calling Backoff() the second time
|
|
func (j *jitteredBackoffManagerImpl) Backoff() clock.Timer {
|
|
backoff := j.getNextBackoff()
|
|
if j.backoffTimer == nil {
|
|
j.backoffTimer = j.clock.NewTimer(backoff)
|
|
} else {
|
|
j.backoffTimer.Reset(backoff)
|
|
}
|
|
return j.backoffTimer
|
|
}
|
|
|
|
// ExponentialBackoff repeats a condition check with exponential backoff.
|
|
//
|
|
// It repeatedly checks the condition and then sleeps, using `backoff.Step()`
|
|
// to determine the length of the sleep and adjust Duration and Steps.
|
|
// Stops and returns as soon as:
|
|
// 1. the condition check returns true or an error,
|
|
// 2. `backoff.Steps` checks of the condition have been done, or
|
|
// 3. a sleep truncated by the cap on duration has been completed.
|
|
// In case (1) the returned error is what the condition function returned.
|
|
// In all other cases, ErrWaitTimeout is returned.
|
|
func ExponentialBackoff(backoff Backoff, condition ConditionFunc) error {
|
|
for backoff.Steps > 0 {
|
|
if ok, err := runConditionWithCrashProtection(condition); err != nil || ok {
|
|
return err
|
|
}
|
|
if backoff.Steps == 1 {
|
|
break
|
|
}
|
|
time.Sleep(backoff.Step())
|
|
}
|
|
return ErrWaitTimeout
|
|
}
|
|
|
|
// Poll tries a condition func until it returns true, an error, or the timeout
|
|
// is reached.
|
|
//
|
|
// Poll always waits the interval before the run of 'condition'.
|
|
// 'condition' will always be invoked at least once.
|
|
//
|
|
// Some intervals may be missed if the condition takes too long or the time
|
|
// window is too short.
|
|
//
|
|
// If you want to Poll something forever, see PollInfinite.
|
|
func Poll(interval, timeout time.Duration, condition ConditionFunc) error {
|
|
return pollInternal(poller(interval, timeout), condition)
|
|
}
|
|
|
|
func pollInternal(wait WaitFunc, condition ConditionFunc) error {
|
|
done := make(chan struct{})
|
|
defer close(done)
|
|
return WaitFor(wait, condition, done)
|
|
}
|
|
|
|
// PollImmediate tries a condition func until it returns true, an error, or the timeout
|
|
// is reached.
|
|
//
|
|
// PollImmediate always checks 'condition' before waiting for the interval. 'condition'
|
|
// will always be invoked at least once.
|
|
//
|
|
// Some intervals may be missed if the condition takes too long or the time
|
|
// window is too short.
|
|
//
|
|
// If you want to immediately Poll something forever, see PollImmediateInfinite.
|
|
func PollImmediate(interval, timeout time.Duration, condition ConditionFunc) error {
|
|
return pollImmediateInternal(poller(interval, timeout), condition)
|
|
}
|
|
|
|
func pollImmediateInternal(wait WaitFunc, condition ConditionFunc) error {
|
|
done, err := runConditionWithCrashProtection(condition)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
if done {
|
|
return nil
|
|
}
|
|
return pollInternal(wait, condition)
|
|
}
|
|
|
|
// PollInfinite tries a condition func until it returns true or an error
|
|
//
|
|
// PollInfinite always waits the interval before the run of 'condition'.
|
|
//
|
|
// Some intervals may be missed if the condition takes too long or the time
|
|
// window is too short.
|
|
func PollInfinite(interval time.Duration, condition ConditionFunc) error {
|
|
done := make(chan struct{})
|
|
defer close(done)
|
|
return PollUntil(interval, condition, done)
|
|
}
|
|
|
|
// PollImmediateInfinite tries a condition func until it returns true or an error
|
|
//
|
|
// PollImmediateInfinite runs the 'condition' before waiting for the interval.
|
|
//
|
|
// Some intervals may be missed if the condition takes too long or the time
|
|
// window is too short.
|
|
func PollImmediateInfinite(interval time.Duration, condition ConditionFunc) error {
|
|
done, err := runConditionWithCrashProtection(condition)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
if done {
|
|
return nil
|
|
}
|
|
return PollInfinite(interval, condition)
|
|
}
|
|
|
|
// PollUntil tries a condition func until it returns true, an error or stopCh is
|
|
// closed.
|
|
//
|
|
// PollUntil always waits interval before the first run of 'condition'.
|
|
// 'condition' will always be invoked at least once.
|
|
func PollUntil(interval time.Duration, condition ConditionFunc, stopCh <-chan struct{}) error {
|
|
ctx, cancel := contextForChannel(stopCh)
|
|
defer cancel()
|
|
return WaitFor(poller(interval, 0), condition, ctx.Done())
|
|
}
|
|
|
|
// PollImmediateUntil tries a condition func until it returns true, an error or stopCh is closed.
|
|
//
|
|
// PollImmediateUntil runs the 'condition' before waiting for the interval.
|
|
// 'condition' will always be invoked at least once.
|
|
func PollImmediateUntil(interval time.Duration, condition ConditionFunc, stopCh <-chan struct{}) error {
|
|
done, err := condition()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
if done {
|
|
return nil
|
|
}
|
|
select {
|
|
case <-stopCh:
|
|
return ErrWaitTimeout
|
|
default:
|
|
return PollUntil(interval, condition, stopCh)
|
|
}
|
|
}
|
|
|
|
// WaitFunc creates a channel that receives an item every time a test
|
|
// should be executed and is closed when the last test should be invoked.
|
|
type WaitFunc func(done <-chan struct{}) <-chan struct{}
|
|
|
|
// WaitFor continually checks 'fn' as driven by 'wait'.
|
|
//
|
|
// WaitFor gets a channel from 'wait()'', and then invokes 'fn' once for every value
|
|
// placed on the channel and once more when the channel is closed. If the channel is closed
|
|
// and 'fn' returns false without error, WaitFor returns ErrWaitTimeout.
|
|
//
|
|
// If 'fn' returns an error the loop ends and that error is returned. If
|
|
// 'fn' returns true the loop ends and nil is returned.
|
|
//
|
|
// ErrWaitTimeout will be returned if the 'done' channel is closed without fn ever
|
|
// returning true.
|
|
//
|
|
// When the done channel is closed, because the golang `select` statement is
|
|
// "uniform pseudo-random", the `fn` might still run one or multiple time,
|
|
// though eventually `WaitFor` will return.
|
|
func WaitFor(wait WaitFunc, fn ConditionFunc, done <-chan struct{}) error {
|
|
stopCh := make(chan struct{})
|
|
defer close(stopCh)
|
|
c := wait(stopCh)
|
|
for {
|
|
select {
|
|
case _, open := <-c:
|
|
ok, err := runConditionWithCrashProtection(fn)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
if ok {
|
|
return nil
|
|
}
|
|
if !open {
|
|
return ErrWaitTimeout
|
|
}
|
|
case <-done:
|
|
return ErrWaitTimeout
|
|
}
|
|
}
|
|
}
|
|
|
|
// poller returns a WaitFunc that will send to the channel every interval until
|
|
// timeout has elapsed and then closes the channel.
|
|
//
|
|
// Over very short intervals you may receive no ticks before the channel is
|
|
// closed. A timeout of 0 is interpreted as an infinity, and in such a case
|
|
// it would be the caller's responsibility to close the done channel.
|
|
// Failure to do so would result in a leaked goroutine.
|
|
//
|
|
// Output ticks are not buffered. If the channel is not ready to receive an
|
|
// item, the tick is skipped.
|
|
func poller(interval, timeout time.Duration) WaitFunc {
|
|
return WaitFunc(func(done <-chan struct{}) <-chan struct{} {
|
|
ch := make(chan struct{})
|
|
|
|
go func() {
|
|
defer close(ch)
|
|
|
|
tick := time.NewTicker(interval)
|
|
defer tick.Stop()
|
|
|
|
var after <-chan time.Time
|
|
if timeout != 0 {
|
|
// time.After is more convenient, but it
|
|
// potentially leaves timers around much longer
|
|
// than necessary if we exit early.
|
|
timer := time.NewTimer(timeout)
|
|
after = timer.C
|
|
defer timer.Stop()
|
|
}
|
|
|
|
for {
|
|
select {
|
|
case <-tick.C:
|
|
// If the consumer isn't ready for this signal drop it and
|
|
// check the other channels.
|
|
select {
|
|
case ch <- struct{}{}:
|
|
default:
|
|
}
|
|
case <-after:
|
|
return
|
|
case <-done:
|
|
return
|
|
}
|
|
}
|
|
}()
|
|
|
|
return ch
|
|
})
|
|
}
|