rebase: bump the k8s-dependencies group in /e2e with 3 updates

Bumps the k8s-dependencies group in /e2e with 3 updates: [k8s.io/apimachinery](https://github.com/kubernetes/apimachinery), [k8s.io/cloud-provider](https://github.com/kubernetes/cloud-provider) and [k8s.io/pod-security-admission](https://github.com/kubernetes/pod-security-admission).


Updates `k8s.io/apimachinery` from 0.32.3 to 0.33.0
- [Commits](https://github.com/kubernetes/apimachinery/compare/v0.32.3...v0.33.0)

Updates `k8s.io/cloud-provider` from 0.32.3 to 0.33.0
- [Commits](https://github.com/kubernetes/cloud-provider/compare/v0.32.3...v0.33.0)

Updates `k8s.io/pod-security-admission` from 0.32.3 to 0.33.0
- [Commits](https://github.com/kubernetes/pod-security-admission/compare/v0.32.3...v0.33.0)

---
updated-dependencies:
- dependency-name: k8s.io/apimachinery
  dependency-version: 0.33.0
  dependency-type: direct:production
  update-type: version-update:semver-minor
  dependency-group: k8s-dependencies
- dependency-name: k8s.io/cloud-provider
  dependency-version: 0.33.0
  dependency-type: direct:production
  update-type: version-update:semver-minor
  dependency-group: k8s-dependencies
- dependency-name: k8s.io/pod-security-admission
  dependency-version: 0.33.0
  dependency-type: direct:production
  update-type: version-update:semver-minor
  dependency-group: k8s-dependencies
...

Signed-off-by: dependabot[bot] <support@github.com>
This commit is contained in:
dependabot[bot]
2025-05-06 11:20:01 +00:00
committed by mergify[bot]
parent d52dc2c4ba
commit dd77e72800
359 changed files with 11145 additions and 18557 deletions

View File

@ -42,7 +42,9 @@ import (
"k8s.io/apiserver/pkg/endpoints/request"
"k8s.io/apiserver/pkg/features"
"k8s.io/apiserver/pkg/storage"
"k8s.io/apiserver/pkg/storage/cacher/delegator"
"k8s.io/apiserver/pkg/storage/cacher/metrics"
"k8s.io/apiserver/pkg/storage/cacher/progress"
etcdfeature "k8s.io/apiserver/pkg/storage/feature"
utilfeature "k8s.io/apiserver/pkg/util/feature"
"k8s.io/client-go/tools/cache"
@ -61,10 +63,16 @@ const (
// storageWatchListPageSize is the cacher's request chunk size of
// initial and resync watch lists to storage.
storageWatchListPageSize = int64(10000)
// DefaultEventFreshDuration is the default time duration of events
// we want to keep.
// We set it to defaultBookmarkFrequency plus epsilon to maximize
// chances that last bookmark was sent within kept history, at the
// same time, minimizing the needed memory usage.
DefaultEventFreshDuration = defaultBookmarkFrequency + 15*time.Second
// defaultBookmarkFrequency defines how frequently watch bookmarks should be send
// in addition to sending a bookmark right before watch deadline.
//
// NOTE: Update `eventFreshDuration` when changing this value.
defaultBookmarkFrequency = time.Minute
)
@ -80,6 +88,10 @@ type Config struct {
// and metrics.
GroupResource schema.GroupResource
// EventsHistoryWindow specifies minimum history duration that storage is keeping.
// If lower than DefaultEventFreshDuration, the cache creation will fail.
EventsHistoryWindow time.Duration
// The Cache will be caching objects of a given Type and assumes that they
// are all stored under ResourcePrefix directory in the underlying database.
ResourcePrefix string
@ -329,10 +341,6 @@ type Cacher struct {
expiredBookmarkWatchers []*cacheWatcher
}
func (c *Cacher) RequestWatchProgress(ctx context.Context) error {
return c.storage.RequestWatchProgress(ctx)
}
// NewCacherFromConfig creates a new Cacher responsible for servicing WATCH and LIST requests from
// its internal cache and updating its cache in the background based on the
// given configuration.
@ -368,7 +376,7 @@ func NewCacherFromConfig(config Config) (*Cacher, error) {
objType := reflect.TypeOf(obj)
cacher := &Cacher{
resourcePrefix: config.ResourcePrefix,
ready: newReady(),
ready: newReady(config.Clock),
storage: config.Storage,
objectType: objType,
groupResource: config.GroupResource,
@ -409,9 +417,15 @@ func NewCacherFromConfig(config Config) (*Cacher, error) {
contextMetadata = metadata.New(map[string]string{"source": "cache"})
}
progressRequester := newConditionalProgressRequester(config.Storage.RequestWatchProgress, config.Clock, contextMetadata)
eventFreshDuration := config.EventsHistoryWindow
if eventFreshDuration < DefaultEventFreshDuration {
return nil, fmt.Errorf("config.EventsHistoryWindow (%v) must not be below %v", eventFreshDuration, DefaultEventFreshDuration)
}
progressRequester := progress.NewConditionalProgressRequester(config.Storage.RequestWatchProgress, config.Clock, contextMetadata)
watchCache := newWatchCache(
config.KeyFunc, cacher.processEvent, config.GetAttrsFunc, config.Versioner, config.Indexers, config.Clock, config.GroupResource, progressRequester)
config.KeyFunc, cacher.processEvent, config.GetAttrsFunc, config.Versioner, config.Indexers,
config.Clock, eventFreshDuration, config.GroupResource, progressRequester)
listerWatcher := NewListerWatcher(config.Storage, config.ResourcePrefix, config.NewListFunc, contextMetadata)
reflectorName := "storage/cacher.go:" + config.ResourcePrefix
@ -450,85 +464,30 @@ func NewCacherFromConfig(config Config) (*Cacher, error) {
}
func (c *Cacher) startCaching(stopChannel <-chan struct{}) {
// The 'usable' lock is always 'RLock'able when it is safe to use the cache.
// It is safe to use the cache after a successful list until a disconnection.
// We start with usable (write) locked. The below OnReplace function will
// unlock it after a successful list. The below defer will then re-lock
// it when this function exits (always due to disconnection), only if
// we actually got a successful list. This cycle will repeat as needed.
successfulList := false
c.watchCache.SetOnReplace(func() {
successfulList = true
c.ready.set(true)
c.ready.setReady()
klog.V(1).Infof("cacher (%v): initialized", c.groupResource.String())
metrics.WatchCacheInitializations.WithLabelValues(c.groupResource.String()).Inc()
})
var err error
defer func() {
if successfulList {
c.ready.set(false)
}
c.ready.setError(err)
}()
c.terminateAllWatchers()
// Note that since onReplace may be not called due to errors, we explicitly
// need to retry it on errors under lock.
// Also note that startCaching is called in a loop, so there's no need
// to have another loop here.
if err := c.reflector.ListAndWatch(stopChannel); err != nil {
err = c.reflector.ListAndWatch(stopChannel)
if err != nil {
klog.Errorf("cacher (%v): unexpected ListAndWatch error: %v; reinitializing...", c.groupResource.String(), err)
}
}
// Versioner implements storage.Interface.
func (c *Cacher) Versioner() storage.Versioner {
return c.storage.Versioner()
}
// Create implements storage.Interface.
func (c *Cacher) Create(ctx context.Context, key string, obj, out runtime.Object, ttl uint64) error {
return c.storage.Create(ctx, key, obj, out, ttl)
}
// Delete implements storage.Interface.
func (c *Cacher) Delete(
ctx context.Context, key string, out runtime.Object, preconditions *storage.Preconditions,
validateDeletion storage.ValidateObjectFunc, _ runtime.Object, opts storage.DeleteOptions) error {
// Ignore the suggestion and try to pass down the current version of the object
// read from cache.
if elem, exists, err := c.watchCache.GetByKey(key); err != nil {
klog.Errorf("GetByKey returned error: %v", err)
} else if exists {
// DeepCopy the object since we modify resource version when serializing the
// current object.
currObj := elem.(*storeElement).Object.DeepCopyObject()
return c.storage.Delete(ctx, key, out, preconditions, validateDeletion, currObj, opts)
}
// If we couldn't get the object, fallback to no-suggestion.
return c.storage.Delete(ctx, key, out, preconditions, validateDeletion, nil, opts)
}
type namespacedName struct {
namespace string
name string
}
// Watch implements storage.Interface.
func (c *Cacher) Watch(ctx context.Context, key string, opts storage.ListOptions) (watch.Interface, error) {
pred := opts.Predicate
// if the watch-list feature wasn't set and the resourceVersion is unset
// ensure that the rv from which the watch is being served, is the latest
// one. "latest" is ensured by serving the watch from
// the underlying storage.
//
// it should never happen due to our validation but let's just be super-safe here
// and disable sendingInitialEvents when the feature wasn't enabled
if !utilfeature.DefaultFeatureGate.Enabled(features.WatchList) && opts.SendInitialEvents != nil {
opts.SendInitialEvents = nil
}
// TODO: we should eventually get rid of this legacy case
if utilfeature.DefaultFeatureGate.Enabled(features.WatchFromStorageWithoutResourceVersion) && opts.SendInitialEvents == nil && opts.ResourceVersion == "" {
return c.storage.Watch(ctx, key, opts)
}
requestedWatchRV, err := c.versioner.ParseResourceVersion(opts.ResourceVersion)
if err != nil {
return nil, err
@ -536,10 +495,11 @@ func (c *Cacher) Watch(ctx context.Context, key string, opts storage.ListOptions
var readyGeneration int
if utilfeature.DefaultFeatureGate.Enabled(features.ResilientWatchCacheInitialization) {
var ok bool
readyGeneration, ok = c.ready.checkAndReadGeneration()
if !ok {
return nil, errors.NewTooManyRequests("storage is (re)initializing", 1)
var err error
var downtime time.Duration
readyGeneration, downtime, err = c.ready.checkAndReadGeneration()
if err != nil {
return nil, errors.NewTooManyRequests(err.Error(), calculateRetryAfterForUnreadyCache(downtime))
}
} else {
readyGeneration, err = c.ready.waitAndReadGeneration(ctx)
@ -660,7 +620,7 @@ func (c *Cacher) Watch(ctx context.Context, key string, opts storage.ListOptions
c.Lock()
defer c.Unlock()
if generation, ok := c.ready.checkAndReadGeneration(); generation != readyGeneration || !ok {
if generation, _, err := c.ready.checkAndReadGeneration(); generation != readyGeneration || err != nil {
// We went unready or are already on a different generation.
// Avoid registering and starting the watch as it will have to be
// terminated immediately anyway.
@ -693,58 +653,17 @@ func (c *Cacher) Watch(ctx context.Context, key string, opts storage.ListOptions
return watcher, nil
}
// Get implements storage.Interface.
func (c *Cacher) Get(ctx context.Context, key string, opts storage.GetOptions, objPtr runtime.Object) error {
ctx, span := tracing.Start(ctx, "cacher.Get",
attribute.String("audit-id", audit.GetAuditIDTruncated(ctx)),
attribute.String("key", key),
attribute.String("resource-version", opts.ResourceVersion))
defer span.End(500 * time.Millisecond)
if opts.ResourceVersion == "" {
// If resourceVersion is not specified, serve it from underlying
// storage (for backward compatibility).
span.AddEvent("About to Get from underlying storage")
return c.storage.Get(ctx, key, opts, objPtr)
}
if utilfeature.DefaultFeatureGate.Enabled(features.ResilientWatchCacheInitialization) {
if !c.ready.check() {
// If Cache is not initialized, delegate Get requests to storage
// as described in https://kep.k8s.io/4568
span.AddEvent("About to Get from underlying storage - cache not initialized")
return c.storage.Get(ctx, key, opts, objPtr)
}
}
// If resourceVersion is specified, serve it from cache.
// It's guaranteed that the returned value is at least that
// fresh as the given resourceVersion.
getRV, err := c.versioner.ParseResourceVersion(opts.ResourceVersion)
if err != nil {
return err
}
// Do not create a trace - it's not for free and there are tons
// of Get requests. We can add it if it will be really needed.
if !utilfeature.DefaultFeatureGate.Enabled(features.ResilientWatchCacheInitialization) {
if getRV == 0 && !c.ready.check() {
// If Cacher is not yet initialized and we don't require any specific
// minimal resource version, simply forward the request to storage.
span.AddEvent("About to Get from underlying storage - cache not initialized and no resourceVersion set")
return c.storage.Get(ctx, key, opts, objPtr)
}
if err := c.ready.wait(ctx); err != nil {
return errors.NewServiceUnavailable(err.Error())
}
}
objVal, err := conversion.EnforcePtr(objPtr)
if err != nil {
return err
}
span.AddEvent("About to fetch object from cache")
obj, exists, readResourceVersion, err := c.watchCache.WaitUntilFreshAndGet(ctx, getRV, key)
if err != nil {
return err
@ -765,28 +684,6 @@ func (c *Cacher) Get(ctx context.Context, key string, opts storage.GetOptions, o
return nil
}
// NOTICE: Keep in sync with shouldListFromStorage function in
//
// staging/src/k8s.io/apiserver/pkg/util/flowcontrol/request/list_work_estimator.go
func shouldDelegateList(opts storage.ListOptions) bool {
resourceVersion := opts.ResourceVersion
pred := opts.Predicate
match := opts.ResourceVersionMatch
consistentListFromCacheEnabled := utilfeature.DefaultFeatureGate.Enabled(features.ConsistentListFromCache)
requestWatchProgressSupported := etcdfeature.DefaultFeatureSupportChecker.Supports(storage.RequestWatchProgress)
// Serve consistent reads from storage if ConsistentListFromCache is disabled
consistentReadFromStorage := resourceVersion == "" && !(consistentListFromCacheEnabled && requestWatchProgressSupported)
// Watch cache doesn't support continuations, so serve them from etcd.
hasContinuation := len(pred.Continue) > 0
// Watch cache only supports ResourceVersionMatchNotOlderThan (default).
// see https://kubernetes.io/docs/reference/using-api/api-concepts/#semantics-for-get-and-list
isLegacyExactMatch := opts.Predicate.Limit > 0 && match == "" && len(resourceVersion) > 0 && resourceVersion != "0"
unsupportedMatch := match != "" && match != metav1.ResourceVersionMatchNotOlderThan || isLegacyExactMatch
return consistentReadFromStorage || hasContinuation || unsupportedMatch
}
// computeListLimit determines whether the cacher should
// apply a limit to an incoming LIST request and returns its value.
//
@ -801,55 +698,27 @@ func computeListLimit(opts storage.ListOptions) int64 {
return opts.Predicate.Limit
}
func shouldDelegateListOnNotReadyCache(opts storage.ListOptions) bool {
pred := opts.Predicate
noLabelSelector := pred.Label == nil || pred.Label.Empty()
noFieldSelector := pred.Field == nil || pred.Field.Empty()
hasLimit := pred.Limit > 0
return noLabelSelector && noFieldSelector && hasLimit
}
func (c *Cacher) listItems(ctx context.Context, listRV uint64, key string, pred storage.SelectionPredicate, recursive bool) ([]interface{}, uint64, string, error) {
if !recursive {
func (c *Cacher) listItems(ctx context.Context, listRV uint64, key string, opts storage.ListOptions) (listResp, string, error) {
if !opts.Recursive {
obj, exists, readResourceVersion, err := c.watchCache.WaitUntilFreshAndGet(ctx, listRV, key)
if err != nil {
return nil, 0, "", err
return listResp{}, "", err
}
if exists {
return []interface{}{obj}, readResourceVersion, "", nil
return listResp{Items: []interface{}{obj}, ResourceVersion: readResourceVersion}, "", nil
}
return nil, readResourceVersion, "", nil
return listResp{ResourceVersion: readResourceVersion}, "", nil
}
return c.watchCache.WaitUntilFreshAndList(ctx, listRV, key, pred.MatcherIndex(ctx))
return c.watchCache.WaitUntilFreshAndList(ctx, listRV, key, opts)
}
type listResp struct {
Items []interface{}
ResourceVersion uint64
}
// GetList implements storage.Interface
func (c *Cacher) GetList(ctx context.Context, key string, opts storage.ListOptions, listObj runtime.Object) error {
recursive := opts.Recursive
resourceVersion := opts.ResourceVersion
pred := opts.Predicate
if shouldDelegateList(opts) {
return c.storage.GetList(ctx, key, opts, listObj)
}
listRV, err := c.versioner.ParseResourceVersion(resourceVersion)
if err != nil {
return err
}
if utilfeature.DefaultFeatureGate.Enabled(features.ResilientWatchCacheInitialization) {
if !c.ready.check() && shouldDelegateListOnNotReadyCache(opts) {
// If Cacher is not initialized, delegate List requests to storage
// as described in https://kep.k8s.io/4568
return c.storage.GetList(ctx, key, opts, listObj)
}
} else {
if listRV == 0 && !c.ready.check() {
// If Cacher is not yet initialized and we don't require any specific
// minimal resource version, simply forward the request to storage.
return c.storage.GetList(ctx, key, opts, listObj)
}
}
// For recursive lists, we need to make sure the key ended with "/" so that we only
// get children "directories". e.g. if we have key "/a", "/a/b", "/ab", getting keys
// with prefix "/a" will return all three, while with prefix "/a/" will return only
@ -858,13 +727,9 @@ func (c *Cacher) GetList(ctx context.Context, key string, opts storage.ListOptio
if opts.Recursive && !strings.HasSuffix(key, "/") {
preparedKey += "/"
}
requestWatchProgressSupported := etcdfeature.DefaultFeatureSupportChecker.Supports(storage.RequestWatchProgress)
consistentRead := resourceVersion == "" && utilfeature.DefaultFeatureGate.Enabled(features.ConsistentListFromCache) && requestWatchProgressSupported
if consistentRead {
listRV, err = storage.GetCurrentResourceVersionFromStorage(ctx, c.storage, c.newListFunc, c.resourcePrefix, c.objectType.String())
if err != nil {
return err
}
listRV, err := c.versioner.ParseResourceVersion(opts.ResourceVersion)
if err != nil {
return err
}
ctx, span := tracing.Start(ctx, "cacher.GetList",
@ -873,10 +738,10 @@ func (c *Cacher) GetList(ctx context.Context, key string, opts storage.ListOptio
defer span.End(500 * time.Millisecond)
if utilfeature.DefaultFeatureGate.Enabled(features.ResilientWatchCacheInitialization) {
if !c.ready.check() {
if downtime, err := c.ready.check(); err != nil {
// If Cacher is not initialized, reject List requests
// as described in https://kep.k8s.io/4568
return errors.NewTooManyRequests("storage is (re)initializing", 1)
return errors.NewTooManyRequests(err.Error(), calculateRetryAfterForUnreadyCache(downtime))
}
} else {
if err := c.ready.wait(ctx); err != nil {
@ -898,26 +763,11 @@ func (c *Cacher) GetList(ctx context.Context, key string, opts storage.ListOptio
return fmt.Errorf("need a pointer to slice, got %v", listVal.Kind())
}
objs, readResourceVersion, indexUsed, err := c.listItems(ctx, listRV, preparedKey, pred, recursive)
success := "true"
fallback := "false"
resp, indexUsed, err := c.listItems(ctx, listRV, preparedKey, opts)
if err != nil {
if consistentRead {
if storage.IsTooLargeResourceVersion(err) {
fallback = "true"
err = c.storage.GetList(ctx, key, opts, listObj)
}
if err != nil {
success = "false"
}
metrics.ConsistentReadTotal.WithLabelValues(c.resourcePrefix, success, fallback).Add(1)
}
return err
}
if consistentRead {
metrics.ConsistentReadTotal.WithLabelValues(c.resourcePrefix, success, fallback).Add(1)
}
span.AddEvent("Listed items from cache", attribute.Int("count", len(objs)))
span.AddEvent("Listed items from cache", attribute.Int("count", len(resp.Items)))
// store pointer of eligible objects,
// Why not directly put object in the items of listObj?
// the elements in ListObject are Struct type, making slice will bring excessive memory consumption.
@ -926,17 +776,17 @@ func (c *Cacher) GetList(ctx context.Context, key string, opts storage.ListOptio
var lastSelectedObjectKey string
var hasMoreListItems bool
limit := computeListLimit(opts)
for i, obj := range objs {
for i, obj := range resp.Items {
elem, ok := obj.(*storeElement)
if !ok {
return fmt.Errorf("non *storeElement returned from storage: %v", obj)
}
if pred.MatchesObjectAttributes(elem.Labels, elem.Fields) {
if opts.Predicate.MatchesObjectAttributes(elem.Labels, elem.Fields) {
selectedObjects = append(selectedObjects, elem.Object)
lastSelectedObjectKey = elem.Key
}
if limit > 0 && int64(len(selectedObjects)) >= limit {
hasMoreListItems = i < len(objs)-1
hasMoreListItems = i < len(resp.Items)-1
break
}
}
@ -953,47 +803,16 @@ func (c *Cacher) GetList(ctx context.Context, key string, opts storage.ListOptio
}
span.AddEvent("Filtered items", attribute.Int("count", listVal.Len()))
if c.versioner != nil {
continueValue, remainingItemCount, err := storage.PrepareContinueToken(lastSelectedObjectKey, key, int64(readResourceVersion), int64(len(objs)), hasMoreListItems, opts)
continueValue, remainingItemCount, err := storage.PrepareContinueToken(lastSelectedObjectKey, key, int64(resp.ResourceVersion), int64(len(resp.Items)), hasMoreListItems, opts)
if err != nil {
return err
}
if err = c.versioner.UpdateList(listObj, readResourceVersion, continueValue, remainingItemCount); err != nil {
if err = c.versioner.UpdateList(listObj, resp.ResourceVersion, continueValue, remainingItemCount); err != nil {
return err
}
}
metrics.RecordListCacheMetrics(c.resourcePrefix, indexUsed, len(objs), listVal.Len())
return nil
}
// GuaranteedUpdate implements storage.Interface.
func (c *Cacher) GuaranteedUpdate(
ctx context.Context, key string, destination runtime.Object, ignoreNotFound bool,
preconditions *storage.Preconditions, tryUpdate storage.UpdateFunc, _ runtime.Object) error {
// Ignore the suggestion and try to pass down the current version of the object
// read from cache.
if elem, exists, err := c.watchCache.GetByKey(key); err != nil {
klog.Errorf("GetByKey returned error: %v", err)
} else if exists {
// DeepCopy the object since we modify resource version when serializing the
// current object.
currObj := elem.(*storeElement).Object.DeepCopyObject()
return c.storage.GuaranteedUpdate(ctx, key, destination, ignoreNotFound, preconditions, tryUpdate, currObj)
}
// If we couldn't get the object, fallback to no-suggestion.
return c.storage.GuaranteedUpdate(ctx, key, destination, ignoreNotFound, preconditions, tryUpdate, nil)
}
// Count implements storage.Interface.
func (c *Cacher) Count(pathPrefix string) (int64, error) {
return c.storage.Count(pathPrefix)
}
// ReadinessCheck implements storage.Interface.
func (c *Cacher) ReadinessCheck() error {
if !c.ready.check() {
return storage.ErrStorageNotReady
}
metrics.RecordListCacheMetrics(c.resourcePrefix, indexUsed, len(resp.Items), listVal.Len())
return nil
}
@ -1420,7 +1239,7 @@ func (c *Cacher) getWatchCacheResourceVersion(ctx context.Context, parsedWatchRe
if !utilfeature.DefaultFeatureGate.Enabled(features.WatchFromStorageWithoutResourceVersion) && opts.SendInitialEvents == nil && opts.ResourceVersion == "" {
return 0, nil
}
rv, err := storage.GetCurrentResourceVersionFromStorage(ctx, c.storage, c.newListFunc, c.resourcePrefix, c.objectType.String())
rv, err := c.storage.GetCurrentResourceVersion(ctx)
return rv, err
}
@ -1473,6 +1292,11 @@ func (c *Cacher) setInitialEventsEndBookmarkIfRequested(cacheInterval *watchCach
}
}
func (c *Cacher) Ready() bool {
_, err := c.ready.check()
return err == nil
}
// errWatcher implements watch.Interface to return a single error
type errWatcher struct {
result chan watch.Event
@ -1503,6 +1327,55 @@ func newErrWatcher(err error) *errWatcher {
return watcher
}
func (c *Cacher) ShouldDelegateExactRV(resourceVersion string, recursive bool) (delegator.Result, error) {
// Not Recursive is not supported unitl exact RV is implemented for WaitUntilFreshAndGet.
if !recursive || c.watchCache.snapshots == nil {
return delegator.Result{ShouldDelegate: true}, nil
}
listRV, err := c.versioner.ParseResourceVersion(resourceVersion)
if err != nil {
return delegator.Result{}, err
}
return c.shouldDelegateExactRV(listRV)
}
func (c *Cacher) ShouldDelegateContinue(continueToken string, recursive bool) (delegator.Result, error) {
// Not Recursive is not supported unitl exact RV is implemented for WaitUntilFreshAndGet.
if !recursive || c.watchCache.snapshots == nil {
return delegator.Result{ShouldDelegate: true}, nil
}
_, continueRV, err := storage.DecodeContinue(continueToken, c.resourcePrefix)
if err != nil {
return delegator.Result{}, err
}
if continueRV > 0 {
return c.shouldDelegateExactRV(uint64(continueRV))
} else {
// Continue with negative RV is a consistent read.
return c.ShouldDelegateConsistentRead()
}
}
func (c *Cacher) shouldDelegateExactRV(rv uint64) (delegator.Result, error) {
// Exact requests on future revision require support for consistent read, but are not a consistent read by themselves.
if c.watchCache.notFresh(rv) {
return delegator.Result{
ShouldDelegate: !delegator.ConsistentReadSupported(),
}, nil
}
_, canServe := c.watchCache.snapshots.GetLessOrEqual(rv)
return delegator.Result{
ShouldDelegate: !canServe,
}, nil
}
func (c *Cacher) ShouldDelegateConsistentRead() (delegator.Result, error) {
return delegator.Result{
ConsistentRead: true,
ShouldDelegate: !delegator.ConsistentReadSupported(),
}, nil
}
// Implements watch.Interface.
func (c *errWatcher) ResultChan() <-chan watch.Event {
return c.result

View File

@ -0,0 +1,437 @@
/*
Copyright 2015 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package cacher
import (
"context"
"fmt"
"hash"
"hash/fnv"
"os"
"strconv"
"sync"
"time"
"go.opentelemetry.io/otel/attribute"
"k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/apimachinery/pkg/watch"
"k8s.io/apiserver/pkg/audit"
"k8s.io/apiserver/pkg/features"
"k8s.io/apiserver/pkg/storage"
"k8s.io/apiserver/pkg/storage/cacher/delegator"
"k8s.io/apiserver/pkg/storage/cacher/metrics"
utilfeature "k8s.io/apiserver/pkg/util/feature"
"k8s.io/component-base/tracing"
"k8s.io/klog/v2"
)
var (
// ConsistencyCheckPeriod is the period of checking consistency between etcd and cache.
// 5 minutes were proposed to match the default compaction period. It's magnitute higher than
// List latency SLO (30 seconds) and timeout (1 minute).
ConsistencyCheckPeriod = 5 * time.Minute
// ConsistencyCheckerEnabled enables the consistency checking mechanism for cache.
// Based on KUBE_WATCHCACHE_CONSISTENCY_CHECKER environment variable.
ConsistencyCheckerEnabled = false
)
func init() {
ConsistencyCheckerEnabled, _ = strconv.ParseBool(os.Getenv("KUBE_WATCHCACHE_CONSISTENCY_CHECKER"))
}
func NewCacheDelegator(cacher *Cacher, storage storage.Interface) *CacheDelegator {
d := &CacheDelegator{
cacher: cacher,
storage: storage,
stopCh: make(chan struct{}),
}
if ConsistencyCheckerEnabled {
d.checker = newConsistencyChecker(cacher.resourcePrefix, cacher.newListFunc, cacher, storage)
d.wg.Add(1)
go func() {
defer d.wg.Done()
d.checker.startChecking(d.stopCh)
}()
}
return d
}
type CacheDelegator struct {
cacher *Cacher
storage storage.Interface
checker *consistencyChecker
wg sync.WaitGroup
stopOnce sync.Once
stopCh chan struct{}
}
var _ storage.Interface = (*CacheDelegator)(nil)
func (c *CacheDelegator) Versioner() storage.Versioner {
return c.storage.Versioner()
}
func (c *CacheDelegator) Create(ctx context.Context, key string, obj, out runtime.Object, ttl uint64) error {
return c.storage.Create(ctx, key, obj, out, ttl)
}
func (c *CacheDelegator) GetCurrentResourceVersion(ctx context.Context) (uint64, error) {
return c.storage.GetCurrentResourceVersion(ctx)
}
func (c *CacheDelegator) Delete(ctx context.Context, key string, out runtime.Object, preconditions *storage.Preconditions, validateDeletion storage.ValidateObjectFunc, cachedExistingObject runtime.Object, opts storage.DeleteOptions) error {
// Ignore the suggestion and try to pass down the current version of the object
// read from cache.
if elem, exists, err := c.cacher.watchCache.GetByKey(key); err != nil {
klog.Errorf("GetByKey returned error: %v", err)
} else if exists {
// DeepCopy the object since we modify resource version when serializing the
// current object.
currObj := elem.(*storeElement).Object.DeepCopyObject()
return c.storage.Delete(ctx, key, out, preconditions, validateDeletion, currObj, opts)
}
// If we couldn't get the object, fallback to no-suggestion.
return c.storage.Delete(ctx, key, out, preconditions, validateDeletion, nil, opts)
}
func (c *CacheDelegator) Watch(ctx context.Context, key string, opts storage.ListOptions) (watch.Interface, error) {
// if the watch-list feature wasn't set and the resourceVersion is unset
// ensure that the rv from which the watch is being served, is the latest
// one. "latest" is ensured by serving the watch from
// the underlying storage.
//
// it should never happen due to our validation but let's just be super-safe here
// and disable sendingInitialEvents when the feature wasn't enabled
if !utilfeature.DefaultFeatureGate.Enabled(features.WatchList) && opts.SendInitialEvents != nil {
opts.SendInitialEvents = nil
}
// TODO: we should eventually get rid of this legacy case
if utilfeature.DefaultFeatureGate.Enabled(features.WatchFromStorageWithoutResourceVersion) && opts.SendInitialEvents == nil && opts.ResourceVersion == "" {
return c.storage.Watch(ctx, key, opts)
}
return c.cacher.Watch(ctx, key, opts)
}
func (c *CacheDelegator) Get(ctx context.Context, key string, opts storage.GetOptions, objPtr runtime.Object) error {
ctx, span := tracing.Start(ctx, "cacher.Get",
attribute.String("audit-id", audit.GetAuditIDTruncated(ctx)),
attribute.String("key", key),
attribute.String("resource-version", opts.ResourceVersion))
defer span.End(500 * time.Millisecond)
if opts.ResourceVersion == "" {
// If resourceVersion is not specified, serve it from underlying
// storage (for backward compatibility).
span.AddEvent("About to Get from underlying storage")
return c.storage.Get(ctx, key, opts, objPtr)
}
if utilfeature.DefaultFeatureGate.Enabled(features.ResilientWatchCacheInitialization) {
if !c.cacher.Ready() {
// If Cache is not initialized, delegator Get requests to storage
// as described in https://kep.k8s.io/4568
span.AddEvent("About to Get from underlying storage - cache not initialized")
return c.storage.Get(ctx, key, opts, objPtr)
}
}
// If resourceVersion is specified, serve it from cache.
// It's guaranteed that the returned value is at least that
// fresh as the given resourceVersion.
getRV, err := c.cacher.versioner.ParseResourceVersion(opts.ResourceVersion)
if err != nil {
return err
}
// Do not create a trace - it's not for free and there are tons
// of Get requests. We can add it if it will be really needed.
if !utilfeature.DefaultFeatureGate.Enabled(features.ResilientWatchCacheInitialization) {
if getRV == 0 && !c.cacher.Ready() {
// If Cacher is not yet initialized and we don't require any specific
// minimal resource version, simply forward the request to storage.
return c.storage.Get(ctx, key, opts, objPtr)
}
if err := c.cacher.ready.wait(ctx); err != nil {
return errors.NewServiceUnavailable(err.Error())
}
}
span.AddEvent("About to fetch object from cache")
return c.cacher.Get(ctx, key, opts, objPtr)
}
func (c *CacheDelegator) GetList(ctx context.Context, key string, opts storage.ListOptions, listObj runtime.Object) error {
_, _, err := storage.ValidateListOptions(c.cacher.resourcePrefix, c.cacher.versioner, opts)
if err != nil {
return err
}
result, err := delegator.ShouldDelegateList(opts, c.cacher)
if err != nil {
return err
}
if result.ShouldDelegate {
return c.storage.GetList(ctx, key, opts, listObj)
}
listRV, err := c.cacher.versioner.ParseResourceVersion(opts.ResourceVersion)
if err != nil {
return err
}
if utilfeature.DefaultFeatureGate.Enabled(features.ResilientWatchCacheInitialization) {
if !c.cacher.Ready() && shouldDelegateListOnNotReadyCache(opts) {
// If Cacher is not initialized, delegator List requests to storage
// as described in https://kep.k8s.io/4568
return c.storage.GetList(ctx, key, opts, listObj)
}
} else {
if listRV == 0 && !c.cacher.Ready() {
// If Cacher is not yet initialized and we don't require any specific
// minimal resource version, simply forward the request to storage.
return c.storage.GetList(ctx, key, opts, listObj)
}
}
if result.ConsistentRead {
listRV, err = c.storage.GetCurrentResourceVersion(ctx)
if err != nil {
return err
}
// Setting resource version for consistent read in cache based on current ResourceVersion in etcd.
opts.ResourceVersion = strconv.FormatInt(int64(listRV), 10)
}
err = c.cacher.GetList(ctx, key, opts, listObj)
success := "true"
fallback := "false"
if err != nil {
if errors.IsResourceExpired(err) {
return c.storage.GetList(ctx, key, opts, listObj)
}
if result.ConsistentRead {
if storage.IsTooLargeResourceVersion(err) {
fallback = "true"
// Reset resourceVersion during fallback from consistent read.
opts.ResourceVersion = ""
err = c.storage.GetList(ctx, key, opts, listObj)
}
if err != nil {
success = "false"
}
metrics.ConsistentReadTotal.WithLabelValues(c.cacher.resourcePrefix, success, fallback).Add(1)
}
return err
}
if result.ConsistentRead {
metrics.ConsistentReadTotal.WithLabelValues(c.cacher.resourcePrefix, success, fallback).Add(1)
}
return nil
}
func shouldDelegateListOnNotReadyCache(opts storage.ListOptions) bool {
pred := opts.Predicate
noLabelSelector := pred.Label == nil || pred.Label.Empty()
noFieldSelector := pred.Field == nil || pred.Field.Empty()
hasLimit := pred.Limit > 0
return noLabelSelector && noFieldSelector && hasLimit
}
func (c *CacheDelegator) GuaranteedUpdate(ctx context.Context, key string, destination runtime.Object, ignoreNotFound bool, preconditions *storage.Preconditions, tryUpdate storage.UpdateFunc, cachedExistingObject runtime.Object) error {
// Ignore the suggestion and try to pass down the current version of the object
// read from cache.
if elem, exists, err := c.cacher.watchCache.GetByKey(key); err != nil {
klog.Errorf("GetByKey returned error: %v", err)
} else if exists {
// DeepCopy the object since we modify resource version when serializing the
// current object.
currObj := elem.(*storeElement).Object.DeepCopyObject()
return c.storage.GuaranteedUpdate(ctx, key, destination, ignoreNotFound, preconditions, tryUpdate, currObj)
}
// If we couldn't get the object, fallback to no-suggestion.
return c.storage.GuaranteedUpdate(ctx, key, destination, ignoreNotFound, preconditions, tryUpdate, nil)
}
func (c *CacheDelegator) Count(pathPrefix string) (int64, error) {
return c.storage.Count(pathPrefix)
}
func (c *CacheDelegator) ReadinessCheck() error {
if !c.cacher.Ready() {
return storage.ErrStorageNotReady
}
return nil
}
func (c *CacheDelegator) RequestWatchProgress(ctx context.Context) error {
return c.storage.RequestWatchProgress(ctx)
}
func (c *CacheDelegator) Stop() {
c.stopOnce.Do(func() {
close(c.stopCh)
})
c.wg.Wait()
}
func newConsistencyChecker(resourcePrefix string, newListFunc func() runtime.Object, cacher getListerReady, etcd getLister) *consistencyChecker {
return &consistencyChecker{
resourcePrefix: resourcePrefix,
newListFunc: newListFunc,
cacher: cacher,
etcd: etcd,
}
}
type consistencyChecker struct {
resourcePrefix string
newListFunc func() runtime.Object
cacher getListerReady
etcd getLister
}
type getListerReady interface {
getLister
Ready() bool
}
type getLister interface {
GetList(ctx context.Context, key string, opts storage.ListOptions, listObj runtime.Object) error
}
func (c consistencyChecker) startChecking(stopCh <-chan struct{}) {
err := wait.PollUntilContextCancel(wait.ContextForChannel(stopCh), ConsistencyCheckPeriod, false, func(ctx context.Context) (done bool, err error) {
c.check(ctx)
return false, nil
})
if err != nil {
klog.InfoS("Cache consistency check exiting", "resource", c.resourcePrefix, "err", err)
}
}
func (c *consistencyChecker) check(ctx context.Context) {
digests, err := c.calculateDigests(ctx)
if err != nil {
klog.ErrorS(err, "Cache consistency check error", "resource", c.resourcePrefix)
metrics.StorageConsistencyCheckTotal.WithLabelValues(c.resourcePrefix, "error").Inc()
return
}
if digests.CacheDigest == digests.EtcdDigest {
klog.V(3).InfoS("Cache consistency check passed", "resource", c.resourcePrefix, "resourceVersion", digests.ResourceVersion, "digest", digests.CacheDigest)
metrics.StorageConsistencyCheckTotal.WithLabelValues(c.resourcePrefix, "success").Inc()
} else {
klog.ErrorS(nil, "Cache consistency check failed", "resource", c.resourcePrefix, "resourceVersion", digests.ResourceVersion, "etcdDigest", digests.EtcdDigest, "cacheDigest", digests.CacheDigest)
metrics.StorageConsistencyCheckTotal.WithLabelValues(c.resourcePrefix, "failure").Inc()
// Panic on internal consistency checking enabled only by environment variable. R
panic(fmt.Sprintf("Cache consistency check failed, resource: %q, resourceVersion: %q, etcdDigest: %q, cacheDigest: %q", c.resourcePrefix, digests.ResourceVersion, digests.EtcdDigest, digests.CacheDigest))
}
}
func (c *consistencyChecker) calculateDigests(ctx context.Context) (*storageDigest, error) {
if !c.cacher.Ready() {
return nil, fmt.Errorf("cache is not ready")
}
cacheDigest, resourceVersion, err := c.calculateStoreDigest(ctx, c.cacher, storage.ListOptions{
Recursive: true,
ResourceVersion: "0",
Predicate: storage.Everything,
ResourceVersionMatch: metav1.ResourceVersionMatchNotOlderThan,
})
if err != nil {
return nil, fmt.Errorf("failed calculating cache digest: %w", err)
}
etcdDigest, _, err := c.calculateStoreDigest(ctx, c.etcd, storage.ListOptions{
Recursive: true,
ResourceVersion: resourceVersion,
Predicate: storage.Everything,
ResourceVersionMatch: metav1.ResourceVersionMatchExact,
})
if err != nil {
return nil, fmt.Errorf("failed calculating etcd digest: %w", err)
}
return &storageDigest{
ResourceVersion: resourceVersion,
CacheDigest: cacheDigest,
EtcdDigest: etcdDigest,
}, nil
}
type storageDigest struct {
ResourceVersion string
CacheDigest string
EtcdDigest string
}
func (c *consistencyChecker) calculateStoreDigest(ctx context.Context, store getLister, opts storage.ListOptions) (digest, rv string, err error) {
// TODO: Implement pagination
resp := c.newListFunc()
err = store.GetList(ctx, c.resourcePrefix, opts, resp)
if err != nil {
return "", "", err
}
digest, err = listDigest(resp)
if err != nil {
return "", "", err
}
list, err := meta.ListAccessor(resp)
if err != nil {
return "", "", err
}
return digest, list.GetResourceVersion(), nil
}
func listDigest(list runtime.Object) (string, error) {
h := fnv.New64()
err := meta.EachListItem(list, func(obj runtime.Object) error {
objectMeta, err := meta.Accessor(obj)
if err != nil {
return err
}
err = addObjectToDigest(h, objectMeta)
if err != nil {
return err
}
return nil
})
if err != nil {
return "", err
}
return fmt.Sprintf("%x", h.Sum64()), nil
}
func addObjectToDigest(h hash.Hash64, objectMeta metav1.Object) error {
_, err := h.Write([]byte(objectMeta.GetNamespace()))
if err != nil {
return err
}
_, err = h.Write([]byte("/"))
if err != nil {
return err
}
_, err = h.Write([]byte(objectMeta.GetName()))
if err != nil {
return err
}
_, err = h.Write([]byte("/"))
if err != nil {
return err
}
_, err = h.Write([]byte(objectMeta.GetResourceVersion()))
if err != nil {
return err
}
return nil
}

View File

@ -0,0 +1,113 @@
/*
Copyright 2025 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package delegator
import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apiserver/pkg/features"
"k8s.io/apiserver/pkg/storage"
etcdfeature "k8s.io/apiserver/pkg/storage/feature"
utilfeature "k8s.io/apiserver/pkg/util/feature"
)
func ShouldDelegateListMeta(opts *metav1.ListOptions, cache Helper) (Result, error) {
return ShouldDelegateList(
storage.ListOptions{
ResourceVersionMatch: opts.ResourceVersionMatch,
ResourceVersion: opts.ResourceVersion,
Predicate: storage.SelectionPredicate{
Continue: opts.Continue,
Limit: opts.Limit,
},
Recursive: true,
}, cache)
}
func ShouldDelegateList(opts storage.ListOptions, cache Helper) (Result, error) {
// see https://kubernetes.io/docs/reference/using-api/api-concepts/#semantics-for-get-and-list
switch opts.ResourceVersionMatch {
case metav1.ResourceVersionMatchExact:
return cache.ShouldDelegateExactRV(opts.ResourceVersion, opts.Recursive)
case metav1.ResourceVersionMatchNotOlderThan:
return Result{ShouldDelegate: false}, nil
case "":
// Continue
if len(opts.Predicate.Continue) > 0 {
return cache.ShouldDelegateContinue(opts.Predicate.Continue, opts.Recursive)
}
// Legacy exact match
if opts.Predicate.Limit > 0 && len(opts.ResourceVersion) > 0 && opts.ResourceVersion != "0" {
return cache.ShouldDelegateExactRV(opts.ResourceVersion, opts.Recursive)
}
// Consistent Read
if opts.ResourceVersion == "" {
return cache.ShouldDelegateConsistentRead()
}
return Result{ShouldDelegate: false}, nil
default:
return Result{ShouldDelegate: true}, nil
}
}
type Helper interface {
ShouldDelegateExactRV(rv string, recursive bool) (Result, error)
ShouldDelegateContinue(continueToken string, recursive bool) (Result, error)
ShouldDelegateConsistentRead() (Result, error)
}
// Result of delegator decision.
type Result struct {
// Whether a request cannot be served by cache and should be delegated to etcd.
ShouldDelegate bool
// Whether a request is a consistent read, used by delegator to decide if it should call GetCurrentResourceVersion to get RV.
// Included in interface as only cacher has keyPrefix needed to parse continue token.
ConsistentRead bool
}
type CacheWithoutSnapshots struct{}
var _ Helper = CacheWithoutSnapshots{}
func (c CacheWithoutSnapshots) ShouldDelegateContinue(continueToken string, recursive bool) (Result, error) {
return Result{
ShouldDelegate: true,
// Continue with negative RV is considered a consistent read, however token cannot be parsed without keyPrefix unavailable in staging/src/k8s.io/apiserver/pkg/util/flow_control/request/list_work_estimator.go.
ConsistentRead: false,
}, nil
}
func (c CacheWithoutSnapshots) ShouldDelegateExactRV(rv string, recursive bool) (Result, error) {
return Result{
ShouldDelegate: true,
ConsistentRead: false,
}, nil
}
func (c CacheWithoutSnapshots) ShouldDelegateConsistentRead() (Result, error) {
return Result{
ShouldDelegate: !ConsistentReadSupported(),
ConsistentRead: true,
}, nil
}
// ConsistentReadSupported returns whether cache can be used to serve reads with RV not yet observed by cache, including both consistent reads.
// Function is located here to avoid import cycles between staging/src/k8s.io/apiserver/pkg/storage/cacher/delegator.go and staging/src/k8s.io/apiserver/pkg/util/flow_control/request/list_work_estimator.go.
func ConsistentReadSupported() bool {
consistentListFromCacheEnabled := utilfeature.DefaultFeatureGate.Enabled(features.ConsistentListFromCache)
requestWatchProgressSupported := etcdfeature.DefaultFeatureSupportChecker.Supports(storage.RequestWatchProgress)
return consistentListFromCacheEnabled && requestWatchProgressSupported
}

View File

@ -176,6 +176,14 @@ var (
Help: "Counter for consistent reads from cache.",
StabilityLevel: compbasemetrics.ALPHA,
}, []string{"resource", "success", "fallback"})
StorageConsistencyCheckTotal = compbasemetrics.NewCounterVec(
&compbasemetrics.CounterOpts{
Namespace: namespace,
Name: "storage_consistency_checks_total",
Help: "Counter for status of consistency checks between etcd and watch cache",
StabilityLevel: compbasemetrics.INTERNAL,
}, []string{"resource", "status"})
)
var registerMetrics sync.Once
@ -198,6 +206,7 @@ func Register() {
legacyregistry.MustRegister(WatchCacheInitializations)
legacyregistry.MustRegister(WatchCacheReadWait)
legacyregistry.MustRegister(ConsistentReadTotal)
legacyregistry.MustRegister(StorageConsistencyCheckTotal)
})
}

View File

@ -14,7 +14,7 @@ See the License for the specific language governing permissions and
limitations under the License.
*/
package cacher
package progress
import (
"context"
@ -36,8 +36,8 @@ const (
progressRequestPeriod = 100 * time.Millisecond
)
func newConditionalProgressRequester(requestWatchProgress WatchProgressRequester, clock TickerFactory, contextMetadata metadata.MD) *conditionalProgressRequester {
pr := &conditionalProgressRequester{
func NewConditionalProgressRequester(requestWatchProgress WatchProgressRequester, clock TickerFactory, contextMetadata metadata.MD) *ConditionalProgressRequester {
pr := &ConditionalProgressRequester{
clock: clock,
requestWatchProgress: requestWatchProgress,
contextMetadata: contextMetadata,
@ -52,9 +52,9 @@ type TickerFactory interface {
NewTimer(time.Duration) clock.Timer
}
// conditionalProgressRequester will request progress notification if there
// ConditionalProgressRequester will request progress notification if there
// is a request waiting for watch cache to be fresh.
type conditionalProgressRequester struct {
type ConditionalProgressRequester struct {
clock TickerFactory
requestWatchProgress WatchProgressRequester
contextMetadata metadata.MD
@ -65,7 +65,7 @@ type conditionalProgressRequester struct {
stopped bool
}
func (pr *conditionalProgressRequester) Run(stopCh <-chan struct{}) {
func (pr *ConditionalProgressRequester) Run(stopCh <-chan struct{}) {
ctx := wait.ContextForChannel(stopCh)
if pr.contextMetadata != nil {
ctx = metadata.NewOutgoingContext(ctx, pr.contextMetadata)
@ -115,14 +115,14 @@ func (pr *conditionalProgressRequester) Run(stopCh <-chan struct{}) {
}
}
func (pr *conditionalProgressRequester) Add() {
func (pr *ConditionalProgressRequester) Add() {
pr.mux.Lock()
defer pr.mux.Unlock()
pr.waiting += 1
pr.cond.Signal()
}
func (pr *conditionalProgressRequester) Remove() {
func (pr *ConditionalProgressRequester) Remove() {
pr.mux.Lock()
defer pr.mux.Unlock()
pr.waiting -= 1

View File

@ -20,6 +20,9 @@ import (
"context"
"fmt"
"sync"
"time"
"k8s.io/utils/clock"
)
type status int
@ -38,18 +41,26 @@ const (
// | ^
// └---------------------------┘
type ready struct {
state status // represent the state of the variable
state status // represent the state of the variable
lastErr error
generation int // represent the number of times we have transtioned to ready
lock sync.RWMutex // protect the state and generation variables
restartLock sync.Mutex // protect the transition from ready to pending where the channel is recreated
waitCh chan struct{} // blocks until is ready or stopped
clock clock.Clock
lastStateChangeTime time.Time
}
func newReady() *ready {
return &ready{
func newReady(c clock.Clock) *ready {
r := &ready{
waitCh: make(chan struct{}),
state: Pending,
clock: c,
}
r.updateLastStateChangeTimeLocked()
return r
}
// done close the channel once the state is Ready or Stopped
@ -77,8 +88,7 @@ func (r *ready) waitAndReadGeneration(ctx context.Context) (int, error) {
}
r.lock.RLock()
switch r.state {
case Pending:
if r.state == Pending {
// since we allow to switch between the states Pending and Ready
// if there is a quick transition from Pending -> Ready -> Pending
// a process that was waiting can get unblocked and see a Pending
@ -86,43 +96,65 @@ func (r *ready) waitAndReadGeneration(ctx context.Context) (int, error) {
// avoid an inconsistent state on the system, with some processes not
// waiting despite the state moved back to Pending.
r.lock.RUnlock()
case Ready:
generation := r.generation
r.lock.RUnlock()
return generation, nil
case Stopped:
r.lock.RUnlock()
return 0, fmt.Errorf("apiserver cacher is stopped")
default:
r.lock.RUnlock()
return 0, fmt.Errorf("unexpected apiserver cache state: %v", r.state)
continue
}
generation, err := r.readGenerationLocked()
r.lock.RUnlock()
return generation, err
}
}
// check returns true only if it is Ready.
func (r *ready) check() bool {
_, ok := r.checkAndReadGeneration()
return ok
// check returns the time elapsed since the state was last changed and the current value.
func (r *ready) check() (time.Duration, error) {
_, elapsed, err := r.checkAndReadGeneration()
return elapsed, err
}
// checkAndReadGeneration returns the current generation and whether it is Ready.
func (r *ready) checkAndReadGeneration() (int, bool) {
// checkAndReadGeneration returns the current generation, the time elapsed since the state was last changed and the current value.
func (r *ready) checkAndReadGeneration() (int, time.Duration, error) {
r.lock.RLock()
defer r.lock.RUnlock()
return r.generation, r.state == Ready
generation, err := r.readGenerationLocked()
return generation, r.clock.Since(r.lastStateChangeTime), err
}
func (r *ready) readGenerationLocked() (int, error) {
switch r.state {
case Pending:
if r.lastErr == nil {
return 0, fmt.Errorf("storage is (re)initializing")
} else {
return 0, fmt.Errorf("storage is (re)initializing: %w", r.lastErr)
}
case Ready:
return r.generation, nil
case Stopped:
return 0, fmt.Errorf("apiserver cacher is stopped")
default:
return 0, fmt.Errorf("unexpected apiserver cache state: %v", r.state)
}
}
func (r *ready) setReady() {
r.set(true, nil)
}
func (r *ready) setError(err error) {
r.set(false, err)
}
// set the state to Pending (false) or Ready (true), it does not have effect if the state is Stopped.
func (r *ready) set(ok bool) {
func (r *ready) set(ok bool, err error) {
r.lock.Lock()
defer r.lock.Unlock()
if r.state == Stopped {
return
}
r.lastErr = err
if ok && r.state == Pending {
r.state = Ready
r.generation++
r.updateLastStateChangeTimeLocked()
select {
case <-r.waitCh:
default:
@ -139,6 +171,7 @@ func (r *ready) set(ok bool) {
default:
}
r.state = Pending
r.updateLastStateChangeTimeLocked()
}
}
@ -148,6 +181,7 @@ func (r *ready) stop() {
defer r.lock.Unlock()
if r.state != Stopped {
r.state = Stopped
r.updateLastStateChangeTimeLocked()
}
select {
case <-r.waitCh:
@ -155,3 +189,7 @@ func (r *ready) stop() {
close(r.waitCh)
}
}
func (r *ready) updateLastStateChangeTimeLocked() {
r.lastStateChangeTime = r.clock.Now()
}

View File

@ -19,8 +19,6 @@ package cacher
import (
"fmt"
"github.com/google/btree"
"k8s.io/apimachinery/pkg/fields"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime"
@ -75,7 +73,9 @@ type storeIndexer interface {
}
type orderedLister interface {
ListPrefix(prefix, continueKey string, limit int) (items []interface{}, hasMore bool)
ListPrefix(prefix, continueKey string) []interface{}
Count(prefix, continueKey string) (count int)
Clone() orderedLister
}
func newStoreIndexer(indexers *cache.Indexers) storeIndexer {
@ -97,12 +97,6 @@ type storeElement struct {
Fields fields.Set
}
func (t *storeElement) Less(than btree.Item) bool {
return t.Key < than.(*storeElement).Key
}
var _ btree.Item = (*storeElement)(nil)
func storeElementKey(obj interface{}) (string, error) {
elem, ok := obj.(*storeElement)
if !ok {

View File

@ -18,7 +18,6 @@ package cacher
import (
"fmt"
"math"
"strings"
"sync"
@ -44,6 +43,20 @@ type threadedStoreIndexer struct {
indexer indexer
}
var _ orderedLister = (*threadedStoreIndexer)(nil)
func (si *threadedStoreIndexer) Count(prefix, continueKey string) (count int) {
si.lock.RLock()
defer si.lock.RUnlock()
return si.store.Count(prefix, continueKey)
}
func (si *threadedStoreIndexer) Clone() orderedLister {
si.lock.RLock()
defer si.lock.RUnlock()
return si.store.Clone()
}
func (si *threadedStoreIndexer) Add(obj interface{}) error {
return si.addOrUpdate(obj)
}
@ -73,11 +86,11 @@ func (si *threadedStoreIndexer) Delete(obj interface{}) error {
}
si.lock.Lock()
defer si.lock.Unlock()
oldObj := si.store.deleteElem(storeElem)
if oldObj == nil {
oldObj, existed := si.store.deleteElem(storeElem)
if !existed {
return nil
}
return si.indexer.updateElem(storeElem.Key, oldObj.(*storeElement), nil)
return si.indexer.updateElem(storeElem.Key, oldObj, nil)
}
func (si *threadedStoreIndexer) List() []interface{} {
@ -86,10 +99,10 @@ func (si *threadedStoreIndexer) List() []interface{} {
return si.store.List()
}
func (si *threadedStoreIndexer) ListPrefix(prefix, continueKey string, limit int) ([]interface{}, bool) {
func (si *threadedStoreIndexer) ListPrefix(prefix, continueKey string) []interface{} {
si.lock.RLock()
defer si.lock.RUnlock()
return si.store.ListPrefix(prefix, continueKey, limit)
return si.store.ListPrefix(prefix, continueKey)
}
func (si *threadedStoreIndexer) ListKeys() []string {
@ -128,12 +141,20 @@ func (si *threadedStoreIndexer) ByIndex(indexName, indexValue string) ([]interfa
func newBtreeStore(degree int) btreeStore {
return btreeStore{
tree: btree.New(degree),
tree: btree.NewG(degree, func(a, b *storeElement) bool {
return a.Key < b.Key
}),
}
}
type btreeStore struct {
tree *btree.BTree
tree *btree.BTreeG[*storeElement]
}
func (s *btreeStore) Clone() orderedLister {
return &btreeStore{
tree: s.tree.Clone(),
}
}
func (s *btreeStore) Add(obj interface{}) error {
@ -172,14 +193,14 @@ func (s *btreeStore) Delete(obj interface{}) error {
return nil
}
func (s *btreeStore) deleteElem(storeElem *storeElement) interface{} {
func (s *btreeStore) deleteElem(storeElem *storeElement) (*storeElement, bool) {
return s.tree.Delete(storeElem)
}
func (s *btreeStore) List() []interface{} {
items := make([]interface{}, 0, s.tree.Len())
s.tree.Ascend(func(i btree.Item) bool {
items = append(items, i.(interface{}))
s.tree.Ascend(func(item *storeElement) bool {
items = append(items, item)
return true
})
return items
@ -187,8 +208,8 @@ func (s *btreeStore) List() []interface{} {
func (s *btreeStore) ListKeys() []string {
items := make([]string, 0, s.tree.Len())
s.tree.Ascend(func(i btree.Item) bool {
items = append(items, i.(*storeElement).Key)
s.tree.Ascend(func(item *storeElement) bool {
items = append(items, item.Key)
return true
})
return items
@ -199,11 +220,8 @@ func (s *btreeStore) Get(obj interface{}) (item interface{}, exists bool, err er
if !ok {
return nil, false, fmt.Errorf("obj is not a storeElement")
}
item = s.tree.Get(storeElem)
if item == nil {
return nil, false, nil
}
return item, true, nil
item, exists = s.tree.Get(storeElem)
return item, exists, nil
}
func (s *btreeStore) GetByKey(key string) (item interface{}, exists bool, err error) {
@ -225,54 +243,37 @@ func (s *btreeStore) Replace(objs []interface{}, _ string) error {
// addOrUpdateLocked assumes a lock is held and is used for Add
// and Update operations.
func (s *btreeStore) addOrUpdateElem(storeElem *storeElement) *storeElement {
oldObj := s.tree.ReplaceOrInsert(storeElem)
if oldObj == nil {
return nil
}
return oldObj.(*storeElement)
oldObj, _ := s.tree.ReplaceOrInsert(storeElem)
return oldObj
}
func (s *btreeStore) getByKey(key string) (item interface{}, exists bool, err error) {
keyElement := &storeElement{Key: key}
item = s.tree.Get(keyElement)
return item, item != nil, nil
item, exists = s.tree.Get(keyElement)
return item, exists, nil
}
func (s *btreeStore) ListPrefix(prefix, continueKey string, limit int) ([]interface{}, bool) {
if limit < 0 {
return nil, false
}
func (s *btreeStore) ListPrefix(prefix, continueKey string) []interface{} {
if continueKey == "" {
continueKey = prefix
}
var result []interface{}
var hasMore bool
if limit == 0 {
limit = math.MaxInt
}
s.tree.AscendGreaterOrEqual(&storeElement{Key: continueKey}, func(i btree.Item) bool {
elementKey := i.(*storeElement).Key
if !strings.HasPrefix(elementKey, prefix) {
s.tree.AscendGreaterOrEqual(&storeElement{Key: continueKey}, func(item *storeElement) bool {
if !strings.HasPrefix(item.Key, prefix) {
return false
}
// TODO: Might be worth to lookup one more item to provide more accurate HasMore.
if len(result) >= limit {
hasMore = true
return false
}
result = append(result, i.(interface{}))
result = append(result, item)
return true
})
return result, hasMore
return result
}
func (s *btreeStore) Count(prefix, continueKey string) (count int) {
if continueKey == "" {
continueKey = prefix
}
s.tree.AscendGreaterOrEqual(&storeElement{Key: continueKey}, func(i btree.Item) bool {
elementKey := i.(*storeElement).Key
if !strings.HasPrefix(elementKey, prefix) {
s.tree.AscendGreaterOrEqual(&storeElement{Key: continueKey}, func(item *storeElement) bool {
if !strings.HasPrefix(item.Key, prefix) {
return false
}
count++
@ -391,3 +392,114 @@ func (i *indexer) delete(key, value string, index map[string]map[string]*storeEl
delete(index, value)
}
}
// newStoreSnapshotter returns a storeSnapshotter that stores snapshots for
// serving read requests with exact resource versions (RV) and pagination.
//
// Snapshots are created by calling Clone method on orderedLister, which is
// expected to be fast and efficient thanks to usage of B-trees.
// B-trees can create a lazy copy of the tree structure, minimizing overhead.
//
// Assuming the watch cache observes all events and snapshots cache after each of them,
// requests for a specific resource version can be served by retrieving
// the snapshot with the greatest RV less than or equal to the requested RV.
// To make snapshot retrivial efficient we need an ordered data structure, such as tree.
//
// The initial implementation uses a B-tree to achieve the following performance characteristics (n - number of snapshots stored):
// - `Add`: Adds a new snapshot.
// Complexity: O(log n).
// Executed for each watch event observed by the cache.
// - `GetLessOrEqual`: Retrieves the snapshot with the greatest RV less than or equal to the requested RV.
// Complexity: O(log n).
// Executed for each LIST request with match=Exact or continuation.
// - `RemoveLess`: Cleans up snapshots outside the watch history window.
// Complexity: O(k log n), k - number of snapshots to remove, usually only one if watch capacity was not reduced.
// Executed per watch event observed when the cache is full.
// - `Reset`: Cleans up all snapshots.
// Complexity: O(1).
// Executed when the watch cache is reinitialized.
//
// Further optimization is possible by leveraging the property that adds always
// increase the maximum RV and deletes only increase the minimum RV.
// For example, a binary search on a cyclic buffer of (RV, snapshot)
// should reduce number of allocations and improve removal complexity.
// However, this solution is more complex and is deferred for future implementation.
//
// TODO: Rewrite to use a cyclic buffer
func newStoreSnapshotter() *storeSnapshotter {
s := &storeSnapshotter{
snapshots: btree.NewG[rvSnapshot](btreeDegree, func(a, b rvSnapshot) bool {
return a.resourceVersion < b.resourceVersion
}),
}
return s
}
var _ Snapshotter = (*storeSnapshotter)(nil)
type Snapshotter interface {
Reset()
GetLessOrEqual(rv uint64) (orderedLister, bool)
Add(rv uint64, indexer orderedLister)
RemoveLess(rv uint64)
Len() int
}
type storeSnapshotter struct {
mux sync.RWMutex
snapshots *btree.BTreeG[rvSnapshot]
}
type rvSnapshot struct {
resourceVersion uint64
snapshot orderedLister
}
func (s *storeSnapshotter) Reset() {
s.mux.Lock()
defer s.mux.Unlock()
s.snapshots.Clear(false)
}
func (s *storeSnapshotter) GetLessOrEqual(rv uint64) (orderedLister, bool) {
s.mux.RLock()
defer s.mux.RUnlock()
var result *rvSnapshot
s.snapshots.DescendLessOrEqual(rvSnapshot{resourceVersion: rv}, func(rvs rvSnapshot) bool {
result = &rvs
return false
})
if result == nil {
return nil, false
}
return result.snapshot, true
}
func (s *storeSnapshotter) Add(rv uint64, indexer orderedLister) {
s.mux.Lock()
defer s.mux.Unlock()
s.snapshots.ReplaceOrInsert(rvSnapshot{resourceVersion: rv, snapshot: indexer.Clone()})
}
func (s *storeSnapshotter) RemoveLess(rv uint64) {
s.mux.Lock()
defer s.mux.Unlock()
for s.snapshots.Len() > 0 {
oldest, ok := s.snapshots.Min()
if !ok {
break
}
if rv <= oldest.resourceVersion {
break
}
s.snapshots.DeleteMin()
}
}
func (s *storeSnapshotter) Len() int {
s.mux.RLock()
defer s.mux.RUnlock()
return s.snapshots.Len()
}

View File

@ -17,7 +17,9 @@ limitations under the License.
package cacher
import (
"math"
"strings"
"time"
)
// hasPathPrefix returns true if the string matches pathPrefix exactly, or if is prefixed with pathPrefix at a path segment boundary
@ -44,3 +46,11 @@ func hasPathPrefix(s, pathPrefix string) bool {
}
return false
}
// calculateRetryAfterForUnreadyCache calculates the retry duration based on the cache downtime.
func calculateRetryAfterForUnreadyCache(downtime time.Duration) int {
factor := 0.06
result := math.Exp(factor * downtime.Seconds())
result = math.Min(30, math.Max(1, result))
return int(result)
}

View File

@ -25,6 +25,7 @@ import (
"time"
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/fields"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime"
@ -32,8 +33,9 @@ import (
"k8s.io/apimachinery/pkg/watch"
"k8s.io/apiserver/pkg/features"
"k8s.io/apiserver/pkg/storage"
"k8s.io/apiserver/pkg/storage/cacher/delegator"
"k8s.io/apiserver/pkg/storage/cacher/metrics"
etcdfeature "k8s.io/apiserver/pkg/storage/feature"
"k8s.io/apiserver/pkg/storage/cacher/progress"
utilfeature "k8s.io/apiserver/pkg/util/feature"
"k8s.io/client-go/tools/cache"
"k8s.io/component-base/tracing"
@ -52,17 +54,11 @@ const (
// after receiving a 'too high resource version' error.
resourceVersionTooHighRetrySeconds = 1
// eventFreshDuration is time duration of events we want to keep.
// We set it to `defaultBookmarkFrequency` plus epsilon to maximize
// chances that last bookmark was sent within kept history, at the
// same time, minimizing the needed memory usage.
eventFreshDuration = 75 * time.Second
// defaultLowerBoundCapacity is a default value for event cache capacity's lower bound.
// TODO: Figure out, to what value we can decreased it.
defaultLowerBoundCapacity = 100
// defaultUpperBoundCapacity should be able to keep eventFreshDuration of history.
// defaultUpperBoundCapacity should be able to keep the required history.
defaultUpperBoundCapacity = 100 * 1024
)
@ -142,6 +138,9 @@ type watchCache struct {
// for testing timeouts.
clock clock.Clock
// eventFreshDuration defines the minimum watch history watchcache will store.
eventFreshDuration time.Duration
// An underlying storage.Versioner.
versioner storage.Versioner
@ -153,7 +152,10 @@ type watchCache struct {
// Requests progress notification if there are requests waiting for watch
// to be fresh
waitingUntilFresh *conditionalProgressRequester
waitingUntilFresh *progress.ConditionalProgressRequester
// Stores previous snapshots of orderedLister to allow serving requests from previous revisions.
snapshots Snapshotter
}
func newWatchCache(
@ -163,15 +165,16 @@ func newWatchCache(
versioner storage.Versioner,
indexers *cache.Indexers,
clock clock.WithTicker,
eventFreshDuration time.Duration,
groupResource schema.GroupResource,
progressRequester *conditionalProgressRequester) *watchCache {
progressRequester *progress.ConditionalProgressRequester) *watchCache {
wc := &watchCache{
capacity: defaultLowerBoundCapacity,
keyFunc: keyFunc,
getAttrsFunc: getAttrsFunc,
cache: make([]*watchCacheEvent, defaultLowerBoundCapacity),
lowerBoundCapacity: defaultLowerBoundCapacity,
upperBoundCapacity: defaultUpperBoundCapacity,
upperBoundCapacity: capacityUpperBound(eventFreshDuration),
startIndex: 0,
endIndex: 0,
store: newStoreIndexer(indexers),
@ -179,10 +182,14 @@ func newWatchCache(
listResourceVersion: 0,
eventHandler: eventHandler,
clock: clock,
eventFreshDuration: eventFreshDuration,
versioner: versioner,
groupResource: groupResource,
waitingUntilFresh: progressRequester,
}
if utilfeature.DefaultFeatureGate.Enabled(features.ListFromCacheSnapshot) {
wc.snapshots = newStoreSnapshotter()
}
metrics.WatchCacheCapacity.WithLabelValues(groupResource.String()).Set(float64(wc.capacity))
wc.cond = sync.NewCond(wc.RLocker())
wc.indexValidator = wc.isIndexValidLocked
@ -190,6 +197,30 @@ func newWatchCache(
return wc
}
// capacityUpperBound denotes the maximum possible capacity of the watch cache
// to which it can resize.
func capacityUpperBound(eventFreshDuration time.Duration) int {
if eventFreshDuration <= DefaultEventFreshDuration {
return defaultUpperBoundCapacity
}
// eventFreshDuration determines how long the watch events are supposed
// to be stored in the watch cache.
// In very high churn situations, there is a need to store more events
// in the watch cache, hence it would have to be upsized accordingly.
// Because of that, for larger values of eventFreshDuration, we set the
// upper bound of the watch cache's capacity proportionally to the ratio
// between eventFreshDuration and DefaultEventFreshDuration.
// Given that the watch cache size can only double, we round up that
// proportion to the next power of two.
exponent := int(math.Ceil((math.Log2(eventFreshDuration.Seconds() / DefaultEventFreshDuration.Seconds()))))
if maxExponent := int(math.Floor((math.Log2(math.MaxInt32 / defaultUpperBoundCapacity)))); exponent > maxExponent {
// Making sure that the capacity's upper bound fits in a 32-bit integer.
exponent = maxExponent
klog.Warningf("Capping watch cache capacity upper bound to %v", defaultUpperBoundCapacity<<exponent)
}
return defaultUpperBoundCapacity << exponent
}
// Add takes runtime.Object as an argument.
func (w *watchCache) Add(obj interface{}) error {
object, resourceVersion, err := w.objectToVersionedRuntimeObject(obj)
@ -287,7 +318,20 @@ func (w *watchCache) processEvent(event watch.Event, resourceVersion uint64, upd
w.resourceVersion = resourceVersion
defer w.cond.Broadcast()
return updateFunc(elem)
err := updateFunc(elem)
if err != nil {
return err
}
if w.snapshots != nil {
if orderedLister, ordered := w.store.(orderedLister); ordered {
if w.isCacheFullLocked() {
oldestRV := w.cache[w.startIndex%w.capacity].ResourceVersion
w.snapshots.RemoveLess(oldestRV)
}
w.snapshots.Add(w.resourceVersion, orderedLister)
}
}
return err
}(); err != nil {
return err
}
@ -319,14 +363,14 @@ func (w *watchCache) updateCache(event *watchCacheEvent) {
// - increases capacity by 2x if cache is full and all cached events occurred within last eventFreshDuration.
// - decreases capacity by 2x when recent quarter of events occurred outside of eventFreshDuration(protect watchCache from flapping).
func (w *watchCache) resizeCacheLocked(eventTime time.Time) {
if w.isCacheFullLocked() && eventTime.Sub(w.cache[w.startIndex%w.capacity].RecordTime) < eventFreshDuration {
if w.isCacheFullLocked() && eventTime.Sub(w.cache[w.startIndex%w.capacity].RecordTime) < w.eventFreshDuration {
capacity := min(w.capacity*2, w.upperBoundCapacity)
if capacity > w.capacity {
w.doCacheResizeLocked(capacity)
}
return
}
if w.isCacheFullLocked() && eventTime.Sub(w.cache[(w.endIndex-w.capacity/4)%w.capacity].RecordTime) > eventFreshDuration {
if w.isCacheFullLocked() && eventTime.Sub(w.cache[(w.endIndex-w.capacity/4)%w.capacity].RecordTime) > w.eventFreshDuration {
capacity := max(w.capacity/2, w.lowerBoundCapacity)
if capacity < w.capacity {
w.doCacheResizeLocked(capacity)
@ -452,9 +496,8 @@ func (s sortableStoreElements) Swap(i, j int) {
// WaitUntilFreshAndList returns list of pointers to `storeElement` objects along
// with their ResourceVersion and the name of the index, if any, that was used.
func (w *watchCache) WaitUntilFreshAndList(ctx context.Context, resourceVersion uint64, key string, matchValues []storage.MatchValue) (result []interface{}, rv uint64, index string, err error) {
requestWatchProgressSupported := etcdfeature.DefaultFeatureSupportChecker.Supports(storage.RequestWatchProgress)
if utilfeature.DefaultFeatureGate.Enabled(features.ConsistentListFromCache) && requestWatchProgressSupported && w.notFresh(resourceVersion) {
func (w *watchCache) WaitUntilFreshAndList(ctx context.Context, resourceVersion uint64, key string, opts storage.ListOptions) (resp listResp, index string, err error) {
if delegator.ConsistentReadSupported() && w.notFresh(resourceVersion) {
w.waitingUntilFresh.Add()
err = w.waitUntilFreshAndBlock(ctx, resourceVersion)
w.waitingUntilFresh.Remove()
@ -464,32 +507,84 @@ func (w *watchCache) WaitUntilFreshAndList(ctx context.Context, resourceVersion
defer w.RUnlock()
if err != nil {
return result, rv, index, err
return listResp{}, "", err
}
var prefixFilteredAndOrdered bool
result, rv, index, prefixFilteredAndOrdered, err = func() ([]interface{}, uint64, string, bool, error) {
// This isn't the place where we do "final filtering" - only some "prefiltering" is happening here. So the only
// requirement here is to NOT miss anything that should be returned. We can return as many non-matching items as we
// want - they will be filtered out later. The fact that we return less things is only further performance improvement.
// TODO: if multiple indexes match, return the one with the fewest items, so as to do as much filtering as possible.
for _, matchValue := range matchValues {
if result, err := w.store.ByIndex(matchValue.IndexName, matchValue.Value); err == nil {
return result, w.resourceVersion, matchValue.IndexName, false, nil
return w.list(ctx, resourceVersion, key, opts)
}
// NOTICE: Structure follows the shouldDelegateList function in
// staging/src/k8s.io/apiserver/pkg/storage/cacher/delegator.go
func (w *watchCache) list(ctx context.Context, resourceVersion uint64, key string, opts storage.ListOptions) (resp listResp, index string, err error) {
switch opts.ResourceVersionMatch {
case metav1.ResourceVersionMatchExact:
return w.listExactRV(key, "", resourceVersion)
case metav1.ResourceVersionMatchNotOlderThan:
case "":
// Continue
if len(opts.Predicate.Continue) > 0 {
continueKey, continueRV, err := storage.DecodeContinue(opts.Predicate.Continue, key)
if err != nil {
return listResp{}, "", errors.NewBadRequest(fmt.Sprintf("invalid continue token: %v", err))
}
if continueRV > 0 {
return w.listExactRV(key, continueKey, uint64(continueRV))
} else {
// Continue with negative RV is a consistent read - already handled via waitUntilFreshAndBlock.
// Don't pass matchValues as they don't support continueKey
return w.listLatestRV(key, continueKey, nil)
}
}
if store, ok := w.store.(orderedLister); ok {
result, _ := store.ListPrefix(key, "", 0)
return result, w.resourceVersion, "", true, nil
// Legacy exact match
if opts.Predicate.Limit > 0 && len(opts.ResourceVersion) > 0 && opts.ResourceVersion != "0" {
return w.listExactRV(key, "", resourceVersion)
}
return w.store.List(), w.resourceVersion, "", false, nil
}()
if !prefixFilteredAndOrdered {
result, err = filterPrefixAndOrder(key, result)
if err != nil {
return nil, 0, "", err
// Consistent Read - already handled via waitUntilFreshAndBlock
}
return w.listLatestRV(key, "", opts.Predicate.MatcherIndex(ctx))
}
func (w *watchCache) listExactRV(key, continueKey string, resourceVersion uint64) (resp listResp, index string, err error) {
if w.snapshots == nil {
return listResp{}, "", errors.NewResourceExpired(fmt.Sprintf("too old resource version: %d", resourceVersion))
}
store, ok := w.snapshots.GetLessOrEqual(resourceVersion)
if !ok {
return listResp{}, "", errors.NewResourceExpired(fmt.Sprintf("too old resource version: %d", resourceVersion))
}
items := store.ListPrefix(key, continueKey)
return listResp{
Items: items,
ResourceVersion: resourceVersion,
}, "", nil
}
func (w *watchCache) listLatestRV(key, continueKey string, matchValues []storage.MatchValue) (resp listResp, index string, err error) {
// This isn't the place where we do "final filtering" - only some "prefiltering" is happening here. So the only
// requirement here is to NOT miss anything that should be returned. We can return as many non-matching items as we
// want - they will be filtered out later. The fact that we return less things is only further performance improvement.
// TODO: if multiple indexes match, return the one with the fewest items, so as to do as much filtering as possible.
for _, matchValue := range matchValues {
if result, err := w.store.ByIndex(matchValue.IndexName, matchValue.Value); err == nil {
result, err = filterPrefixAndOrder(key, result)
return listResp{
Items: result,
ResourceVersion: w.resourceVersion,
}, matchValue.IndexName, err
}
}
return result, w.resourceVersion, index, nil
if store, ok := w.store.(orderedLister); ok {
result := store.ListPrefix(key, continueKey)
return listResp{
Items: result,
ResourceVersion: w.resourceVersion,
}, "", nil
}
result := w.store.List()
result, err = filterPrefixAndOrder(key, result)
return listResp{
Items: result,
ResourceVersion: w.resourceVersion,
}, "", err
}
func filterPrefixAndOrder(prefix string, items []interface{}) ([]interface{}, error) {
@ -517,7 +612,7 @@ func (w *watchCache) notFresh(resourceVersion uint64) bool {
// WaitUntilFreshAndGet returns a pointers to <storeElement> object.
func (w *watchCache) WaitUntilFreshAndGet(ctx context.Context, resourceVersion uint64, key string) (interface{}, bool, uint64, error) {
var err error
if utilfeature.DefaultFeatureGate.Enabled(features.ConsistentListFromCache) && w.notFresh(resourceVersion) {
if delegator.ConsistentReadSupported() && w.notFresh(resourceVersion) {
w.waitingUntilFresh.Add()
err = w.waitUntilFreshAndBlock(ctx, resourceVersion)
w.waitingUntilFresh.Remove()
@ -600,6 +695,12 @@ func (w *watchCache) Replace(objs []interface{}, resourceVersion string) error {
if err := w.store.Replace(toReplace, resourceVersion); err != nil {
return err
}
if w.snapshots != nil {
w.snapshots.Reset()
if orderedLister, ordered := w.store.(orderedLister); ordered {
w.snapshots.Add(version, orderedLister)
}
}
w.listResourceVersion = version
w.resourceVersion = version
if w.onReplace != nil {
@ -660,7 +761,7 @@ func (w *watchCache) suggestedWatchChannelSize(indexExists, triggerUsed bool) in
// We don't have an exact data, but given we store updates from
// the last <eventFreshDuration>, we approach it by dividing the
// capacity by the length of the history window.
chanSize := int(math.Ceil(float64(w.currentCapacity()) / eventFreshDuration.Seconds()))
chanSize := int(math.Ceil(float64(w.currentCapacity()) / w.eventFreshDuration.Seconds()))
// Finally we adjust the size to avoid ending with too low or
// to large values.
@ -751,7 +852,7 @@ func (w *watchCache) getAllEventsSinceLocked(resourceVersion uint64, key string,
// that covers the entire storage state.
// This function assumes to be called under the watchCache lock.
func (w *watchCache) getIntervalFromStoreLocked(key string, matchesSingle bool) (*watchCacheInterval, error) {
ci, err := newCacheIntervalFromStore(w.resourceVersion, w.store, w.getAttrsFunc, key, matchesSingle)
ci, err := newCacheIntervalFromStore(w.resourceVersion, w.store, key, matchesSingle)
if err != nil {
return nil, err
}

View File

@ -21,9 +21,6 @@ import (
"sort"
"sync"
"k8s.io/apimachinery/pkg/fields"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/watch"
)
@ -106,7 +103,6 @@ type watchCacheInterval struct {
initialEventsEndBookmark *watchCacheEvent
}
type attrFunc func(runtime.Object) (labels.Set, fields.Set, error)
type indexerFunc func(int) *watchCacheEvent
type indexValidator func(int) bool
@ -140,10 +136,9 @@ func (s sortableWatchCacheEvents) Swap(i, j int) {
// returned by Next() need to be events from a List() done on the underlying store of
// the watch cache.
// The items returned in the interval will be sorted by Key.
func newCacheIntervalFromStore(resourceVersion uint64, store storeIndexer, getAttrsFunc attrFunc, key string, matchesSingle bool) (*watchCacheInterval, error) {
func newCacheIntervalFromStore(resourceVersion uint64, store storeIndexer, key string, matchesSingle bool) (*watchCacheInterval, error) {
buffer := &watchCacheIntervalBuffer{}
var allItems []interface{}
if matchesSingle {
item, exists, err := store.GetByKey(key)
if err != nil {
@ -162,15 +157,11 @@ func newCacheIntervalFromStore(resourceVersion uint64, store storeIndexer, getAt
if !ok {
return nil, fmt.Errorf("not a storeElement: %v", elem)
}
objLabels, objFields, err := getAttrsFunc(elem.Object)
if err != nil {
return nil, err
}
buffer.buffer[i] = &watchCacheEvent{
Type: watch.Added,
Object: elem.Object,
ObjLabels: objLabels,
ObjFields: objFields,
ObjLabels: elem.Labels,
ObjFields: elem.Fields,
Key: elem.Key,
ResourceVersion: resourceVersion,
}

View File

@ -15,4 +15,4 @@ limitations under the License.
*/
// Interfaces for database-related operations.
package storage // import "k8s.io/apiserver/pkg/storage"
package storage

View File

@ -15,4 +15,4 @@ limitations under the License.
*/
// Package storage provides conversion of storage errors to API errors.
package errors // import "k8s.io/apiserver/pkg/storage/errors"
package errors

View File

@ -22,6 +22,7 @@ import (
"fmt"
"path"
"reflect"
"strconv"
"strings"
"time"
@ -32,9 +33,10 @@ import (
apierrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/conversion"
"k8s.io/apimachinery/pkg/fields"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/watch"
@ -84,6 +86,9 @@ type store struct {
leaseManager *leaseManager
decoder Decoder
listErrAggrFactory func() ListErrorAggregator
resourcePrefix string
newListFunc func() runtime.Object
}
func (s *store) RequestWatchProgress(ctx context.Context) error {
@ -185,10 +190,13 @@ func newStore(c *kubernetes.Client, codec runtime.Codec, newFunc, newListFunc fu
leaseManager: newDefaultLeaseManager(c.Client, leaseManagerConfig),
decoder: decoder,
listErrAggrFactory: listErrAggrFactory,
resourcePrefix: resourcePrefix,
newListFunc: newListFunc,
}
w.getCurrentStorageRV = func(ctx context.Context) (uint64, error) {
return storage.GetCurrentResourceVersionFromStorage(ctx, s, newListFunc, resourcePrefix, w.objectType)
return s.GetCurrentResourceVersion(ctx)
}
if utilfeature.DefaultFeatureGate.Enabled(features.ConsistentListFromCache) || utilfeature.DefaultFeatureGate.Enabled(features.WatchList) {
etcdfeature.DefaultFeatureSupportChecker.CheckClient(c.Ctx(), c, storage.RequestWatchProgress)
@ -636,45 +644,35 @@ func (s *store) ReadinessCheck() error {
return nil
}
// resolveGetListRev is used by GetList to resolve the rev to use in the client.KV.Get request.
func (s *store) resolveGetListRev(continueKey string, continueRV int64, opts storage.ListOptions) (int64, error) {
var withRev int64
// Uses continueRV if this is a continuation request.
if len(continueKey) > 0 {
if len(opts.ResourceVersion) > 0 && opts.ResourceVersion != "0" {
return withRev, apierrors.NewBadRequest("specifying resource version is not allowed when using continue")
}
// If continueRV > 0, the LIST request needs a specific resource version.
// continueRV==0 is invalid.
// If continueRV < 0, the request is for the latest resource version.
if continueRV > 0 {
withRev = continueRV
}
return withRev, nil
}
// Returns 0 if ResourceVersion is not specified.
if len(opts.ResourceVersion) == 0 {
return withRev, nil
}
parsedRV, err := s.versioner.ParseResourceVersion(opts.ResourceVersion)
if err != nil {
return withRev, apierrors.NewBadRequest(fmt.Sprintf("invalid resource version: %v", err))
func (s *store) GetCurrentResourceVersion(ctx context.Context) (uint64, error) {
emptyList := s.newListFunc()
pred := storage.SelectionPredicate{
Label: labels.Everything(),
Field: fields.Everything(),
Limit: 1, // just in case we actually hit something
}
switch opts.ResourceVersionMatch {
case metav1.ResourceVersionMatchNotOlderThan:
// The not older than constraint is checked after we get a response from etcd,
// and returnedRV is then set to the revision we get from the etcd response.
case metav1.ResourceVersionMatchExact:
withRev = int64(parsedRV)
case "": // legacy case
if opts.Recursive && opts.Predicate.Limit > 0 && parsedRV > 0 {
withRev = int64(parsedRV)
}
default:
return withRev, fmt.Errorf("unknown ResourceVersionMatch value: %v", opts.ResourceVersionMatch)
err := s.GetList(ctx, s.resourcePrefix, storage.ListOptions{Predicate: pred}, emptyList)
if err != nil {
return 0, err
}
return withRev, nil
emptyListAccessor, err := meta.ListAccessor(emptyList)
if err != nil {
return 0, err
}
if emptyListAccessor == nil {
return 0, fmt.Errorf("unable to extract a list accessor from %T", emptyList)
}
currentResourceVersion, err := strconv.Atoi(emptyListAccessor.GetResourceVersion())
if err != nil {
return 0, err
}
if currentResourceVersion == 0 {
return 0, fmt.Errorf("the current resource version must be greater than 0")
}
return uint64(currentResourceVersion), nil
}
// GetList implements storage.Interface.
@ -713,15 +711,8 @@ func (s *store) GetList(ctx context.Context, key string, opts storage.ListOption
paging := opts.Predicate.Limit > 0
newItemFunc := getNewItemFunc(listObj, v)
var continueRV, withRev int64
var continueKey string
if opts.Recursive && len(opts.Predicate.Continue) > 0 {
continueKey, continueRV, err = storage.DecodeContinue(opts.Predicate.Continue, keyPrefix)
if err != nil {
return apierrors.NewBadRequest(fmt.Sprintf("invalid continue token: %v", err))
}
}
if withRev, err = s.resolveGetListRev(continueKey, continueRV, opts); err != nil {
withRev, continueKey, err := storage.ValidateListOptions(keyPrefix, s.versioner, opts)
if err != nil {
return err
}

View File

@ -438,7 +438,12 @@ func (wc *watchChan) serialProcessEvents(wg *sync.WaitGroup) {
for {
select {
case e := <-wc.incomingEventChan:
res := wc.transform(e)
res, err := wc.transform(e)
if err != nil {
wc.sendError(err)
return
}
if res == nil {
continue
}
@ -461,10 +466,8 @@ func (wc *watchChan) serialProcessEvents(wg *sync.WaitGroup) {
func (wc *watchChan) concurrentProcessEvents(wg *sync.WaitGroup) {
p := concurrentOrderedEventProcessing{
input: wc.incomingEventChan,
processFunc: wc.transform,
output: wc.resultChan,
processingQueue: make(chan chan *watch.Event, processEventConcurrency-1),
wc: wc,
processingQueue: make(chan chan *processingResult, processEventConcurrency-1),
objectType: wc.watcher.objectType,
groupResource: wc.watcher.groupResource,
@ -481,12 +484,15 @@ func (wc *watchChan) concurrentProcessEvents(wg *sync.WaitGroup) {
}()
}
type concurrentOrderedEventProcessing struct {
input chan *event
processFunc func(*event) *watch.Event
output chan watch.Event
type processingResult struct {
event *watch.Event
err error
}
processingQueue chan chan *watch.Event
type concurrentOrderedEventProcessing struct {
wc *watchChan
processingQueue chan chan *processingResult
// Metadata for logging
objectType string
groupResource schema.GroupResource
@ -498,28 +504,29 @@ func (p *concurrentOrderedEventProcessing) scheduleEventProcessing(ctx context.C
select {
case <-ctx.Done():
return
case e = <-p.input:
case e = <-p.wc.incomingEventChan:
}
processingResponse := make(chan *watch.Event, 1)
processingResponse := make(chan *processingResult, 1)
select {
case <-ctx.Done():
return
case p.processingQueue <- processingResponse:
}
wg.Add(1)
go func(e *event, response chan<- *watch.Event) {
go func(e *event, response chan<- *processingResult) {
defer wg.Done()
responseEvent, err := p.wc.transform(e)
select {
case <-ctx.Done():
case response <- p.processFunc(e):
case response <- &processingResult{event: responseEvent, err: err}:
}
}(e, processingResponse)
}
}
func (p *concurrentOrderedEventProcessing) collectEventProcessing(ctx context.Context) {
var processingResponse chan *watch.Event
var e *watch.Event
var processingResponse chan *processingResult
var r *processingResult
for {
select {
case <-ctx.Done():
@ -529,21 +536,25 @@ func (p *concurrentOrderedEventProcessing) collectEventProcessing(ctx context.Co
select {
case <-ctx.Done():
return
case e = <-processingResponse:
case r = <-processingResponse:
}
if e == nil {
if r.err != nil {
p.wc.sendError(r.err)
return
}
if r.event == nil {
continue
}
if len(p.output) == cap(p.output) {
klog.V(3).InfoS("Fast watcher, slow processing. Probably caused by slow dispatching events to watchers", "outgoingEvents", outgoingBufSize, "objectType", p.objectType, "groupResource", p.groupResource)
if len(p.wc.resultChan) == cap(p.wc.resultChan) {
klog.V(3).InfoS("Fast watcher, slow processing. Probably caused by slow dispatching events to watchers", "outgoingEvents", outgoingBufSize, "objectType", p.wc.watcher.objectType, "groupResource", p.wc.watcher.groupResource)
}
// If user couldn't receive results fast enough, we also block incoming events from watcher.
// Because storing events in local will cause more memory usage.
// The worst case would be closing the fast watcher.
select {
case <-ctx.Done():
case p.wc.resultChan <- *r.event:
case <-p.wc.ctx.Done():
return
case p.output <- *e:
}
}
}
@ -561,12 +572,11 @@ func (wc *watchChan) acceptAll() bool {
}
// transform transforms an event into a result for user if not filtered.
func (wc *watchChan) transform(e *event) (res *watch.Event) {
func (wc *watchChan) transform(e *event) (res *watch.Event, err error) {
curObj, oldObj, err := wc.prepareObjs(e)
if err != nil {
klog.Errorf("failed to prepare current and previous objects: %v", err)
wc.sendError(err)
return nil
return nil, err
}
switch {
@ -574,12 +584,11 @@ func (wc *watchChan) transform(e *event) (res *watch.Event) {
object := wc.watcher.newFunc()
if err := wc.watcher.versioner.UpdateObject(object, uint64(e.rev)); err != nil {
klog.Errorf("failed to propagate object version: %v", err)
return nil
return nil, fmt.Errorf("failed to propagate object resource version: %w", err)
}
if e.isInitialEventsEndBookmark {
if err := storage.AnnotateInitialEventsEndBookmark(object); err != nil {
wc.sendError(fmt.Errorf("error while accessing object's metadata gr: %v, type: %v, obj: %#v, err: %v", wc.watcher.groupResource, wc.watcher.objectType, object, err))
return nil
return nil, fmt.Errorf("error while accessing object's metadata gr: %v, type: %v, obj: %#v, err: %w", wc.watcher.groupResource, wc.watcher.objectType, object, err)
}
}
res = &watch.Event{
@ -588,7 +597,7 @@ func (wc *watchChan) transform(e *event) (res *watch.Event) {
}
case e.isDeleted:
if !wc.filter(oldObj) {
return nil
return nil, nil
}
res = &watch.Event{
Type: watch.Deleted,
@ -596,7 +605,7 @@ func (wc *watchChan) transform(e *event) (res *watch.Event) {
}
case e.isCreated:
if !wc.filter(curObj) {
return nil
return nil, nil
}
res = &watch.Event{
Type: watch.Added,
@ -608,7 +617,7 @@ func (wc *watchChan) transform(e *event) (res *watch.Event) {
Type: watch.Modified,
Object: curObj,
}
return res
return res, nil
}
curObjPasses := wc.filter(curObj)
oldObjPasses := wc.filter(oldObj)
@ -630,7 +639,7 @@ func (wc *watchChan) transform(e *event) (res *watch.Event) {
}
}
}
return res
return res, nil
}
func transformErrorToEvent(err error) *watch.Event {

View File

@ -20,6 +20,7 @@ import (
"context"
"fmt"
apierrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/fields"
@ -262,6 +263,10 @@ type Interface interface {
// TODO: Remove when storage.Interface will be separate from etc3.store.
// Deprecated: Added temporarily to simplify exposing RequestProgress for watch cache.
RequestWatchProgress(ctx context.Context) error
// GetCurrentResourceVersion gets the current resource version from etcd.
// This method issues an empty list request and reads only the ResourceVersion from the object metadata
GetCurrentResourceVersion(ctx context.Context) (uint64, error)
}
// GetOptions provides the options that may be provided for storage get operations.
@ -325,3 +330,43 @@ type DeleteOptions struct {
// object which otherwise can not be deleted using the normal flow
IgnoreStoreReadError bool
}
func ValidateListOptions(keyPrefix string, versioner Versioner, opts ListOptions) (withRev int64, continueKey string, err error) {
if opts.Recursive && len(opts.Predicate.Continue) > 0 {
continueKey, continueRV, err := DecodeContinue(opts.Predicate.Continue, keyPrefix)
if err != nil {
return 0, "", apierrors.NewBadRequest(fmt.Sprintf("invalid continue token: %v", err))
}
if len(opts.ResourceVersion) > 0 && opts.ResourceVersion != "0" {
return 0, "", apierrors.NewBadRequest("specifying resource version is not allowed when using continue")
}
// If continueRV > 0, the LIST request needs a specific resource version.
// continueRV==0 is invalid.
// If continueRV < 0, the request is for the latest resource version.
if continueRV > 0 {
withRev = continueRV
}
return withRev, continueKey, nil
}
if len(opts.ResourceVersion) == 0 {
return withRev, "", nil
}
parsedRV, err := versioner.ParseResourceVersion(opts.ResourceVersion)
if err != nil {
return withRev, "", apierrors.NewBadRequest(fmt.Sprintf("invalid resource version: %v", err))
}
switch opts.ResourceVersionMatch {
case metav1.ResourceVersionMatchNotOlderThan:
// The not older than constraint is checked after we get a response from etcd,
// and returnedRV is then set to the revision we get from the etcd response.
case metav1.ResourceVersionMatchExact:
withRev = int64(parsedRV)
case "": // legacy case
if opts.Recursive && opts.Predicate.Limit > 0 && parsedRV > 0 {
withRev = int64(parsedRV)
}
default:
return withRev, "", fmt.Errorf("unknown ResourceVersionMatch value: %v", opts.ResourceVersionMatch)
}
return withRev, "", nil
}

View File

@ -37,6 +37,7 @@ const (
DefaultCompactInterval = 5 * time.Minute
DefaultDBMetricPollInterval = 30 * time.Second
DefaultEventsHistoryWindow = 75 * time.Second
DefaultHealthcheckTimeout = 2 * time.Second
DefaultReadinessTimeout = 2 * time.Second
)
@ -80,6 +81,8 @@ type Config struct {
CountMetricPollPeriod time.Duration
// DBMetricPollInterval specifies how often should storage backend metric be updated.
DBMetricPollInterval time.Duration
// EventsHistoryWindow specifies minimum history duration that storage is keeping.
EventsHistoryWindow time.Duration
// HealthcheckTimeout specifies the timeout used when checking health
HealthcheckTimeout time.Duration
// ReadycheckTimeout specifies the timeout used when checking readiness
@ -115,6 +118,7 @@ func NewDefaultConfig(prefix string, codec runtime.Codec) *Config {
Codec: codec,
CompactionInterval: DefaultCompactInterval,
DBMetricPollInterval: DefaultDBMetricPollInterval,
EventsHistoryWindow: DefaultEventsHistoryWindow,
HealthcheckTimeout: DefaultHealthcheckTimeout,
ReadycheckTimeout: DefaultReadinessTimeout,
LeaseManagerConfig: etcd3.NewDefaultLeaseManagerConfig(),

View File

@ -17,16 +17,12 @@ limitations under the License.
package storage
import (
"context"
"fmt"
"strconv"
"sync/atomic"
"k8s.io/apimachinery/pkg/api/meta"
"k8s.io/apimachinery/pkg/api/validation/path"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/fields"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime"
)
@ -81,45 +77,6 @@ func (hwm *HighWaterMark) Update(current int64) bool {
}
}
// GetCurrentResourceVersionFromStorage gets the current resource version from the underlying storage engine.
// This method issues an empty list request and reads only the ResourceVersion from the object metadata
func GetCurrentResourceVersionFromStorage(ctx context.Context, storage Interface, newListFunc func() runtime.Object, resourcePrefix, objectType string) (uint64, error) {
if storage == nil {
return 0, fmt.Errorf("storage wasn't provided for %s", objectType)
}
if newListFunc == nil {
return 0, fmt.Errorf("newListFunction wasn't provided for %s", objectType)
}
emptyList := newListFunc()
pred := SelectionPredicate{
Label: labels.Everything(),
Field: fields.Everything(),
Limit: 1, // just in case we actually hit something
}
err := storage.GetList(ctx, resourcePrefix, ListOptions{Predicate: pred}, emptyList)
if err != nil {
return 0, err
}
emptyListAccessor, err := meta.ListAccessor(emptyList)
if err != nil {
return 0, err
}
if emptyListAccessor == nil {
return 0, fmt.Errorf("unable to extract a list accessor from %T", emptyList)
}
currentResourceVersion, err := strconv.Atoi(emptyListAccessor.GetResourceVersion())
if err != nil {
return 0, err
}
if currentResourceVersion == 0 {
return 0, fmt.Errorf("the current resource version must be greater than 0")
}
return uint64(currentResourceVersion), nil
}
// AnnotateInitialEventsEndBookmark adds a special annotation to the given object
// which indicates that the initial events have been sent.
//