mirror of
https://github.com/ceph/ceph-csi.git
synced 2025-03-10 09:29:30 +00:00
Several packages are only used while running the e2e suite. These packages are less important to update, as the they can not influence the final executable that is part of the Ceph-CSI container-image. By moving these dependencies out of the main Ceph-CSI go.mod, it is easier to identify if a reported CVE affects Ceph-CSI, or only the testing (like most of the Kubernetes CVEs). Signed-off-by: Niels de Vos <ndevos@ibm.com>
268 lines
8.0 KiB
Go
268 lines
8.0 KiB
Go
//
|
|
// Written by Maxim Khitrov (November 2012)
|
|
//
|
|
|
|
// Package flowrate provides the tools for monitoring and limiting the flow rate
|
|
// of an arbitrary data stream.
|
|
package flowrate
|
|
|
|
import (
|
|
"math"
|
|
"sync"
|
|
"time"
|
|
)
|
|
|
|
// Monitor monitors and limits the transfer rate of a data stream.
|
|
type Monitor struct {
|
|
mu sync.Mutex // Mutex guarding access to all internal fields
|
|
active bool // Flag indicating an active transfer
|
|
start time.Duration // Transfer start time (clock() value)
|
|
bytes int64 // Total number of bytes transferred
|
|
samples int64 // Total number of samples taken
|
|
|
|
rSample float64 // Most recent transfer rate sample (bytes per second)
|
|
rEMA float64 // Exponential moving average of rSample
|
|
rPeak float64 // Peak transfer rate (max of all rSamples)
|
|
rWindow float64 // rEMA window (seconds)
|
|
|
|
sBytes int64 // Number of bytes transferred since sLast
|
|
sLast time.Duration // Most recent sample time (stop time when inactive)
|
|
sRate time.Duration // Sampling rate
|
|
|
|
tBytes int64 // Number of bytes expected in the current transfer
|
|
tLast time.Duration // Time of the most recent transfer of at least 1 byte
|
|
}
|
|
|
|
// New creates a new flow control monitor. Instantaneous transfer rate is
|
|
// measured and updated for each sampleRate interval. windowSize determines the
|
|
// weight of each sample in the exponential moving average (EMA) calculation.
|
|
// The exact formulas are:
|
|
//
|
|
// sampleTime = currentTime - prevSampleTime
|
|
// sampleRate = byteCount / sampleTime
|
|
// weight = 1 - exp(-sampleTime/windowSize)
|
|
// newRate = weight*sampleRate + (1-weight)*oldRate
|
|
//
|
|
// The default values for sampleRate and windowSize (if <= 0) are 100ms and 1s,
|
|
// respectively.
|
|
func New(sampleRate, windowSize time.Duration) *Monitor {
|
|
if sampleRate = clockRound(sampleRate); sampleRate <= 0 {
|
|
sampleRate = 5 * clockRate
|
|
}
|
|
if windowSize <= 0 {
|
|
windowSize = 1 * time.Second
|
|
}
|
|
now := clock()
|
|
return &Monitor{
|
|
active: true,
|
|
start: now,
|
|
rWindow: windowSize.Seconds(),
|
|
sLast: now,
|
|
sRate: sampleRate,
|
|
tLast: now,
|
|
}
|
|
}
|
|
|
|
// Update records the transfer of n bytes and returns n. It should be called
|
|
// after each Read/Write operation, even if n is 0.
|
|
func (m *Monitor) Update(n int) int {
|
|
m.mu.Lock()
|
|
m.update(n)
|
|
m.mu.Unlock()
|
|
return n
|
|
}
|
|
|
|
// IO is a convenience method intended to wrap io.Reader and io.Writer method
|
|
// execution. It calls m.Update(n) and then returns (n, err) unmodified.
|
|
func (m *Monitor) IO(n int, err error) (int, error) {
|
|
return m.Update(n), err
|
|
}
|
|
|
|
// Done marks the transfer as finished and prevents any further updates or
|
|
// limiting. Instantaneous and current transfer rates drop to 0. Update, IO, and
|
|
// Limit methods become NOOPs. It returns the total number of bytes transferred.
|
|
func (m *Monitor) Done() int64 {
|
|
m.mu.Lock()
|
|
if now := m.update(0); m.sBytes > 0 {
|
|
m.reset(now)
|
|
}
|
|
m.active = false
|
|
m.tLast = 0
|
|
n := m.bytes
|
|
m.mu.Unlock()
|
|
return n
|
|
}
|
|
|
|
// timeRemLimit is the maximum Status.TimeRem value.
|
|
const timeRemLimit = 999*time.Hour + 59*time.Minute + 59*time.Second
|
|
|
|
// Status represents the current Monitor status. All transfer rates are in bytes
|
|
// per second rounded to the nearest byte.
|
|
type Status struct {
|
|
Active bool // Flag indicating an active transfer
|
|
Start time.Time // Transfer start time
|
|
Duration time.Duration // Time period covered by the statistics
|
|
Idle time.Duration // Time since the last transfer of at least 1 byte
|
|
Bytes int64 // Total number of bytes transferred
|
|
Samples int64 // Total number of samples taken
|
|
InstRate int64 // Instantaneous transfer rate
|
|
CurRate int64 // Current transfer rate (EMA of InstRate)
|
|
AvgRate int64 // Average transfer rate (Bytes / Duration)
|
|
PeakRate int64 // Maximum instantaneous transfer rate
|
|
BytesRem int64 // Number of bytes remaining in the transfer
|
|
TimeRem time.Duration // Estimated time to completion
|
|
Progress Percent // Overall transfer progress
|
|
}
|
|
|
|
// Status returns current transfer status information. The returned value
|
|
// becomes static after a call to Done.
|
|
func (m *Monitor) Status() Status {
|
|
m.mu.Lock()
|
|
now := m.update(0)
|
|
s := Status{
|
|
Active: m.active,
|
|
Start: clockToTime(m.start),
|
|
Duration: m.sLast - m.start,
|
|
Idle: now - m.tLast,
|
|
Bytes: m.bytes,
|
|
Samples: m.samples,
|
|
PeakRate: round(m.rPeak),
|
|
BytesRem: m.tBytes - m.bytes,
|
|
Progress: percentOf(float64(m.bytes), float64(m.tBytes)),
|
|
}
|
|
if s.BytesRem < 0 {
|
|
s.BytesRem = 0
|
|
}
|
|
if s.Duration > 0 {
|
|
rAvg := float64(s.Bytes) / s.Duration.Seconds()
|
|
s.AvgRate = round(rAvg)
|
|
if s.Active {
|
|
s.InstRate = round(m.rSample)
|
|
s.CurRate = round(m.rEMA)
|
|
if s.BytesRem > 0 {
|
|
if tRate := 0.8*m.rEMA + 0.2*rAvg; tRate > 0 {
|
|
ns := float64(s.BytesRem) / tRate * 1e9
|
|
if ns > float64(timeRemLimit) {
|
|
ns = float64(timeRemLimit)
|
|
}
|
|
s.TimeRem = clockRound(time.Duration(ns))
|
|
}
|
|
}
|
|
}
|
|
}
|
|
m.mu.Unlock()
|
|
return s
|
|
}
|
|
|
|
// Limit restricts the instantaneous (per-sample) data flow to rate bytes per
|
|
// second. It returns the maximum number of bytes (0 <= n <= want) that may be
|
|
// transferred immediately without exceeding the limit. If block == true, the
|
|
// call blocks until n > 0. want is returned unmodified if want < 1, rate < 1,
|
|
// or the transfer is inactive (after a call to Done).
|
|
//
|
|
// At least one byte is always allowed to be transferred in any given sampling
|
|
// period. Thus, if the sampling rate is 100ms, the lowest achievable flow rate
|
|
// is 10 bytes per second.
|
|
//
|
|
// For usage examples, see the implementation of Reader and Writer in io.go.
|
|
func (m *Monitor) Limit(want int, rate int64, block bool) (n int) {
|
|
if want < 1 || rate < 1 {
|
|
return want
|
|
}
|
|
m.mu.Lock()
|
|
|
|
// Determine the maximum number of bytes that can be sent in one sample
|
|
limit := round(float64(rate) * m.sRate.Seconds())
|
|
if limit <= 0 {
|
|
limit = 1
|
|
}
|
|
|
|
// If block == true, wait until m.sBytes < limit
|
|
if now := m.update(0); block {
|
|
for m.sBytes >= limit && m.active {
|
|
now = m.waitNextSample(now)
|
|
}
|
|
}
|
|
|
|
// Make limit <= want (unlimited if the transfer is no longer active)
|
|
if limit -= m.sBytes; limit > int64(want) || !m.active {
|
|
limit = int64(want)
|
|
}
|
|
m.mu.Unlock()
|
|
|
|
if limit < 0 {
|
|
limit = 0
|
|
}
|
|
return int(limit)
|
|
}
|
|
|
|
// SetTransferSize specifies the total size of the data transfer, which allows
|
|
// the Monitor to calculate the overall progress and time to completion.
|
|
func (m *Monitor) SetTransferSize(bytes int64) {
|
|
if bytes < 0 {
|
|
bytes = 0
|
|
}
|
|
m.mu.Lock()
|
|
m.tBytes = bytes
|
|
m.mu.Unlock()
|
|
}
|
|
|
|
// update accumulates the transferred byte count for the current sample until
|
|
// clock() - m.sLast >= m.sRate. The monitor status is updated once the current
|
|
// sample is done.
|
|
func (m *Monitor) update(n int) (now time.Duration) {
|
|
if !m.active {
|
|
return
|
|
}
|
|
if now = clock(); n > 0 {
|
|
m.tLast = now
|
|
}
|
|
m.sBytes += int64(n)
|
|
if sTime := now - m.sLast; sTime >= m.sRate {
|
|
t := sTime.Seconds()
|
|
if m.rSample = float64(m.sBytes) / t; m.rSample > m.rPeak {
|
|
m.rPeak = m.rSample
|
|
}
|
|
|
|
// Exponential moving average using a method similar to *nix load
|
|
// average calculation. Longer sampling periods carry greater weight.
|
|
if m.samples > 0 {
|
|
w := math.Exp(-t / m.rWindow)
|
|
m.rEMA = m.rSample + w*(m.rEMA-m.rSample)
|
|
} else {
|
|
m.rEMA = m.rSample
|
|
}
|
|
m.reset(now)
|
|
}
|
|
return
|
|
}
|
|
|
|
// reset clears the current sample state in preparation for the next sample.
|
|
func (m *Monitor) reset(sampleTime time.Duration) {
|
|
m.bytes += m.sBytes
|
|
m.samples++
|
|
m.sBytes = 0
|
|
m.sLast = sampleTime
|
|
}
|
|
|
|
// waitNextSample sleeps for the remainder of the current sample. The lock is
|
|
// released and reacquired during the actual sleep period, so it's possible for
|
|
// the transfer to be inactive when this method returns.
|
|
func (m *Monitor) waitNextSample(now time.Duration) time.Duration {
|
|
const minWait = 5 * time.Millisecond
|
|
current := m.sLast
|
|
|
|
// sleep until the last sample time changes (ideally, just one iteration)
|
|
for m.sLast == current && m.active {
|
|
d := current + m.sRate - now
|
|
m.mu.Unlock()
|
|
if d < minWait {
|
|
d = minWait
|
|
}
|
|
time.Sleep(d)
|
|
m.mu.Lock()
|
|
now = m.update(0)
|
|
}
|
|
return now
|
|
}
|