rebase: update kubernetes to 1.28.0 in main

updating kubernetes to 1.28.0
in the main repo.

Signed-off-by: Madhu Rajanna <madhupr007@gmail.com>
This commit is contained in:
Madhu Rajanna
2023-08-17 07:15:28 +02:00
committed by mergify[bot]
parent b2fdc269c3
commit ff3e84ad67
706 changed files with 45252 additions and 16346 deletions

View File

@ -1,15 +1,15 @@
# See the OWNERS docs at https://go.k8s.io/owners
approvers:
- lavalamp
- deads2k
- yue9944882
- MikeSpreitzer
reviewers:
- lavalamp
- deads2k
- yue9944882
- MikeSpreitzer
labels:
- sig/api-machinery
- area/apiserver
emeritus_approvers:
- lavalamp

View File

@ -58,6 +58,11 @@ import (
const timeFmt = "2006-01-02T15:04:05.999"
const (
// priorityLevelMaxSeatsPercent is the percentage of the nominalCL used as max seats allocatable from work estimator
priorityLevelMaxSeatsPercent = float64(0.15)
)
// This file contains a simple local (to the apiserver) controller
// that digests API Priority and Fairness config objects (FlowSchema
// and PriorityLevelConfiguration) into the data structure that the
@ -151,6 +156,12 @@ type configController struct {
// watchTracker implements the necessary WatchTracker interface.
WatchTracker
// MaxSeatsTracker tracks the maximum seats that should be allocatable from the
// work estimator for a given priority level. This controller does not enforce
// any limits on max seats stored in this tracker, it is up to the work estimator
// to set lower/upper limits on max seats (currently min=1, max=10).
MaxSeatsTracker
// the most recent update attempts, ordered by increasing age.
// Consumer trims to keep only the last minute's worth of entries.
// The controller uses this to limit itself to at most six updates
@ -197,16 +208,15 @@ type priorityLevelState struct {
pl *flowcontrol.PriorityLevelConfiguration
// qsCompleter holds the QueueSetCompleter derived from `config`
// and `queues` if config is not exempt, nil otherwise.
// and `queues`.
qsCompleter fq.QueueSetCompleter
// The QueueSet for this priority level. This is nil if and only
// if the priority level is exempt.
// The QueueSet for this priority level.
// Never nil.
queues fq.QueueSet
// quiescing==true indicates that this priority level should be
// removed when its queues have all drained. May be true only if
// queues is non-nil.
// removed when its queues have all drained.
quiescing bool
// number of goroutines between Controller::Match and calling the
@ -275,6 +285,7 @@ func newTestableController(config TestableConfig) *configController {
flowcontrolClient: config.FlowcontrolClient,
priorityLevelStates: make(map[string]*priorityLevelState),
WatchTracker: NewWatchTracker(),
MaxSeatsTracker: NewMaxSeatsTracker(),
}
klog.V(2).Infof("NewTestableController %q with serverConcurrencyLimit=%d, requestWaitLimit=%s, name=%s, asFieldManager=%q", cfgCtlr.name, cfgCtlr.serverConcurrencyLimit, cfgCtlr.requestWaitLimit, cfgCtlr.name, cfgCtlr.asFieldManager)
// Start with longish delay because conflicts will be between
@ -384,9 +395,6 @@ func (cfgCtlr *configController) updateBorrowingLocked(setCompleters bool, plSta
items := make([]allocProblemItem, 0, len(plStates))
plNames := make([]string, 0, len(plStates))
for plName, plState := range plStates {
if plState.pl.Spec.Limited == nil {
continue
}
obs := plState.seatDemandIntegrator.Reset()
plState.seatDemandStats.update(obs)
// Lower bound on this priority level's adjusted concurreny limit is the lesser of:
@ -403,7 +411,7 @@ func (cfgCtlr *configController) updateBorrowingLocked(setCompleters bool, plSta
})
}
if len(items) == 0 && cfgCtlr.nominalCLSum > 0 {
klog.ErrorS(nil, "Impossible: no non-exempt priority levels", "plStates", cfgCtlr.priorityLevelStates)
klog.ErrorS(nil, "Impossible: no priority levels", "plStates", cfgCtlr.priorityLevelStates)
return
}
allocs, fairFrac, err := computeConcurrencyAllocation(cfgCtlr.nominalCLSum, items)
@ -412,17 +420,11 @@ func (cfgCtlr *configController) updateBorrowingLocked(setCompleters bool, plSta
allocs = make([]float64, len(items))
for idx, plName := range plNames {
plState := plStates[plName]
if plState.pl.Spec.Limited == nil {
continue
}
allocs[idx] = float64(plState.currentCL)
}
}
for idx, plName := range plNames {
plState := plStates[plName]
if plState.pl.Spec.Limited == nil {
continue
}
if setCompleters {
qsCompleter, err := queueSetCompleterForPL(cfgCtlr.queueSetFactory, plState.queues,
plState.pl, cfgCtlr.requestWaitLimit, plState.reqsGaugePair, plState.execSeatsObs,
@ -441,8 +443,15 @@ func (cfgCtlr *configController) updateBorrowingLocked(setCompleters bool, plSta
if relChange >= 0.05 {
logLevel = 2
}
klog.V(logLevel).InfoS("Update CurrentCL", "plName", plName, "seatDemandHighWatermark", plState.seatDemandStats.highWatermark, "seatDemandAvg", plState.seatDemandStats.avg, "seatDemandStdev", plState.seatDemandStats.stdDev, "seatDemandSmoothed", plState.seatDemandStats.smoothed, "fairFrac", fairFrac, "currentCL", currentCL, "backstop", err != nil)
plState.queues = plState.qsCompleter.Complete(fq.DispatchingConfig{ConcurrencyLimit: currentCL})
var concurrencyDenominator int
if currentCL > 0 {
concurrencyDenominator = currentCL
} else {
concurrencyDenominator = int(math.Max(1, math.Round(float64(cfgCtlr.serverConcurrencyLimit)/10)))
}
plState.seatDemandRatioedGauge.SetDenominator(float64(concurrencyDenominator))
klog.V(logLevel).InfoS("Update CurrentCL", "plName", plName, "seatDemandHighWatermark", plState.seatDemandStats.highWatermark, "seatDemandAvg", plState.seatDemandStats.avg, "seatDemandStdev", plState.seatDemandStats.stdDev, "seatDemandSmoothed", plState.seatDemandStats.smoothed, "fairFrac", fairFrac, "currentCL", currentCL, "concurrencyDenominator", concurrencyDenominator, "backstop", err != nil)
plState.queues = plState.qsCompleter.Complete(fq.DispatchingConfig{ConcurrencyLimit: currentCL, ConcurrencyDenominator: concurrencyDenominator})
}
metrics.SetFairFrac(float64(fairFrac))
}
@ -690,9 +699,8 @@ func (meal *cfgMeal) digestNewPLsLocked(newPLs []*flowcontrol.PriorityLevelConfi
klog.V(3).Infof("Priority level %q was undesired and has become desired again", pl.Name)
state.quiescing = false
}
if state.pl.Spec.Limited != nil {
meal.shareSum += float64(state.pl.Spec.Limited.NominalConcurrencyShares)
}
nominalConcurrencyShares, _, _ := plSpecCommons(state.pl)
meal.shareSum += float64(nominalConcurrencyShares)
meal.haveExemptPL = meal.haveExemptPL || pl.Name == flowcontrol.PriorityLevelConfigurationNameExempt
meal.haveCatchAllPL = meal.haveCatchAllPL || pl.Name == flowcontrol.PriorityLevelConfigurationNameCatchAll
}
@ -765,15 +773,16 @@ func (meal *cfgMeal) processOldPLsLocked() {
continue
}
if plName == flowcontrol.PriorityLevelConfigurationNameExempt && !meal.haveExemptPL || plName == flowcontrol.PriorityLevelConfigurationNameCatchAll && !meal.haveCatchAllPL {
// BTW, we know the Spec has not changed because the
// mandatory objects have immutable Specs
// BTW, we know the Spec has not changed what is says about queuing because the
// mandatory objects have immutable Specs as far as queuing is concerned.
klog.V(3).Infof("Retaining mandatory priority level %q despite lack of API object", plName)
} else {
if plState.queues == nil || plState.numPending == 0 && plState.queues.IsIdle() {
// Either there are no queues or they are done
if plState.numPending == 0 && plState.queues.IsIdle() {
// The QueueSet is done
// draining and no use is coming from another
// goroutine
klog.V(3).Infof("Removing undesired priority level %q (nilQueues=%v), Type=%v", plName, plState.queues == nil, plState.pl.Spec.Type)
klog.V(3).Infof("Removing undesired priority level %q, Type=%v", plName, plState.pl.Spec.Type)
meal.cfgCtlr.MaxSeatsTracker.ForgetPriorityLevel(plName)
continue
}
if !plState.quiescing {
@ -789,15 +798,14 @@ func (meal *cfgMeal) processOldPLsLocked() {
// This can not happen because queueSetCompleterForPL already approved this config
panic(fmt.Sprintf("%s from name=%q spec=%s", err, plName, fcfmt.Fmt(plState.pl.Spec)))
}
if plState.pl.Spec.Limited != nil {
// We deliberately include the lingering priority levels
// here so that their queues get some concurrency and they
// continue to drain. During this interim a lingering
// priority level continues to get a concurrency
// allocation determined by all the share values in the
// regular way.
meal.shareSum += float64(plState.pl.Spec.Limited.NominalConcurrencyShares)
}
// We deliberately include the lingering priority levels
// here so that their queues get some concurrency and they
// continue to drain. During this interim a lingering
// priority level continues to get a concurrency
// allocation determined by all the share values in the
// regular way.
nominalConcurrencyShares, _, _ := plSpecCommons(plState.pl)
meal.shareSum += float64(nominalConcurrencyShares)
meal.haveExemptPL = meal.haveExemptPL || plName == flowcontrol.PriorityLevelConfigurationNameExempt
meal.haveCatchAllPL = meal.haveCatchAllPL || plName == flowcontrol.PriorityLevelConfigurationNameCatchAll
meal.newPLStates[plName] = plState
@ -809,41 +817,46 @@ func (meal *cfgMeal) processOldPLsLocked() {
// QueueSets.
func (meal *cfgMeal) finishQueueSetReconfigsLocked() {
for plName, plState := range meal.newPLStates {
if plState.pl.Spec.Limited == nil {
klog.V(5).Infof("Using exempt priority level %q: quiescing=%v", plName, plState.quiescing)
continue
}
limited := plState.pl.Spec.Limited
nominalConcurrencyShares, lendablePercent, borrowingLimitPercent := plSpecCommons(plState.pl)
// The use of math.Ceil here means that the results might sum
// to a little more than serverConcurrencyLimit but the
// difference will be negligible.
concurrencyLimit := int(math.Ceil(float64(meal.cfgCtlr.serverConcurrencyLimit) * float64(limited.NominalConcurrencyShares) / meal.shareSum))
concurrencyLimit := int(math.Ceil(float64(meal.cfgCtlr.serverConcurrencyLimit) * float64(nominalConcurrencyShares) / meal.shareSum))
var lendableCL, borrowingCL int
if limited.LendablePercent != nil {
lendableCL = int(math.Round(float64(concurrencyLimit) * float64(*limited.LendablePercent) / 100))
if lendablePercent != nil {
lendableCL = int(math.Round(float64(concurrencyLimit) * float64(*lendablePercent) / 100))
}
if limited.BorrowingLimitPercent != nil {
borrowingCL = int(math.Round(float64(concurrencyLimit) * float64(*limited.BorrowingLimitPercent) / 100))
if borrowingLimitPercent != nil {
borrowingCL = int(math.Round(float64(concurrencyLimit) * float64(*borrowingLimitPercent) / 100))
} else {
borrowingCL = meal.cfgCtlr.serverConcurrencyLimit
}
metrics.SetPriorityLevelConfiguration(plName, concurrencyLimit, concurrencyLimit-lendableCL, concurrencyLimit+borrowingCL)
plState.seatDemandRatioedGauge.SetDenominator(float64(concurrencyLimit))
cfgChanged := plState.nominalCL != concurrencyLimit || plState.minCL != concurrencyLimit-lendableCL || plState.maxCL != concurrencyLimit+borrowingCL
plState.nominalCL = concurrencyLimit
plState.minCL = concurrencyLimit - lendableCL
plState.maxCL = concurrencyLimit + borrowingCL
meal.maxExecutingRequests += concurrencyLimit
var waitLimit int
if qCfg := limited.LimitResponse.Queuing; qCfg != nil {
waitLimit = int(qCfg.Queues * qCfg.QueueLengthLimit)
}
meal.maxWaitingRequests += waitLimit
if limited := plState.pl.Spec.Limited; limited != nil {
if qCfg := limited.LimitResponse.Queuing; qCfg != nil {
meal.maxWaitingRequests += int(qCfg.Queues * qCfg.QueueLengthLimit)
// Max seats allocatable from work estimator is calculated as MAX(1, MIN(0.15 * nominalCL, nominalCL/handSize)).
// This is to keep max seats relative to total available concurrency with a minimum value of 1.
// 15% of nominal concurrency was chosen since it preserved the previous max seats of 10 for default priority levels
// when using apiserver's default total server concurrency of 600 (--max-requests-inflight=400, --max-mutating-requests-inflight=200).
// This ensures that clusters with relatively high inflight requests will continue to use a max seats of 10
// while clusters with lower inflight requests will use max seats no greater than nominalCL/handSize.
// Calculated max seats can return arbitrarily high values but work estimator currently limits max seats at 10.
handSize := plState.pl.Spec.Limited.LimitResponse.Queuing.HandSize
maxSeats := uint64(math.Max(1, math.Min(math.Ceil(float64(concurrencyLimit)*priorityLevelMaxSeatsPercent), float64(int32(concurrencyLimit)/handSize))))
meal.cfgCtlr.MaxSeatsTracker.SetMaxSeats(plName, maxSeats)
}
}
if plState.queues == nil {
initialCL := concurrencyLimit - lendableCL/2
klog.V(2).Infof("Introducing queues for priority level %q: config=%s, nominalCL=%d, lendableCL=%d, borrowingCL=%d, currentCL=%d, quiescing=%v (shares=%v, shareSum=%v)", plName, fcfmt.Fmt(plState.pl.Spec), concurrencyLimit, lendableCL, borrowingCL, initialCL, plState.quiescing, plState.pl.Spec.Limited.NominalConcurrencyShares, meal.shareSum)
klog.V(2).Infof("Introducing queues for priority level %q: config=%s, nominalCL=%d, lendableCL=%d, borrowingCL=%d, currentCL=%d, quiescing=%v (shares=%v, shareSum=%v)", plName, fcfmt.Fmt(plState.pl.Spec), concurrencyLimit, lendableCL, borrowingCL, initialCL, plState.quiescing, nominalConcurrencyShares, meal.shareSum)
plState.seatDemandStats = seatDemandStats{}
plState.currentCL = initialCL
} else {
@ -851,7 +864,7 @@ func (meal *cfgMeal) finishQueueSetReconfigsLocked() {
if cfgChanged {
logLevel = 2
}
klog.V(logLevel).Infof("Retaining queues for priority level %q: config=%s, nominalCL=%d, lendableCL=%d, borrowingCL=%d, currentCL=%d, quiescing=%v, numPending=%d (shares=%v, shareSum=%v)", plName, fcfmt.Fmt(plState.pl.Spec), concurrencyLimit, lendableCL, borrowingCL, plState.currentCL, plState.quiescing, plState.numPending, plState.pl.Spec.Limited.NominalConcurrencyShares, meal.shareSum)
klog.V(logLevel).Infof("Retaining queues for priority level %q: config=%s, nominalCL=%d, lendableCL=%d, borrowingCL=%d, currentCL=%d, quiescing=%v, numPending=%d (shares=%v, shareSum=%v)", plName, fcfmt.Fmt(plState.pl.Spec), concurrencyLimit, lendableCL, borrowingCL, plState.currentCL, plState.quiescing, plState.numPending, nominalConcurrencyShares, meal.shareSum)
}
}
meal.cfgCtlr.nominalCLSum = meal.maxExecutingRequests
@ -859,32 +872,35 @@ func (meal *cfgMeal) finishQueueSetReconfigsLocked() {
}
// queueSetCompleterForPL returns an appropriate QueueSetCompleter for the
// given priority level configuration. Returns nil if that config
// does not call for limiting. Returns nil and an error if the given
// given priority level configuration. Returns nil and an error if the given
// object is malformed in a way that is a problem for this package.
func queueSetCompleterForPL(qsf fq.QueueSetFactory, queues fq.QueueSet, pl *flowcontrol.PriorityLevelConfiguration, requestWaitLimit time.Duration, reqsIntPair metrics.RatioedGaugePair, execSeatsObs metrics.RatioedGauge, seatDemandGauge metrics.Gauge) (fq.QueueSetCompleter, error) {
if (pl.Spec.Type == flowcontrol.PriorityLevelEnablementExempt) != (pl.Spec.Limited == nil) {
return nil, errors.New("broken union structure at the top")
if (pl.Spec.Type == flowcontrol.PriorityLevelEnablementLimited) != (pl.Spec.Limited != nil) {
return nil, errors.New("broken union structure at the top, for Limited")
}
if (pl.Spec.Type == flowcontrol.PriorityLevelEnablementExempt) != (pl.Spec.Exempt != nil) {
return nil, errors.New("broken union structure at the top, for Exempt")
}
if (pl.Spec.Type == flowcontrol.PriorityLevelEnablementExempt) != (pl.Name == flowcontrol.PriorityLevelConfigurationNameExempt) {
// This package does not attempt to cope with a priority level dynamically switching between exempt and not.
return nil, errors.New("non-alignment between name and type")
}
if pl.Spec.Limited == nil {
return nil, nil
}
if (pl.Spec.Limited.LimitResponse.Type == flowcontrol.LimitResponseTypeReject) != (pl.Spec.Limited.LimitResponse.Queuing == nil) {
return nil, errors.New("broken union structure for limit response")
}
qcAPI := pl.Spec.Limited.LimitResponse.Queuing
qcQS := fq.QueuingConfig{Name: pl.Name}
if qcAPI != nil {
qcQS = fq.QueuingConfig{Name: pl.Name,
DesiredNumQueues: int(qcAPI.Queues),
QueueLengthLimit: int(qcAPI.QueueLengthLimit),
HandSize: int(qcAPI.HandSize),
RequestWaitLimit: requestWaitLimit,
if pl.Spec.Limited != nil {
if (pl.Spec.Limited.LimitResponse.Type == flowcontrol.LimitResponseTypeReject) != (pl.Spec.Limited.LimitResponse.Queuing == nil) {
return nil, errors.New("broken union structure for limit response")
}
qcAPI := pl.Spec.Limited.LimitResponse.Queuing
if qcAPI != nil {
qcQS = fq.QueuingConfig{Name: pl.Name,
DesiredNumQueues: int(qcAPI.Queues),
QueueLengthLimit: int(qcAPI.QueueLengthLimit),
HandSize: int(qcAPI.HandSize),
RequestWaitLimit: requestWaitLimit,
}
}
} else {
qcQS = fq.QueuingConfig{Name: pl.Name, DesiredNumQueues: -1}
}
var qsc fq.QueueSetCompleter
var err error
@ -894,7 +910,7 @@ func queueSetCompleterForPL(qsf fq.QueueSetFactory, queues fq.QueueSet, pl *flow
qsc, err = qsf.BeginConstruction(qcQS, reqsIntPair, execSeatsObs, seatDemandGauge)
}
if err != nil {
err = fmt.Errorf("priority level %q has QueuingConfiguration %#+v, which is invalid: %w", pl.Name, qcAPI, err)
err = fmt.Errorf("priority level %q has QueuingConfiguration %#+v, which is invalid: %w", pl.Name, qcQS, err)
}
return qsc, err
}
@ -957,16 +973,8 @@ func (meal *cfgMeal) imaginePL(proto *flowcontrol.PriorityLevelConfiguration, re
seatDemandIntegrator: seatDemandIntegrator,
seatDemandRatioedGauge: seatDemandRatioedGauge,
}
if proto.Spec.Limited != nil {
meal.shareSum += float64(proto.Spec.Limited.NominalConcurrencyShares)
}
}
type immediateRequest struct{}
func (immediateRequest) Finish(execute func()) bool {
execute()
return false
nominalConcurrencyShares, _, _ := plSpecCommons(proto)
meal.shareSum += float64(nominalConcurrencyShares)
}
// startRequest classifies and, if appropriate, enqueues the request.
@ -1007,32 +1015,31 @@ func (cfgCtlr *configController) startRequest(ctx context.Context, rd RequestDig
}
plName := selectedFlowSchema.Spec.PriorityLevelConfiguration.Name
plState := cfgCtlr.priorityLevelStates[plName]
if plState.pl.Spec.Type == flowcontrol.PriorityLevelEnablementExempt {
noteFn(selectedFlowSchema, plState.pl, "")
klog.V(7).Infof("startRequest(%#+v) => fsName=%q, distMethod=%#+v, plName=%q, immediate", rd, selectedFlowSchema.Name, selectedFlowSchema.Spec.DistinguisherMethod, plName)
return selectedFlowSchema, plState.pl, true, immediateRequest{}, time.Time{}
}
var numQueues int32
if plState.pl.Spec.Limited.LimitResponse.Type == flowcontrol.LimitResponseTypeQueue {
numQueues = plState.pl.Spec.Limited.LimitResponse.Queuing.Queues
}
var flowDistinguisher string
var hashValue uint64
if numQueues > 1 {
flowDistinguisher = computeFlowDistinguisher(rd, selectedFlowSchema.Spec.DistinguisherMethod)
hashValue = hashFlowID(selectedFlowSchema.Name, flowDistinguisher)
var flowDistinguisher string
if plState.pl.Spec.Type != flowcontrol.PriorityLevelEnablementExempt {
if plState.pl.Spec.Limited.LimitResponse.Type == flowcontrol.LimitResponseTypeQueue {
numQueues = plState.pl.Spec.Limited.LimitResponse.Queuing.Queues
}
if numQueues > 1 {
flowDistinguisher = computeFlowDistinguisher(rd, selectedFlowSchema.Spec.DistinguisherMethod)
hashValue = hashFlowID(selectedFlowSchema.Name, flowDistinguisher)
}
}
noteFn(selectedFlowSchema, plState.pl, flowDistinguisher)
workEstimate := workEstimator()
startWaitingTime = cfgCtlr.clock.Now()
if plState.pl.Spec.Type != flowcontrol.PriorityLevelEnablementExempt {
startWaitingTime = cfgCtlr.clock.Now()
}
klog.V(7).Infof("startRequest(%#+v) => fsName=%q, distMethod=%#+v, plName=%q, numQueues=%d", rd, selectedFlowSchema.Name, selectedFlowSchema.Spec.DistinguisherMethod, plName, numQueues)
req, idle := plState.queues.StartRequest(ctx, &workEstimate, hashValue, flowDistinguisher, selectedFlowSchema.Name, rd.RequestInfo, rd.User, queueNoteFn)
if idle {
cfgCtlr.maybeReapReadLocked(plName, plState)
}
return selectedFlowSchema, plState.pl, false, req, startWaitingTime
return selectedFlowSchema, plState.pl, plState.pl.Spec.Type == flowcontrol.PriorityLevelEnablementExempt, req, startWaitingTime
}
// maybeReap will remove the last internal traces of the named
@ -1046,10 +1053,6 @@ func (cfgCtlr *configController) maybeReap(plName string) {
klog.V(7).Infof("plName=%s, plState==nil", plName)
return
}
if plState.queues == nil {
klog.V(7).Infof("plName=%s, plState.queues==nil", plName)
return
}
useless := plState.quiescing && plState.numPending == 0 && plState.queues.IsIdle()
klog.V(7).Infof("plState.quiescing=%v, plState.numPending=%d, useless=%v", plState.quiescing, plState.numPending, useless)
if !useless {
@ -1107,3 +1110,16 @@ func relDiff(x, y float64) float64 {
}
return diff / den
}
// plSpecCommons returns the (NominalConcurrencyShares, LendablePercent, BorrowingLimitPercent) of the given priority level config
func plSpecCommons(pl *flowcontrol.PriorityLevelConfiguration) (int32, *int32, *int32) {
if limiter := pl.Spec.Limited; limiter != nil {
return limiter.NominalConcurrencyShares, limiter.LendablePercent, limiter.BorrowingLimitPercent
}
limiter := pl.Spec.Exempt
var nominalConcurrencyShares int32
if limiter.NominalConcurrencyShares != nil {
nominalConcurrencyShares = *limiter.NominalConcurrencyShares
}
return nominalConcurrencyShares, limiter.LendablePercent, nil
}

View File

@ -29,6 +29,7 @@ import (
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apiserver/pkg/server/mux"
"k8s.io/apiserver/pkg/util/flowcontrol/debug"
)
const (
@ -75,22 +76,6 @@ func (cfgCtlr *configController) dumpPriorityLevels(w http.ResponseWriter, r *ht
continue
}
if plState.queues == nil {
tabPrint(tabWriter, row(
plState.pl.Name, // 1
"<none>", // 2
"<none>", // 3
"<none>", // 4
"<none>", // 5
"<none>", // 6
"<none>", // 7
"<none>", // 8
"<none>", // 9
"<none>", // 10
))
endLine(tabWriter)
continue
}
queueSetDigest := plState.queues.Dump(false)
activeQueueNum := 0
for _, q := range queueSetDigest.Queues {
@ -134,21 +119,6 @@ func (cfgCtlr *configController) dumpQueues(w http.ResponseWriter, r *http.Reque
tabPrint(tabWriter, rowForHeaders(columnHeaders))
endLine(tabWriter)
for _, plState := range cfgCtlr.priorityLevelStates {
if plState.queues == nil {
tabPrint(tabWriter, row(
plState.pl.Name, // 1
"<none>", // 2
"<none>", // 3
"<none>", // 4
"<none>", // 5
"<none>", // 6
"<none>", // 7
"<none>", // 8
"<none>", // 9
))
endLine(tabWriter)
continue
}
queueSetDigest := plState.queues.Dump(false)
for i, q := range queueSetDigest.Queues {
tabPrint(tabWriter, row(
@ -185,57 +155,65 @@ func (cfgCtlr *configController) dumpRequests(w http.ResponseWriter, r *http.Req
"InitialSeats", // 7
"FinalSeats", // 8
"AdditionalLatency", // 9
"StartTime", // 10
}))
if includeRequestDetails {
continueLine(tabWriter)
tabPrint(tabWriter, rowForHeaders([]string{
"UserName", // 10
"Verb", // 11
"APIPath", // 12
"Namespace", // 13
"Name", // 14
"APIVersion", // 15
"Resource", // 16
"SubResource", // 17
"UserName", // 11
"Verb", // 12
"APIPath", // 13
"Namespace", // 14
"Name", // 15
"APIVersion", // 16
"Resource", // 17
"SubResource", // 18
}))
}
endLine(tabWriter)
for _, plState := range cfgCtlr.priorityLevelStates {
if plState.queues == nil {
continue
}
queueSetDigest := plState.queues.Dump(includeRequestDetails)
dumpRequest := func(iq, ir int, r debug.RequestDump) {
tabPrint(tabWriter, row(
plState.pl.Name, // 1
r.MatchedFlowSchema, // 2
strconv.Itoa(iq), // 3
strconv.Itoa(ir), // 4
r.FlowDistinguisher, // 5
r.ArriveTime.UTC().Format(time.RFC3339Nano), // 6
strconv.Itoa(int(r.WorkEstimate.InitialSeats)), // 7
strconv.Itoa(int(r.WorkEstimate.FinalSeats)), // 8
r.WorkEstimate.AdditionalLatency.String(), // 9
r.StartTime.UTC().Format(time.RFC3339Nano), // 10
))
if includeRequestDetails {
continueLine(tabWriter)
tabPrint(tabWriter, rowForRequestDetails(
r.UserName, // 11
r.RequestInfo.Verb, // 12
r.RequestInfo.Path, // 13
r.RequestInfo.Namespace, // 14
r.RequestInfo.Name, // 15
schema.GroupVersion{
Group: r.RequestInfo.APIGroup,
Version: r.RequestInfo.APIVersion,
}.String(), // 16
r.RequestInfo.Resource, // 17
r.RequestInfo.Subresource, // 18
))
}
endLine(tabWriter)
}
for iq, q := range queueSetDigest.Queues {
for ir, r := range q.Requests {
tabPrint(tabWriter, row(
plState.pl.Name, // 1
r.MatchedFlowSchema, // 2
strconv.Itoa(iq), // 3
strconv.Itoa(ir), // 4
r.FlowDistinguisher, // 5
r.ArriveTime.UTC().Format(time.RFC3339Nano), // 6
strconv.Itoa(int(r.WorkEstimate.InitialSeats)), // 7
strconv.Itoa(int(r.WorkEstimate.FinalSeats)), // 8
r.WorkEstimate.AdditionalLatency.String(), // 9
))
if includeRequestDetails {
continueLine(tabWriter)
tabPrint(tabWriter, rowForRequestDetails(
r.UserName, // 10
r.RequestInfo.Verb, // 11
r.RequestInfo.Path, // 12
r.RequestInfo.Namespace, // 13
r.RequestInfo.Name, // 14
schema.GroupVersion{
Group: r.RequestInfo.APIGroup,
Version: r.RequestInfo.APIVersion,
}.String(), // 15
r.RequestInfo.Resource, // 16
r.RequestInfo.Subresource, // 17
))
}
endLine(tabWriter)
dumpRequest(iq, ir, r)
}
for _, r := range q.RequestsExecuting {
dumpRequest(iq, -1, r)
}
}
for _, r := range queueSetDigest.QueuelessExecutingRequests {
dumpRequest(-1, -1, r)
}
}
runtime.HandleError(tabWriter.Flush())

View File

@ -77,6 +77,10 @@ type Interface interface {
// WatchTracker provides the WatchTracker interface.
WatchTracker
// MaxSeatsTracker is invoked from the work estimator to track max seats
// that can be occupied by a request for a priority level.
MaxSeatsTracker
}
// This request filter implements https://github.com/kubernetes/enhancements/blob/master/keps/sig-api-machinery/1040-priority-and-fairness/README.md

View File

@ -25,21 +25,23 @@ import (
// QueueSetDump is an instant dump of queue-set.
type QueueSetDump struct {
Queues []QueueDump
Waiting int
Executing int
SeatsInUse int
SeatsWaiting int
Dispatched int
Rejected int
Timedout int
Cancelled int
Queues []QueueDump
QueuelessExecutingRequests []RequestDump
Waiting int
Executing int
SeatsInUse int
SeatsWaiting int
Dispatched int
Rejected int
Timedout int
Cancelled int
}
// QueueDump is an instant dump of one queue in a queue-set.
type QueueDump struct {
QueueSum QueueSum
Requests []RequestDump
Requests []RequestDump // just the waiting ones
RequestsExecuting []RequestDump
NextDispatchR string
ExecutingRequests int
SeatsInUse int

View File

@ -0,0 +1,234 @@
/*
Copyright 2023 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package flowcontrol
import (
"sync"
"sync/atomic"
"time"
"k8s.io/utils/clock"
)
const (
// maxRetryAfter represents the maximum possible retryAfter.
maxRetryAfter = int64(32)
)
// DroppedRequestsTracker is an interface that allows tracking
// a history od dropped requests in the system for the purpose
// of adjusting RetryAfter header to avoid system overload.
type DroppedRequestsTracker interface {
// RecordDroppedRequest records a request that was just
// dropped from processing.
RecordDroppedRequest(plName string)
// GetRetryAfter returns the current suggested value of
// RetryAfter value.
GetRetryAfter(plName string) int64
}
// unixStat keeps a statistic how many requests were dropped within
// a single second.
type unixStat struct {
unixTime int64
requests int64
}
type droppedRequestsStats struct {
lock sync.RWMutex
// history stores the history of dropped requests.
history []unixStat
// To reduce lock-contention, we store the information about
// the current second here, which we can then access under
// reader lock.
currentUnix int64
currentCount atomic.Int64
retryAfter atomic.Int64
retryAfterUpdateUnix int64
}
func newDroppedRequestsStats(nowUnix int64) *droppedRequestsStats {
result := &droppedRequestsStats{
// We assume that we can bump at any time after first dropped request.
retryAfterUpdateUnix: 0,
}
result.retryAfter.Store(1)
return result
}
func (s *droppedRequestsStats) recordDroppedRequest(unixTime int64) {
// Short path - if the current second matches passed time,
// just update the stats.
if done := func() bool {
s.lock.RLock()
defer s.lock.RUnlock()
if s.currentUnix == unixTime {
s.currentCount.Add(1)
return true
}
return false
}(); done {
return
}
// We trigger the change of <currentUnix>.
s.lock.Lock()
defer s.lock.Unlock()
if s.currentUnix == unixTime {
s.currentCount.Add(1)
return
}
s.updateHistory(s.currentUnix, s.currentCount.Load())
s.currentUnix = unixTime
s.currentCount.Store(1)
// We only consider updating retryAfter when bumping the current second.
// However, given that we didn't report anything for the current second,
// we recompute it based on statistics from the previous one.
s.updateRetryAfterIfNeededLocked(unixTime)
}
func (s *droppedRequestsStats) updateHistory(unixTime int64, count int64) {
s.history = append(s.history, unixStat{unixTime: unixTime, requests: count})
startIndex := 0
// Entries that exceed 2*retryAfter or maxRetryAfter are never going to be needed.
maxHistory := 2 * s.retryAfter.Load()
if maxHistory > maxRetryAfter {
maxHistory = maxRetryAfter
}
for ; startIndex < len(s.history) && unixTime-s.history[startIndex].unixTime > maxHistory; startIndex++ {
}
if startIndex > 0 {
s.history = s.history[startIndex:]
}
}
// updateRetryAfterIfNeededLocked updates the retryAfter based on the number of
// dropped requests in the last `retryAfter` seconds:
// - if there were less than `retryAfter` dropped requests, it decreases
// retryAfter
// - if there were at least 3*`retryAfter` dropped requests, it increases
// retryAfter
//
// The rationale behind these numbers being fairly low is that APF is queuing
// requests and rejecting (dropping) them is a last resort, which is not expected
// unless a given priority level is actually overloaded.
//
// Additionally, we rate-limit the increases of retryAfter to wait at least
// `retryAfter' seconds after the previous increase to avoid multiple bumps
// on a single spike.
//
// We're working with the interval [unixTime-retryAfter, unixTime).
func (s *droppedRequestsStats) updateRetryAfterIfNeededLocked(unixTime int64) {
retryAfter := s.retryAfter.Load()
droppedRequests := int64(0)
for i := len(s.history) - 1; i >= 0; i-- {
if unixTime-s.history[i].unixTime > retryAfter {
break
}
if s.history[i].unixTime < unixTime {
droppedRequests += s.history[i].requests
}
}
if unixTime-s.retryAfterUpdateUnix >= retryAfter && droppedRequests >= 3*retryAfter {
// We try to mimic the TCP algorithm and thus are doubling
// the retryAfter here.
retryAfter *= 2
if retryAfter >= maxRetryAfter {
retryAfter = maxRetryAfter
}
s.retryAfter.Store(retryAfter)
s.retryAfterUpdateUnix = unixTime
return
}
if droppedRequests < retryAfter && retryAfter > 1 {
// We try to mimc the TCP algorithm and thus are linearly
// scaling down the retryAfter here.
retryAfter--
s.retryAfter.Store(retryAfter)
return
}
}
// droppedRequestsTracker implement DroppedRequestsTracker interface
// for the purpose of adjusting RetryAfter header for newly dropped
// requests to avoid system overload.
type droppedRequestsTracker struct {
now func() time.Time
lock sync.RWMutex
plStats map[string]*droppedRequestsStats
}
// NewDroppedRequestsTracker is creating a new instance of
// DroppedRequestsTracker.
func NewDroppedRequestsTracker() DroppedRequestsTracker {
return newDroppedRequestsTracker(clock.RealClock{}.Now)
}
func newDroppedRequestsTracker(now func() time.Time) *droppedRequestsTracker {
return &droppedRequestsTracker{
now: now,
plStats: make(map[string]*droppedRequestsStats),
}
}
func (t *droppedRequestsTracker) RecordDroppedRequest(plName string) {
unixTime := t.now().Unix()
stats := func() *droppedRequestsStats {
// The list of priority levels should change very infrequently,
// so in almost all cases, the fast path should be enough.
t.lock.RLock()
if plStats, ok := t.plStats[plName]; ok {
t.lock.RUnlock()
return plStats
}
t.lock.RUnlock()
// Slow path taking writer lock to update the map.
t.lock.Lock()
defer t.lock.Unlock()
if plStats, ok := t.plStats[plName]; ok {
return plStats
}
stats := newDroppedRequestsStats(unixTime)
t.plStats[plName] = stats
return stats
}()
stats.recordDroppedRequest(unixTime)
}
func (t *droppedRequestsTracker) GetRetryAfter(plName string) int64 {
t.lock.RLock()
defer t.lock.RUnlock()
if plStats, ok := t.plStats[plName]; ok {
return plStats.retryAfter.Load()
}
return 1
}

View File

@ -34,7 +34,10 @@ type QueueSetFactory interface {
// BeginConstruction does the first phase of creating a QueueSet.
// The RatioedGaugePair observes number of requests,
// execution covering just the regular phase.
// The denominator for the waiting phase is
// max(1, QueuingConfig.QueueLengthLimit) X max(1, QueuingConfig.DesiredNumQueues).
// The RatioedGauge observes number of seats occupied through all phases of execution.
// The denominator for all the ratioed concurrency gauges is supplied later in the DispatchingConfig.
// The Gauge observes the seat demand (executing + queued seats).
BeginConstruction(QueuingConfig, metrics.RatioedGaugePair, metrics.RatioedGauge, metrics.Gauge) (QueueSetCompleter, error)
}
@ -113,8 +116,11 @@ type QueuingConfig struct {
Name string
// DesiredNumQueues is the number of queues that the API says
// should exist now. This may be zero, in which case
// should exist now. This may be non-positive, in which case
// QueueLengthLimit, HandSize, and RequestWaitLimit are ignored.
// A value of zero means to respect the ConcurrencyLimit of the DispatchingConfig.
// A negative value means to always dispatch immediately upon arrival
// (i.e., the requests are "exempt" from limitation).
DesiredNumQueues int
// QueueLengthLimit is the maximum number of requests that may be waiting in a given queue at a time
@ -133,4 +139,8 @@ type QueuingConfig struct {
type DispatchingConfig struct {
// ConcurrencyLimit is the maximum number of requests of this QueueSet that may be executing at a time
ConcurrencyLimit int
// ConcurrencyDenominator is used in relative metrics of concurrency.
// It equals ConcurrencyLimit except when that is zero.
ConcurrencyDenominator int
}

View File

@ -24,6 +24,7 @@ import (
"sync"
"time"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/apiserver/pkg/util/flowcontrol/debug"
fq "k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing"
"k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/eventclock"
@ -138,6 +139,10 @@ type queueSet struct {
// from that queue.
totRequestsExecuting int
// requestsExecutingSet is the set of requests executing in the real world IF
// there are no queues; otherwise the requests are tracked in the queues.
requestsExecutingSet sets.Set[*request]
// totSeatsInUse is the number of total "seats" in use by all the
// request(s) that are currently executing in this queueset.
totSeatsInUse int
@ -197,7 +202,7 @@ func (qsf *queueSetFactory) BeginConstruction(qCfg fq.QueuingConfig, reqsGaugePa
// calls for one, and returns a non-nil error if the given config is
// invalid.
func checkConfig(qCfg fq.QueuingConfig) (*shufflesharding.Dealer, error) {
if qCfg.DesiredNumQueues == 0 {
if qCfg.DesiredNumQueues <= 0 {
return nil, nil
}
dealer, err := shufflesharding.NewDealer(qCfg.DesiredNumQueues, qCfg.HandSize)
@ -219,6 +224,7 @@ func (qsc *queueSetCompleter) Complete(dCfg fq.DispatchingConfig) fq.QueueSet {
qCfg: qsc.qCfg,
currentR: 0,
lastRealTime: qsc.factory.clock.Now(),
requestsExecutingSet: sets.New[*request](),
}
qs.promiseFactory = qsc.factory.promiseFactoryFactory(qs)
}
@ -230,7 +236,7 @@ func (qsc *queueSetCompleter) Complete(dCfg fq.DispatchingConfig) fq.QueueSet {
func createQueues(n, baseIndex int) []*queue {
fqqueues := make([]*queue, n)
for i := 0; i < n; i++ {
fqqueues[i] = &queue{index: baseIndex + i, requests: newRequestFIFO()}
fqqueues[i] = &queue{index: baseIndex + i, requestsWaiting: newRequestFIFO(), requestsExecuting: sets.New[*request]()}
}
return fqqueues
}
@ -280,8 +286,8 @@ func (qs *queueSet) setConfiguration(ctx context.Context, qCfg fq.QueuingConfig,
qll *= qCfg.DesiredNumQueues
}
qs.reqsGaugePair.RequestsWaiting.SetDenominator(float64(qll))
qs.reqsGaugePair.RequestsExecuting.SetDenominator(float64(dCfg.ConcurrencyLimit))
qs.execSeatsGauge.SetDenominator(float64(dCfg.ConcurrencyLimit))
qs.reqsGaugePair.RequestsExecuting.SetDenominator(float64(dCfg.ConcurrencyDenominator))
qs.execSeatsGauge.SetDenominator(float64(dCfg.ConcurrencyDenominator))
qs.dispatchAsMuchAsPossibleLocked()
}
@ -504,7 +510,7 @@ func (qs *queueSet) advanceEpoch(ctx context.Context, now time.Time, incrR fqreq
klog.InfoS("Advancing epoch", "QS", qs.qCfg.Name, "when", now.Format(nsTimeFmt), "oldR", oldR, "newR", qs.currentR, "incrR", incrR)
success := true
for qIdx, queue := range qs.queues {
if queue.requests.Length() == 0 && queue.requestsExecuting == 0 {
if queue.requestsWaiting.Length() == 0 && queue.requestsExecuting.Len() == 0 {
// Do not just decrement, the value could be quite outdated.
// It is safe to reset to zero in this case, because the next request
// will overwrite the zero with `qs.currentR`.
@ -517,7 +523,7 @@ func (qs *queueSet) advanceEpoch(ctx context.Context, now time.Time, incrR fqreq
klog.ErrorS(errors.New("queue::nextDispatchR underflow"), "Underflow", "QS", qs.qCfg.Name, "queue", qIdx, "oldNextDispatchR", oldNextDispatchR, "newNextDispatchR", queue.nextDispatchR, "incrR", incrR)
success = false
}
queue.requests.Walk(func(req *request) bool {
queue.requestsWaiting.Walk(func(req *request) bool {
oldArrivalR := req.arrivalR
req.arrivalR -= rDecrement
if req.arrivalR > oldArrivalR {
@ -538,8 +544,8 @@ func (qs *queueSet) getVirtualTimeRatioLocked() float64 {
for _, queue := range qs.queues {
// here we want the sum of the maximum width of the requests in this queue since our
// goal is to find the maximum rate at which the queue could work.
seatsRequested += (queue.seatsInUse + queue.requests.QueueSum().MaxSeatsSum)
if queue.requests.Length() > 0 || queue.requestsExecuting > 0 {
seatsRequested += (queue.seatsInUse + queue.requestsWaiting.QueueSum().MaxSeatsSum)
if queue.requestsWaiting.Length() > 0 || queue.requestsExecuting.Len() > 0 {
activeQueues++
}
}
@ -589,7 +595,7 @@ func (qs *queueSet) timeoutOldRequestsAndRejectOrEnqueueLocked(ctx context.Conte
if ok := qs.rejectOrEnqueueToBoundLocked(req); !ok {
return nil
}
metrics.ObserveQueueLength(ctx, qs.qCfg.Name, fsName, queue.requests.Length())
metrics.ObserveQueueLength(ctx, qs.qCfg.Name, fsName, queue.requestsWaiting.Length())
return req
}
@ -608,7 +614,7 @@ func (qs *queueSet) shuffleShardLocked(hashValue uint64, descr1, descr2 interfac
for i := 0; i < handSize; i++ {
queueIdx := hand[(offset+i)%handSize]
queue := qs.queues[queueIdx]
queueSum := queue.requests.QueueSum()
queueSum := queue.requestsWaiting.QueueSum()
// this is the total amount of work in seat-seconds for requests
// waiting in this queue, we will select the queue with the minimum.
@ -621,7 +627,7 @@ func (qs *queueSet) shuffleShardLocked(hashValue uint64, descr1, descr2 interfac
}
if klogV := klog.V(6); klogV.Enabled() {
chosenQueue := qs.queues[bestQueueIdx]
klogV.Infof("QS(%s) at t=%s R=%v: For request %#+v %#+v chose queue %d, with sum: %#v & %d seats in use & nextDispatchR=%v", qs.qCfg.Name, qs.clock.Now().Format(nsTimeFmt), qs.currentR, descr1, descr2, bestQueueIdx, chosenQueue.requests.QueueSum(), chosenQueue.seatsInUse, chosenQueue.nextDispatchR)
klogV.Infof("QS(%s) at t=%s R=%v: For request %#+v %#+v chose queue %d, with sum: %#v & %d seats in use & nextDispatchR=%v", qs.qCfg.Name, qs.clock.Now().Format(nsTimeFmt), qs.currentR, descr1, descr2, bestQueueIdx, chosenQueue.requestsWaiting.QueueSum(), chosenQueue.seatsInUse, chosenQueue.nextDispatchR)
}
return bestQueueIdx
}
@ -632,7 +638,7 @@ func (qs *queueSet) removeTimedOutRequestsFromQueueToBoundLocked(queue *queue, f
timeoutCount := 0
disqueueSeats := 0
now := qs.clock.Now()
reqs := queue.requests
reqs := queue.requestsWaiting
// reqs are sorted oldest -> newest
// can short circuit loop (break) if oldest requests are not timing out
// as newer requests also will not have timed out
@ -669,7 +675,7 @@ func (qs *queueSet) removeTimedOutRequestsFromQueueToBoundLocked(queue *queue, f
// Otherwise enqueues and returns true.
func (qs *queueSet) rejectOrEnqueueToBoundLocked(request *request) bool {
queue := request.queue
curQueueLength := queue.requests.Length()
curQueueLength := queue.requestsWaiting.Length()
// rejects the newly arrived request if resource criteria not met
if qs.totSeatsInUse >= qs.dCfg.ConcurrencyLimit &&
curQueueLength >= qs.qCfg.QueueLengthLimit {
@ -684,7 +690,7 @@ func (qs *queueSet) rejectOrEnqueueToBoundLocked(request *request) bool {
func (qs *queueSet) enqueueToBoundLocked(request *request) {
queue := request.queue
now := qs.clock.Now()
if queue.requests.Length() == 0 && queue.requestsExecuting == 0 {
if queue.requestsWaiting.Length() == 0 && queue.requestsExecuting.Len() == 0 {
// the queues start R is set to the virtual time.
queue.nextDispatchR = qs.currentR
klogV := klog.V(6)
@ -692,7 +698,7 @@ func (qs *queueSet) enqueueToBoundLocked(request *request) {
klogV.Infof("QS(%s) at t=%s R=%v: initialized queue %d start R due to request %#+v %#+v", qs.qCfg.Name, now.Format(nsTimeFmt), queue.nextDispatchR, queue.index, request.descr1, request.descr2)
}
}
request.removeFromQueueLocked = queue.requests.Enqueue(request)
request.removeFromQueueLocked = queue.requestsWaiting.Enqueue(request)
qs.totRequestsWaiting++
qs.totSeatsWaiting += request.MaxSeats()
metrics.AddRequestsInQueues(request.ctx, qs.qCfg.Name, request.fsName, 1)
@ -725,8 +731,9 @@ func (qs *queueSet) dispatchSansQueueLocked(ctx context.Context, workEstimate *f
}
qs.totRequestsExecuting++
qs.totSeatsInUse += req.MaxSeats()
qs.requestsExecutingSet = qs.requestsExecutingSet.Insert(req)
metrics.AddRequestsExecuting(ctx, qs.qCfg.Name, fsName, 1)
metrics.AddRequestConcurrencyInUse(qs.qCfg.Name, fsName, req.MaxSeats())
metrics.AddSeatConcurrencyInUse(qs.qCfg.Name, fsName, req.MaxSeats())
qs.reqsGaugePair.RequestsExecuting.Add(1)
qs.execSeatsGauge.Add(float64(req.MaxSeats()))
qs.seatDemandIntegrator.Set(float64(qs.totSeatsInUse + qs.totSeatsWaiting))
@ -768,10 +775,10 @@ func (qs *queueSet) dispatchLocked() bool {
// problem because other overhead is also included.
qs.totRequestsExecuting++
qs.totSeatsInUse += request.MaxSeats()
queue.requestsExecuting++
queue.requestsExecuting = queue.requestsExecuting.Insert(request)
queue.seatsInUse += request.MaxSeats()
metrics.AddRequestsExecuting(request.ctx, qs.qCfg.Name, request.fsName, 1)
metrics.AddRequestConcurrencyInUse(qs.qCfg.Name, request.fsName, request.MaxSeats())
metrics.AddSeatConcurrencyInUse(qs.qCfg.Name, request.fsName, request.MaxSeats())
qs.reqsGaugePair.RequestsExecuting.Add(1)
qs.execSeatsGauge.Add(float64(request.MaxSeats()))
qs.seatDemandIntegrator.Set(float64(qs.totSeatsInUse + qs.totSeatsWaiting))
@ -779,7 +786,7 @@ func (qs *queueSet) dispatchLocked() bool {
if klogV.Enabled() {
klogV.Infof("QS(%s) at t=%s R=%v: dispatching request %#+v %#+v work %v from queue %d with start R %v, queue will have %d waiting & %d requests occupying %d seats, set will have %d seats occupied",
qs.qCfg.Name, request.startTime.Format(nsTimeFmt), qs.currentR, request.descr1, request.descr2,
request.workEstimate, queue.index, queue.nextDispatchR, queue.requests.Length(), queue.requestsExecuting, queue.seatsInUse, qs.totSeatsInUse)
request.workEstimate, queue.index, queue.nextDispatchR, queue.requestsWaiting.Length(), queue.requestsExecuting.Len(), queue.seatsInUse, qs.totSeatsInUse)
}
// When a request is dequeued for service -> qs.virtualStart += G * width
if request.totalWork() > rDecrement/100 { // A single increment should never be so big
@ -796,6 +803,9 @@ func (qs *queueSet) dispatchLocked() bool {
// otherwise it returns false.
func (qs *queueSet) canAccommodateSeatsLocked(seats int) bool {
switch {
case qs.qCfg.DesiredNumQueues < 0:
// This is code for exemption from limitation
return true
case seats > qs.dCfg.ConcurrencyLimit:
// we have picked the queue with the minimum virtual finish time, but
// the number of seats this request asks for exceeds the concurrency limit.
@ -831,7 +841,7 @@ func (qs *queueSet) findDispatchQueueToBoundLocked() (*queue, *request) {
for range qs.queues {
qs.robinIndex = (qs.robinIndex + 1) % nq
queue := qs.queues[qs.robinIndex]
oldestWaiting, _ := queue.requests.Peek()
oldestWaiting, _ := queue.requestsWaiting.Peek()
if oldestWaiting != nil {
sMin = ssMin(sMin, queue.nextDispatchR)
sMax = ssMax(sMax, queue.nextDispatchR)
@ -848,7 +858,7 @@ func (qs *queueSet) findDispatchQueueToBoundLocked() (*queue, *request) {
}
}
oldestReqFromMinQueue, _ := minQueue.requests.Peek()
oldestReqFromMinQueue, _ := minQueue.requestsWaiting.Peek()
if oldestReqFromMinQueue == nil {
// This cannot happen
klog.ErrorS(errors.New("selected queue is empty"), "Impossible", "queueSet", qs.qCfg.Name)
@ -935,7 +945,7 @@ func (qs *queueSet) finishRequestLocked(r *request) {
defer qs.removeQueueIfEmptyLocked(r)
qs.totSeatsInUse -= r.MaxSeats()
metrics.AddRequestConcurrencyInUse(qs.qCfg.Name, r.fsName, -r.MaxSeats())
metrics.AddSeatConcurrencyInUse(qs.qCfg.Name, r.fsName, -r.MaxSeats())
qs.execSeatsGauge.Add(-float64(r.MaxSeats()))
qs.seatDemandIntegrator.Set(float64(qs.totSeatsInUse + qs.totSeatsWaiting))
if r.queue != nil {
@ -952,7 +962,7 @@ func (qs *queueSet) finishRequestLocked(r *request) {
} else if r.queue != nil {
klogV.Infof("QS(%s) at t=%s R=%v: request %#+v %#+v finished all use of %d seats, adjusted queue %d start R to %v due to service time %.9fs, queue will have %d requests with %#v waiting & %d requests occupying %d seats",
qs.qCfg.Name, now.Format(nsTimeFmt), qs.currentR, r.descr1, r.descr2, r.workEstimate.MaxSeats(), r.queue.index,
r.queue.nextDispatchR, actualServiceDuration.Seconds(), r.queue.requests.Length(), r.queue.requests.QueueSum(), r.queue.requestsExecuting, r.queue.seatsInUse)
r.queue.nextDispatchR, actualServiceDuration.Seconds(), r.queue.requestsWaiting.Length(), r.queue.requestsWaiting.QueueSum(), r.queue.requestsExecuting.Len(), r.queue.seatsInUse)
} else {
klogV.Infof("QS(%s) at t=%s R=%v: request %#+v %#+v finished all use of %d seats, qs will have %d requests occupying %d seats", qs.qCfg.Name, now.Format(nsTimeFmt), qs.currentR, r.descr1, r.descr2, r.workEstimate.InitialSeats, qs.totRequestsExecuting, qs.totSeatsInUse)
}
@ -964,7 +974,7 @@ func (qs *queueSet) finishRequestLocked(r *request) {
} else if r.queue != nil {
klogV.Infof("QS(%s) at t=%s R=%v: request %#+v %#+v finished main use of %d seats but lingering on %d seats for %v seconds, adjusted queue %d start R to %v due to service time %.9fs, queue will have %d requests with %#v waiting & %d requests occupying %d seats",
qs.qCfg.Name, now.Format(nsTimeFmt), qs.currentR, r.descr1, r.descr2, r.workEstimate.InitialSeats, r.workEstimate.FinalSeats, additionalLatency.Seconds(), r.queue.index,
r.queue.nextDispatchR, actualServiceDuration.Seconds(), r.queue.requests.Length(), r.queue.requests.QueueSum(), r.queue.requestsExecuting, r.queue.seatsInUse)
r.queue.nextDispatchR, actualServiceDuration.Seconds(), r.queue.requestsWaiting.Length(), r.queue.requestsWaiting.QueueSum(), r.queue.requestsExecuting.Len(), r.queue.seatsInUse)
} else {
klogV.Infof("QS(%s) at t=%s R=%v: request %#+v %#+v finished main use of %d seats but lingering on %d seats for %v seconds, qs will have %d requests occupying %d seats", qs.qCfg.Name, now.Format(nsTimeFmt), qs.currentR, r.descr1, r.descr2, r.workEstimate.InitialSeats, r.workEstimate.FinalSeats, additionalLatency.Seconds(), qs.totRequestsExecuting, qs.totSeatsInUse)
}
@ -981,7 +991,7 @@ func (qs *queueSet) finishRequestLocked(r *request) {
} else if r.queue != nil {
klogV.Infof("QS(%s) at t=%s R=%v: request %#+v %#+v finished lingering on %d seats, queue %d will have %d requests with %#v waiting & %d requests occupying %d seats",
qs.qCfg.Name, now.Format(nsTimeFmt), qs.currentR, r.descr1, r.descr2, r.workEstimate.FinalSeats, r.queue.index,
r.queue.requests.Length(), r.queue.requests.QueueSum(), r.queue.requestsExecuting, r.queue.seatsInUse)
r.queue.requestsWaiting.Length(), r.queue.requestsWaiting.QueueSum(), r.queue.requestsExecuting.Len(), r.queue.seatsInUse)
} else {
klogV.Infof("QS(%s) at t=%s R=%v: request %#+v %#+v finished lingering on %d seats, qs will have %d requests occupying %d seats", qs.qCfg.Name, now.Format(nsTimeFmt), qs.currentR, r.descr1, r.descr2, r.workEstimate.FinalSeats, qs.totRequestsExecuting, qs.totSeatsInUse)
}
@ -991,12 +1001,14 @@ func (qs *queueSet) finishRequestLocked(r *request) {
if r.queue != nil {
// request has finished, remove from requests executing
r.queue.requestsExecuting--
r.queue.requestsExecuting = r.queue.requestsExecuting.Delete(r)
// When a request finishes being served, and the actual service time was S,
// the queues start R is decremented by (G - S)*width.
r.queue.nextDispatchR -= fqrequest.SeatsTimesDuration(float64(r.InitialSeats()), qs.estimatedServiceDuration-actualServiceDuration)
qs.boundNextDispatchLocked(r.queue)
} else {
qs.requestsExecutingSet = qs.requestsExecutingSet.Delete(r)
}
}
@ -1008,7 +1020,7 @@ func (qs *queueSet) finishRequestLocked(r *request) {
// The following hack addresses the first side of that inequity,
// by insisting that dispatch in the virtual world not precede arrival.
func (qs *queueSet) boundNextDispatchLocked(queue *queue) {
oldestReqFromMinQueue, _ := queue.requests.Peek()
oldestReqFromMinQueue, _ := queue.requestsWaiting.Peek()
if oldestReqFromMinQueue == nil {
return
}
@ -1029,8 +1041,8 @@ func (qs *queueSet) removeQueueIfEmptyLocked(r *request) {
// If there are more queues than desired and this one has no
// requests then remove it
if len(qs.queues) > qs.qCfg.DesiredNumQueues &&
r.queue.requests.Length() == 0 &&
r.queue.requestsExecuting == 0 {
r.queue.requestsWaiting.Length() == 0 &&
r.queue.requestsExecuting.Len() == 0 {
qs.queues = removeQueueAndUpdateIndexes(qs.queues, r.queue.index)
// decrement here to maintain the invariant that (qs.robinIndex+1) % numQueues
@ -1055,15 +1067,16 @@ func (qs *queueSet) Dump(includeRequestDetails bool) debug.QueueSetDump {
qs.lock.Lock()
defer qs.lock.Unlock()
d := debug.QueueSetDump{
Queues: make([]debug.QueueDump, len(qs.queues)),
Waiting: qs.totRequestsWaiting,
Executing: qs.totRequestsExecuting,
SeatsInUse: qs.totSeatsInUse,
SeatsWaiting: qs.totSeatsWaiting,
Dispatched: qs.totRequestsDispatched,
Rejected: qs.totRequestsRejected,
Timedout: qs.totRequestsTimedout,
Cancelled: qs.totRequestsCancelled,
Queues: make([]debug.QueueDump, len(qs.queues)),
QueuelessExecutingRequests: SetMapReduce(dumpRequest(includeRequestDetails), append1[debug.RequestDump])(qs.requestsExecutingSet),
Waiting: qs.totRequestsWaiting,
Executing: qs.totRequestsExecuting,
SeatsInUse: qs.totSeatsInUse,
SeatsWaiting: qs.totSeatsWaiting,
Dispatched: qs.totRequestsDispatched,
Rejected: qs.totRequestsRejected,
Timedout: qs.totRequestsTimedout,
Cancelled: qs.totRequestsCancelled,
}
for i, q := range qs.queues {
d.Queues[i] = q.dumpLocked(includeRequestDetails)

View File

@ -20,6 +20,7 @@ import (
"context"
"time"
"k8s.io/apimachinery/pkg/util/sets"
genericrequest "k8s.io/apiserver/pkg/endpoints/request"
"k8s.io/apiserver/pkg/util/flowcontrol/debug"
fq "k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing"
@ -90,15 +91,15 @@ type completedWorkEstimate struct {
// queue is a sequence of requests that have arrived but not yet finished
// execution in both the real and virtual worlds.
type queue struct {
// The requests not yet executing in the real world are stored in a FIFO list.
requests fifo
// The requestsWaiting not yet executing in the real world are stored in a FIFO list.
requestsWaiting fifo
// nextDispatchR is the R progress meter reading at
// which the next request will be dispatched in the virtual world.
nextDispatchR fcrequest.SeatSeconds
// requestsExecuting is the count in the real world.
requestsExecuting int
// requestsExecuting is the set of requests executing in the real world.
requestsExecuting sets.Set[*request]
// index is the position of this queue among those in its queueSet.
index int
@ -145,28 +146,14 @@ func (qs *queueSet) computeFinalWork(we *fcrequest.WorkEstimate) fcrequest.SeatS
}
func (q *queue) dumpLocked(includeDetails bool) debug.QueueDump {
digest := make([]debug.RequestDump, q.requests.Length())
i := 0
q.requests.Walk(func(r *request) bool {
// dump requests.
digest[i].MatchedFlowSchema = r.fsName
digest[i].FlowDistinguisher = r.flowDistinguisher
digest[i].ArriveTime = r.arrivalTime
digest[i].StartTime = r.startTime
digest[i].WorkEstimate = r.workEstimate.WorkEstimate
if includeDetails {
userInfo, _ := genericrequest.UserFrom(r.ctx)
digest[i].UserName = userInfo.GetName()
requestInfo, ok := genericrequest.RequestInfoFrom(r.ctx)
if ok {
digest[i].RequestInfo = *requestInfo
}
}
i++
waitingDigest := make([]debug.RequestDump, 0, q.requestsWaiting.Length())
q.requestsWaiting.Walk(func(r *request) bool {
waitingDigest = append(waitingDigest, dumpRequest(includeDetails)(r))
return true
})
executingDigest := SetMapReduce(dumpRequest(includeDetails), append1[debug.RequestDump])(q.requestsExecuting)
sum := q.requests.QueueSum()
sum := q.requestsWaiting.QueueSum()
queueSum := debug.QueueSum{
InitialSeatsSum: sum.InitialSeatsSum,
MaxSeatsSum: sum.MaxSeatsSum,
@ -175,9 +162,57 @@ func (q *queue) dumpLocked(includeDetails bool) debug.QueueDump {
return debug.QueueDump{
NextDispatchR: q.nextDispatchR.String(),
Requests: digest,
ExecutingRequests: q.requestsExecuting,
Requests: waitingDigest,
RequestsExecuting: executingDigest,
ExecutingRequests: q.requestsExecuting.Len(),
SeatsInUse: q.seatsInUse,
QueueSum: queueSum,
}
}
func dumpRequest(includeDetails bool) func(*request) debug.RequestDump {
return func(r *request) debug.RequestDump {
ans := debug.RequestDump{
MatchedFlowSchema: r.fsName,
FlowDistinguisher: r.flowDistinguisher,
ArriveTime: r.arrivalTime,
StartTime: r.startTime,
WorkEstimate: r.workEstimate.WorkEstimate,
}
if includeDetails {
userInfo, _ := genericrequest.UserFrom(r.ctx)
ans.UserName = userInfo.GetName()
requestInfo, ok := genericrequest.RequestInfoFrom(r.ctx)
if ok {
ans.RequestInfo = *requestInfo
}
}
return ans
}
}
// SetMapReduce is map-reduce starting from a set type in the sets package.
func SetMapReduce[Elt comparable, Result, Accumulator any](mapFn func(Elt) Result, reduceFn func(Accumulator, Result) Accumulator) func(map[Elt]sets.Empty) Accumulator {
return func(set map[Elt]sets.Empty) Accumulator {
var ans Accumulator
for elt := range set {
ans = reduceFn(ans, mapFn(elt))
}
return ans
}
}
// SliceMapReduce is map-reduce starting from a slice.
func SliceMapReduce[Elt, Result, Accumulator any](mapFn func(Elt) Result, reduceFn func(Accumulator, Result) Accumulator) func([]Elt) Accumulator {
return func(slice []Elt) Accumulator {
var ans Accumulator
for _, elt := range slice {
ans = reduceFn(ans, mapFn(elt))
}
return ans
}
}
func or(x, y bool) bool { return x || y }
func append1[Elt any](slice []Elt, next Elt) []Elt { return append(slice, next) }

View File

@ -0,0 +1,66 @@
/*
Copyright 2023 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package flowcontrol
import (
"sync"
)
// MaxSeatsTracker is used to track max seats allocatable per priority level from the work estimator
type MaxSeatsTracker interface {
// GetMaxSeats returns the maximum seats a request should occupy for a given priority level.
GetMaxSeats(priorityLevelName string) uint64
// SetMaxSeats configures max seats for a priority level.
SetMaxSeats(priorityLevelName string, maxSeats uint64)
// ForgetPriorityLevel removes max seats tracking for a priority level.
ForgetPriorityLevel(priorityLevelName string)
}
type maxSeatsTracker struct {
sync.RWMutex
maxSeats map[string]uint64
}
func NewMaxSeatsTracker() MaxSeatsTracker {
return &maxSeatsTracker{
maxSeats: make(map[string]uint64),
}
}
func (m *maxSeatsTracker) GetMaxSeats(plName string) uint64 {
m.RLock()
defer m.RUnlock()
return m.maxSeats[plName]
}
func (m *maxSeatsTracker) SetMaxSeats(plName string, maxSeats uint64) {
m.Lock()
defer m.Unlock()
m.maxSeats[plName] = maxSeats
}
func (m *maxSeatsTracker) ForgetPriorityLevel(plName string) {
m.Lock()
defer m.Unlock()
delete(m.maxSeats, plName)
}

View File

@ -47,7 +47,7 @@ const (
var (
queueLengthBuckets = []float64{0, 10, 25, 50, 100, 250, 500, 1000}
requestDurationSecondsBuckets = []float64{0, 0.005, 0.02, 0.05, 0.1, 0.2, 0.5, 1, 2, 5, 10, 30}
requestDurationSecondsBuckets = []float64{0, 0.005, 0.02, 0.05, 0.1, 0.2, 0.5, 1, 2, 5, 10, 15, 30}
)
var registerMetrics sync.Once
@ -94,7 +94,7 @@ var (
Subsystem: subsystem,
Name: "rejected_requests_total",
Help: "Number of requests rejected by API Priority and Fairness subsystem",
StabilityLevel: compbasemetrics.ALPHA,
StabilityLevel: compbasemetrics.BETA,
},
[]string{priorityLevel, flowSchema, "reason"},
)
@ -104,7 +104,7 @@ var (
Subsystem: subsystem,
Name: "dispatched_requests_total",
Help: "Number of requests executed by API Priority and Fairness subsystem",
StabilityLevel: compbasemetrics.ALPHA,
StabilityLevel: compbasemetrics.BETA,
},
[]string{priorityLevel, flowSchema},
)
@ -206,7 +206,7 @@ var (
Subsystem: subsystem,
Name: "current_inqueue_requests",
Help: "Number of requests currently pending in queues of the API Priority and Fairness subsystem",
StabilityLevel: compbasemetrics.ALPHA,
StabilityLevel: compbasemetrics.BETA,
},
[]string{priorityLevel, flowSchema},
)
@ -223,11 +223,13 @@ var (
)
apiserverRequestConcurrencyLimit = compbasemetrics.NewGaugeVec(
&compbasemetrics.GaugeOpts{
Namespace: namespace,
Subsystem: subsystem,
Name: "request_concurrency_limit",
Help: "Shared concurrency limit in the API Priority and Fairness subsystem",
StabilityLevel: compbasemetrics.ALPHA,
Namespace: namespace,
Subsystem: subsystem,
Name: "request_concurrency_limit",
Help: "Nominal number of execution seats configured for each priority level",
// Remove this metric once all suppported releases have the equal nominal_limit_seats metric
DeprecatedVersion: "1.30.0",
StabilityLevel: compbasemetrics.ALPHA,
},
[]string{priorityLevel},
)
@ -237,17 +239,29 @@ var (
Subsystem: subsystem,
Name: "current_executing_requests",
Help: "Number of requests in initial (for a WATCH) or any (for a non-WATCH) execution stage in the API Priority and Fairness subsystem",
StabilityLevel: compbasemetrics.ALPHA,
StabilityLevel: compbasemetrics.BETA,
},
[]string{priorityLevel, flowSchema},
)
apiserverCurrentExecutingSeats = compbasemetrics.NewGaugeVec(
&compbasemetrics.GaugeOpts{
Namespace: namespace,
Subsystem: subsystem,
Name: "current_executing_seats",
Help: "Concurrency (number of seats) occupied by the currently executing (initial stage for a WATCH, any stage otherwise) requests in the API Priority and Fairness subsystem",
StabilityLevel: compbasemetrics.BETA,
},
[]string{priorityLevel, flowSchema},
)
apiserverRequestConcurrencyInUse = compbasemetrics.NewGaugeVec(
&compbasemetrics.GaugeOpts{
Namespace: namespace,
Subsystem: subsystem,
Name: "request_concurrency_in_use",
Help: "Concurrency (number of seats) occupied by the currently executing (initial stage for a WATCH, any stage otherwise) requests in the API Priority and Fairness subsystem",
StabilityLevel: compbasemetrics.ALPHA,
Namespace: namespace,
Subsystem: subsystem,
Name: "request_concurrency_in_use",
Help: "Concurrency (number of seats) occupied by the currently executing (initial stage for a WATCH, any stage otherwise) requests in the API Priority and Fairness subsystem",
// Remove this metric once all suppported releases have the equal current_executing_seats metric
DeprecatedVersion: "1.31.0",
StabilityLevel: compbasemetrics.ALPHA,
},
[]string{priorityLevel, flowSchema},
)
@ -258,7 +272,7 @@ var (
Name: "request_wait_duration_seconds",
Help: "Length of time a request spent waiting in its queue",
Buckets: requestDurationSecondsBuckets,
StabilityLevel: compbasemetrics.ALPHA,
StabilityLevel: compbasemetrics.BETA,
},
[]string{priorityLevel, flowSchema, "execute"},
)
@ -323,7 +337,7 @@ var (
Subsystem: subsystem,
Name: "nominal_limit_seats",
Help: "Nominal number of execution seats configured for each priority level",
StabilityLevel: compbasemetrics.ALPHA,
StabilityLevel: compbasemetrics.BETA,
},
[]string{priorityLevel},
)
@ -444,6 +458,7 @@ var (
apiserverRequestQueueLength,
apiserverRequestConcurrencyLimit,
apiserverRequestConcurrencyInUse,
apiserverCurrentExecutingSeats,
apiserverCurrentExecutingRequests,
apiserverRequestWaitingSeconds,
apiserverRequestExecutionSeconds,
@ -523,9 +538,10 @@ func SetDispatchMetrics(priorityLevel string, r, s, sMin, sMax, discountedSMin,
apiserverNextDiscountedSBounds.WithLabelValues(priorityLevel, "max").Set(discountedSMax)
}
// AddRequestConcurrencyInUse adds the given delta to the gauge of concurrency in use by
// AddSeatConcurrencyInUse adds the given delta to the gauge of seats in use by
// the currently executing requests of the given flowSchema and priorityLevel
func AddRequestConcurrencyInUse(priorityLevel, flowSchema string, delta int) {
func AddSeatConcurrencyInUse(priorityLevel, flowSchema string, delta int) {
apiserverCurrentExecutingSeats.WithLabelValues(priorityLevel, flowSchema).Add(float64(delta))
apiserverRequestConcurrencyInUse.WithLabelValues(priorityLevel, flowSchema).Add(float64(delta))
}

View File

@ -24,7 +24,7 @@ import (
const (
minimumSeats = 1
maximumSeats = 10
maximumSeatsLimit = 10
objectsPerSeat = 100.0
watchesPerSeat = 10.0
enableMutatingWorkEstimator = true
@ -39,12 +39,13 @@ type WorkEstimatorConfig struct {
// MinimumSeats is the minimum number of seats a request must occupy.
MinimumSeats uint64 `json:"minimumSeats,omitempty"`
// MaximumSeats is the maximum number of seats a request can occupy
// MaximumSeatsLimit is an upper limit on the max seats a request can occupy.
//
// NOTE: work_estimate_seats_samples metric uses the value of maximumSeats
// as the upper bound, so when we change maximumSeats we should also
// update the buckets of the metric.
MaximumSeats uint64 `json:"maximumSeats,omitempty"`
MaximumSeatsLimit uint64 `json:"maximumSeatsLimit,omitempty"`
}
// ListWorkEstimatorConfig holds work estimator parameters related to list requests.
@ -66,7 +67,7 @@ type MutatingWorkEstimatorConfig struct {
func DefaultWorkEstimatorConfig() *WorkEstimatorConfig {
return &WorkEstimatorConfig{
MinimumSeats: minimumSeats,
MaximumSeats: maximumSeats,
MaximumSeatsLimit: maximumSeatsLimit,
ListWorkEstimatorConfig: defaultListWorkEstimatorConfig(),
MutatingWorkEstimatorConfig: defaultMutatingWorkEstimatorConfig(),
}

View File

@ -29,10 +29,11 @@ import (
"k8s.io/klog/v2"
)
func newListWorkEstimator(countFn objectCountGetterFunc, config *WorkEstimatorConfig) WorkEstimatorFunc {
func newListWorkEstimator(countFn objectCountGetterFunc, config *WorkEstimatorConfig, maxSeatsFn maxSeatsFunc) WorkEstimatorFunc {
estimator := &listWorkEstimator{
config: config,
countGetterFn: countFn,
maxSeatsFn: maxSeatsFn,
}
return estimator.estimate
}
@ -40,14 +41,21 @@ func newListWorkEstimator(countFn objectCountGetterFunc, config *WorkEstimatorCo
type listWorkEstimator struct {
config *WorkEstimatorConfig
countGetterFn objectCountGetterFunc
maxSeatsFn maxSeatsFunc
}
func (e *listWorkEstimator) estimate(r *http.Request, flowSchemaName, priorityLevelName string) WorkEstimate {
minSeats := e.config.MinimumSeats
maxSeats := e.maxSeatsFn(priorityLevelName)
if maxSeats == 0 || maxSeats > e.config.MaximumSeatsLimit {
maxSeats = e.config.MaximumSeatsLimit
}
requestInfo, ok := apirequest.RequestInfoFrom(r.Context())
if !ok {
// no RequestInfo should never happen, but to be on the safe side
// let's return maximumSeats
return WorkEstimate{InitialSeats: e.config.MaximumSeats}
return WorkEstimate{InitialSeats: maxSeats}
}
if requestInfo.Name != "" {
@ -56,7 +64,7 @@ func (e *listWorkEstimator) estimate(r *http.Request, flowSchemaName, priorityLe
// Example of such list requests:
// /apis/certificates.k8s.io/v1/certificatesigningrequests?fieldSelector=metadata.name%3Dcsr-xxs4m
// /api/v1/namespaces/test/configmaps?fieldSelector=metadata.name%3Dbig-deployment-1&limit=500&resourceVersion=0
return WorkEstimate{InitialSeats: e.config.MinimumSeats}
return WorkEstimate{InitialSeats: minSeats}
}
query := r.URL.Query()
@ -66,9 +74,18 @@ func (e *listWorkEstimator) estimate(r *http.Request, flowSchemaName, priorityLe
// This request is destined to fail in the validation layer,
// return maximumSeats for this request to be consistent.
return WorkEstimate{InitialSeats: e.config.MaximumSeats}
return WorkEstimate{InitialSeats: maxSeats}
}
isListFromCache := !shouldListFromStorage(query, &listOptions)
// For watch requests, we want to adjust the cost only if they explicitly request
// sending initial events.
if requestInfo.Verb == "watch" {
if listOptions.SendInitialEvents == nil || !*listOptions.SendInitialEvents {
return WorkEstimate{InitialSeats: e.config.MinimumSeats}
}
}
isListFromCache := requestInfo.Verb == "watch" || !shouldListFromStorage(query, &listOptions)
numStored, err := e.countGetterFn(key(requestInfo))
switch {
@ -77,7 +94,7 @@ func (e *listWorkEstimator) estimate(r *http.Request, flowSchemaName, priorityLe
// be conservative here and allocate maximum seats to this list request.
// NOTE: if a CRD is removed, its count will go stale first and then the
// pruner will eventually remove the CRD from the cache.
return WorkEstimate{InitialSeats: e.config.MaximumSeats}
return WorkEstimate{InitialSeats: maxSeats}
case err == ObjectCountNotFoundErr:
// there are multiple scenarios in which we can see this error:
// a. the type is truly unknown, a typo on the caller's part.
@ -91,12 +108,12 @@ func (e *listWorkEstimator) estimate(r *http.Request, flowSchemaName, priorityLe
// when aggregated API calls are overestimated, we allocate the minimum
// possible seats (see #109106 as an example when being more conservative
// led to problems).
return WorkEstimate{InitialSeats: e.config.MinimumSeats}
return WorkEstimate{InitialSeats: minSeats}
case err != nil:
// we should never be here since Get returns either ObjectCountStaleErr or
// ObjectCountNotFoundErr, return maximumSeats to be on the safe side.
klog.ErrorS(err, "Unexpected error from object count tracker")
return WorkEstimate{InitialSeats: e.config.MaximumSeats}
return WorkEstimate{InitialSeats: maxSeats}
}
limit := numStored
@ -125,11 +142,11 @@ func (e *listWorkEstimator) estimate(r *http.Request, flowSchemaName, priorityLe
seats := uint64(math.Ceil(float64(estimatedObjectsToBeProcessed) / e.config.ObjectsPerSeat))
// make sure we never return a seat of zero
if seats < e.config.MinimumSeats {
seats = e.config.MinimumSeats
if seats < minSeats {
seats = minSeats
}
if seats > e.config.MaximumSeats {
seats = e.config.MaximumSeats
if seats > maxSeats {
seats = maxSeats
}
return WorkEstimate{InitialSeats: seats}
}
@ -149,9 +166,16 @@ func shouldListFromStorage(query url.Values, opts *metav1.ListOptions) bool {
resourceVersion := opts.ResourceVersion
match := opts.ResourceVersionMatch
pagingEnabled := utilfeature.DefaultFeatureGate.Enabled(features.APIListChunking)
consistentListFromCacheEnabled := utilfeature.DefaultFeatureGate.Enabled(features.ConsistentListFromCache)
// Serve consistent reads from storage if ConsistentListFromCache is disabled
consistentReadFromStorage := resourceVersion == "" && !consistentListFromCacheEnabled
// Watch cache doesn't support continuations, so serve them from etcd.
hasContinuation := pagingEnabled && len(opts.Continue) > 0
// Serve paginated requests about revision "0" from watch cache to avoid overwhelming etcd.
hasLimit := pagingEnabled && opts.Limit > 0 && resourceVersion != "0"
// Watch cache only supports ResourceVersionMatchNotOlderThan (default).
unsupportedMatch := match != "" && match != metav1.ResourceVersionMatchNotOlderThan
return resourceVersion == "" || hasContinuation || hasLimit || unsupportedMatch
return consistentReadFromStorage || hasContinuation || hasLimit || unsupportedMatch
}

View File

@ -25,25 +25,33 @@ import (
"k8s.io/apiserver/pkg/util/flowcontrol/metrics"
)
func newMutatingWorkEstimator(countFn watchCountGetterFunc, config *WorkEstimatorConfig) WorkEstimatorFunc {
func newMutatingWorkEstimator(countFn watchCountGetterFunc, config *WorkEstimatorConfig, maxSeatsFn maxSeatsFunc) WorkEstimatorFunc {
estimator := &mutatingWorkEstimator{
config: config,
countFn: countFn,
config: config,
countFn: countFn,
maxSeatsFn: maxSeatsFn,
}
return estimator.estimate
}
type mutatingWorkEstimator struct {
config *WorkEstimatorConfig
countFn watchCountGetterFunc
config *WorkEstimatorConfig
countFn watchCountGetterFunc
maxSeatsFn maxSeatsFunc
}
func (e *mutatingWorkEstimator) estimate(r *http.Request, flowSchemaName, priorityLevelName string) WorkEstimate {
minSeats := e.config.MinimumSeats
maxSeats := e.maxSeatsFn(priorityLevelName)
if maxSeats == 0 || maxSeats > e.config.MaximumSeatsLimit {
maxSeats = e.config.MaximumSeatsLimit
}
// TODO(wojtekt): Remove once we tune the algorithm to not fail
// scalability tests.
if !e.config.Enabled {
return WorkEstimate{
InitialSeats: 1,
InitialSeats: minSeats,
}
}
@ -52,15 +60,15 @@ func (e *mutatingWorkEstimator) estimate(r *http.Request, flowSchemaName, priori
// no RequestInfo should never happen, but to be on the safe side
// let's return a large value.
return WorkEstimate{
InitialSeats: 1,
FinalSeats: e.config.MaximumSeats,
InitialSeats: minSeats,
FinalSeats: maxSeats,
AdditionalLatency: e.config.eventAdditionalDuration(),
}
}
if isRequestExemptFromWatchEvents(requestInfo) {
return WorkEstimate{
InitialSeats: e.config.MinimumSeats,
InitialSeats: minSeats,
FinalSeats: 0,
AdditionalLatency: time.Duration(0),
}
@ -126,8 +134,8 @@ func (e *mutatingWorkEstimator) estimate(r *http.Request, flowSchemaName, priori
//
// TODO: Confirm that the current cap of maximumSeats allow us to
// achieve the above.
if finalSeats > e.config.MaximumSeats {
finalSeats = e.config.MaximumSeats
if finalSeats > maxSeats {
finalSeats = maxSeats
}
additionalLatency = finalWork.DurationPerSeat(float64(finalSeats))
}

View File

@ -38,7 +38,7 @@ const MinSeatSeconds = SeatSeconds(0)
// This is intended only to produce small values, increments in work
// rather than amount of work done since process start.
func SeatsTimesDuration(seats float64, duration time.Duration) SeatSeconds {
return SeatSeconds(math.Round(seats * float64(duration/time.Nanosecond) / (1e9 / ssScale)))
return SeatSeconds(int64(math.Round(seats * float64(duration/time.Nanosecond) / (1e9 / ssScale))))
}
// ToFloat converts to a floating-point representation.

View File

@ -22,6 +22,9 @@ import (
"time"
apirequest "k8s.io/apiserver/pkg/endpoints/request"
"k8s.io/apiserver/pkg/features"
utilfeature "k8s.io/apiserver/pkg/util/feature"
"k8s.io/klog/v2"
)
@ -61,15 +64,19 @@ type objectCountGetterFunc func(string) (int64, error)
// number of watchers potentially interested in a given request.
type watchCountGetterFunc func(*apirequest.RequestInfo) int
// MaxSeatsFunc represents a function that returns the maximum seats
// allowed for the work estimator for a given priority level.
type maxSeatsFunc func(priorityLevelName string) uint64
// NewWorkEstimator estimates the work that will be done by a given request,
// if no WorkEstimatorFunc matches the given request then the default
// work estimate of 1 seat is allocated to the request.
func NewWorkEstimator(objectCountFn objectCountGetterFunc, watchCountFn watchCountGetterFunc, config *WorkEstimatorConfig) WorkEstimatorFunc {
func NewWorkEstimator(objectCountFn objectCountGetterFunc, watchCountFn watchCountGetterFunc, config *WorkEstimatorConfig, maxSeatsFn maxSeatsFunc) WorkEstimatorFunc {
estimator := &workEstimator{
minimumSeats: config.MinimumSeats,
maximumSeats: config.MaximumSeats,
listWorkEstimator: newListWorkEstimator(objectCountFn, config),
mutatingWorkEstimator: newMutatingWorkEstimator(watchCountFn, config),
maximumSeatsLimit: config.MaximumSeatsLimit,
listWorkEstimator: newListWorkEstimator(objectCountFn, config, maxSeatsFn),
mutatingWorkEstimator: newMutatingWorkEstimator(watchCountFn, config, maxSeatsFn),
}
return estimator.estimate
}
@ -86,8 +93,8 @@ func (e WorkEstimatorFunc) EstimateWork(r *http.Request, flowSchemaName, priorit
type workEstimator struct {
// the minimum number of seats a request must occupy
minimumSeats uint64
// the maximum number of seats a request can occupy
maximumSeats uint64
// the default maximum number of seats a request can occupy
maximumSeatsLimit uint64
// listWorkEstimator estimates work for list request(s)
listWorkEstimator WorkEstimatorFunc
// mutatingWorkEstimator calculates the width of mutating request(s)
@ -99,12 +106,21 @@ func (e *workEstimator) estimate(r *http.Request, flowSchemaName, priorityLevelN
if !ok {
klog.ErrorS(fmt.Errorf("no RequestInfo found in context"), "Failed to estimate work for the request", "URI", r.RequestURI)
// no RequestInfo should never happen, but to be on the safe side let's return maximumSeats
return WorkEstimate{InitialSeats: e.maximumSeats}
return WorkEstimate{InitialSeats: e.maximumSeatsLimit}
}
switch requestInfo.Verb {
case "list":
return e.listWorkEstimator.EstimateWork(r, flowSchemaName, priorityLevelName)
case "watch":
// WATCH supports `SendInitialEvents` option, which effectively means
// that is starts with sending of the contents of a corresponding LIST call.
// From that perspective, given that the watch only consumes APF seats
// during its initialization (sending init events), its cost should then
// be computed the same way as for a regular list.
if utilfeature.DefaultFeatureGate.Enabled(features.WatchList) {
return e.listWorkEstimator.EstimateWork(r, flowSchemaName, priorityLevelName)
}
case "create", "update", "patch", "delete":
return e.mutatingWorkEstimator.EstimateWork(r, flowSchemaName, priorityLevelName)
}

View File

@ -0,0 +1,56 @@
/*
Copyright 2023 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package metrics
import (
"context"
"sync"
"k8s.io/component-base/metrics"
"k8s.io/component-base/metrics/legacyregistry"
)
const (
subsystem = "apiserver"
statuscode = "code"
)
var registerMetricsOnce sync.Once
var (
// peerProxiedRequestsTotal counts the number of requests that were proxied to a peer kube-apiserver.
peerProxiedRequestsTotal = metrics.NewCounterVec(
&metrics.CounterOpts{
Subsystem: subsystem,
Name: "rerouted_request_total",
Help: "Total number of requests that were proxied to a peer kube apiserver because the local apiserver was not capable of serving it",
StabilityLevel: metrics.ALPHA,
},
[]string{statuscode},
)
)
func Register() {
registerMetricsOnce.Do(func() {
legacyregistry.MustRegister(peerProxiedRequestsTotal)
})
}
// IncPeerProxiedRequest increments the # of proxied requests to peer kube-apiserver
func IncPeerProxiedRequest(ctx context.Context, status string) {
peerProxiedRequestsTotal.WithContext(ctx).WithLabelValues(status).Add(1)
}

View File

@ -243,6 +243,7 @@ func restConfigFromKubeconfig(configAuthInfo *clientcmdapi.AuthInfo) (*rest.Conf
if len(configAuthInfo.Impersonate) > 0 {
config.Impersonate = rest.ImpersonationConfig{
UserName: configAuthInfo.Impersonate,
UID: configAuthInfo.ImpersonateUID,
Groups: configAuthInfo.ImpersonateGroups,
Extra: configAuthInfo.ImpersonateUserExtra,
}

View File

@ -62,7 +62,7 @@ type GenericWebhook struct {
// Otherwise it returns false for an immediate fail.
func DefaultShouldRetry(err error) bool {
// these errors indicate a transient error that should be retried.
if utilnet.IsConnectionReset(err) || apierrors.IsInternalError(err) || apierrors.IsTimeout(err) || apierrors.IsTooManyRequests(err) {
if utilnet.IsConnectionReset(err) || utilnet.IsHTTP2ConnectionLost(err) || apierrors.IsInternalError(err) || apierrors.IsTimeout(err) || apierrors.IsTooManyRequests(err) {
return true
}
// if the error sends the Retry-After header, we respect it as an explicit confirmation we should retry.

View File

@ -1,350 +0,0 @@
/*
Copyright 2015 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package wsstream
import (
"encoding/base64"
"fmt"
"io"
"net/http"
"regexp"
"strings"
"time"
"golang.org/x/net/websocket"
"k8s.io/klog/v2"
"k8s.io/apimachinery/pkg/util/runtime"
)
// The Websocket subprotocol "channel.k8s.io" prepends each binary message with a byte indicating
// the channel number (zero indexed) the message was sent on. Messages in both directions should
// prefix their messages with this channel byte. When used for remote execution, the channel numbers
// are by convention defined to match the POSIX file-descriptors assigned to STDIN, STDOUT, and STDERR
// (0, 1, and 2). No other conversion is performed on the raw subprotocol - writes are sent as they
// are received by the server.
//
// Example client session:
//
// CONNECT http://server.com with subprotocol "channel.k8s.io"
// WRITE []byte{0, 102, 111, 111, 10} # send "foo\n" on channel 0 (STDIN)
// READ []byte{1, 10} # receive "\n" on channel 1 (STDOUT)
// CLOSE
const ChannelWebSocketProtocol = "channel.k8s.io"
// The Websocket subprotocol "base64.channel.k8s.io" base64 encodes each message with a character
// indicating the channel number (zero indexed) the message was sent on. Messages in both directions
// should prefix their messages with this channel char. When used for remote execution, the channel
// numbers are by convention defined to match the POSIX file-descriptors assigned to STDIN, STDOUT,
// and STDERR ('0', '1', and '2'). The data received on the server is base64 decoded (and must be
// be valid) and data written by the server to the client is base64 encoded.
//
// Example client session:
//
// CONNECT http://server.com with subprotocol "base64.channel.k8s.io"
// WRITE []byte{48, 90, 109, 57, 118, 67, 103, 111, 61} # send "foo\n" (base64: "Zm9vCgo=") on channel '0' (STDIN)
// READ []byte{49, 67, 103, 61, 61} # receive "\n" (base64: "Cg==") on channel '1' (STDOUT)
// CLOSE
const Base64ChannelWebSocketProtocol = "base64.channel.k8s.io"
type codecType int
const (
rawCodec codecType = iota
base64Codec
)
type ChannelType int
const (
IgnoreChannel ChannelType = iota
ReadChannel
WriteChannel
ReadWriteChannel
)
var (
// connectionUpgradeRegex matches any Connection header value that includes upgrade
connectionUpgradeRegex = regexp.MustCompile("(^|.*,\\s*)upgrade($|\\s*,)")
)
// IsWebSocketRequest returns true if the incoming request contains connection upgrade headers
// for WebSockets.
func IsWebSocketRequest(req *http.Request) bool {
if !strings.EqualFold(req.Header.Get("Upgrade"), "websocket") {
return false
}
return connectionUpgradeRegex.MatchString(strings.ToLower(req.Header.Get("Connection")))
}
// IgnoreReceives reads from a WebSocket until it is closed, then returns. If timeout is set, the
// read and write deadlines are pushed every time a new message is received.
func IgnoreReceives(ws *websocket.Conn, timeout time.Duration) {
defer runtime.HandleCrash()
var data []byte
for {
resetTimeout(ws, timeout)
if err := websocket.Message.Receive(ws, &data); err != nil {
return
}
}
}
// handshake ensures the provided user protocol matches one of the allowed protocols. It returns
// no error if no protocol is specified.
func handshake(config *websocket.Config, req *http.Request, allowed []string) error {
protocols := config.Protocol
if len(protocols) == 0 {
protocols = []string{""}
}
for _, protocol := range protocols {
for _, allow := range allowed {
if allow == protocol {
config.Protocol = []string{protocol}
return nil
}
}
}
return fmt.Errorf("requested protocol(s) are not supported: %v; supports %v", config.Protocol, allowed)
}
// ChannelProtocolConfig describes a websocket subprotocol with channels.
type ChannelProtocolConfig struct {
Binary bool
Channels []ChannelType
}
// NewDefaultChannelProtocols returns a channel protocol map with the
// subprotocols "", "channel.k8s.io", "base64.channel.k8s.io" and the given
// channels.
func NewDefaultChannelProtocols(channels []ChannelType) map[string]ChannelProtocolConfig {
return map[string]ChannelProtocolConfig{
"": {Binary: true, Channels: channels},
ChannelWebSocketProtocol: {Binary: true, Channels: channels},
Base64ChannelWebSocketProtocol: {Binary: false, Channels: channels},
}
}
// Conn supports sending multiple binary channels over a websocket connection.
type Conn struct {
protocols map[string]ChannelProtocolConfig
selectedProtocol string
channels []*websocketChannel
codec codecType
ready chan struct{}
ws *websocket.Conn
timeout time.Duration
}
// NewConn creates a WebSocket connection that supports a set of channels. Channels begin each
// web socket message with a single byte indicating the channel number (0-N). 255 is reserved for
// future use. The channel types for each channel are passed as an array, supporting the different
// duplex modes. Read and Write refer to whether the channel can be used as a Reader or Writer.
//
// The protocols parameter maps subprotocol names to ChannelProtocols. The empty string subprotocol
// name is used if websocket.Config.Protocol is empty.
func NewConn(protocols map[string]ChannelProtocolConfig) *Conn {
return &Conn{
ready: make(chan struct{}),
protocols: protocols,
}
}
// SetIdleTimeout sets the interval for both reads and writes before timeout. If not specified,
// there is no timeout on the connection.
func (conn *Conn) SetIdleTimeout(duration time.Duration) {
conn.timeout = duration
}
// Open the connection and create channels for reading and writing. It returns
// the selected subprotocol, a slice of channels and an error.
func (conn *Conn) Open(w http.ResponseWriter, req *http.Request) (string, []io.ReadWriteCloser, error) {
go func() {
defer runtime.HandleCrash()
defer conn.Close()
websocket.Server{Handshake: conn.handshake, Handler: conn.handle}.ServeHTTP(w, req)
}()
<-conn.ready
rwc := make([]io.ReadWriteCloser, len(conn.channels))
for i := range conn.channels {
rwc[i] = conn.channels[i]
}
return conn.selectedProtocol, rwc, nil
}
func (conn *Conn) initialize(ws *websocket.Conn) {
negotiated := ws.Config().Protocol
conn.selectedProtocol = negotiated[0]
p := conn.protocols[conn.selectedProtocol]
if p.Binary {
conn.codec = rawCodec
} else {
conn.codec = base64Codec
}
conn.ws = ws
conn.channels = make([]*websocketChannel, len(p.Channels))
for i, t := range p.Channels {
switch t {
case ReadChannel:
conn.channels[i] = newWebsocketChannel(conn, byte(i), true, false)
case WriteChannel:
conn.channels[i] = newWebsocketChannel(conn, byte(i), false, true)
case ReadWriteChannel:
conn.channels[i] = newWebsocketChannel(conn, byte(i), true, true)
case IgnoreChannel:
conn.channels[i] = newWebsocketChannel(conn, byte(i), false, false)
}
}
close(conn.ready)
}
func (conn *Conn) handshake(config *websocket.Config, req *http.Request) error {
supportedProtocols := make([]string, 0, len(conn.protocols))
for p := range conn.protocols {
supportedProtocols = append(supportedProtocols, p)
}
return handshake(config, req, supportedProtocols)
}
func (conn *Conn) resetTimeout() {
if conn.timeout > 0 {
conn.ws.SetDeadline(time.Now().Add(conn.timeout))
}
}
// Close is only valid after Open has been called
func (conn *Conn) Close() error {
<-conn.ready
for _, s := range conn.channels {
s.Close()
}
conn.ws.Close()
return nil
}
// handle implements a websocket handler.
func (conn *Conn) handle(ws *websocket.Conn) {
defer conn.Close()
conn.initialize(ws)
for {
conn.resetTimeout()
var data []byte
if err := websocket.Message.Receive(ws, &data); err != nil {
if err != io.EOF {
klog.Errorf("Error on socket receive: %v", err)
}
break
}
if len(data) == 0 {
continue
}
channel := data[0]
if conn.codec == base64Codec {
channel = channel - '0'
}
data = data[1:]
if int(channel) >= len(conn.channels) {
klog.V(6).Infof("Frame is targeted for a reader %d that is not valid, possible protocol error", channel)
continue
}
if _, err := conn.channels[channel].DataFromSocket(data); err != nil {
klog.Errorf("Unable to write frame to %d: %v\n%s", channel, err, string(data))
continue
}
}
}
// write multiplexes the specified channel onto the websocket
func (conn *Conn) write(num byte, data []byte) (int, error) {
conn.resetTimeout()
switch conn.codec {
case rawCodec:
frame := make([]byte, len(data)+1)
frame[0] = num
copy(frame[1:], data)
if err := websocket.Message.Send(conn.ws, frame); err != nil {
return 0, err
}
case base64Codec:
frame := string('0'+num) + base64.StdEncoding.EncodeToString(data)
if err := websocket.Message.Send(conn.ws, frame); err != nil {
return 0, err
}
}
return len(data), nil
}
// websocketChannel represents a channel in a connection
type websocketChannel struct {
conn *Conn
num byte
r io.Reader
w io.WriteCloser
read, write bool
}
// newWebsocketChannel creates a pipe for writing to a websocket. Do not write to this pipe
// prior to the connection being opened. It may be no, half, or full duplex depending on
// read and write.
func newWebsocketChannel(conn *Conn, num byte, read, write bool) *websocketChannel {
r, w := io.Pipe()
return &websocketChannel{conn, num, r, w, read, write}
}
func (p *websocketChannel) Write(data []byte) (int, error) {
if !p.write {
return len(data), nil
}
return p.conn.write(p.num, data)
}
// DataFromSocket is invoked by the connection receiver to move data from the connection
// into a specific channel.
func (p *websocketChannel) DataFromSocket(data []byte) (int, error) {
if !p.read {
return len(data), nil
}
switch p.conn.codec {
case rawCodec:
return p.w.Write(data)
case base64Codec:
dst := make([]byte, len(data))
n, err := base64.StdEncoding.Decode(dst, data)
if err != nil {
return 0, err
}
return p.w.Write(dst[:n])
}
return 0, nil
}
func (p *websocketChannel) Read(data []byte) (int, error) {
if !p.read {
return 0, io.EOF
}
return p.r.Read(data)
}
func (p *websocketChannel) Close() error {
return p.w.Close()
}

View File

@ -1,21 +0,0 @@
/*
Copyright 2015 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
// Package wsstream contains utilities for streaming content over WebSockets.
// The Conn type allows callers to multiplex multiple read/write channels over
// a single websocket. The Reader type allows an io.Reader to be copied over
// a websocket channel as binary content.
package wsstream // import "k8s.io/apiserver/pkg/util/wsstream"

View File

@ -1,177 +0,0 @@
/*
Copyright 2015 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package wsstream
import (
"encoding/base64"
"io"
"net/http"
"sync"
"time"
"golang.org/x/net/websocket"
"k8s.io/apimachinery/pkg/util/runtime"
)
// The WebSocket subprotocol "binary.k8s.io" will only send messages to the
// client and ignore messages sent to the server. The received messages are
// the exact bytes written to the stream. Zero byte messages are possible.
const binaryWebSocketProtocol = "binary.k8s.io"
// The WebSocket subprotocol "base64.binary.k8s.io" will only send messages to the
// client and ignore messages sent to the server. The received messages are
// a base64 version of the bytes written to the stream. Zero byte messages are
// possible.
const base64BinaryWebSocketProtocol = "base64.binary.k8s.io"
// ReaderProtocolConfig describes a websocket subprotocol with one stream.
type ReaderProtocolConfig struct {
Binary bool
}
// NewDefaultReaderProtocols returns a stream protocol map with the
// subprotocols "", "channel.k8s.io", "base64.channel.k8s.io".
func NewDefaultReaderProtocols() map[string]ReaderProtocolConfig {
return map[string]ReaderProtocolConfig{
"": {Binary: true},
binaryWebSocketProtocol: {Binary: true},
base64BinaryWebSocketProtocol: {Binary: false},
}
}
// Reader supports returning an arbitrary byte stream over a websocket channel.
type Reader struct {
err chan error
r io.Reader
ping bool
timeout time.Duration
protocols map[string]ReaderProtocolConfig
selectedProtocol string
handleCrash func(additionalHandlers ...func(interface{})) // overridable for testing
}
// NewReader creates a WebSocket pipe that will copy the contents of r to a provided
// WebSocket connection. If ping is true, a zero length message will be sent to the client
// before the stream begins reading.
//
// The protocols parameter maps subprotocol names to StreamProtocols. The empty string
// subprotocol name is used if websocket.Config.Protocol is empty.
func NewReader(r io.Reader, ping bool, protocols map[string]ReaderProtocolConfig) *Reader {
return &Reader{
r: r,
err: make(chan error),
ping: ping,
protocols: protocols,
handleCrash: runtime.HandleCrash,
}
}
// SetIdleTimeout sets the interval for both reads and writes before timeout. If not specified,
// there is no timeout on the reader.
func (r *Reader) SetIdleTimeout(duration time.Duration) {
r.timeout = duration
}
func (r *Reader) handshake(config *websocket.Config, req *http.Request) error {
supportedProtocols := make([]string, 0, len(r.protocols))
for p := range r.protocols {
supportedProtocols = append(supportedProtocols, p)
}
return handshake(config, req, supportedProtocols)
}
// Copy the reader to the response. The created WebSocket is closed after this
// method completes.
func (r *Reader) Copy(w http.ResponseWriter, req *http.Request) error {
go func() {
defer r.handleCrash()
websocket.Server{Handshake: r.handshake, Handler: r.handle}.ServeHTTP(w, req)
}()
return <-r.err
}
// handle implements a WebSocket handler.
func (r *Reader) handle(ws *websocket.Conn) {
// Close the connection when the client requests it, or when we finish streaming, whichever happens first
closeConnOnce := &sync.Once{}
closeConn := func() {
closeConnOnce.Do(func() {
ws.Close()
})
}
negotiated := ws.Config().Protocol
r.selectedProtocol = negotiated[0]
defer close(r.err)
defer closeConn()
go func() {
defer runtime.HandleCrash()
// This blocks until the connection is closed.
// Client should not send anything.
IgnoreReceives(ws, r.timeout)
// Once the client closes, we should also close
closeConn()
}()
r.err <- messageCopy(ws, r.r, !r.protocols[r.selectedProtocol].Binary, r.ping, r.timeout)
}
func resetTimeout(ws *websocket.Conn, timeout time.Duration) {
if timeout > 0 {
ws.SetDeadline(time.Now().Add(timeout))
}
}
func messageCopy(ws *websocket.Conn, r io.Reader, base64Encode, ping bool, timeout time.Duration) error {
buf := make([]byte, 2048)
if ping {
resetTimeout(ws, timeout)
if base64Encode {
if err := websocket.Message.Send(ws, ""); err != nil {
return err
}
} else {
if err := websocket.Message.Send(ws, []byte{}); err != nil {
return err
}
}
}
for {
resetTimeout(ws, timeout)
n, err := r.Read(buf)
if err != nil {
if err == io.EOF {
return nil
}
return err
}
if n > 0 {
if base64Encode {
if err := websocket.Message.Send(ws, base64.StdEncoding.EncodeToString(buf[:n])); err != nil {
return err
}
} else {
if err := websocket.Message.Send(ws, buf[:n]); err != nil {
return err
}
}
}
}
}