rebase: update kubernetes to latest

updating the kubernetes release to the
latest in main go.mod

Signed-off-by: Madhu Rajanna <madhupr007@gmail.com>
This commit is contained in:
Madhu Rajanna
2024-08-19 10:01:33 +02:00
committed by mergify[bot]
parent 63c4c05b35
commit 5a66991bb3
2173 changed files with 98906 additions and 61334 deletions

View File

@ -0,0 +1,146 @@
/*
Copyright 2023 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package consistencydetector
import (
"context"
"fmt"
"sort"
"time"
"github.com/google/go-cmp/cmp"
"k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/klog/v2"
)
type RetrieveItemsFunc[U any] func() []U
type ListFunc[T runtime.Object] func(ctx context.Context, options metav1.ListOptions) (T, error)
// CheckDataConsistency exists solely for testing purposes.
// we cannot use checkWatchListDataConsistencyIfRequested because
// it is guarded by an environmental variable.
// we cannot manipulate the environmental variable because
// it will affect other tests in this package.
func CheckDataConsistency[T runtime.Object, U any](ctx context.Context, identity string, lastSyncedResourceVersion string, listFn ListFunc[T], listOptions metav1.ListOptions, retrieveItemsFn RetrieveItemsFunc[U]) {
if !canFormAdditionalListCall(lastSyncedResourceVersion, listOptions) {
klog.V(4).Infof("data consistency check for %s is enabled but the parameters (RV, ListOptions) doesn't allow for creating a valid LIST request. Skipping the data consistency check.", identity)
return
}
klog.Warningf("data consistency check for %s is enabled, this will result in an additional call to the API server.", identity)
retrievedItems := toMetaObjectSliceOrDie(retrieveItemsFn())
listOptions = prepareListCallOptions(lastSyncedResourceVersion, listOptions, len(retrievedItems))
var list runtime.Object
err := wait.PollUntilContextCancel(ctx, time.Second, true, func(_ context.Context) (done bool, err error) {
list, err = listFn(ctx, listOptions)
if err != nil {
// the consistency check will only be enabled in the CI
// and LIST calls in general will be retired by the client-go library
// if we fail simply log and retry
klog.Errorf("failed to list data from the server, retrying until stopCh is closed, err: %v", err)
return false, nil
}
return true, nil
})
if err != nil {
klog.Errorf("failed to list data from the server, the data consistency check for %s won't be performed, stopCh was closed, err: %v", identity, err)
return
}
rawListItems, err := meta.ExtractListWithAlloc(list)
if err != nil {
panic(err) // this should never happen
}
listItems := toMetaObjectSliceOrDie(rawListItems)
sort.Sort(byUID(listItems))
sort.Sort(byUID(retrievedItems))
if !cmp.Equal(listItems, retrievedItems) {
klog.Infof("previously received data for %s is different than received by the standard list api call against etcd, diff: %v", identity, cmp.Diff(listItems, retrievedItems))
msg := fmt.Sprintf("data inconsistency detected for %s, panicking!", identity)
panic(msg)
}
}
// canFormAdditionalListCall ensures that we can form a valid LIST requests
// for checking data consistency.
func canFormAdditionalListCall(lastSyncedResourceVersion string, listOptions metav1.ListOptions) bool {
// since we are setting ResourceVersionMatch to metav1.ResourceVersionMatchExact
// we need to make sure that the continuation hasn't been set
// https://github.com/kubernetes/kubernetes/blob/be4afb9ef90b19ccb6f7e595cbdb247e088b2347/staging/src/k8s.io/apimachinery/pkg/apis/meta/internalversion/validation/validation.go#L38
if len(listOptions.Continue) > 0 {
return false
}
// since we are setting ResourceVersionMatch to metav1.ResourceVersionMatchExact
// we need to make sure that the RV is valid because the validation code forbids RV == "0"
// https://github.com/kubernetes/kubernetes/blob/be4afb9ef90b19ccb6f7e595cbdb247e088b2347/staging/src/k8s.io/apimachinery/pkg/apis/meta/internalversion/validation/validation.go#L44
if lastSyncedResourceVersion == "0" {
return false
}
return true
}
// prepareListCallOptions changes the input list options so that
// the list call goes directly to etcd
func prepareListCallOptions(lastSyncedResourceVersion string, listOptions metav1.ListOptions, retrievedItemsCount int) metav1.ListOptions {
// this is our legacy case:
//
// the watch cache skips the Limit if the ResourceVersion was set to "0"
// thus, to compare with data retrieved directly from etcd
// we need to skip the limit to for the list call as well.
//
// note that when the number of retrieved items is less than the request limit,
// it means either the watch cache is disabled, or there is not enough data.
// in both cases, we can use the limit because we will be able to compare
// the data with the items retrieved from etcd.
if listOptions.ResourceVersion == "0" && listOptions.Limit > 0 && int64(retrievedItemsCount) > listOptions.Limit {
listOptions.Limit = 0
}
// set the RV and RVM so that we get the snapshot of data
// directly from etcd.
listOptions.ResourceVersion = lastSyncedResourceVersion
listOptions.ResourceVersionMatch = metav1.ResourceVersionMatchExact
return listOptions
}
type byUID []metav1.Object
func (a byUID) Len() int { return len(a) }
func (a byUID) Less(i, j int) bool { return a[i].GetUID() < a[j].GetUID() }
func (a byUID) Swap(i, j int) { a[i], a[j] = a[j], a[i] }
func toMetaObjectSliceOrDie[T any](s []T) []metav1.Object {
result := make([]metav1.Object, len(s))
for i, v := range s {
m, err := meta.Accessor(v)
if err != nil {
panic(err)
}
result[i] = m
}
return result
}

View File

@ -0,0 +1,70 @@
/*
Copyright 2024 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package consistencydetector
import (
"context"
"os"
"strconv"
"k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
)
var dataConsistencyDetectionForListFromCacheEnabled = false
func init() {
dataConsistencyDetectionForListFromCacheEnabled, _ = strconv.ParseBool(os.Getenv("KUBE_LIST_FROM_CACHE_INCONSISTENCY_DETECTOR"))
}
// CheckListFromCacheDataConsistencyIfRequested performs a data consistency check only when
// the KUBE_LIST_FROM_CACHE_INCONSISTENCY_DETECTOR environment variable was set during a binary startup
// for requests that have a high chance of being served from the watch-cache.
//
// The consistency check is meant to be enforced only in the CI, not in production.
// The check ensures that data retrieved by a list api call from the watch-cache
// is exactly the same as data received by the list api call from etcd.
//
// Note that this function will panic when data inconsistency is detected.
// This is intentional because we want to catch it in the CI.
//
// Note that this function doesn't examine the ListOptions to determine
// if the original request has hit the cache because it would be challenging
// to maintain consistency with the server-side implementation.
// For simplicity, we assume that the first request retrieved data from
// the cache (even though this might not be true for some requests)
// and issue the second call to get data from etcd for comparison.
func CheckListFromCacheDataConsistencyIfRequested[T runtime.Object](ctx context.Context, identity string, listItemsFn ListFunc[T], optionsUsedToReceiveList metav1.ListOptions, receivedList runtime.Object) {
if !dataConsistencyDetectionForListFromCacheEnabled {
return
}
checkListFromCacheDataConsistencyIfRequestedInternal(ctx, identity, listItemsFn, optionsUsedToReceiveList, receivedList)
}
func checkListFromCacheDataConsistencyIfRequestedInternal[T runtime.Object](ctx context.Context, identity string, listItemsFn ListFunc[T], optionsUsedToReceiveList metav1.ListOptions, receivedList runtime.Object) {
receivedListMeta, err := meta.ListAccessor(receivedList)
if err != nil {
panic(err)
}
rawListItems, err := meta.ExtractListWithAlloc(receivedList)
if err != nil {
panic(err) // this should never happen
}
lastSyncedResourceVersion := receivedListMeta.GetResourceVersion()
CheckDataConsistency(ctx, identity, lastSyncedResourceVersion, listItemsFn, optionsUsedToReceiveList, func() []runtime.Object { return rawListItems })
}

View File

@ -0,0 +1,54 @@
/*
Copyright 2024 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package consistencydetector
import (
"context"
"os"
"strconv"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
)
var dataConsistencyDetectionForWatchListEnabled = false
func init() {
dataConsistencyDetectionForWatchListEnabled, _ = strconv.ParseBool(os.Getenv("KUBE_WATCHLIST_INCONSISTENCY_DETECTOR"))
}
// IsDataConsistencyDetectionForWatchListEnabled returns true when
// the KUBE_WATCHLIST_INCONSISTENCY_DETECTOR environment variable was set during a binary startup.
func IsDataConsistencyDetectionForWatchListEnabled() bool {
return dataConsistencyDetectionForWatchListEnabled
}
// CheckWatchListFromCacheDataConsistencyIfRequested performs a data consistency check only when
// the KUBE_WATCHLIST_INCONSISTENCY_DETECTOR environment variable was set during a binary startup.
//
// The consistency check is meant to be enforced only in the CI, not in production.
// The check ensures that data retrieved by the watch-list api call
// is exactly the same as data received by the standard list api call against etcd.
//
// Note that this function will panic when data inconsistency is detected.
// This is intentional because we want to catch it in the CI.
func CheckWatchListFromCacheDataConsistencyIfRequested[T runtime.Object](ctx context.Context, identity string, listItemsFn ListFunc[T], optionsUsedToReceiveList metav1.ListOptions, receivedList runtime.Object) {
if !IsDataConsistencyDetectionForWatchListEnabled() {
return
}
checkListFromCacheDataConsistencyIfRequestedInternal(ctx, identity, listItemsFn, optionsUsedToReceiveList, receivedList)
}

82
vendor/k8s.io/client-go/util/watchlist/watch_list.go generated vendored Normal file
View File

@ -0,0 +1,82 @@
/*
Copyright 2024 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package watchlist
import (
metainternalversion "k8s.io/apimachinery/pkg/apis/meta/internalversion"
metainternalversionvalidation "k8s.io/apimachinery/pkg/apis/meta/internalversion/validation"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
clientfeatures "k8s.io/client-go/features"
"k8s.io/utils/ptr"
)
var scheme = runtime.NewScheme()
func init() {
utilruntime.Must(metainternalversion.AddToScheme(scheme))
}
// PrepareWatchListOptionsFromListOptions creates a new ListOptions
// that can be used for a watch-list request from the given listOptions.
//
// This function also determines if the given listOptions can be used to form a watch-list request,
// which would result in streaming semantically equivalent data from the server.
func PrepareWatchListOptionsFromListOptions(listOptions metav1.ListOptions) (metav1.ListOptions, bool, error) {
if !clientfeatures.FeatureGates().Enabled(clientfeatures.WatchListClient) {
return metav1.ListOptions{}, false, nil
}
internalListOptions := &metainternalversion.ListOptions{}
if err := scheme.Convert(&listOptions, internalListOptions, nil); err != nil {
return metav1.ListOptions{}, false, err
}
if errs := metainternalversionvalidation.ValidateListOptions(internalListOptions, true); len(errs) > 0 {
return metav1.ListOptions{}, false, nil
}
watchListOptions := listOptions
// this is our legacy case, the cache ignores LIMIT for
// ResourceVersion == 0 and RVM=unset|NotOlderThan
if listOptions.Limit > 0 && listOptions.ResourceVersion != "0" {
return metav1.ListOptions{}, false, nil
}
watchListOptions.Limit = 0
// to ensure that we can create a watch-list request that returns
// semantically equivalent data for the given listOptions,
// we need to validate that the RVM for the list is supported by watch-list requests.
if listOptions.ResourceVersionMatch == metav1.ResourceVersionMatchExact {
return metav1.ListOptions{}, false, nil
}
watchListOptions.ResourceVersionMatch = metav1.ResourceVersionMatchNotOlderThan
watchListOptions.Watch = true
watchListOptions.AllowWatchBookmarks = true
watchListOptions.SendInitialEvents = ptr.To(true)
internalWatchListOptions := &metainternalversion.ListOptions{}
if err := scheme.Convert(&watchListOptions, internalWatchListOptions, nil); err != nil {
return metav1.ListOptions{}, false, err
}
if errs := metainternalversionvalidation.ValidateListOptions(internalWatchListOptions, true); len(errs) > 0 {
return metav1.ListOptions{}, false, nil
}
return watchListOptions, true, nil
}

View File

@ -24,49 +24,66 @@ import (
"golang.org/x/time/rate"
)
type RateLimiter interface {
// Deprecated: RateLimiter is deprecated, use TypedRateLimiter instead.
type RateLimiter TypedRateLimiter[any]
type TypedRateLimiter[T comparable] interface {
// When gets an item and gets to decide how long that item should wait
When(item interface{}) time.Duration
When(item T) time.Duration
// Forget indicates that an item is finished being retried. Doesn't matter whether it's for failing
// or for success, we'll stop tracking it
Forget(item interface{})
Forget(item T)
// NumRequeues returns back how many failures the item has had
NumRequeues(item interface{}) int
NumRequeues(item T) int
}
// DefaultControllerRateLimiter is a no-arg constructor for a default rate limiter for a workqueue. It has
// both overall and per-item rate limiting. The overall is a token bucket and the per-item is exponential
//
// Deprecated: Use DefaultTypedControllerRateLimiter instead.
func DefaultControllerRateLimiter() RateLimiter {
return NewMaxOfRateLimiter(
NewItemExponentialFailureRateLimiter(5*time.Millisecond, 1000*time.Second),
return DefaultTypedControllerRateLimiter[any]()
}
// DefaultTypedControllerRateLimiter is a no-arg constructor for a default rate limiter for a workqueue. It has
// both overall and per-item rate limiting. The overall is a token bucket and the per-item is exponential
func DefaultTypedControllerRateLimiter[T comparable]() TypedRateLimiter[T] {
return NewTypedMaxOfRateLimiter(
NewTypedItemExponentialFailureRateLimiter[T](5*time.Millisecond, 1000*time.Second),
// 10 qps, 100 bucket size. This is only for retry speed and its only the overall factor (not per item)
&BucketRateLimiter{Limiter: rate.NewLimiter(rate.Limit(10), 100)},
&TypedBucketRateLimiter[T]{Limiter: rate.NewLimiter(rate.Limit(10), 100)},
)
}
// BucketRateLimiter adapts a standard bucket to the workqueue ratelimiter API
type BucketRateLimiter struct {
// Deprecated: BucketRateLimiter is deprecated, use TypedBucketRateLimiter instead.
type BucketRateLimiter = TypedBucketRateLimiter[any]
// TypedBucketRateLimiter adapts a standard bucket to the workqueue ratelimiter API
type TypedBucketRateLimiter[T comparable] struct {
*rate.Limiter
}
var _ RateLimiter = &BucketRateLimiter{}
func (r *BucketRateLimiter) When(item interface{}) time.Duration {
func (r *TypedBucketRateLimiter[T]) When(item T) time.Duration {
return r.Limiter.Reserve().Delay()
}
func (r *BucketRateLimiter) NumRequeues(item interface{}) int {
func (r *TypedBucketRateLimiter[T]) NumRequeues(item T) int {
return 0
}
func (r *BucketRateLimiter) Forget(item interface{}) {
func (r *TypedBucketRateLimiter[T]) Forget(item T) {
}
// ItemExponentialFailureRateLimiter does a simple baseDelay*2^<num-failures> limit
// Deprecated: ItemExponentialFailureRateLimiter is deprecated, use TypedItemExponentialFailureRateLimiter instead.
type ItemExponentialFailureRateLimiter = TypedItemExponentialFailureRateLimiter[any]
// TypedItemExponentialFailureRateLimiter does a simple baseDelay*2^<num-failures> limit
// dealing with max failures and expiration are up to the caller
type ItemExponentialFailureRateLimiter struct {
type TypedItemExponentialFailureRateLimiter[T comparable] struct {
failuresLock sync.Mutex
failures map[interface{}]int
failures map[T]int
baseDelay time.Duration
maxDelay time.Duration
@ -74,19 +91,29 @@ type ItemExponentialFailureRateLimiter struct {
var _ RateLimiter = &ItemExponentialFailureRateLimiter{}
// Deprecated: NewItemExponentialFailureRateLimiter is deprecated, use NewTypedItemExponentialFailureRateLimiter instead.
func NewItemExponentialFailureRateLimiter(baseDelay time.Duration, maxDelay time.Duration) RateLimiter {
return &ItemExponentialFailureRateLimiter{
failures: map[interface{}]int{},
return NewTypedItemExponentialFailureRateLimiter[any](baseDelay, maxDelay)
}
func NewTypedItemExponentialFailureRateLimiter[T comparable](baseDelay time.Duration, maxDelay time.Duration) TypedRateLimiter[T] {
return &TypedItemExponentialFailureRateLimiter[T]{
failures: map[T]int{},
baseDelay: baseDelay,
maxDelay: maxDelay,
}
}
// Deprecated: DefaultItemBasedRateLimiter is deprecated, use DefaultTypedItemBasedRateLimiter instead.
func DefaultItemBasedRateLimiter() RateLimiter {
return NewItemExponentialFailureRateLimiter(time.Millisecond, 1000*time.Second)
return DefaultTypedItemBasedRateLimiter[any]()
}
func (r *ItemExponentialFailureRateLimiter) When(item interface{}) time.Duration {
func DefaultTypedItemBasedRateLimiter[T comparable]() TypedRateLimiter[T] {
return NewTypedItemExponentialFailureRateLimiter[T](time.Millisecond, 1000*time.Second)
}
func (r *TypedItemExponentialFailureRateLimiter[T]) When(item T) time.Duration {
r.failuresLock.Lock()
defer r.failuresLock.Unlock()
@ -107,14 +134,14 @@ func (r *ItemExponentialFailureRateLimiter) When(item interface{}) time.Duration
return calculated
}
func (r *ItemExponentialFailureRateLimiter) NumRequeues(item interface{}) int {
func (r *TypedItemExponentialFailureRateLimiter[T]) NumRequeues(item T) int {
r.failuresLock.Lock()
defer r.failuresLock.Unlock()
return r.failures[item]
}
func (r *ItemExponentialFailureRateLimiter) Forget(item interface{}) {
func (r *TypedItemExponentialFailureRateLimiter[T]) Forget(item T) {
r.failuresLock.Lock()
defer r.failuresLock.Unlock()
@ -122,9 +149,13 @@ func (r *ItemExponentialFailureRateLimiter) Forget(item interface{}) {
}
// ItemFastSlowRateLimiter does a quick retry for a certain number of attempts, then a slow retry after that
type ItemFastSlowRateLimiter struct {
// Deprecated: Use TypedItemFastSlowRateLimiter instead.
type ItemFastSlowRateLimiter = TypedItemFastSlowRateLimiter[any]
// TypedItemFastSlowRateLimiter does a quick retry for a certain number of attempts, then a slow retry after that
type TypedItemFastSlowRateLimiter[T comparable] struct {
failuresLock sync.Mutex
failures map[interface{}]int
failures map[T]int
maxFastAttempts int
fastDelay time.Duration
@ -133,16 +164,21 @@ type ItemFastSlowRateLimiter struct {
var _ RateLimiter = &ItemFastSlowRateLimiter{}
// Deprecated: NewItemFastSlowRateLimiter is deprecated, use NewTypedItemFastSlowRateLimiter instead.
func NewItemFastSlowRateLimiter(fastDelay, slowDelay time.Duration, maxFastAttempts int) RateLimiter {
return &ItemFastSlowRateLimiter{
failures: map[interface{}]int{},
return NewTypedItemFastSlowRateLimiter[any](fastDelay, slowDelay, maxFastAttempts)
}
func NewTypedItemFastSlowRateLimiter[T comparable](fastDelay, slowDelay time.Duration, maxFastAttempts int) TypedRateLimiter[T] {
return &TypedItemFastSlowRateLimiter[T]{
failures: map[T]int{},
fastDelay: fastDelay,
slowDelay: slowDelay,
maxFastAttempts: maxFastAttempts,
}
}
func (r *ItemFastSlowRateLimiter) When(item interface{}) time.Duration {
func (r *TypedItemFastSlowRateLimiter[T]) When(item T) time.Duration {
r.failuresLock.Lock()
defer r.failuresLock.Unlock()
@ -155,14 +191,14 @@ func (r *ItemFastSlowRateLimiter) When(item interface{}) time.Duration {
return r.slowDelay
}
func (r *ItemFastSlowRateLimiter) NumRequeues(item interface{}) int {
func (r *TypedItemFastSlowRateLimiter[T]) NumRequeues(item T) int {
r.failuresLock.Lock()
defer r.failuresLock.Unlock()
return r.failures[item]
}
func (r *ItemFastSlowRateLimiter) Forget(item interface{}) {
func (r *TypedItemFastSlowRateLimiter[T]) Forget(item T) {
r.failuresLock.Lock()
defer r.failuresLock.Unlock()
@ -172,11 +208,18 @@ func (r *ItemFastSlowRateLimiter) Forget(item interface{}) {
// MaxOfRateLimiter calls every RateLimiter and returns the worst case response
// When used with a token bucket limiter, the burst could be apparently exceeded in cases where particular items
// were separately delayed a longer time.
type MaxOfRateLimiter struct {
limiters []RateLimiter
//
// Deprecated: Use TypedMaxOfRateLimiter instead.
type MaxOfRateLimiter = TypedMaxOfRateLimiter[any]
// TypedMaxOfRateLimiter calls every RateLimiter and returns the worst case response
// When used with a token bucket limiter, the burst could be apparently exceeded in cases where particular items
// were separately delayed a longer time.
type TypedMaxOfRateLimiter[T comparable] struct {
limiters []TypedRateLimiter[T]
}
func (r *MaxOfRateLimiter) When(item interface{}) time.Duration {
func (r *TypedMaxOfRateLimiter[T]) When(item T) time.Duration {
ret := time.Duration(0)
for _, limiter := range r.limiters {
curr := limiter.When(item)
@ -188,11 +231,16 @@ func (r *MaxOfRateLimiter) When(item interface{}) time.Duration {
return ret
}
func NewMaxOfRateLimiter(limiters ...RateLimiter) RateLimiter {
return &MaxOfRateLimiter{limiters: limiters}
// Deprecated: NewMaxOfRateLimiter is deprecated, use NewTypedMaxOfRateLimiter instead.
func NewMaxOfRateLimiter(limiters ...TypedRateLimiter[any]) RateLimiter {
return NewTypedMaxOfRateLimiter(limiters...)
}
func (r *MaxOfRateLimiter) NumRequeues(item interface{}) int {
func NewTypedMaxOfRateLimiter[T comparable](limiters ...TypedRateLimiter[T]) TypedRateLimiter[T] {
return &TypedMaxOfRateLimiter[T]{limiters: limiters}
}
func (r *TypedMaxOfRateLimiter[T]) NumRequeues(item T) int {
ret := 0
for _, limiter := range r.limiters {
curr := limiter.NumRequeues(item)
@ -204,23 +252,32 @@ func (r *MaxOfRateLimiter) NumRequeues(item interface{}) int {
return ret
}
func (r *MaxOfRateLimiter) Forget(item interface{}) {
func (r *TypedMaxOfRateLimiter[T]) Forget(item T) {
for _, limiter := range r.limiters {
limiter.Forget(item)
}
}
// WithMaxWaitRateLimiter have maxDelay which avoids waiting too long
type WithMaxWaitRateLimiter struct {
limiter RateLimiter
// Deprecated: Use TypedWithMaxWaitRateLimiter instead.
type WithMaxWaitRateLimiter = TypedWithMaxWaitRateLimiter[any]
// TypedWithMaxWaitRateLimiter have maxDelay which avoids waiting too long
type TypedWithMaxWaitRateLimiter[T comparable] struct {
limiter TypedRateLimiter[T]
maxDelay time.Duration
}
// Deprecated: NewWithMaxWaitRateLimiter is deprecated, use NewTypedWithMaxWaitRateLimiter instead.
func NewWithMaxWaitRateLimiter(limiter RateLimiter, maxDelay time.Duration) RateLimiter {
return &WithMaxWaitRateLimiter{limiter: limiter, maxDelay: maxDelay}
return NewTypedWithMaxWaitRateLimiter[any](limiter, maxDelay)
}
func (w WithMaxWaitRateLimiter) When(item interface{}) time.Duration {
func NewTypedWithMaxWaitRateLimiter[T comparable](limiter TypedRateLimiter[T], maxDelay time.Duration) TypedRateLimiter[T] {
return &TypedWithMaxWaitRateLimiter[T]{limiter: limiter, maxDelay: maxDelay}
}
func (w TypedWithMaxWaitRateLimiter[T]) When(item T) time.Duration {
delay := w.limiter.When(item)
if delay > w.maxDelay {
return w.maxDelay
@ -229,10 +286,10 @@ func (w WithMaxWaitRateLimiter) When(item interface{}) time.Duration {
return delay
}
func (w WithMaxWaitRateLimiter) Forget(item interface{}) {
func (w TypedWithMaxWaitRateLimiter[T]) Forget(item T) {
w.limiter.Forget(item)
}
func (w WithMaxWaitRateLimiter) NumRequeues(item interface{}) int {
func (w TypedWithMaxWaitRateLimiter[T]) NumRequeues(item T) int {
return w.limiter.NumRequeues(item)
}

View File

@ -27,14 +27,25 @@ import (
// DelayingInterface is an Interface that can Add an item at a later time. This makes it easier to
// requeue items after failures without ending up in a hot-loop.
type DelayingInterface interface {
Interface
//
// Deprecated: use TypedDelayingInterface instead.
type DelayingInterface TypedDelayingInterface[any]
// TypedDelayingInterface is an Interface that can Add an item at a later time. This makes it easier to
// requeue items after failures without ending up in a hot-loop.
type TypedDelayingInterface[T comparable] interface {
TypedInterface[T]
// AddAfter adds an item to the workqueue after the indicated duration has passed
AddAfter(item interface{}, duration time.Duration)
AddAfter(item T, duration time.Duration)
}
// DelayingQueueConfig specifies optional configurations to customize a DelayingInterface.
type DelayingQueueConfig struct {
//
// Deprecated: use TypedDelayingQueueConfig instead.
type DelayingQueueConfig = TypedDelayingQueueConfig[any]
// TypedDelayingQueueConfig specifies optional configurations to customize a DelayingInterface.
type TypedDelayingQueueConfig[T comparable] struct {
// Name for the queue. If unnamed, the metrics will not be registered.
Name string
@ -46,25 +57,42 @@ type DelayingQueueConfig struct {
Clock clock.WithTicker
// Queue optionally allows injecting custom queue Interface instead of the default one.
Queue Interface
Queue TypedInterface[T]
}
// NewDelayingQueue constructs a new workqueue with delayed queuing ability.
// NewDelayingQueue does not emit metrics. For use with a MetricsProvider, please use
// NewDelayingQueueWithConfig instead and specify a name.
//
// Deprecated: use TypedNewDelayingQueue instead.
func NewDelayingQueue() DelayingInterface {
return NewDelayingQueueWithConfig(DelayingQueueConfig{})
}
// TypedNewDelayingQueue constructs a new workqueue with delayed queuing ability.
// TypedNewDelayingQueue does not emit metrics. For use with a MetricsProvider, please use
// TypedNewDelayingQueueWithConfig instead and specify a name.
func TypedNewDelayingQueue[T comparable]() TypedDelayingInterface[T] {
return NewTypedDelayingQueueWithConfig(TypedDelayingQueueConfig[T]{})
}
// NewDelayingQueueWithConfig constructs a new workqueue with options to
// customize different properties.
//
// Deprecated: use TypedNewDelayingQueueWithConfig instead.
func NewDelayingQueueWithConfig(config DelayingQueueConfig) DelayingInterface {
return NewTypedDelayingQueueWithConfig[any](config)
}
// NewTypedDelayingQueueWithConfig constructs a new workqueue with options to
// customize different properties.
func NewTypedDelayingQueueWithConfig[T comparable](config TypedDelayingQueueConfig[T]) TypedDelayingInterface[T] {
if config.Clock == nil {
config.Clock = clock.RealClock{}
}
if config.Queue == nil {
config.Queue = NewWithConfig(QueueConfig{
config.Queue = NewTypedWithConfig[T](TypedQueueConfig[T]{
Name: config.Name,
MetricsProvider: config.MetricsProvider,
Clock: config.Clock,
@ -100,9 +128,9 @@ func NewDelayingQueueWithCustomClock(clock clock.WithTicker, name string) Delayi
})
}
func newDelayingQueue(clock clock.WithTicker, q Interface, name string, provider MetricsProvider) *delayingType {
ret := &delayingType{
Interface: q,
func newDelayingQueue[T comparable](clock clock.WithTicker, q TypedInterface[T], name string, provider MetricsProvider) *delayingType[T] {
ret := &delayingType[T]{
TypedInterface: q,
clock: clock,
heartbeat: clock.NewTicker(maxWait),
stopCh: make(chan struct{}),
@ -115,8 +143,8 @@ func newDelayingQueue(clock clock.WithTicker, q Interface, name string, provider
}
// delayingType wraps an Interface and provides delayed re-enquing
type delayingType struct {
Interface
type delayingType[T comparable] struct {
TypedInterface[T]
// clock tracks time for delayed firing
clock clock.Clock
@ -193,16 +221,16 @@ func (pq waitForPriorityQueue) Peek() interface{} {
// ShutDown stops the queue. After the queue drains, the returned shutdown bool
// on Get() will be true. This method may be invoked more than once.
func (q *delayingType) ShutDown() {
func (q *delayingType[T]) ShutDown() {
q.stopOnce.Do(func() {
q.Interface.ShutDown()
q.TypedInterface.ShutDown()
close(q.stopCh)
q.heartbeat.Stop()
})
}
// AddAfter adds the given item to the work queue after the given delay
func (q *delayingType) AddAfter(item interface{}, duration time.Duration) {
func (q *delayingType[T]) AddAfter(item T, duration time.Duration) {
// don't add if we're already shutting down
if q.ShuttingDown() {
return
@ -229,7 +257,7 @@ func (q *delayingType) AddAfter(item interface{}, duration time.Duration) {
const maxWait = 10 * time.Second
// waitingLoop runs until the workqueue is shutdown and keeps a check on the list of items to be added.
func (q *delayingType) waitingLoop() {
func (q *delayingType[T]) waitingLoop() {
defer utilruntime.HandleCrash()
// Make a placeholder channel to use when there are no items in our list
@ -244,7 +272,7 @@ func (q *delayingType) waitingLoop() {
waitingEntryByData := map[t]*waitFor{}
for {
if q.Interface.ShuttingDown() {
if q.TypedInterface.ShuttingDown() {
return
}
@ -258,7 +286,7 @@ func (q *delayingType) waitingLoop() {
}
entry = heap.Pop(waitingForQueue).(*waitFor)
q.Add(entry.data)
q.Add(entry.data.(T))
delete(waitingEntryByData, entry.data)
}
@ -287,7 +315,7 @@ func (q *delayingType) waitingLoop() {
if waitEntry.readyAt.After(q.clock.Now()) {
insert(waitingForQueue, waitingEntryByData, waitEntry)
} else {
q.Add(waitEntry.data)
q.Add(waitEntry.data.(T))
}
drained := false
@ -297,7 +325,7 @@ func (q *delayingType) waitingLoop() {
if waitEntry.readyAt.After(q.clock.Now()) {
insert(waitingForQueue, waitingEntryByData, waitEntry)
} else {
q.Add(waitEntry.data)
q.Add(waitEntry.data.(T))
}
default:
drained = true

View File

@ -23,18 +23,66 @@ import (
"k8s.io/utils/clock"
)
type Interface interface {
Add(item interface{})
// Deprecated: Interface is deprecated, use TypedInterface instead.
type Interface TypedInterface[any]
type TypedInterface[T comparable] interface {
Add(item T)
Len() int
Get() (item interface{}, shutdown bool)
Done(item interface{})
Get() (item T, shutdown bool)
Done(item T)
ShutDown()
ShutDownWithDrain()
ShuttingDown() bool
}
// Queue is the underlying storage for items. The functions below are always
// called from the same goroutine.
type Queue[T comparable] interface {
// Touch can be hooked when an existing item is added again. This may be
// useful if the implementation allows priority change for the given item.
Touch(item T)
// Push adds a new item.
Push(item T)
// Len tells the total number of items.
Len() int
// Pop retrieves an item.
Pop() (item T)
}
// DefaultQueue is a slice based FIFO queue.
func DefaultQueue[T comparable]() Queue[T] {
return new(queue[T])
}
// queue is a slice which implements Queue.
type queue[T comparable] []T
func (q *queue[T]) Touch(item T) {}
func (q *queue[T]) Push(item T) {
*q = append(*q, item)
}
func (q *queue[T]) Len() int {
return len(*q)
}
func (q *queue[T]) Pop() (item T) {
item = (*q)[0]
// The underlying array still exists and reference this object, so the object will not be garbage collected.
(*q)[0] = *new(T)
*q = (*q)[1:]
return item
}
// QueueConfig specifies optional configurations to customize an Interface.
type QueueConfig struct {
// Deprecated: use TypedQueueConfig instead.
type QueueConfig = TypedQueueConfig[any]
type TypedQueueConfig[T comparable] struct {
// Name for the queue. If unnamed, the metrics will not be registered.
Name string
@ -44,18 +92,38 @@ type QueueConfig struct {
// Clock ability to inject real or fake clock for testing purposes.
Clock clock.WithTicker
// Queue provides the underlying queue to use. It is optional and defaults to slice based FIFO queue.
Queue Queue[T]
}
// New constructs a new work queue (see the package comment).
//
// Deprecated: use NewTyped instead.
func New() *Type {
return NewWithConfig(QueueConfig{
Name: "",
})
}
// NewTyped constructs a new work queue (see the package comment).
func NewTyped[T comparable]() *Typed[T] {
return NewTypedWithConfig(TypedQueueConfig[T]{
Name: "",
})
}
// NewWithConfig constructs a new workqueue with ability to
// customize different properties.
//
// Deprecated: use NewTypedWithConfig instead.
func NewWithConfig(config QueueConfig) *Type {
return NewTypedWithConfig(config)
}
// NewTypedWithConfig constructs a new workqueue with ability to
// customize different properties.
func NewTypedWithConfig[T comparable](config TypedQueueConfig[T]) *Typed[T] {
return newQueueWithConfig(config, defaultUnfinishedWorkUpdatePeriod)
}
@ -69,7 +137,7 @@ func NewNamed(name string) *Type {
// newQueueWithConfig constructs a new named workqueue
// with the ability to customize different properties for testing purposes
func newQueueWithConfig(config QueueConfig, updatePeriod time.Duration) *Type {
func newQueueWithConfig[T comparable](config TypedQueueConfig[T], updatePeriod time.Duration) *Typed[T] {
var metricsFactory *queueMetricsFactory
if config.MetricsProvider != nil {
metricsFactory = &queueMetricsFactory{
@ -83,18 +151,24 @@ func newQueueWithConfig(config QueueConfig, updatePeriod time.Duration) *Type {
config.Clock = clock.RealClock{}
}
if config.Queue == nil {
config.Queue = DefaultQueue[T]()
}
return newQueue(
config.Clock,
config.Queue,
metricsFactory.newQueueMetrics(config.Name, config.Clock),
updatePeriod,
)
}
func newQueue(c clock.WithTicker, metrics queueMetrics, updatePeriod time.Duration) *Type {
t := &Type{
func newQueue[T comparable](c clock.WithTicker, queue Queue[T], metrics queueMetrics, updatePeriod time.Duration) *Typed[T] {
t := &Typed[T]{
clock: c,
dirty: set{},
processing: set{},
queue: queue,
dirty: set[T]{},
processing: set[T]{},
cond: sync.NewCond(&sync.Mutex{}),
metrics: metrics,
unfinishedWorkUpdatePeriod: updatePeriod,
@ -112,20 +186,23 @@ func newQueue(c clock.WithTicker, metrics queueMetrics, updatePeriod time.Durati
const defaultUnfinishedWorkUpdatePeriod = 500 * time.Millisecond
// Type is a work queue (see the package comment).
type Type struct {
// Deprecated: Use Typed instead.
type Type = Typed[any]
type Typed[t comparable] struct {
// queue defines the order in which we will work on items. Every
// element of queue should be in the dirty set and not in the
// processing set.
queue []t
queue Queue[t]
// dirty defines all of the items that need to be processed.
dirty set
dirty set[t]
// Things that are currently being processed are in the processing set.
// These things may be simultaneously in the dirty set. When we finish
// processing something and remove it from this set, we'll check if
// it's in the dirty set, and if so, add it to the queue.
processing set
processing set[t]
cond *sync.Cond
@ -140,33 +217,38 @@ type Type struct {
type empty struct{}
type t interface{}
type set map[t]empty
type set[t comparable] map[t]empty
func (s set) has(item t) bool {
func (s set[t]) has(item t) bool {
_, exists := s[item]
return exists
}
func (s set) insert(item t) {
func (s set[t]) insert(item t) {
s[item] = empty{}
}
func (s set) delete(item t) {
func (s set[t]) delete(item t) {
delete(s, item)
}
func (s set) len() int {
func (s set[t]) len() int {
return len(s)
}
// Add marks item as needing processing.
func (q *Type) Add(item interface{}) {
func (q *Typed[T]) Add(item T) {
q.cond.L.Lock()
defer q.cond.L.Unlock()
if q.shuttingDown {
return
}
if q.dirty.has(item) {
// the same item is added again before it is processed, call the Touch
// function if the queue cares about it (for e.g, reset its priority)
if !q.processing.has(item) {
q.queue.Touch(item)
}
return
}
@ -177,37 +259,34 @@ func (q *Type) Add(item interface{}) {
return
}
q.queue = append(q.queue, item)
q.queue.Push(item)
q.cond.Signal()
}
// Len returns the current queue length, for informational purposes only. You
// shouldn't e.g. gate a call to Add() or Get() on Len() being a particular
// value, that can't be synchronized properly.
func (q *Type) Len() int {
func (q *Typed[T]) Len() int {
q.cond.L.Lock()
defer q.cond.L.Unlock()
return len(q.queue)
return q.queue.Len()
}
// Get blocks until it can return an item to be processed. If shutdown = true,
// the caller should end their goroutine. You must call Done with item when you
// have finished processing it.
func (q *Type) Get() (item interface{}, shutdown bool) {
func (q *Typed[T]) Get() (item T, shutdown bool) {
q.cond.L.Lock()
defer q.cond.L.Unlock()
for len(q.queue) == 0 && !q.shuttingDown {
for q.queue.Len() == 0 && !q.shuttingDown {
q.cond.Wait()
}
if len(q.queue) == 0 {
if q.queue.Len() == 0 {
// We must be shutting down.
return nil, true
return *new(T), true
}
item = q.queue[0]
// The underlying array still exists and reference this object, so the object will not be garbage collected.
q.queue[0] = nil
q.queue = q.queue[1:]
item = q.queue.Pop()
q.metrics.get(item)
@ -220,7 +299,7 @@ func (q *Type) Get() (item interface{}, shutdown bool) {
// Done marks item as done processing, and if it has been marked as dirty again
// while it was being processed, it will be re-added to the queue for
// re-processing.
func (q *Type) Done(item interface{}) {
func (q *Typed[T]) Done(item T) {
q.cond.L.Lock()
defer q.cond.L.Unlock()
@ -228,7 +307,7 @@ func (q *Type) Done(item interface{}) {
q.processing.delete(item)
if q.dirty.has(item) {
q.queue = append(q.queue, item)
q.queue.Push(item)
q.cond.Signal()
} else if q.processing.len() == 0 {
q.cond.Signal()
@ -237,7 +316,7 @@ func (q *Type) Done(item interface{}) {
// ShutDown will cause q to ignore all new items added to it and
// immediately instruct the worker goroutines to exit.
func (q *Type) ShutDown() {
func (q *Typed[T]) ShutDown() {
q.cond.L.Lock()
defer q.cond.L.Unlock()
@ -255,7 +334,7 @@ func (q *Type) ShutDown() {
// indefinitely. It is, however, safe to call ShutDown after having called
// ShutDownWithDrain, as to force the queue shut down to terminate immediately
// without waiting for the drainage.
func (q *Type) ShutDownWithDrain() {
func (q *Typed[T]) ShutDownWithDrain() {
q.cond.L.Lock()
defer q.cond.L.Unlock()
@ -268,14 +347,14 @@ func (q *Type) ShutDownWithDrain() {
}
}
func (q *Type) ShuttingDown() bool {
func (q *Typed[T]) ShuttingDown() bool {
q.cond.L.Lock()
defer q.cond.L.Unlock()
return q.shuttingDown
}
func (q *Type) updateUnfinishedWorkLoop() {
func (q *Typed[T]) updateUnfinishedWorkLoop() {
t := q.clock.NewTicker(q.unfinishedWorkUpdatePeriod)
defer t.Stop()
for range t.C() {

View File

@ -19,24 +19,33 @@ package workqueue
import "k8s.io/utils/clock"
// RateLimitingInterface is an interface that rate limits items being added to the queue.
type RateLimitingInterface interface {
DelayingInterface
//
// Deprecated: Use TypedRateLimitingInterface instead.
type RateLimitingInterface TypedRateLimitingInterface[any]
// TypedRateLimitingInterface is an interface that rate limits items being added to the queue.
type TypedRateLimitingInterface[T comparable] interface {
TypedDelayingInterface[T]
// AddRateLimited adds an item to the workqueue after the rate limiter says it's ok
AddRateLimited(item interface{})
AddRateLimited(item T)
// Forget indicates that an item is finished being retried. Doesn't matter whether it's for perm failing
// or for success, we'll stop the rate limiter from tracking it. This only clears the `rateLimiter`, you
// still have to call `Done` on the queue.
Forget(item interface{})
Forget(item T)
// NumRequeues returns back how many times the item was requeued
NumRequeues(item interface{}) int
NumRequeues(item T) int
}
// RateLimitingQueueConfig specifies optional configurations to customize a RateLimitingInterface.
//
// Deprecated: Use TypedRateLimitingQueueConfig instead.
type RateLimitingQueueConfig = TypedRateLimitingQueueConfig[any]
type RateLimitingQueueConfig struct {
// TypedRateLimitingQueueConfig specifies optional configurations to customize a TypedRateLimitingInterface.
type TypedRateLimitingQueueConfig[T comparable] struct {
// Name for the queue. If unnamed, the metrics will not be registered.
Name string
@ -48,36 +57,55 @@ type RateLimitingQueueConfig struct {
Clock clock.WithTicker
// DelayingQueue optionally allows injecting custom delaying queue DelayingInterface instead of the default one.
DelayingQueue DelayingInterface
DelayingQueue TypedDelayingInterface[T]
}
// NewRateLimitingQueue constructs a new workqueue with rateLimited queuing ability
// Remember to call Forget! If you don't, you may end up tracking failures forever.
// NewRateLimitingQueue does not emit metrics. For use with a MetricsProvider, please use
// NewRateLimitingQueueWithConfig instead and specify a name.
//
// Deprecated: Use NewTypedRateLimitingQueue instead.
func NewRateLimitingQueue(rateLimiter RateLimiter) RateLimitingInterface {
return NewRateLimitingQueueWithConfig(rateLimiter, RateLimitingQueueConfig{})
}
// NewTypedRateLimitingQueue constructs a new workqueue with rateLimited queuing ability
// Remember to call Forget! If you don't, you may end up tracking failures forever.
// NewTypedRateLimitingQueue does not emit metrics. For use with a MetricsProvider, please use
// NewTypedRateLimitingQueueWithConfig instead and specify a name.
func NewTypedRateLimitingQueue[T comparable](rateLimiter TypedRateLimiter[T]) TypedRateLimitingInterface[T] {
return NewTypedRateLimitingQueueWithConfig(rateLimiter, TypedRateLimitingQueueConfig[T]{})
}
// NewRateLimitingQueueWithConfig constructs a new workqueue with rateLimited queuing ability
// with options to customize different properties.
// Remember to call Forget! If you don't, you may end up tracking failures forever.
//
// Deprecated: Use NewTypedRateLimitingQueueWithConfig instead.
func NewRateLimitingQueueWithConfig(rateLimiter RateLimiter, config RateLimitingQueueConfig) RateLimitingInterface {
return NewTypedRateLimitingQueueWithConfig(rateLimiter, config)
}
// NewTypedRateLimitingQueueWithConfig constructs a new workqueue with rateLimited queuing ability
// with options to customize different properties.
// Remember to call Forget! If you don't, you may end up tracking failures forever.
func NewTypedRateLimitingQueueWithConfig[T comparable](rateLimiter TypedRateLimiter[T], config TypedRateLimitingQueueConfig[T]) TypedRateLimitingInterface[T] {
if config.Clock == nil {
config.Clock = clock.RealClock{}
}
if config.DelayingQueue == nil {
config.DelayingQueue = NewDelayingQueueWithConfig(DelayingQueueConfig{
config.DelayingQueue = NewTypedDelayingQueueWithConfig(TypedDelayingQueueConfig[T]{
Name: config.Name,
MetricsProvider: config.MetricsProvider,
Clock: config.Clock,
})
}
return &rateLimitingType{
DelayingInterface: config.DelayingQueue,
rateLimiter: rateLimiter,
return &rateLimitingType[T]{
TypedDelayingInterface: config.DelayingQueue,
rateLimiter: rateLimiter,
}
}
@ -99,21 +127,21 @@ func NewRateLimitingQueueWithDelayingInterface(di DelayingInterface, rateLimiter
}
// rateLimitingType wraps an Interface and provides rateLimited re-enquing
type rateLimitingType struct {
DelayingInterface
type rateLimitingType[T comparable] struct {
TypedDelayingInterface[T]
rateLimiter RateLimiter
rateLimiter TypedRateLimiter[T]
}
// AddRateLimited AddAfter's the item based on the time when the rate limiter says it's ok
func (q *rateLimitingType) AddRateLimited(item interface{}) {
q.DelayingInterface.AddAfter(item, q.rateLimiter.When(item))
func (q *rateLimitingType[T]) AddRateLimited(item T) {
q.TypedDelayingInterface.AddAfter(item, q.rateLimiter.When(item))
}
func (q *rateLimitingType) NumRequeues(item interface{}) int {
func (q *rateLimitingType[T]) NumRequeues(item T) int {
return q.rateLimiter.NumRequeues(item)
}
func (q *rateLimitingType) Forget(item interface{}) {
func (q *rateLimitingType[T]) Forget(item T) {
q.rateLimiter.Forget(item)
}