rebase: update K8s packages to v0.32.1

Update K8s packages in go.mod to v0.32.1

Signed-off-by: Praveen M <m.praveen@ibm.com>
This commit is contained in:
Praveen M
2025-01-16 09:41:46 +05:30
committed by mergify[bot]
parent 5aef21ea4e
commit 7eb99fc6c9
2442 changed files with 273386 additions and 47788 deletions

View File

@ -10,7 +10,6 @@ reviewers:
- caesarxuchao
- mikedanese
- liggitt
- ncdc
- ingvagabund
- enj
- stevekuznetsov
@ -19,3 +18,5 @@ emeritus_approvers:
- xiang90
- timothysc
- lavalamp
emeritus_reviewers:
- ncdc

View File

@ -454,6 +454,13 @@ func (c *cacheWatcher) processInterval(ctx context.Context, cacheInterval *watch
const initProcessThreshold = 500 * time.Millisecond
startTime := time.Now()
// cacheInterval may be created from a version being more fresh than requested
// (e.g. for NotOlderThan semantic). In such a case, we need to prevent watch event
// with lower resourceVersion from being delivered to ensure watch contract.
if cacheInterval.resourceVersion > resourceVersion {
resourceVersion = cacheInterval.resourceVersion
}
initEventCount := 0
for {
event, err := cacheInterval.Next()
@ -503,6 +510,10 @@ func (c *cacheWatcher) processInterval(ctx context.Context, cacheInterval *watch
klog.V(2).Infof("processing %d initEvents of %s (%s) took %v", initEventCount, c.groupResource, c.identifier, processingTime)
}
// send bookmark after sending all events in cacheInterval for watchlist request
if cacheInterval.initialEventsEndBookmark != nil {
c.sendWatchCacheEvent(cacheInterval.initialEventsEndBookmark)
}
c.process(ctx, resourceVersion)
}

View File

@ -492,7 +492,7 @@ func (c *Cacher) Create(ctx context.Context, key string, obj, out runtime.Object
// Delete implements storage.Interface.
func (c *Cacher) Delete(
ctx context.Context, key string, out runtime.Object, preconditions *storage.Preconditions,
validateDeletion storage.ValidateObjectFunc, _ runtime.Object) error {
validateDeletion storage.ValidateObjectFunc, _ runtime.Object, opts storage.DeleteOptions) error {
// Ignore the suggestion and try to pass down the current version of the object
// read from cache.
if elem, exists, err := c.watchCache.GetByKey(key); err != nil {
@ -501,10 +501,10 @@ func (c *Cacher) Delete(
// DeepCopy the object since we modify resource version when serializing the
// current object.
currObj := elem.(*storeElement).Object.DeepCopyObject()
return c.storage.Delete(ctx, key, out, preconditions, validateDeletion, currObj)
return c.storage.Delete(ctx, key, out, preconditions, validateDeletion, currObj, opts)
}
// If we couldn't get the object, fallback to no-suggestion.
return c.storage.Delete(ctx, key, out, preconditions, validateDeletion, nil)
return c.storage.Delete(ctx, key, out, preconditions, validateDeletion, nil, opts)
}
type namespacedName struct {
@ -653,6 +653,8 @@ func (c *Cacher) Watch(ctx context.Context, key string, opts storage.ListOptions
return newErrWatcher(err), nil
}
c.setInitialEventsEndBookmarkIfRequested(cacheInterval, opts, c.watchCache.resourceVersion)
addedWatcher := false
func() {
c.Lock()
@ -693,9 +695,15 @@ func (c *Cacher) Watch(ctx context.Context, key string, opts storage.ListOptions
// Get implements storage.Interface.
func (c *Cacher) Get(ctx context.Context, key string, opts storage.GetOptions, objPtr runtime.Object) error {
ctx, span := tracing.Start(ctx, "cacher.Get",
attribute.String("audit-id", audit.GetAuditIDTruncated(ctx)),
attribute.String("key", key),
attribute.String("resource-version", opts.ResourceVersion))
defer span.End(500 * time.Millisecond)
if opts.ResourceVersion == "" {
// If resourceVersion is not specified, serve it from underlying
// storage (for backward compatibility).
span.AddEvent("About to Get from underlying storage")
return c.storage.Get(ctx, key, opts, objPtr)
}
@ -703,6 +711,7 @@ func (c *Cacher) Get(ctx context.Context, key string, opts storage.GetOptions, o
if !c.ready.check() {
// If Cache is not initialized, delegate Get requests to storage
// as described in https://kep.k8s.io/4568
span.AddEvent("About to Get from underlying storage - cache not initialized")
return c.storage.Get(ctx, key, opts, objPtr)
}
}
@ -722,6 +731,7 @@ func (c *Cacher) Get(ctx context.Context, key string, opts storage.GetOptions, o
if getRV == 0 && !c.ready.check() {
// If Cacher is not yet initialized and we don't require any specific
// minimal resource version, simply forward the request to storage.
span.AddEvent("About to Get from underlying storage - cache not initialized and no resourceVersion set")
return c.storage.Get(ctx, key, opts, objPtr)
}
if err := c.ready.wait(ctx); err != nil {
@ -734,6 +744,7 @@ func (c *Cacher) Get(ctx context.Context, key string, opts storage.GetOptions, o
return err
}
span.AddEvent("About to fetch object from cache")
obj, exists, readResourceVersion, err := c.watchCache.WaitUntilFreshAndGet(ctx, getRV, key)
if err != nil {
return err
@ -856,7 +867,7 @@ func (c *Cacher) GetList(ctx context.Context, key string, opts storage.ListOptio
}
}
ctx, span := tracing.Start(ctx, "cacher list",
ctx, span := tracing.Start(ctx, "cacher.GetList",
attribute.String("audit-id", audit.GetAuditIDTruncated(ctx)),
attribute.Stringer("type", c.groupResource))
defer span.End(500 * time.Millisecond)
@ -1119,6 +1130,9 @@ func (c *Cacher) dispatchEvent(event *watchCacheEvent) {
// Since add() can block, we explicitly add when cacher is unlocked.
// Dispatching event in nonblocking way first, which make faster watchers
// not be blocked by slower ones.
//
// Note: if we ever decide to cache the serialization of bookmark events,
// we will also need to modify the watchEncoder encoder
if event.Type == watch.Bookmark {
for _, watcher := range c.watchersBuffer {
watcher.nonblockingAdd(event)
@ -1439,6 +1453,26 @@ func (c *Cacher) Wait(ctx context.Context) error {
return c.ready.wait(ctx)
}
// setInitialEventsEndBookmarkIfRequested sets initialEventsEndBookmark field in watchCacheInterval for watchlist request
func (c *Cacher) setInitialEventsEndBookmarkIfRequested(cacheInterval *watchCacheInterval, opts storage.ListOptions, currentResourceVersion uint64) {
if opts.SendInitialEvents != nil && *opts.SendInitialEvents && opts.Predicate.AllowWatchBookmarks {
// We don't need to set the InitialEventsAnnotation for this bookmark event,
// because this will be automatically set during event conversion in cacheWatcher.convertToWatchEvent method
initialEventsEndBookmark := &watchCacheEvent{
Type: watch.Bookmark,
Object: c.newFunc(),
ResourceVersion: currentResourceVersion,
}
if err := c.versioner.UpdateObject(initialEventsEndBookmark.Object, initialEventsEndBookmark.ResourceVersion); err != nil {
klog.Errorf("failure to set resourceVersion to %d on initialEventsEndBookmark event %+v for watchlist request and wait for bookmark trigger to send", initialEventsEndBookmark.ResourceVersion, initialEventsEndBookmark.Object)
initialEventsEndBookmark = nil
}
cacheInterval.initialEventsEndBookmark = initialEventsEndBookmark
}
}
// errWatcher implements watch.Interface to return a single error
type errWatcher struct {
result chan watch.Event

141
vendor/k8s.io/apiserver/pkg/storage/cacher/store.go generated vendored Normal file
View File

@ -0,0 +1,141 @@
/*
Copyright 2024 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package cacher
import (
"fmt"
"github.com/google/btree"
"k8s.io/apimachinery/pkg/fields"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apiserver/pkg/features"
utilfeature "k8s.io/apiserver/pkg/util/feature"
"k8s.io/client-go/tools/cache"
)
const (
// btreeDegree defines the degree of btree storage.
// Decided based on the benchmark results (below).
// Selected the lowest degree from three options with best runtime (16,32,128).
// │ 2 │ 4 │ 8 │ 16 │ 32 │ 64 │ 128 │
// │ sec/op │ sec/op vs base │ sec/op vs base │ sec/op vs base │ sec/op vs base │ sec/op vs base │ sec/op vs base │
// StoreCreateList/RV=NotOlderThan-24 473.0µ ± 11% 430.1µ ± 9% -9.08% (p=0.005 n=10) 427.9µ ± 6% -9.54% (p=0.002 n=10) 403.9µ ± 8% -14.62% (p=0.000 n=10) 401.0µ ± 4% -15.22% (p=0.000 n=10) 408.0µ ± 4% -13.75% (p=0.000 n=10) 385.9µ ± 4% -18.42% (p=0.000 n=10)
// StoreCreateList/RV=ExactMatch-24 604.7µ ± 4% 596.7µ ± 8% ~ (p=0.529 n=10) 604.6µ ± 4% ~ (p=0.971 n=10) 601.1µ ± 4% ~ (p=0.853 n=10) 611.0µ ± 6% ~ (p=0.105 n=10) 598.2µ ± 5% ~ (p=0.579 n=10) 608.2µ ± 3% ~ (p=0.796 n=10)
// StoreList/List=All/Paginate=False/RV=Empty-24 729.1µ ± 5% 692.9µ ± 3% -4.96% (p=0.002 n=10) 693.7µ ± 3% -4.86% (p=0.000 n=10) 688.3µ ± 1% -5.59% (p=0.000 n=10) 690.4µ ± 5% -5.31% (p=0.002 n=10) 689.7µ ± 2% -5.40% (p=0.000 n=10) 687.8µ ± 3% -5.67% (p=0.000 n=10)
// StoreList/List=All/Paginate=True/RV=Empty-24 19.51m ± 2% 19.84m ± 2% ~ (p=0.105 n=10) 19.89m ± 3% ~ (p=0.190 n=10) 19.64m ± 4% ~ (p=0.853 n=10) 19.34m ± 4% ~ (p=0.481 n=10) 20.22m ± 4% +3.66% (p=0.007 n=10) 19.58m ± 4% ~ (p=0.912 n=10)
// StoreList/List=Namespace/Paginate=False/RV=Empty-24 1.672m ± 4% 1.635m ± 2% ~ (p=0.247 n=10) 1.673m ± 5% ~ (p=0.631 n=10) 1.657m ± 2% ~ (p=0.971 n=10) 1.656m ± 4% ~ (p=0.739 n=10) 1.678m ± 2% ~ (p=0.631 n=10) 1.718m ± 8% ~ (p=0.105 n=10)
// geomean 1.467m 1.420m -3.24% 1.430m -2.58% 1.403m -4.38% 1.402m -4.46% 1.417m -3.44% 1.403m -4.41%
//
// │ 2 │ 4 │ 8 │ 16 │ 32 │ 64 │ 128 │
// │ B/op │ B/op vs base │ B/op vs base │ B/op vs base │ B/op vs base │ B/op vs base │ B/op vs base │
// StoreCreateList/RV=NotOlderThan-24 98.58Ki ± 11% 101.33Ki ± 13% ~ (p=0.280 n=10) 99.80Ki ± 26% ~ (p=0.353 n=10) 109.63Ki ± 9% ~ (p=0.075 n=10) 112.56Ki ± 6% +14.18% (p=0.007 n=10) 114.41Ki ± 10% +16.05% (p=0.003 n=10) 115.06Ki ± 12% +16.72% (p=0.011 n=10)
// StoreCreateList/RV=ExactMatch-24 117.1Ki ± 0% 117.5Ki ± 0% ~ (p=0.218 n=10) 116.9Ki ± 0% ~ (p=0.052 n=10) 117.3Ki ± 0% ~ (p=0.353 n=10) 116.9Ki ± 0% ~ (p=0.075 n=10) 117.0Ki ± 0% ~ (p=0.436 n=10) 117.0Ki ± 0% ~ (p=0.280 n=10)
// StoreList/List=All/Paginate=False/RV=Empty-24 6.023Mi ± 0% 6.024Mi ± 0% +0.01% (p=0.037 n=10) 6.024Mi ± 0% ~ (p=0.493 n=10) 6.024Mi ± 0% +0.01% (p=0.035 n=10) 6.024Mi ± 0% ~ (p=0.247 n=10) 6.024Mi ± 0% ~ (p=0.247 n=10) 6.024Mi ± 0% ~ (p=0.315 n=10)
// StoreList/List=All/Paginate=True/RV=Empty-24 64.22Mi ± 0% 64.21Mi ± 0% ~ (p=0.075 n=10) 64.23Mi ± 0% ~ (p=0.280 n=10) 64.21Mi ± 0% -0.02% (p=0.002 n=10) 64.22Mi ± 0% ~ (p=0.579 n=10) 64.22Mi ± 0% ~ (p=0.971 n=10) 64.22Mi ± 0% ~ (p=1.000 n=10)
// StoreList/List=Namespace/Paginate=False/RV=Empty-24 8.177Mi ± 0% 8.178Mi ± 0% ~ (p=0.579 n=10) 8.177Mi ± 0% ~ (p=0.971 n=10) 8.179Mi ± 0% ~ (p=0.579 n=10) 8.178Mi ± 0% ~ (p=0.739 n=10) 8.179Mi ± 0% ~ (p=0.315 n=10) 8.176Mi ± 0% ~ (p=0.247 n=10)
// geomean 2.034Mi 2.047Mi +0.61% 2.039Mi +0.22% 2.079Mi +2.19% 2.088Mi +2.66% 2.095Mi +3.01% 2.098Mi +3.12%
//
// │ 2 │ 4 │ 8 │ 16 │ 32 │ 64 │ 128 │
// │ allocs/op │ allocs/op vs base │ allocs/op vs base │ allocs/op vs base │ allocs/op vs base │ allocs/op vs base │ allocs/op vs base │
// StoreCreateList/RV=NotOlderThan-24 560.0 ± 0% 558.0 ± 0% -0.36% (p=0.000 n=10) 557.0 ± 0% -0.54% (p=0.000 n=10) 558.0 ± 0% -0.36% (p=0.000 n=10) 557.0 ± 0% -0.54% (p=0.000 n=10) 557.0 ± 0% -0.54% (p=0.000 n=10) 557.0 ± 0% -0.54% (p=0.000 n=10)
// StoreCreateList/RV=ExactMatch-24 871.0 ± 0% 870.0 ± 0% -0.11% (p=0.038 n=10) 870.0 ± 0% -0.11% (p=0.004 n=10) 870.0 ± 0% -0.11% (p=0.005 n=10) 869.0 ± 0% -0.23% (p=0.000 n=10) 870.0 ± 0% -0.11% (p=0.001 n=10) 870.0 ± 0% -0.11% (p=0.000 n=10)
// StoreList/List=All/Paginate=False/RV=Empty-24 351.0 ± 3% 358.0 ± 1% +1.99% (p=0.034 n=10) 352.5 ± 3% ~ (p=0.589 n=10) 358.5 ± 1% +2.14% (p=0.022 n=10) 356.5 ± 3% ~ (p=0.208 n=10) 355.0 ± 3% ~ (p=0.224 n=10) 355.0 ± 3% ~ (p=0.183 n=10)
// StoreList/List=All/Paginate=True/RV=Empty-24 494.4k ± 0% 494.4k ± 0% ~ (p=0.424 n=10) 494.6k ± 0% +0.06% (p=0.000 n=10) 492.7k ± 0% -0.34% (p=0.000 n=10) 494.5k ± 0% +0.02% (p=0.009 n=10) 493.0k ± 0% -0.28% (p=0.000 n=10) 494.4k ± 0% ~ (p=0.424 n=10)
// StoreList/List=Namespace/Paginate=False/RV=Empty-24 32.43k ± 0% 32.44k ± 0% ~ (p=0.579 n=10) 32.43k ± 0% ~ (p=0.971 n=10) 32.45k ± 0% ~ (p=0.517 n=10) 32.44k ± 0% ~ (p=0.670 n=10) 32.46k ± 0% ~ (p=0.256 n=10) 32.41k ± 0% ~ (p=0.247 n=10)
// geomean 4.872k 4.887k +0.31% 4.870k -0.03% 4.885k +0.28% 4.880k +0.17% 4.875k +0.06% 4.876k +0.08%
btreeDegree = 16
)
type storeIndexer interface {
Add(obj interface{}) error
Update(obj interface{}) error
Delete(obj interface{}) error
List() []interface{}
ListKeys() []string
Get(obj interface{}) (item interface{}, exists bool, err error)
GetByKey(key string) (item interface{}, exists bool, err error)
Replace([]interface{}, string) error
ByIndex(indexName, indexedValue string) ([]interface{}, error)
}
type orderedLister interface {
ListPrefix(prefix, continueKey string, limit int) (items []interface{}, hasMore bool)
}
func newStoreIndexer(indexers *cache.Indexers) storeIndexer {
if utilfeature.DefaultFeatureGate.Enabled(features.BtreeWatchCache) {
return newThreadedBtreeStoreIndexer(storeElementIndexers(indexers), btreeDegree)
}
return cache.NewIndexer(storeElementKey, storeElementIndexers(indexers))
}
// Computing a key of an object is generally non-trivial (it performs
// e.g. validation underneath). Similarly computing object fields and
// labels. To avoid computing them multiple times (to serve the event
// in different List/Watch requests), in the underlying store we are
// keeping structs (key, object, labels, fields).
type storeElement struct {
Key string
Object runtime.Object
Labels labels.Set
Fields fields.Set
}
func (t *storeElement) Less(than btree.Item) bool {
return t.Key < than.(*storeElement).Key
}
var _ btree.Item = (*storeElement)(nil)
func storeElementKey(obj interface{}) (string, error) {
elem, ok := obj.(*storeElement)
if !ok {
return "", fmt.Errorf("not a storeElement: %v", obj)
}
return elem.Key, nil
}
func storeElementObject(obj interface{}) (runtime.Object, error) {
elem, ok := obj.(*storeElement)
if !ok {
return nil, fmt.Errorf("not a storeElement: %v", obj)
}
return elem.Object, nil
}
func storeElementIndexFunc(objIndexFunc cache.IndexFunc) cache.IndexFunc {
return func(obj interface{}) (strings []string, e error) {
seo, err := storeElementObject(obj)
if err != nil {
return nil, err
}
return objIndexFunc(seo)
}
}
func storeElementIndexers(indexers *cache.Indexers) cache.Indexers {
if indexers == nil {
return cache.Indexers{}
}
ret := cache.Indexers{}
for indexName, indexFunc := range *indexers {
ret[indexName] = storeElementIndexFunc(indexFunc)
}
return ret
}

View File

@ -0,0 +1,393 @@
/*
Copyright 2022 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package cacher
import (
"fmt"
"math"
"strings"
"sync"
"github.com/google/btree"
"k8s.io/client-go/tools/cache"
)
// newThreadedBtreeStoreIndexer returns a storage for cacher by adding locking over the two 2 data structures:
// * btree based storage for efficient LIST operation on prefix
// * map based indexer for retrieving values by index.
// This separation is used to allow independent snapshotting those two storages in the future.
// Intention is to utilize btree for its cheap snapshots that don't require locking if don't mutate data.
func newThreadedBtreeStoreIndexer(indexers cache.Indexers, degree int) *threadedStoreIndexer {
return &threadedStoreIndexer{
store: newBtreeStore(degree),
indexer: newIndexer(indexers),
}
}
type threadedStoreIndexer struct {
lock sync.RWMutex
store btreeStore
indexer indexer
}
func (si *threadedStoreIndexer) Add(obj interface{}) error {
return si.addOrUpdate(obj)
}
func (si *threadedStoreIndexer) Update(obj interface{}) error {
return si.addOrUpdate(obj)
}
func (si *threadedStoreIndexer) addOrUpdate(obj interface{}) error {
if obj == nil {
return fmt.Errorf("obj cannot be nil")
}
newElem, ok := obj.(*storeElement)
if !ok {
return fmt.Errorf("obj not a storeElement: %#v", obj)
}
si.lock.Lock()
defer si.lock.Unlock()
oldElem := si.store.addOrUpdateElem(newElem)
return si.indexer.updateElem(newElem.Key, oldElem, newElem)
}
func (si *threadedStoreIndexer) Delete(obj interface{}) error {
storeElem, ok := obj.(*storeElement)
if !ok {
return fmt.Errorf("obj not a storeElement: %#v", obj)
}
si.lock.Lock()
defer si.lock.Unlock()
oldObj := si.store.deleteElem(storeElem)
if oldObj == nil {
return nil
}
return si.indexer.updateElem(storeElem.Key, oldObj.(*storeElement), nil)
}
func (si *threadedStoreIndexer) List() []interface{} {
si.lock.RLock()
defer si.lock.RUnlock()
return si.store.List()
}
func (si *threadedStoreIndexer) ListPrefix(prefix, continueKey string, limit int) ([]interface{}, bool) {
si.lock.RLock()
defer si.lock.RUnlock()
return si.store.ListPrefix(prefix, continueKey, limit)
}
func (si *threadedStoreIndexer) ListKeys() []string {
si.lock.RLock()
defer si.lock.RUnlock()
return si.store.ListKeys()
}
func (si *threadedStoreIndexer) Get(obj interface{}) (item interface{}, exists bool, err error) {
si.lock.RLock()
defer si.lock.RUnlock()
return si.store.Get(obj)
}
func (si *threadedStoreIndexer) GetByKey(key string) (item interface{}, exists bool, err error) {
si.lock.RLock()
defer si.lock.RUnlock()
return si.store.GetByKey(key)
}
func (si *threadedStoreIndexer) Replace(objs []interface{}, resourceVersion string) error {
si.lock.Lock()
defer si.lock.Unlock()
err := si.store.Replace(objs, resourceVersion)
if err != nil {
return err
}
return si.indexer.Replace(objs, resourceVersion)
}
func (si *threadedStoreIndexer) ByIndex(indexName, indexValue string) ([]interface{}, error) {
si.lock.RLock()
defer si.lock.RUnlock()
return si.indexer.ByIndex(indexName, indexValue)
}
func newBtreeStore(degree int) btreeStore {
return btreeStore{
tree: btree.New(degree),
}
}
type btreeStore struct {
tree *btree.BTree
}
func (s *btreeStore) Add(obj interface{}) error {
if obj == nil {
return fmt.Errorf("obj cannot be nil")
}
storeElem, ok := obj.(*storeElement)
if !ok {
return fmt.Errorf("obj not a storeElement: %#v", obj)
}
s.addOrUpdateElem(storeElem)
return nil
}
func (s *btreeStore) Update(obj interface{}) error {
if obj == nil {
return fmt.Errorf("obj cannot be nil")
}
storeElem, ok := obj.(*storeElement)
if !ok {
return fmt.Errorf("obj not a storeElement: %#v", obj)
}
s.addOrUpdateElem(storeElem)
return nil
}
func (s *btreeStore) Delete(obj interface{}) error {
if obj == nil {
return fmt.Errorf("obj cannot be nil")
}
storeElem, ok := obj.(*storeElement)
if !ok {
return fmt.Errorf("obj not a storeElement: %#v", obj)
}
s.deleteElem(storeElem)
return nil
}
func (s *btreeStore) deleteElem(storeElem *storeElement) interface{} {
return s.tree.Delete(storeElem)
}
func (s *btreeStore) List() []interface{} {
items := make([]interface{}, 0, s.tree.Len())
s.tree.Ascend(func(i btree.Item) bool {
items = append(items, i.(interface{}))
return true
})
return items
}
func (s *btreeStore) ListKeys() []string {
items := make([]string, 0, s.tree.Len())
s.tree.Ascend(func(i btree.Item) bool {
items = append(items, i.(*storeElement).Key)
return true
})
return items
}
func (s *btreeStore) Get(obj interface{}) (item interface{}, exists bool, err error) {
storeElem, ok := obj.(*storeElement)
if !ok {
return nil, false, fmt.Errorf("obj is not a storeElement")
}
item = s.tree.Get(storeElem)
if item == nil {
return nil, false, nil
}
return item, true, nil
}
func (s *btreeStore) GetByKey(key string) (item interface{}, exists bool, err error) {
return s.getByKey(key)
}
func (s *btreeStore) Replace(objs []interface{}, _ string) error {
s.tree.Clear(false)
for _, obj := range objs {
storeElem, ok := obj.(*storeElement)
if !ok {
return fmt.Errorf("obj not a storeElement: %#v", obj)
}
s.addOrUpdateElem(storeElem)
}
return nil
}
// addOrUpdateLocked assumes a lock is held and is used for Add
// and Update operations.
func (s *btreeStore) addOrUpdateElem(storeElem *storeElement) *storeElement {
oldObj := s.tree.ReplaceOrInsert(storeElem)
if oldObj == nil {
return nil
}
return oldObj.(*storeElement)
}
func (s *btreeStore) getByKey(key string) (item interface{}, exists bool, err error) {
keyElement := &storeElement{Key: key}
item = s.tree.Get(keyElement)
return item, item != nil, nil
}
func (s *btreeStore) ListPrefix(prefix, continueKey string, limit int) ([]interface{}, bool) {
if limit < 0 {
return nil, false
}
if continueKey == "" {
continueKey = prefix
}
var result []interface{}
var hasMore bool
if limit == 0 {
limit = math.MaxInt
}
s.tree.AscendGreaterOrEqual(&storeElement{Key: continueKey}, func(i btree.Item) bool {
elementKey := i.(*storeElement).Key
if !strings.HasPrefix(elementKey, prefix) {
return false
}
// TODO: Might be worth to lookup one more item to provide more accurate HasMore.
if len(result) >= limit {
hasMore = true
return false
}
result = append(result, i.(interface{}))
return true
})
return result, hasMore
}
func (s *btreeStore) Count(prefix, continueKey string) (count int) {
if continueKey == "" {
continueKey = prefix
}
s.tree.AscendGreaterOrEqual(&storeElement{Key: continueKey}, func(i btree.Item) bool {
elementKey := i.(*storeElement).Key
if !strings.HasPrefix(elementKey, prefix) {
return false
}
count++
return true
})
return count
}
// newIndexer returns a indexer similar to storeIndex from client-go/tools/cache.
// TODO: Unify the indexer code with client-go/cache package.
// Major differences is type of values stored and their mutability:
// * Indexer in client-go stores object keys, that are not mutable.
// * Indexer in cacher stores whole objects, which is mutable.
// Indexer in client-go uses keys as it is used in conjunction with map[key]value
// allowing for fast value retrieval, while btree used in cacher would provide additional overhead.
// Difference in mutability of stored values is used for optimizing some operations in client-go Indexer.
func newIndexer(indexers cache.Indexers) indexer {
return indexer{
indices: map[string]map[string]map[string]*storeElement{},
indexers: indexers,
}
}
type indexer struct {
indices map[string]map[string]map[string]*storeElement
indexers cache.Indexers
}
func (i *indexer) ByIndex(indexName, indexValue string) ([]interface{}, error) {
indexFunc := i.indexers[indexName]
if indexFunc == nil {
return nil, fmt.Errorf("index with name %s does not exist", indexName)
}
index := i.indices[indexName]
set := index[indexValue]
list := make([]interface{}, 0, len(set))
for _, obj := range set {
list = append(list, obj)
}
return list, nil
}
func (i *indexer) Replace(objs []interface{}, resourceVersion string) error {
i.indices = map[string]map[string]map[string]*storeElement{}
for _, obj := range objs {
storeElem, ok := obj.(*storeElement)
if !ok {
return fmt.Errorf("obj not a storeElement: %#v", obj)
}
err := i.updateElem(storeElem.Key, nil, storeElem)
if err != nil {
return err
}
}
return nil
}
func (i *indexer) updateElem(key string, oldObj, newObj *storeElement) (err error) {
var oldIndexValues, indexValues []string
for name, indexFunc := range i.indexers {
if oldObj != nil {
oldIndexValues, err = indexFunc(oldObj)
} else {
oldIndexValues = oldIndexValues[:0]
}
if err != nil {
return fmt.Errorf("unable to calculate an index entry for key %q on index %q: %w", key, name, err)
}
if newObj != nil {
indexValues, err = indexFunc(newObj)
} else {
indexValues = indexValues[:0]
}
if err != nil {
return fmt.Errorf("unable to calculate an index entry for key %q on index %q: %w", key, name, err)
}
index := i.indices[name]
if index == nil {
index = map[string]map[string]*storeElement{}
i.indices[name] = index
}
if len(indexValues) == 1 && len(oldIndexValues) == 1 && indexValues[0] == oldIndexValues[0] {
// We optimize for the most common case where indexFunc returns a single value which has not been changed
i.add(key, indexValues[0], newObj, index)
continue
}
for _, value := range oldIndexValues {
i.delete(key, value, index)
}
for _, value := range indexValues {
i.add(key, value, newObj, index)
}
}
return nil
}
func (i *indexer) add(key, value string, obj *storeElement, index map[string]map[string]*storeElement) {
set := index[value]
if set == nil {
set = map[string]*storeElement{}
index[value] = set
}
set[key] = obj
}
func (i *indexer) delete(key, value string, index map[string]map[string]*storeElement) {
set := index[value]
if set == nil {
return
}
delete(set, key)
// If we don's delete the set when zero, indices with high cardinality
// short lived resources can cause memory to increase over time from
// unused empty sets. See `kubernetes/kubernetes/issues/84959`.
if len(set) == 0 {
delete(index, value)
}
}

View File

@ -83,55 +83,6 @@ type watchCacheEvent struct {
RecordTime time.Time
}
// Computing a key of an object is generally non-trivial (it performs
// e.g. validation underneath). Similarly computing object fields and
// labels. To avoid computing them multiple times (to serve the event
// in different List/Watch requests), in the underlying store we are
// keeping structs (key, object, labels, fields).
type storeElement struct {
Key string
Object runtime.Object
Labels labels.Set
Fields fields.Set
}
func storeElementKey(obj interface{}) (string, error) {
elem, ok := obj.(*storeElement)
if !ok {
return "", fmt.Errorf("not a storeElement: %v", obj)
}
return elem.Key, nil
}
func storeElementObject(obj interface{}) (runtime.Object, error) {
elem, ok := obj.(*storeElement)
if !ok {
return nil, fmt.Errorf("not a storeElement: %v", obj)
}
return elem.Object, nil
}
func storeElementIndexFunc(objIndexFunc cache.IndexFunc) cache.IndexFunc {
return func(obj interface{}) (strings []string, e error) {
seo, err := storeElementObject(obj)
if err != nil {
return nil, err
}
return objIndexFunc(seo)
}
}
func storeElementIndexers(indexers *cache.Indexers) cache.Indexers {
if indexers == nil {
return cache.Indexers{}
}
ret := cache.Indexers{}
for indexName, indexFunc := range *indexers {
ret[indexName] = storeElementIndexFunc(indexFunc)
}
return ret
}
// watchCache implements a Store interface.
// However, it depends on the elements implementing runtime.Object interface.
//
@ -173,7 +124,7 @@ type watchCache struct {
// history" i.e. from the moment just after the newest cached watched event.
// It is necessary to effectively allow clients to start watching at now.
// NOTE: We assume that <store> is thread-safe.
store cache.Indexer
store storeIndexer
// ResourceVersion up to which the watchCache is propagated.
resourceVersion uint64
@ -223,7 +174,7 @@ func newWatchCache(
upperBoundCapacity: defaultUpperBoundCapacity,
startIndex: 0,
endIndex: 0,
store: cache.NewIndexer(storeElementKey, storeElementIndexers(indexers)),
store: newStoreIndexer(indexers),
resourceVersion: 0,
listResourceVersion: 0,
eventHandler: eventHandler,
@ -501,29 +452,7 @@ func (s sortableStoreElements) Swap(i, j int) {
// WaitUntilFreshAndList returns list of pointers to `storeElement` objects along
// with their ResourceVersion and the name of the index, if any, that was used.
func (w *watchCache) WaitUntilFreshAndList(ctx context.Context, resourceVersion uint64, key string, matchValues []storage.MatchValue) ([]interface{}, uint64, string, error) {
items, rv, index, err := w.waitUntilFreshAndListItems(ctx, resourceVersion, key, matchValues)
if err != nil {
return nil, 0, "", err
}
var result []interface{}
for _, item := range items {
elem, ok := item.(*storeElement)
if !ok {
return nil, 0, "", fmt.Errorf("non *storeElement returned from storage: %v", item)
}
if !hasPathPrefix(elem.Key, key) {
continue
}
result = append(result, item)
}
sort.Sort(sortableStoreElements(result))
return result, rv, index, nil
}
func (w *watchCache) waitUntilFreshAndListItems(ctx context.Context, resourceVersion uint64, key string, matchValues []storage.MatchValue) (result []interface{}, rv uint64, index string, err error) {
func (w *watchCache) WaitUntilFreshAndList(ctx context.Context, resourceVersion uint64, key string, matchValues []storage.MatchValue) (result []interface{}, rv uint64, index string, err error) {
requestWatchProgressSupported := etcdfeature.DefaultFeatureSupportChecker.Supports(storage.RequestWatchProgress)
if utilfeature.DefaultFeatureGate.Enabled(features.ConsistentListFromCache) && requestWatchProgressSupported && w.notFresh(resourceVersion) {
w.waitingUntilFresh.Add()
@ -537,21 +466,46 @@ func (w *watchCache) waitUntilFreshAndListItems(ctx context.Context, resourceVer
if err != nil {
return result, rv, index, err
}
result, rv, index, err = func() ([]interface{}, uint64, string, error) {
var prefixFilteredAndOrdered bool
result, rv, index, prefixFilteredAndOrdered, err = func() ([]interface{}, uint64, string, bool, error) {
// This isn't the place where we do "final filtering" - only some "prefiltering" is happening here. So the only
// requirement here is to NOT miss anything that should be returned. We can return as many non-matching items as we
// want - they will be filtered out later. The fact that we return less things is only further performance improvement.
// TODO: if multiple indexes match, return the one with the fewest items, so as to do as much filtering as possible.
for _, matchValue := range matchValues {
if result, err := w.store.ByIndex(matchValue.IndexName, matchValue.Value); err == nil {
return result, w.resourceVersion, matchValue.IndexName, nil
return result, w.resourceVersion, matchValue.IndexName, false, nil
}
}
return w.store.List(), w.resourceVersion, "", nil
if store, ok := w.store.(orderedLister); ok {
result, _ := store.ListPrefix(key, "", 0)
return result, w.resourceVersion, "", true, nil
}
return w.store.List(), w.resourceVersion, "", false, nil
}()
if !prefixFilteredAndOrdered {
result, err = filterPrefixAndOrder(key, result)
if err != nil {
return nil, 0, "", err
}
}
return result, w.resourceVersion, index, nil
}
return result, rv, index, err
func filterPrefixAndOrder(prefix string, items []interface{}) ([]interface{}, error) {
var result []interface{}
for _, item := range items {
elem, ok := item.(*storeElement)
if !ok {
return nil, fmt.Errorf("non *storeElement returned from storage: %v", item)
}
if !hasPathPrefix(elem.Key, prefix) {
continue
}
result = append(result, item)
}
sort.Sort(sortableStoreElements(result))
return result, nil
}
func (w *watchCache) notFresh(resourceVersion uint64) bool {
@ -739,6 +693,7 @@ func (w *watchCache) isIndexValidLocked(index int) bool {
// be called under the watchCache lock.
func (w *watchCache) getAllEventsSinceLocked(resourceVersion uint64, key string, opts storage.ListOptions) (*watchCacheInterval, error) {
_, matchesSingle := opts.Predicate.MatchesSingle()
matchesSingle = matchesSingle && !opts.Recursive
if opts.SendInitialEvents != nil && *opts.SendInitialEvents {
return w.getIntervalFromStoreLocked(key, matchesSingle)
}
@ -788,7 +743,7 @@ func (w *watchCache) getAllEventsSinceLocked(resourceVersion uint64, key string,
indexerFunc := func(i int) *watchCacheEvent {
return w.cache[i%w.capacity]
}
ci := newCacheInterval(w.startIndex+first, w.endIndex, indexerFunc, w.indexValidator, w.RWMutex.RLocker())
ci := newCacheInterval(w.startIndex+first, w.endIndex, indexerFunc, w.indexValidator, resourceVersion, w.RWMutex.RLocker())
return ci, nil
}

View File

@ -25,7 +25,6 @@ import (
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/watch"
"k8s.io/client-go/tools/cache"
)
// watchCacheInterval serves as an abstraction over a source
@ -92,26 +91,34 @@ type watchCacheInterval struct {
// lock on each invocation of Next().
buffer *watchCacheIntervalBuffer
// resourceVersion is the resourceVersion from which
// the interval was constructed.
resourceVersion uint64
// lock effectively protects access to the underlying source
// of events through - indexer and indexValidator.
//
// Given that indexer and indexValidator only read state, if
// possible, Locker obtained through RLocker() is provided.
lock sync.Locker
// initialEventsEndBookmark will be sent after sending all events in cacheInterval
initialEventsEndBookmark *watchCacheEvent
}
type attrFunc func(runtime.Object) (labels.Set, fields.Set, error)
type indexerFunc func(int) *watchCacheEvent
type indexValidator func(int) bool
func newCacheInterval(startIndex, endIndex int, indexer indexerFunc, indexValidator indexValidator, locker sync.Locker) *watchCacheInterval {
func newCacheInterval(startIndex, endIndex int, indexer indexerFunc, indexValidator indexValidator, resourceVersion uint64, locker sync.Locker) *watchCacheInterval {
return &watchCacheInterval{
startIndex: startIndex,
endIndex: endIndex,
indexer: indexer,
indexValidator: indexValidator,
buffer: &watchCacheIntervalBuffer{buffer: make([]*watchCacheEvent, bufferSize)},
lock: locker,
startIndex: startIndex,
endIndex: endIndex,
indexer: indexer,
indexValidator: indexValidator,
buffer: &watchCacheIntervalBuffer{buffer: make([]*watchCacheEvent, bufferSize)},
resourceVersion: resourceVersion,
lock: locker,
}
}
@ -133,7 +140,7 @@ func (s sortableWatchCacheEvents) Swap(i, j int) {
// returned by Next() need to be events from a List() done on the underlying store of
// the watch cache.
// The items returned in the interval will be sorted by Key.
func newCacheIntervalFromStore(resourceVersion uint64, store cache.Indexer, getAttrsFunc attrFunc, key string, matchesSingle bool) (*watchCacheInterval, error) {
func newCacheIntervalFromStore(resourceVersion uint64, store storeIndexer, getAttrsFunc attrFunc, key string, matchesSingle bool) (*watchCacheInterval, error) {
buffer := &watchCacheIntervalBuffer{}
var allItems []interface{}
@ -173,8 +180,9 @@ func newCacheIntervalFromStore(resourceVersion uint64, store cache.Indexer, getA
ci := &watchCacheInterval{
startIndex: 0,
// Simulate that we already have all the events we're looking for.
endIndex: 0,
buffer: buffer,
endIndex: 0,
buffer: buffer,
resourceVersion: resourceVersion,
}
return ci, nil

View File

@ -37,6 +37,7 @@ const (
ErrCodeInvalidObj
ErrCodeUnreachable
ErrCodeTimeout
ErrCodeCorruptObj
)
var errCodeToMessage = map[int]string{
@ -46,6 +47,7 @@ var errCodeToMessage = map[int]string{
ErrCodeInvalidObj: "invalid object",
ErrCodeUnreachable: "server unreachable",
ErrCodeTimeout: "request timeout",
ErrCodeCorruptObj: "corrupt object",
}
func NewKeyNotFoundError(key string, rv int64) *StorageError {
@ -82,30 +84,45 @@ func NewUnreachableError(key string, rv int64) *StorageError {
func NewTimeoutError(key, msg string) *StorageError {
return &StorageError{
Code: ErrCodeTimeout,
Key: key,
AdditionalErrorMsg: msg,
Code: ErrCodeTimeout,
Key: key,
err: errors.New(msg),
}
}
func NewInvalidObjError(key, msg string) *StorageError {
return &StorageError{
Code: ErrCodeInvalidObj,
Key: key,
AdditionalErrorMsg: msg,
Code: ErrCodeInvalidObj,
Key: key,
err: errors.New(msg),
}
}
// NewCorruptObjError returns a new StorageError, it represents a corrupt object:
// a) object data retrieved from the storage failed to transform with the given err.
// b) the given object failed to decode with the given err
func NewCorruptObjError(key string, err error) *StorageError {
return &StorageError{
Code: ErrCodeCorruptObj,
Key: key,
err: err,
}
}
type StorageError struct {
Code int
Key string
ResourceVersion int64
AdditionalErrorMsg string
Code int
Key string
ResourceVersion int64
// inner error
err error
}
func (e *StorageError) Unwrap() error { return e.err }
func (e *StorageError) Error() string {
return fmt.Sprintf("StorageError: %s, Code: %d, Key: %s, ResourceVersion: %d, AdditionalErrorMsg: %s",
errCodeToMessage[e.Code], e.Code, e.Key, e.ResourceVersion, e.AdditionalErrorMsg)
return fmt.Sprintf("StorageError: %s, Code: %d, Key: %s, ResourceVersion: %d, AdditionalErrorMsg: %v",
errCodeToMessage[e.Code], e.Code, e.Key, e.ResourceVersion, e.err)
}
// IsNotFound returns true if and only if err is "key" not found error.
@ -138,6 +155,21 @@ func IsInvalidObj(err error) bool {
return isErrCode(err, ErrCodeInvalidObj)
}
// IsCorruptObject returns true if and only if:
// a) the given object data retrieved from the storage is not transformable, or
// b) the given object failed to decode properly
func IsCorruptObject(err error) bool {
if err == nil {
return false
}
var storageErr *StorageError
if !errors.As(err, &storageErr) {
return false
}
return storageErr.Code == ErrCodeCorruptObj
}
func isErrCode(err error, code int) bool {
if err == nil {
return false
@ -172,24 +204,26 @@ func NewInvalidError(errors field.ErrorList) InvalidError {
// not from the underlying storage backend (e.g., etcd).
type InternalError struct {
Reason string
// retain the inner error to maintain the error tree, so as to enable us
// to do proper error checking, but we also need to be backward compatible.
err error
}
func (e InternalError) Error() string {
return e.Reason
}
func (e InternalError) Unwrap() error { return e.err }
// IsInternalError returns true if and only if err is an InternalError.
func IsInternalError(err error) bool {
_, ok := err.(InternalError)
return ok
}
func NewInternalError(reason string) InternalError {
return InternalError{reason}
}
func NewInternalErrorf(format string, a ...interface{}) InternalError {
return InternalError{fmt.Sprintf(format, a...)}
func NewInternalError(err error) InternalError {
return InternalError{Reason: err.Error(), err: err}
}
var tooLargeResourceVersionCauseMsg = "Too large resource version"

View File

@ -15,4 +15,4 @@ limitations under the License.
*/
// Package storage provides conversion of storage errors to API errors.
package storage // import "k8s.io/apiserver/pkg/storage/errors"
package errors // import "k8s.io/apiserver/pkg/storage/errors"

View File

@ -14,7 +14,7 @@ See the License for the specific language governing permissions and
limitations under the License.
*/
package storage
package errors
import (
"k8s.io/apimachinery/pkg/api/errors"
@ -32,6 +32,8 @@ func InterpretListError(err error, qualifiedResource schema.GroupResource) error
return errors.NewServerTimeout(qualifiedResource, "list", 2) // TODO: make configurable or handled at a higher level
case storage.IsInternalError(err):
return errors.NewInternalError(err)
case storage.IsCorruptObject(err):
return errors.NewInternalError(err)
default:
return err
}
@ -47,6 +49,8 @@ func InterpretGetError(err error, qualifiedResource schema.GroupResource, name s
return errors.NewServerTimeout(qualifiedResource, "get", 2) // TODO: make configurable or handled at a higher level
case storage.IsInternalError(err):
return errors.NewInternalError(err)
case storage.IsCorruptObject(err):
return errors.NewInternalError(err)
default:
return err
}
@ -62,6 +66,8 @@ func InterpretCreateError(err error, qualifiedResource schema.GroupResource, nam
return errors.NewServerTimeout(qualifiedResource, "create", 2) // TODO: make configurable or handled at a higher level
case storage.IsInternalError(err):
return errors.NewInternalError(err)
case storage.IsCorruptObject(err):
return errors.NewInternalError(err)
default:
return err
}
@ -79,6 +85,8 @@ func InterpretUpdateError(err error, qualifiedResource schema.GroupResource, nam
return errors.NewNotFound(qualifiedResource, name)
case storage.IsInternalError(err):
return errors.NewInternalError(err)
case storage.IsCorruptObject(err):
return errors.NewInternalError(err)
default:
return err
}
@ -96,6 +104,8 @@ func InterpretDeleteError(err error, qualifiedResource schema.GroupResource, nam
return errors.NewConflict(qualifiedResource, name, err)
case storage.IsInternalError(err):
return errors.NewInternalError(err)
case storage.IsCorruptObject(err):
return errors.NewInternalError(err)
default:
return err
}
@ -110,6 +120,8 @@ func InterpretWatchError(err error, resource schema.GroupResource, name string)
return errors.NewInvalid(schema.GroupKind{Group: resource.Group, Kind: resource.Resource}, name, invalidError.Errs)
case storage.IsInternalError(err):
return errors.NewInternalError(err)
case storage.IsCorruptObject(err):
return errors.NewInternalError(err)
default:
return err
}

View File

@ -0,0 +1,270 @@
/*
Copyright 2024 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package etcd3
import (
"context"
"errors"
"fmt"
"net/http"
"strings"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
utilerrors "k8s.io/apimachinery/pkg/util/errors"
"k8s.io/apiserver/pkg/storage"
"k8s.io/apiserver/pkg/storage/value"
"k8s.io/klog/v2"
)
// NewStoreWithUnsafeCorruptObjectDeletion wraps the given store implementation
// and adds support for unsafe deletion of corrupt objects
func NewStoreWithUnsafeCorruptObjectDeletion(delegate storage.Interface, gr schema.GroupResource) storage.Interface {
return &corruptObjectDeleter{
Interface: delegate,
groupResource: gr,
}
}
// WithCorruptObjErrorHandlingDecoder decorates the given decoder, it determines
// if the error returned by the given decoder represents a corrupt object (the
// object is undecodable), and then it wraps the error appropriately so the
// unsafe deleter can determine if the object is a candidate for unsafe deletion
func WithCorruptObjErrorHandlingDecoder(decoder Decoder) Decoder {
return &corruptObjErrorInterpretingDecoder{Decoder: decoder}
}
// WithCorruptObjErrorHandlingTransformer decorates the given decoder, it
// determines if the error returned by the given transformer represents a
// corrupt object (the data from the storage is untransformable), and then it
// wraps the error appropriately so the unsafe deleter can determine
// if the object is a candidate for unsafe deletion
func WithCorruptObjErrorHandlingTransformer(transformer value.Transformer) value.Transformer {
return &corruptObjErrorInterpretingTransformer{Transformer: transformer}
}
// corruptObjErrAggregatorFactory returns an error aggregator that aggregates
// corrupt object error(s) that the list operation encounters while
// retrieving objects from the storage.
// maxCount: it is the maximum number of error that will be aggregated
func corruptObjErrAggregatorFactory(maxCount int) func() ListErrorAggregator {
if maxCount <= 0 {
return defaultListErrorAggregatorFactory
}
return func() ListErrorAggregator {
return &corruptObjErrAggregator{maxCount: maxCount}
}
}
var errTooMany = errors.New("too many errors, the list is truncated")
// aggregate corrupt object errors from the LIST operation
type corruptObjErrAggregator struct {
errs []error
abortErr error
maxCount int
}
func (a *corruptObjErrAggregator) Aggregate(key string, err error) bool {
if len(a.errs) >= a.maxCount {
// add a sentinel error to indicate there are more
a.errs = append(a.errs, errTooMany)
return true
}
var corruptObjErr *corruptObjectError
if errors.As(err, &corruptObjErr) {
a.errs = append(a.errs, storage.NewCorruptObjError(key, corruptObjErr))
return false
}
// not a corrupt object error, the list operation should abort
a.abortErr = err
return true
}
func (a *corruptObjErrAggregator) Err() error {
switch {
case len(a.errs) == 0 && a.abortErr != nil:
return a.abortErr
case len(a.errs) > 0:
err := utilerrors.NewAggregate(a.errs)
return &aggregatedStorageError{errs: err, resourcePrefix: "list"}
default:
return nil
}
}
// corruptObjectDeleter facilitates unsafe deletion of corrupt objects for etcd
type corruptObjectDeleter struct {
storage.Interface
groupResource schema.GroupResource
}
func (s *corruptObjectDeleter) Get(ctx context.Context, key string, opts storage.GetOptions, out runtime.Object) error {
if err := s.Interface.Get(ctx, key, opts, out); err != nil {
var corruptObjErr *corruptObjectError
if !errors.As(err, &corruptObjErr) {
// this error does not represent a corrupt object
return err
}
// the unsafe deleter at the registry layer will check whether
// the given err represents a corrupt object in order to
// initiate the unsafe deletion flow.
return storage.NewCorruptObjError(key, corruptObjErr)
}
return nil
}
func (s *corruptObjectDeleter) GetList(ctx context.Context, key string, opts storage.ListOptions, listObj runtime.Object) error {
err := s.Interface.GetList(ctx, key, opts, listObj)
if err == nil {
return nil
}
var aggregatedErr *aggregatedStorageError
if errors.As(err, &aggregatedErr) {
// we have aggregated a list of corrupt objects
klog.V(5).ErrorS(aggregatedErr, "corrupt objects")
return aggregatedErr.NewAPIStatusError(s.groupResource)
}
return err
}
// corruptObjErrorInterpretingDecoder wraps the error returned by the decorated decoder
type corruptObjErrorInterpretingDecoder struct {
Decoder
}
func (d *corruptObjErrorInterpretingDecoder) Decode(value []byte, objPtr runtime.Object, rev int64) error {
// TODO: right now any error is deemed as undecodable, in
// the future, we can apply some filter, if need be.
if err := d.Decoder.Decode(value, objPtr, rev); err != nil {
return &corruptObjectError{err: err, errType: undecodable, revision: rev}
}
return nil
}
// decodeListItem decodes bytes value in array into object.
func (d *corruptObjErrorInterpretingDecoder) DecodeListItem(ctx context.Context, data []byte, rev uint64, newItemFunc func() runtime.Object) (runtime.Object, error) {
// TODO: right now any error is deemed as undecodable, in
// the future, we can apply some filter, if need be.
obj, err := d.Decoder.DecodeListItem(ctx, data, rev, newItemFunc)
if err != nil {
err = &corruptObjectError{err: err, errType: undecodable, revision: int64(rev)}
}
return obj, err
}
// corruptObjErrorInterpretingTransformer wraps the error returned by the transformer
type corruptObjErrorInterpretingTransformer struct {
value.Transformer
}
func (t *corruptObjErrorInterpretingTransformer) TransformFromStorage(ctx context.Context, data []byte, dataCtx value.Context) ([]byte, bool, error) {
// TODO: right now any error is deemed as undecodable, in the future, we
// can apply some filter, if need be. For example, any network error
out, stale, err := t.Transformer.TransformFromStorage(ctx, data, dataCtx)
if err != nil {
err = &corruptObjectError{err: err, errType: untransformable}
}
return out, stale, err
}
// corruptObjectError is used internally, only by the corrupt object
// deleter, this error represents a corrup object:
// a) the data from the storage failed to transform, or
// b) the data failed to decode into an object
// NOTE: this error does not have any information to identify the object
// that is corrupt, for example the storage key associated with the object
type corruptObjectError struct {
err error
errType int
revision int64
}
const (
untransformable int = iota + 1
undecodable
)
var typeToMessage = map[int]string{
untransformable: "data from the storage is not transformable",
undecodable: "object not decodable",
}
func (e *corruptObjectError) Unwrap() error { return e.err }
func (e *corruptObjectError) Error() string {
return fmt.Sprintf("%s revision=%d: %v", typeToMessage[e.errType], e.revision, e.err)
}
// aggregatedStorageError holds an aggregated list of storage.StorageError
type aggregatedStorageError struct {
resourcePrefix string
errs utilerrors.Aggregate
}
func (e *aggregatedStorageError) Error() string {
errs := e.errs.Errors()
var b strings.Builder
fmt.Fprintf(&b, "unable to transform or decode %d objects: {\n", len(errs))
for _, err := range errs {
fmt.Fprintf(&b, "\t%s\n", err.Error())
}
b.WriteString("}")
return b.String()
}
// NewAPIStatusError creates a new APIStatus object from the
// aggregated list of StorageError
func (e *aggregatedStorageError) NewAPIStatusError(qualifiedResource schema.GroupResource) *apierrors.StatusError {
var causes []metav1.StatusCause
for _, err := range e.errs.Errors() {
var storageErr *storage.StorageError
if errors.As(err, &storageErr) {
causes = append(causes, metav1.StatusCause{
Type: metav1.CauseTypeUnexpectedServerResponse,
Field: storageErr.Key,
// TODO: do we need to expose the internal error message here?
Message: err.Error(),
})
continue
}
if errors.Is(err, errTooMany) {
causes = append(causes, metav1.StatusCause{
Type: metav1.CauseTypeTooMany,
Message: errTooMany.Error(),
})
}
}
return &apierrors.StatusError{
ErrStatus: metav1.Status{
Status: metav1.StatusFailure,
Code: http.StatusInternalServerError,
Reason: metav1.StatusReasonStoreReadError,
Details: &metav1.StatusDetails{
Group: qualifiedResource.Group,
Kind: qualifiedResource.Resource,
Name: e.resourcePrefix,
Causes: causes,
},
Message: fmt.Sprintf("failed to read one or more %s from the storage", qualifiedResource.String()),
},
}
}

94
vendor/k8s.io/apiserver/pkg/storage/etcd3/decoder.go generated vendored Normal file
View File

@ -0,0 +1,94 @@
/*
Copyright 2024 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package etcd3
import (
"context"
"fmt"
"time"
"k8s.io/apimachinery/pkg/conversion"
"k8s.io/apimachinery/pkg/runtime"
endpointsrequest "k8s.io/apiserver/pkg/endpoints/request"
"k8s.io/apiserver/pkg/storage"
"k8s.io/klog/v2"
)
// NewDefaultDecoder returns the default decoder for etcd3 store
func NewDefaultDecoder(codec runtime.Codec, versioner storage.Versioner) Decoder {
return &defaultDecoder{
codec: codec,
versioner: versioner,
}
}
// Decoder is used by the etcd storage implementation to decode
// transformed data from the storage into an object
type Decoder interface {
// Decode decodes value of bytes into object. It will also
// set the object resource version to rev.
// On success, objPtr would be set to the object.
Decode(value []byte, objPtr runtime.Object, rev int64) error
// DecodeListItem decodes bytes value in array into object.
DecodeListItem(ctx context.Context, data []byte, rev uint64, newItemFunc func() runtime.Object) (runtime.Object, error)
}
var _ Decoder = &defaultDecoder{}
type defaultDecoder struct {
codec runtime.Codec
versioner storage.Versioner
}
// decode decodes value of bytes into object. It will also set the object resource version to rev.
// On success, objPtr would be set to the object.
func (d *defaultDecoder) Decode(value []byte, objPtr runtime.Object, rev int64) error {
if _, err := conversion.EnforcePtr(objPtr); err != nil {
// nolint:errorlint // this code was moved from store.go as is
return fmt.Errorf("unable to convert output object to pointer: %v", err)
}
_, _, err := d.codec.Decode(value, nil, objPtr)
if err != nil {
return err
}
// being unable to set the version does not prevent the object from being extracted
if err := d.versioner.UpdateObject(objPtr, uint64(rev)); err != nil {
klog.Errorf("failed to update object version: %v", err)
}
return nil
}
// decodeListItem decodes bytes value in array into object.
func (d *defaultDecoder) DecodeListItem(ctx context.Context, data []byte, rev uint64, newItemFunc func() runtime.Object) (runtime.Object, error) {
startedAt := time.Now()
defer func() {
endpointsrequest.TrackDecodeLatency(ctx, time.Since(startedAt))
}()
obj, _, err := d.codec.Decode(data, nil, newItemFunc())
if err != nil {
return nil, err
}
if err := d.versioner.UpdateObject(obj, rev); err != nil {
klog.Errorf("failed to update object version: %v", err)
}
return obj, nil
}

View File

@ -17,7 +17,11 @@ limitations under the License.
package etcd3
import (
goerrors "errors"
"net/http"
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apiserver/pkg/storage"
etcdrpc "go.etcd.io/etcd/api/v3/v3rpc/rpctypes"
@ -29,6 +33,19 @@ func interpretWatchError(err error) error {
case err == etcdrpc.ErrCompacted:
return errors.NewResourceExpired("The resourceVersion for the provided watch is too old.")
}
var corruptobjDeletedErr *corruptObjectDeletedError
if goerrors.As(err, &corruptobjDeletedErr) {
return &errors.StatusError{
ErrStatus: metav1.Status{
Status: metav1.StatusFailure,
Code: http.StatusInternalServerError,
Reason: metav1.StatusReasonStoreReadError,
Message: corruptobjDeletedErr.Error(),
},
}
}
return err
}

View File

@ -19,14 +19,15 @@ package etcd3
import (
"bytes"
"context"
"errors"
"fmt"
"path"
"reflect"
"strings"
"time"
"go.etcd.io/etcd/api/v3/mvccpb"
clientv3 "go.etcd.io/etcd/client/v3"
"go.etcd.io/etcd/client/v3/kubernetes"
"go.opentelemetry.io/otel/attribute"
apierrors "k8s.io/apimachinery/pkg/api/errors"
@ -38,7 +39,6 @@ import (
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/watch"
"k8s.io/apiserver/pkg/audit"
endpointsrequest "k8s.io/apiserver/pkg/endpoints/request"
"k8s.io/apiserver/pkg/features"
"k8s.io/apiserver/pkg/storage"
"k8s.io/apiserver/pkg/storage/etcd3/metrics"
@ -73,7 +73,7 @@ func (d authenticatedDataString) AuthenticatedData() []byte {
var _ value.Context = authenticatedDataString("")
type store struct {
client *clientv3.Client
client *kubernetes.Client
codec runtime.Codec
versioner storage.Versioner
transformer value.Transformer
@ -82,6 +82,8 @@ type store struct {
groupResourceString string
watcher *watcher
leaseManager *leaseManager
decoder Decoder
listErrAggrFactory func() ListErrorAggregator
}
func (s *store) RequestWatchProgress(ctx context.Context) error {
@ -98,13 +100,52 @@ type objState struct {
stale bool
}
// New returns an etcd3 implementation of storage.Interface.
func New(c *clientv3.Client, codec runtime.Codec, newFunc, newListFunc func() runtime.Object, prefix, resourcePrefix string, groupResource schema.GroupResource, transformer value.Transformer, leaseManagerConfig LeaseManagerConfig) storage.Interface {
return newStore(c, codec, newFunc, newListFunc, prefix, resourcePrefix, groupResource, transformer, leaseManagerConfig)
// ListErrorAggregator aggregates the error(s) that the LIST operation
// encounters while retrieving object(s) from the storage
type ListErrorAggregator interface {
// Aggregate aggregates the given error from list operation
// key: it identifies the given object in the storage.
// err: it represents the error the list operation encountered while
// retrieving the given object from the storage.
// done: true if the aggregation is done and the list operation should
// abort, otherwise the list operation will continue
Aggregate(key string, err error) bool
// Err returns the aggregated error
Err() error
}
func newStore(c *clientv3.Client, codec runtime.Codec, newFunc, newListFunc func() runtime.Object, prefix, resourcePrefix string, groupResource schema.GroupResource, transformer value.Transformer, leaseManagerConfig LeaseManagerConfig) *store {
versioner := storage.APIObjectVersioner{}
// defaultListErrorAggregatorFactory returns the default list error
// aggregator that maintains backward compatibility, which is abort
// the list operation as soon as it encounters the first error
func defaultListErrorAggregatorFactory() ListErrorAggregator { return &abortOnFirstError{} }
// LIST aborts on the first error it encounters (backward compatible)
type abortOnFirstError struct {
err error
}
func (a *abortOnFirstError) Aggregate(key string, err error) bool {
a.err = err
return true
}
func (a *abortOnFirstError) Err() error { return a.err }
// New returns an etcd3 implementation of storage.Interface.
func New(c *kubernetes.Client, codec runtime.Codec, newFunc, newListFunc func() runtime.Object, prefix, resourcePrefix string, groupResource schema.GroupResource, transformer value.Transformer, leaseManagerConfig LeaseManagerConfig, decoder Decoder, versioner storage.Versioner) storage.Interface {
if utilfeature.DefaultFeatureGate.Enabled(features.AllowUnsafeMalformedObjectDeletion) {
transformer = WithCorruptObjErrorHandlingTransformer(transformer)
decoder = WithCorruptObjErrorHandlingDecoder(decoder)
}
var store storage.Interface
store = newStore(c, codec, newFunc, newListFunc, prefix, resourcePrefix, groupResource, transformer, leaseManagerConfig, decoder, versioner)
if utilfeature.DefaultFeatureGate.Enabled(features.AllowUnsafeMalformedObjectDeletion) {
store = NewStoreWithUnsafeCorruptObjectDeletion(store, groupResource)
}
return store
}
func newStore(c *kubernetes.Client, codec runtime.Codec, newFunc, newListFunc func() runtime.Object, prefix, resourcePrefix string, groupResource schema.GroupResource, transformer value.Transformer, leaseManagerConfig LeaseManagerConfig, decoder Decoder, versioner storage.Versioner) *store {
// for compatibility with etcd2 impl.
// no-op for default prefix of '/registry'.
// keeps compatibility with etcd2 impl for custom prefixes that don't start with '/'
@ -114,8 +155,13 @@ func newStore(c *clientv3.Client, codec runtime.Codec, newFunc, newListFunc func
pathPrefix += "/"
}
listErrAggrFactory := defaultListErrorAggregatorFactory
if utilfeature.DefaultFeatureGate.Enabled(features.AllowUnsafeMalformedObjectDeletion) {
listErrAggrFactory = corruptObjErrAggregatorFactory(100)
}
w := &watcher{
client: c,
client: c.Client,
codec: codec,
newFunc: newFunc,
groupResource: groupResource,
@ -136,7 +182,9 @@ func newStore(c *clientv3.Client, codec runtime.Codec, newFunc, newListFunc func
groupResource: groupResource,
groupResourceString: groupResource.String(),
watcher: w,
leaseManager: newDefaultLeaseManager(c, leaseManagerConfig),
leaseManager: newDefaultLeaseManager(c.Client, leaseManagerConfig),
decoder: decoder,
listErrAggrFactory: listErrAggrFactory,
}
w.getCurrentStorageRV = func(ctx context.Context) (uint64, error) {
@ -160,29 +208,28 @@ func (s *store) Get(ctx context.Context, key string, opts storage.GetOptions, ou
return err
}
startTime := time.Now()
getResp, err := s.client.KV.Get(ctx, preparedKey)
getResp, err := s.client.Kubernetes.Get(ctx, preparedKey, kubernetes.GetOptions{})
metrics.RecordEtcdRequest("get", s.groupResourceString, err, startTime)
if err != nil {
return err
}
if err = s.validateMinimumResourceVersion(opts.ResourceVersion, uint64(getResp.Header.Revision)); err != nil {
if err = s.validateMinimumResourceVersion(opts.ResourceVersion, uint64(getResp.Revision)); err != nil {
return err
}
if len(getResp.Kvs) == 0 {
if getResp.KV == nil {
if opts.IgnoreNotFound {
return runtime.SetZeroValue(out)
}
return storage.NewKeyNotFoundError(preparedKey, 0)
}
kv := getResp.Kvs[0]
data, _, err := s.transformer.TransformFromStorage(ctx, kv.Value, authenticatedDataString(preparedKey))
data, _, err := s.transformer.TransformFromStorage(ctx, getResp.KV.Value, authenticatedDataString(preparedKey))
if err != nil {
return storage.NewInternalError(err.Error())
return storage.NewInternalError(err)
}
err = decode(s.codec, s.versioner, data, out, kv.ModRevision)
err = s.decoder.Decode(data, out, getResp.KV.ModRevision)
if err != nil {
recordDecodeError(s.groupResourceString, preparedKey)
return err
@ -217,24 +264,23 @@ func (s *store) Create(ctx context.Context, key string, obj, out runtime.Object,
}
span.AddEvent("Encode succeeded", attribute.Int("len", len(data)))
opts, err := s.ttlOpts(ctx, int64(ttl))
if err != nil {
return err
var lease clientv3.LeaseID
if ttl != 0 {
lease, err = s.leaseManager.GetLease(ctx, int64(ttl))
if err != nil {
return err
}
}
newData, err := s.transformer.TransformToStorage(ctx, data, authenticatedDataString(preparedKey))
if err != nil {
span.AddEvent("TransformToStorage failed", attribute.String("err", err.Error()))
return storage.NewInternalError(err.Error())
return storage.NewInternalError(err)
}
span.AddEvent("TransformToStorage succeeded")
startTime := time.Now()
txnResp, err := s.client.KV.Txn(ctx).If(
notFound(preparedKey),
).Then(
clientv3.OpPut(preparedKey, string(newData), opts...),
).Commit()
txnResp, err := s.client.Kubernetes.OptimisticPut(ctx, preparedKey, newData, 0, kubernetes.PutOptions{LeaseID: lease})
metrics.RecordEtcdRequest("create", s.groupResourceString, err, startTime)
if err != nil {
span.AddEvent("Txn call failed", attribute.String("err", err.Error()))
@ -247,8 +293,7 @@ func (s *store) Create(ctx context.Context, key string, obj, out runtime.Object,
}
if out != nil {
putResp := txnResp.Responses[0].GetResponsePut()
err = decode(s.codec, s.versioner, data, out, putResp.Header.Revision)
err = s.decoder.Decode(data, out, txnResp.Revision)
if err != nil {
span.AddEvent("decode failed", attribute.Int("len", len(data)), attribute.String("err", err.Error()))
recordDecodeError(s.groupResourceString, preparedKey)
@ -262,7 +307,7 @@ func (s *store) Create(ctx context.Context, key string, obj, out runtime.Object,
// Delete implements storage.Interface.Delete.
func (s *store) Delete(
ctx context.Context, key string, out runtime.Object, preconditions *storage.Preconditions,
validateDeletion storage.ValidateObjectFunc, cachedExistingObject runtime.Object) error {
validateDeletion storage.ValidateObjectFunc, cachedExistingObject runtime.Object, opts storage.DeleteOptions) error {
preparedKey, err := s.prepareKey(key)
if err != nil {
return err
@ -271,13 +316,18 @@ func (s *store) Delete(
if err != nil {
return fmt.Errorf("unable to convert output object to pointer: %v", err)
}
return s.conditionalDelete(ctx, preparedKey, out, v, preconditions, validateDeletion, cachedExistingObject)
skipTransformDecode := false
if utilfeature.DefaultFeatureGate.Enabled(features.AllowUnsafeMalformedObjectDeletion) {
skipTransformDecode = opts.IgnoreStoreReadError
}
return s.conditionalDelete(ctx, preparedKey, out, v, preconditions, validateDeletion, cachedExistingObject, skipTransformDecode)
}
func (s *store) conditionalDelete(
ctx context.Context, key string, out runtime.Object, v reflect.Value, preconditions *storage.Preconditions,
validateDeletion storage.ValidateObjectFunc, cachedExistingObject runtime.Object) error {
getCurrentState := s.getCurrentState(ctx, key, v, false)
validateDeletion storage.ValidateObjectFunc, cachedExistingObject runtime.Object, skipTransformDecode bool) error {
getCurrentState := s.getCurrentState(ctx, key, v, false, skipTransformDecode)
var origState *objState
var err error
@ -347,21 +397,16 @@ func (s *store) conditionalDelete(
}
startTime := time.Now()
txnResp, err := s.client.KV.Txn(ctx).If(
clientv3.Compare(clientv3.ModRevision(key), "=", origState.rev),
).Then(
clientv3.OpDelete(key),
).Else(
clientv3.OpGet(key),
).Commit()
txnResp, err := s.client.Kubernetes.OptimisticDelete(ctx, key, origState.rev, kubernetes.DeleteOptions{
GetOnFailure: true,
})
metrics.RecordEtcdRequest("delete", s.groupResourceString, err, startTime)
if err != nil {
return err
}
if !txnResp.Succeeded {
getResp := (*clientv3.GetResponse)(txnResp.Responses[0].GetResponseRange())
klog.V(4).Infof("deletion of %s failed because of a conflict, going to retry", key)
origState, err = s.getState(ctx, getResp, key, v, false)
origState, err = s.getState(ctx, txnResp.KV, key, v, false, skipTransformDecode)
if err != nil {
return err
}
@ -369,17 +414,12 @@ func (s *store) conditionalDelete(
continue
}
if len(txnResp.Responses) == 0 || txnResp.Responses[0].GetResponseDeleteRange() == nil {
return errors.New(fmt.Sprintf("invalid DeleteRange response: %v", txnResp.Responses))
}
deleteResp := txnResp.Responses[0].GetResponseDeleteRange()
if deleteResp.Header == nil {
return errors.New("invalid DeleteRange response - nil header")
}
err = decode(s.codec, s.versioner, origState.data, out, deleteResp.Header.Revision)
if err != nil {
recordDecodeError(s.groupResourceString, key)
return err
if !skipTransformDecode {
err = s.decoder.Decode(origState.data, out, txnResp.Revision)
if err != nil {
recordDecodeError(s.groupResourceString, key)
return err
}
}
return nil
}
@ -405,7 +445,8 @@ func (s *store) GuaranteedUpdate(
return fmt.Errorf("unable to convert output object to pointer: %v", err)
}
getCurrentState := s.getCurrentState(ctx, preparedKey, v, ignoreNotFound)
skipTransformDecode := false
getCurrentState := s.getCurrentState(ctx, preparedKey, v, ignoreNotFound, skipTransformDecode)
var origState *objState
var origStateIsCurrent bool
@ -491,7 +532,7 @@ func (s *store) GuaranteedUpdate(
}
// recheck that the data from etcd is not stale before short-circuiting a write
if !origState.stale {
err = decode(s.codec, s.versioner, origState.data, destination, origState.rev)
err = s.decoder.Decode(origState.data, destination, origState.rev)
if err != nil {
recordDecodeError(s.groupResourceString, preparedKey)
return err
@ -503,24 +544,25 @@ func (s *store) GuaranteedUpdate(
newData, err := s.transformer.TransformToStorage(ctx, data, transformContext)
if err != nil {
span.AddEvent("TransformToStorage failed", attribute.String("err", err.Error()))
return storage.NewInternalError(err.Error())
return storage.NewInternalError(err)
}
span.AddEvent("TransformToStorage succeeded")
opts, err := s.ttlOpts(ctx, int64(ttl))
if err != nil {
return err
var lease clientv3.LeaseID
if ttl != 0 {
lease, err = s.leaseManager.GetLease(ctx, int64(ttl))
if err != nil {
return err
}
}
span.AddEvent("Transaction prepared")
startTime := time.Now()
txnResp, err := s.client.KV.Txn(ctx).If(
clientv3.Compare(clientv3.ModRevision(preparedKey), "=", origState.rev),
).Then(
clientv3.OpPut(preparedKey, string(newData), opts...),
).Else(
clientv3.OpGet(preparedKey),
).Commit()
txnResp, err := s.client.Kubernetes.OptimisticPut(ctx, preparedKey, newData, origState.rev, kubernetes.PutOptions{
GetOnFailure: true,
LeaseID: lease,
})
metrics.RecordEtcdRequest("update", s.groupResourceString, err, startTime)
if err != nil {
span.AddEvent("Txn call failed", attribute.String("err", err.Error()))
@ -529,9 +571,8 @@ func (s *store) GuaranteedUpdate(
span.AddEvent("Txn call completed")
span.AddEvent("Transaction committed")
if !txnResp.Succeeded {
getResp := (*clientv3.GetResponse)(txnResp.Responses[0].GetResponseRange())
klog.V(4).Infof("GuaranteedUpdate of %s failed because of a conflict, going to retry", preparedKey)
origState, err = s.getState(ctx, getResp, preparedKey, v, ignoreNotFound)
origState, err = s.getState(ctx, txnResp.KV, preparedKey, v, ignoreNotFound, skipTransformDecode)
if err != nil {
return err
}
@ -539,9 +580,8 @@ func (s *store) GuaranteedUpdate(
origStateIsCurrent = true
continue
}
putResp := txnResp.Responses[0].GetResponsePut()
err = decode(s.codec, s.versioner, data, destination, putResp.Header.Revision)
err = s.decoder.Decode(data, destination, txnResp.Revision)
if err != nil {
span.AddEvent("decode failed", attribute.Int("len", len(data)), attribute.String("err", err.Error()))
recordDecodeError(s.groupResourceString, preparedKey)
@ -583,12 +623,12 @@ func (s *store) Count(key string) (int64, error) {
}
startTime := time.Now()
getResp, err := s.client.KV.Get(context.Background(), preparedKey, clientv3.WithRange(clientv3.GetPrefixRangeEnd(preparedKey)), clientv3.WithCountOnly())
count, err := s.client.Kubernetes.Count(context.Background(), preparedKey, kubernetes.CountOptions{})
metrics.RecordEtcdRequest("listWithCount", preparedKey, err, startTime)
if err != nil {
return 0, err
}
return getResp.Count, nil
return count, nil
}
// ReadinessCheck implements storage.Interface.
@ -639,7 +679,7 @@ func (s *store) resolveGetListRev(continueKey string, continueRV int64, opts sto
// GetList implements storage.Interface.
func (s *store) GetList(ctx context.Context, key string, opts storage.ListOptions, listObj runtime.Object) error {
preparedKey, err := s.prepareKey(key)
keyPrefix, err := s.prepareKey(key)
if err != nil {
return err
}
@ -664,27 +704,13 @@ func (s *store) GetList(ctx context.Context, key string, opts storage.ListOption
// get children "directories". e.g. if we have key "/a", "/a/b", "/ab", getting keys
// with prefix "/a" will return all three, while with prefix "/a/" will return only
// "/a/b" which is the correct answer.
if opts.Recursive && !strings.HasSuffix(preparedKey, "/") {
preparedKey += "/"
if opts.Recursive && !strings.HasSuffix(keyPrefix, "/") {
keyPrefix += "/"
}
keyPrefix := preparedKey
// set the appropriate clientv3 options to filter the returned data set
var limitOption *clientv3.OpOption
limit := opts.Predicate.Limit
var paging bool
options := make([]clientv3.OpOption, 0, 4)
if opts.Predicate.Limit > 0 {
paging = true
options = append(options, clientv3.WithLimit(limit))
limitOption = &options[len(options)-1]
}
if opts.Recursive {
rangeEnd := clientv3.GetPrefixRangeEnd(keyPrefix)
options = append(options, clientv3.WithRange(rangeEnd))
}
paging := opts.Predicate.Limit > 0
newItemFunc := getNewItemFunc(listObj, v)
var continueRV, withRev int64
@ -694,20 +720,15 @@ func (s *store) GetList(ctx context.Context, key string, opts storage.ListOption
if err != nil {
return apierrors.NewBadRequest(fmt.Sprintf("invalid continue token: %v", err))
}
preparedKey = continueKey
}
if withRev, err = s.resolveGetListRev(continueKey, continueRV, opts); err != nil {
return err
}
if withRev != 0 {
options = append(options, clientv3.WithRev(withRev))
}
// loop until we have filled the requested limit from etcd or there are no more results
var lastKey []byte
var hasMore bool
var getResp *clientv3.GetResponse
var getResp kubernetes.ListResponse
var numFetched int
var numEvald int
// Because these metrics are for understanding the costs of handling LIST requests,
@ -722,26 +743,30 @@ func (s *store) GetList(ctx context.Context, key string, opts storage.ListOption
metricsOp = "list"
}
aggregator := s.listErrAggrFactory()
for {
startTime := time.Now()
getResp, err = s.client.KV.Get(ctx, preparedKey, options...)
getResp, err = s.getList(ctx, keyPrefix, opts.Recursive, kubernetes.ListOptions{
Revision: withRev,
Limit: limit,
Continue: continueKey,
})
metrics.RecordEtcdRequest(metricsOp, s.groupResourceString, err, startTime)
if err != nil {
return interpretListError(err, len(opts.Predicate.Continue) > 0, continueKey, keyPrefix)
}
numFetched += len(getResp.Kvs)
if err = s.validateMinimumResourceVersion(opts.ResourceVersion, uint64(getResp.Header.Revision)); err != nil {
if err = s.validateMinimumResourceVersion(opts.ResourceVersion, uint64(getResp.Revision)); err != nil {
return err
}
hasMore = getResp.More
hasMore = int64(len(getResp.Kvs)) < getResp.Count
if len(getResp.Kvs) == 0 && getResp.More {
if len(getResp.Kvs) == 0 && hasMore {
return fmt.Errorf("no results were found, but etcd indicated there were more values remaining")
}
// indicate to the client which resource version was returned, and use the same resource version for subsequent requests.
if withRev == 0 {
withRev = getResp.Header.Revision
options = append(options, clientv3.WithRev(withRev))
withRev = getResp.Revision
}
// avoid small allocations for the result slice, since this can be called in many
@ -762,7 +787,10 @@ func (s *store) GetList(ctx context.Context, key string, opts storage.ListOption
data, _, err := s.transformer.TransformFromStorage(ctx, kv.Value, authenticatedDataString(kv.Key))
if err != nil {
return storage.NewInternalErrorf("unable to transform key %q: %v", kv.Key, err)
if done := aggregator.Aggregate(string(kv.Key), storage.NewInternalError(fmt.Errorf("unable to transform key %q: %w", kv.Key, err))); done {
return aggregator.Err()
}
continue
}
// Check if the request has already timed out before decode object
@ -773,10 +801,13 @@ func (s *store) GetList(ctx context.Context, key string, opts storage.ListOption
default:
}
obj, err := decodeListItem(ctx, data, uint64(kv.ModRevision), s.codec, s.versioner, newItemFunc)
obj, err := s.decoder.DecodeListItem(ctx, data, uint64(kv.ModRevision), newItemFunc)
if err != nil {
recordDecodeError(s.groupResourceString, string(kv.Key))
return err
if done := aggregator.Aggregate(string(kv.Key), err); done {
return aggregator.Err()
}
continue
}
// being unable to set the version does not prevent the object from being extracted
@ -789,6 +820,7 @@ func (s *store) GetList(ctx context.Context, key string, opts storage.ListOption
// free kv early. Long lists can take O(seconds) to decode.
getResp.Kvs[i] = nil
}
continueKey = string(lastKey) + "\x00"
// no more results remain or we didn't request paging
if !hasMore || !paging {
@ -806,9 +838,11 @@ func (s *store) GetList(ctx context.Context, key string, opts storage.ListOption
if limit > maxLimit {
limit = maxLimit
}
*limitOption = clientv3.WithLimit(limit)
}
preparedKey = string(lastKey) + "\x00"
}
if err := aggregator.Err(); err != nil {
return err
}
if v.IsNil() {
@ -823,6 +857,26 @@ func (s *store) GetList(ctx context.Context, key string, opts storage.ListOption
return s.versioner.UpdateList(listObj, uint64(withRev), continueValue, remainingItemCount)
}
func (s *store) getList(ctx context.Context, keyPrefix string, recursive bool, options kubernetes.ListOptions) (kubernetes.ListResponse, error) {
if recursive {
return s.client.Kubernetes.List(ctx, keyPrefix, options)
}
getResp, err := s.client.Kubernetes.Get(ctx, keyPrefix, kubernetes.GetOptions{
Revision: options.Revision,
})
var resp kubernetes.ListResponse
if getResp.KV != nil {
resp.Kvs = []*mvccpb.KeyValue{getResp.KV}
resp.Count = 1
resp.Revision = getResp.Revision
} else {
resp.Kvs = []*mvccpb.KeyValue{}
resp.Count = 0
resp.Revision = getResp.Revision
}
return resp, err
}
// growSlice takes a slice value and grows its capacity up
// to the maximum of the passed sizes or maxCapacity, whichever
// is smaller. Above maxCapacity decisions about allocation are left
@ -878,19 +932,25 @@ func (s *store) watchContext(ctx context.Context) context.Context {
return clientv3.WithRequireLeader(ctx)
}
func (s *store) getCurrentState(ctx context.Context, key string, v reflect.Value, ignoreNotFound bool) func() (*objState, error) {
func (s *store) getCurrentState(ctx context.Context, key string, v reflect.Value, ignoreNotFound bool, skipTransformDecode bool) func() (*objState, error) {
return func() (*objState, error) {
startTime := time.Now()
getResp, err := s.client.KV.Get(ctx, key)
getResp, err := s.client.Kubernetes.Get(ctx, key, kubernetes.GetOptions{})
metrics.RecordEtcdRequest("get", s.groupResourceString, err, startTime)
if err != nil {
return nil, err
}
return s.getState(ctx, getResp, key, v, ignoreNotFound)
return s.getState(ctx, getResp.KV, key, v, ignoreNotFound, skipTransformDecode)
}
}
func (s *store) getState(ctx context.Context, getResp *clientv3.GetResponse, key string, v reflect.Value, ignoreNotFound bool) (*objState, error) {
// getState constructs a new objState from the given response from the storage.
// skipTransformDecode: if true, the function will neither transform the data
// from the storage nor decode it into an object; otherwise, data from the
// storage will be transformed and decoded.
// NOTE: when skipTransformDecode is true, the 'data', and the 'obj' fields
// of the objState will be nil, and 'stale' will be set to true.
func (s *store) getState(ctx context.Context, kv *mvccpb.KeyValue, key string, v reflect.Value, ignoreNotFound bool, skipTransformDecode bool) (*objState, error) {
state := &objState{
meta: &storage.ResponseMeta{},
}
@ -901,7 +961,7 @@ func (s *store) getState(ctx context.Context, getResp *clientv3.GetResponse, key
state.obj = reflect.New(v.Type()).Interface().(runtime.Object)
}
if len(getResp.Kvs) == 0 {
if kv == nil {
if !ignoreNotFound {
return nil, storage.NewKeyNotFoundError(key, 0)
}
@ -909,15 +969,25 @@ func (s *store) getState(ctx context.Context, getResp *clientv3.GetResponse, key
return nil, err
}
} else {
data, stale, err := s.transformer.TransformFromStorage(ctx, getResp.Kvs[0].Value, authenticatedDataString(key))
if err != nil {
return nil, storage.NewInternalError(err.Error())
}
state.rev = getResp.Kvs[0].ModRevision
state.rev = kv.ModRevision
state.meta.ResourceVersion = uint64(state.rev)
if skipTransformDecode {
// be explicit that we don't have the object
state.obj = nil
state.stale = true // this seems a more sane value here
return state, nil
}
data, stale, err := s.transformer.TransformFromStorage(ctx, kv.Value, authenticatedDataString(key))
if err != nil {
return nil, storage.NewInternalError(err)
}
state.data = data
state.stale = stale
if err := decode(s.codec, s.versioner, state.data, state.obj, state.rev); err != nil {
if err := s.decoder.Decode(state.data, state.obj, state.rev); err != nil {
recordDecodeError(s.groupResourceString, key)
return nil, err
}
@ -969,19 +1039,6 @@ func (s *store) updateState(st *objState, userUpdate storage.UpdateFunc) (runtim
return ret, ttl, nil
}
// ttlOpts returns client options based on given ttl.
// ttl: if ttl is non-zero, it will attach the key to a lease with ttl of roughly the same length
func (s *store) ttlOpts(ctx context.Context, ttl int64) ([]clientv3.OpOption, error) {
if ttl == 0 {
return nil, nil
}
id, err := s.leaseManager.GetLease(ctx, ttl)
if err != nil {
return nil, err
}
return []clientv3.OpOption{clientv3.WithLease(id)}, nil
}
// validateMinimumResourceVersion returns a 'too large resource' version error when the provided minimumResourceVersion is
// greater than the most recent actualRevision available from storage.
func (s *store) validateMinimumResourceVersion(minimumResourceVersion string, actualRevision uint64) error {
@ -1024,52 +1081,12 @@ func (s *store) prepareKey(key string) (string, error) {
return s.pathPrefix + key[startIndex:], nil
}
// decode decodes value of bytes into object. It will also set the object resource version to rev.
// On success, objPtr would be set to the object.
func decode(codec runtime.Codec, versioner storage.Versioner, value []byte, objPtr runtime.Object, rev int64) error {
if _, err := conversion.EnforcePtr(objPtr); err != nil {
return fmt.Errorf("unable to convert output object to pointer: %v", err)
}
_, _, err := codec.Decode(value, nil, objPtr)
if err != nil {
return err
}
// being unable to set the version does not prevent the object from being extracted
if err := versioner.UpdateObject(objPtr, uint64(rev)); err != nil {
klog.Errorf("failed to update object version: %v", err)
}
return nil
}
// decodeListItem decodes bytes value in array into object.
func decodeListItem(ctx context.Context, data []byte, rev uint64, codec runtime.Codec, versioner storage.Versioner, newItemFunc func() runtime.Object) (runtime.Object, error) {
startedAt := time.Now()
defer func() {
endpointsrequest.TrackDecodeLatency(ctx, time.Since(startedAt))
}()
obj, _, err := codec.Decode(data, nil, newItemFunc())
if err != nil {
return nil, err
}
if err := versioner.UpdateObject(obj, rev); err != nil {
klog.Errorf("failed to update object version: %v", err)
}
return obj, nil
}
// recordDecodeError record decode error split by object type.
func recordDecodeError(resource string, key string) {
metrics.RecordDecodeError(resource)
klog.V(4).Infof("Decoding %s \"%s\" failed", resource, key)
}
func notFound(key string) clientv3.Cmp {
return clientv3.Compare(clientv3.ModRevision(key), "=", 0)
}
// getTypeName returns type name of an object for reporting purposes.
func getTypeName(obj interface{}) string {
return reflect.TypeOf(obj).String()

View File

@ -686,18 +686,40 @@ func (wc *watchChan) prepareObjs(e *event) (curObj runtime.Object, oldObj runtim
if len(e.prevValue) > 0 && (e.isDeleted || !wc.acceptAll()) {
data, _, err := wc.watcher.transformer.TransformFromStorage(wc.ctx, e.prevValue, authenticatedDataString(e.key))
if err != nil {
return nil, nil, err
return nil, nil, wc.watcher.transformIfCorruptObjectError(e, err)
}
// Note that this sends the *old* object with the etcd revision for the time at
// which it gets deleted.
oldObj, err = decodeObj(wc.watcher.codec, wc.watcher.versioner, data, e.rev)
if err != nil {
return nil, nil, err
return nil, nil, wc.watcher.transformIfCorruptObjectError(e, err)
}
}
return curObj, oldObj, nil
}
type corruptObjectDeletedError struct {
err error
}
func (e *corruptObjectDeletedError) Error() string {
return fmt.Sprintf("saw a DELETED event, but object data is corrupt - %v", e.err)
}
func (e *corruptObjectDeletedError) Unwrap() error { return e.err }
func (w *watcher) transformIfCorruptObjectError(e *event, err error) error {
var corruptObjErr *corruptObjectError
if !e.isDeleted || !errors.As(err, &corruptObjErr) {
return err
}
// if we are here it means we received a DELETED event but the object
// associated with it is corrupt because we failed to transform or
// decode the data associated with the object.
// wrap the original error so we can send a proper watch Error event.
return &corruptObjectDeletedError{err: corruptObjErr}
}
func decodeObj(codec runtime.Codec, versioner storage.Versioner, data []byte, rev int64) (_ runtime.Object, err error) {
obj, err := runtime.Decode(codec, []byte(data))
if err != nil {

View File

@ -108,6 +108,8 @@ type UpdateFunc func(input runtime.Object, res ResponseMeta) (output runtime.Obj
// ValidateObjectFunc is a function to act on a given object. An error may be returned
// if the hook cannot be completed. The function may NOT transform the provided
// object.
// NOTE: the object in obj may be nil if it cannot be read from the
// storage, due to transformation or decode error.
type ValidateObjectFunc func(ctx context.Context, obj runtime.Object) error
// ValidateAllObjectFunc is a "admit everything" instance of ValidateObjectFunc.
@ -137,11 +139,11 @@ func (p *Preconditions) Check(key string, obj runtime.Object) error {
}
objMeta, err := meta.Accessor(obj)
if err != nil {
return NewInternalErrorf(
"can't enforce preconditions %v on un-introspectable object %v, got error: %v",
*p,
obj,
err)
return NewInternalError(
fmt.Errorf("can't enforce preconditions %v on un-introspectable object %v, got error: %w",
*p,
obj,
err))
}
if p.UID != nil && *p.UID != objMeta.GetUID() {
err := fmt.Sprintf(
@ -178,7 +180,7 @@ type Interface interface {
// However, the implementations have to retry in case suggestion is stale.
Delete(
ctx context.Context, key string, out runtime.Object, preconditions *Preconditions,
validateDeletion ValidateObjectFunc, cachedExistingObject runtime.Object) error
validateDeletion ValidateObjectFunc, cachedExistingObject runtime.Object, opts DeleteOptions) error
// Watch begins watching the specified key. Events are decoded into API objects,
// and any items selected by 'p' are sent down to returned watch.Interface.
@ -312,3 +314,14 @@ type ListOptions struct {
// continues streaming events.
SendInitialEvents *bool
}
// DeleteOptions provides the options that may be provided for storage delete operations.
type DeleteOptions struct {
// IgnoreStoreReadError, if enabled, will ignore store read error
// such as transformation or decode failure and go ahead with the
// deletion of the object.
// NOTE: for normal deletion flow it should always be false, it may be
// enabled by the caller only to facilitate unsafe deletion of corrupt
// object which otherwise can not be deleted using the normal flow
IgnoreStoreReadError bool
}

View File

@ -33,6 +33,7 @@ import (
"go.etcd.io/etcd/client/pkg/v3/logutil"
"go.etcd.io/etcd/client/pkg/v3/transport"
clientv3 "go.etcd.io/etcd/client/v3"
"go.etcd.io/etcd/client/v3/kubernetes"
"go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc"
"go.uber.org/zap"
"go.uber.org/zap/zapcore"
@ -228,7 +229,7 @@ func newETCD3ProberMonitor(c storagebackend.Config) (*etcd3ProberMonitor, error)
return nil, err
}
return &etcd3ProberMonitor{
client: client,
client: client.Client,
prefix: c.Prefix,
endpoints: c.Transport.ServerList,
}, nil
@ -282,7 +283,7 @@ func (t *etcd3ProberMonitor) Monitor(ctx context.Context) (metrics.StorageMetric
}, nil
}
var newETCD3Client = func(c storagebackend.TransportConfig) (*clientv3.Client, error) {
var newETCD3Client = func(c storagebackend.TransportConfig) (*kubernetes.Client, error) {
tlsInfo := transport.TLSInfo{
CertFile: c.CertFile,
KeyFile: c.KeyFile,
@ -352,7 +353,7 @@ var newETCD3Client = func(c storagebackend.TransportConfig) (*clientv3.Client, e
Logger: etcd3ClientLogger,
}
return clientv3.New(cfg)
return kubernetes.New(cfg)
}
type runningCompactor struct {
@ -378,12 +379,17 @@ func startCompactorOnce(c storagebackend.TransportConfig, interval time.Duration
compactorsMu.Lock()
defer compactorsMu.Unlock()
if interval == 0 {
// short circuit, if the compaction request from apiserver is disabled
return func() {}, nil
}
key := fmt.Sprintf("%v", c) // gives: {[server1 server2] keyFile certFile caFile}
if compactor, foundBefore := compactors[key]; !foundBefore || compactor.interval > interval {
compactorClient, err := newETCD3Client(c)
client, err := newETCD3Client(c)
if err != nil {
return nil, err
}
compactorClient := client.Client
if foundBefore {
// replace compactor
@ -435,7 +441,7 @@ func newETCD3Storage(c storagebackend.ConfigForResource, newFunc, newListFunc fu
// decorate the KV instance so we can track etcd latency per request.
client.KV = etcd3.NewETCDLatencyTracker(client.KV)
stopDBSizeMonitor, err := startDBSizeMonitorPerEndpoint(client, c.DBMetricPollInterval)
stopDBSizeMonitor, err := startDBSizeMonitorPerEndpoint(client.Client, c.DBMetricPollInterval)
if err != nil {
return nil, nil, err
}
@ -455,7 +461,11 @@ func newETCD3Storage(c storagebackend.ConfigForResource, newFunc, newListFunc fu
if transformer == nil {
transformer = identity.NewEncryptCheckTransformer()
}
return etcd3.New(client, c.Codec, newFunc, newListFunc, c.Prefix, resourcePrefix, c.GroupResource, transformer, c.LeaseManagerConfig), destroyFunc, nil
versioner := storage.APIObjectVersioner{}
decoder := etcd3.NewDefaultDecoder(c.Codec, versioner)
store := etcd3.New(client, c.Codec, newFunc, newListFunc, c.Prefix, resourcePrefix, c.GroupResource, transformer, c.LeaseManagerConfig, decoder, versioner)
return store, destroyFunc, nil
}
// startDBSizeMonitorPerEndpoint starts a loop to monitor etcd database size and update the

View File

@ -58,6 +58,7 @@ func NewGRPCService(ctx context.Context, endpoint, providerName string, callTime
s := &gRPCService{callTimeout: callTimeout}
s.connection, err = grpc.Dial(
addr,
grpc.WithAuthority("localhost"),
grpc.WithTransportCredentials(insecure.NewCredentials()),
grpc.WithDefaultCallOptions(grpc.WaitForReady(true)),
grpc.WithContextDialer(

View File

@ -61,10 +61,10 @@ var (
Namespace: namespace,
Subsystem: subsystem,
Name: "transformation_operations_total",
Help: "Total number of transformations. Successful transformation will have a status 'OK' and a varied status string when the transformation fails. This status and transformation_type fields may be used for alerting on encryption/decryption failure using transformation_type from_storage for decryption and to_storage for encryption",
Help: "Total number of transformations. Successful transformation will have a status 'OK' and a varied status string when the transformation fails. The status, resource, and transformation_type fields can be used for alerting purposes. For example, you can monitor for encryption/decryption failures using the transformation_type (e.g., from_storage for decryption and to_storage for encryption). Additionally, these fields can be used to ensure that the correct transformers are applied to each resource.",
StabilityLevel: metrics.ALPHA,
},
[]string{"transformation_type", "transformer_prefix", "status"},
[]string{"resource", "transformation_type", "transformer_prefix", "status"},
)
envelopeTransformationCacheMissTotal = metrics.NewCounter(
@ -113,8 +113,8 @@ func RegisterMetrics() {
// RecordTransformation records latencies and count of TransformFromStorage and TransformToStorage operations.
// Note that transformation_failures_total metric is deprecated, use transformation_operations_total instead.
func RecordTransformation(transformationType, transformerPrefix string, elapsed time.Duration, err error) {
transformerOperationsTotal.WithLabelValues(transformationType, transformerPrefix, getErrorCode(err)).Inc()
func RecordTransformation(resource, transformationType, transformerPrefix string, elapsed time.Duration, err error) {
transformerOperationsTotal.WithLabelValues(resource, transformationType, transformerPrefix, getErrorCode(err)).Inc()
if err == nil {
transformerLatencies.WithLabelValues(transformationType, transformerPrefix).Observe(elapsed.Seconds())

View File

@ -105,6 +105,7 @@ func NewPrefixTransformers(err error, transformers ...PrefixTransformer) Transfo
func (t *prefixTransformers) TransformFromStorage(ctx context.Context, data []byte, dataCtx Context) ([]byte, bool, error) {
start := time.Now()
var errs []error
resource := getResourceFromContext(ctx)
for i, transformer := range t.transformers {
if bytes.HasPrefix(data, transformer.Prefix) {
result, stale, err := transformer.Transformer.TransformFromStorage(ctx, data[len(transformer.Prefix):], dataCtx)
@ -116,9 +117,9 @@ func (t *prefixTransformers) TransformFromStorage(ctx context.Context, data []by
continue
}
if len(transformer.Prefix) == 0 {
RecordTransformation("from_storage", "identity", time.Since(start), err)
RecordTransformation(resource, "from_storage", "identity", time.Since(start), err)
} else {
RecordTransformation("from_storage", string(transformer.Prefix), time.Since(start), err)
RecordTransformation(resource, "from_storage", string(transformer.Prefix), time.Since(start), err)
}
// It is valid to have overlapping prefixes when the same encryption provider
@ -163,7 +164,7 @@ func (t *prefixTransformers) TransformFromStorage(ctx context.Context, data []by
logTransformErr(ctx, err, "failed to decrypt data")
return nil, false, err
}
RecordTransformation("from_storage", "unknown", time.Since(start), t.err)
RecordTransformation(resource, "from_storage", "unknown", time.Since(start), t.err)
return nil, false, t.err
}
@ -171,8 +172,9 @@ func (t *prefixTransformers) TransformFromStorage(ctx context.Context, data []by
func (t *prefixTransformers) TransformToStorage(ctx context.Context, data []byte, dataCtx Context) ([]byte, error) {
start := time.Now()
transformer := t.transformers[0]
resource := getResourceFromContext(ctx)
result, err := transformer.Transformer.TransformToStorage(ctx, data, dataCtx)
RecordTransformation("to_storage", string(transformer.Prefix), time.Since(start), err)
RecordTransformation(resource, "to_storage", string(transformer.Prefix), time.Since(start), err)
if err != nil {
logTransformErr(ctx, err, "failed to encrypt data")
return nil, err
@ -209,5 +211,11 @@ func getRequestInfoFromContext(ctx context.Context) *genericapirequest.RequestIn
if reqInfo, found := genericapirequest.RequestInfoFrom(ctx); found {
return reqInfo
}
klog.V(4).InfoSDepth(1, "no request info on context")
return &genericapirequest.RequestInfo{}
}
func getResourceFromContext(ctx context.Context) string {
reqInfo := getRequestInfoFromContext(ctx)
return schema.GroupResource{Group: reqInfo.APIGroup, Resource: reqInfo.Resource}.String()
}