rebase: update k8s.io packages to v0.29.0

Signed-off-by: Niels de Vos <ndevos@ibm.com>
This commit is contained in:
Niels de Vos
2023-12-20 13:23:59 +01:00
committed by mergify[bot]
parent 328a264202
commit f080b9e0c9
367 changed files with 21340 additions and 11878 deletions

View File

@ -22,7 +22,6 @@ import (
"sync"
"time"
"k8s.io/apimachinery/pkg/api/meta"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
@ -177,7 +176,6 @@ func (c *cacheWatcher) add(event *watchCacheEvent, timer *time.Timer) bool {
// This means that we couldn't send event to that watcher.
// Since we don't want to block on it infinitely,
// we simply terminate it.
klog.V(1).Infof("Forcing %v watcher close due to unresponsiveness: %v. len(c.input) = %v, len(c.result) = %v", c.groupResource.String(), c.identifier, len(c.input), len(c.result))
metrics.TerminatedWatchersCounter.WithLabelValues(c.groupResource.String()).Inc()
// This means that we couldn't send event to that watcher.
// Since we don't want to block on it infinitely, we simply terminate it.
@ -365,17 +363,10 @@ func (c *cacheWatcher) convertToWatchEvent(event *watchCacheEvent) *watch.Event
if event.Type == watch.Bookmark {
e := &watch.Event{Type: watch.Bookmark, Object: event.Object.DeepCopyObject()}
if !c.wasBookmarkAfterRvSent() {
objMeta, err := meta.Accessor(e.Object)
if err != nil {
if err := storage.AnnotateInitialEventsEndBookmark(e.Object); err != nil {
utilruntime.HandleError(fmt.Errorf("error while accessing object's metadata gr: %v, identifier: %v, obj: %#v, err: %v", c.groupResource, c.identifier, e.Object, err))
return nil
}
objAnnotations := objMeta.GetAnnotations()
if objAnnotations == nil {
objAnnotations = map[string]string{}
}
objAnnotations["k8s.io/initial-events-end"] = "true"
objMeta.SetAnnotations(objAnnotations)
}
return e
}

View File

@ -21,7 +21,6 @@ import (
"fmt"
"net/http"
"reflect"
"strconv"
"sync"
"time"
@ -113,11 +112,8 @@ func (wm watchersMap) addWatcher(w *cacheWatcher, number int) {
wm[number] = w
}
func (wm watchersMap) deleteWatcher(number int, done func(*cacheWatcher)) {
if watcher, ok := wm[number]; ok {
delete(wm, number)
done(watcher)
}
func (wm watchersMap) deleteWatcher(number int) {
delete(wm, number)
}
func (wm watchersMap) terminateAll(done func(*cacheWatcher)) {
@ -148,14 +144,14 @@ func (i *indexedWatchers) addWatcher(w *cacheWatcher, number int, scope namespac
}
}
func (i *indexedWatchers) deleteWatcher(number int, scope namespacedName, value string, supported bool, done func(*cacheWatcher)) {
func (i *indexedWatchers) deleteWatcher(number int, scope namespacedName, value string, supported bool) {
if supported {
i.valueWatchers[value].deleteWatcher(number, done)
i.valueWatchers[value].deleteWatcher(number)
if len(i.valueWatchers[value]) == 0 {
delete(i.valueWatchers, value)
}
} else {
i.allWatchers[scope].deleteWatcher(number, done)
i.allWatchers[scope].deleteWatcher(number)
if len(i.allWatchers[scope]) == 0 {
delete(i.allWatchers, scope)
}
@ -725,15 +721,14 @@ func shouldDelegateList(opts storage.ListOptions) bool {
resourceVersion := opts.ResourceVersion
pred := opts.Predicate
match := opts.ResourceVersionMatch
pagingEnabled := utilfeature.DefaultFeatureGate.Enabled(features.APIListChunking)
consistentListFromCacheEnabled := utilfeature.DefaultFeatureGate.Enabled(features.ConsistentListFromCache)
// Serve consistent reads from storage if ConsistentListFromCache is disabled
consistentReadFromStorage := resourceVersion == "" && !consistentListFromCacheEnabled
// Watch cache doesn't support continuations, so serve them from etcd.
hasContinuation := pagingEnabled && len(pred.Continue) > 0
hasContinuation := len(pred.Continue) > 0
// Serve paginated requests about revision "0" from watch cache to avoid overwhelming etcd.
hasLimit := pagingEnabled && pred.Limit > 0 && resourceVersion != "0"
hasLimit := pred.Limit > 0 && resourceVersion != "0"
// Watch cache only supports ResourceVersionMatchNotOlderThan (default).
unsupportedMatch := match != "" && match != metav1.ResourceVersionMatchNotOlderThan
@ -773,7 +768,7 @@ func (c *Cacher) GetList(ctx context.Context, key string, opts storage.ListOptio
return c.storage.GetList(ctx, key, opts, listObj)
}
if listRV == 0 && utilfeature.DefaultFeatureGate.Enabled(features.ConsistentListFromCache) {
listRV, err = c.getCurrentResourceVersionFromStorage(ctx)
listRV, err = storage.GetCurrentResourceVersionFromStorage(ctx, c.storage, c.newListFunc, c.resourcePrefix, c.objectType.String())
if err != nil {
return err
}
@ -1225,7 +1220,8 @@ func forgetWatcher(c *Cacher, w *cacheWatcher, index int, scope namespacedName,
// It's possible that the watcher is already not in the structure (e.g. in case of
// simultaneous Stop() and terminateAllWatchers(), but it is safe to call stopLocked()
// on a watcher multiple times.
c.watchers.deleteWatcher(index, scope, triggerValue, triggerSupported, c.stopWatcherLocked)
c.watchers.deleteWatcher(index, scope, triggerValue, triggerSupported)
c.stopWatcherLocked(w)
}
}
@ -1249,48 +1245,12 @@ func (c *Cacher) LastSyncResourceVersion() (uint64, error) {
return c.versioner.ParseResourceVersion(resourceVersion)
}
// getCurrentResourceVersionFromStorage gets the current resource version from the underlying storage engine.
// this method issues an empty list request and reads only the ResourceVersion from the object metadata
func (c *Cacher) getCurrentResourceVersionFromStorage(ctx context.Context) (uint64, error) {
if c.newListFunc == nil {
return 0, fmt.Errorf("newListFunction wasn't provided for %v", c.objectType)
}
emptyList := c.newListFunc()
pred := storage.SelectionPredicate{
Label: labels.Everything(),
Field: fields.Everything(),
Limit: 1, // just in case we actually hit something
}
err := c.storage.GetList(ctx, c.resourcePrefix, storage.ListOptions{Predicate: pred}, emptyList)
if err != nil {
return 0, err
}
emptyListAccessor, err := meta.ListAccessor(emptyList)
if err != nil {
return 0, err
}
if emptyListAccessor == nil {
return 0, fmt.Errorf("unable to extract a list accessor from %T", emptyList)
}
currentResourceVersion, err := strconv.Atoi(emptyListAccessor.GetResourceVersion())
if err != nil {
return 0, err
}
if currentResourceVersion == 0 {
return 0, fmt.Errorf("the current resource version must be greater than 0")
}
return uint64(currentResourceVersion), nil
}
// getBookmarkAfterResourceVersionLockedFunc returns a function that
// spits a ResourceVersion after which the bookmark event will be delivered.
//
// The returned function must be called under the watchCache lock.
func (c *Cacher) getBookmarkAfterResourceVersionLockedFunc(ctx context.Context, parsedResourceVersion uint64, opts storage.ListOptions) (func() uint64, error) {
if opts.SendInitialEvents == nil || *opts.SendInitialEvents == false || !opts.Predicate.AllowWatchBookmarks {
if opts.SendInitialEvents == nil || !*opts.SendInitialEvents || !opts.Predicate.AllowWatchBookmarks {
return func() uint64 { return 0 }, nil
}
return c.getCommonResourceVersionLockedFunc(ctx, parsedResourceVersion, opts)
@ -1305,7 +1265,7 @@ func (c *Cacher) getBookmarkAfterResourceVersionLockedFunc(ctx context.Context,
//
// The returned function must be called under the watchCache lock.
func (c *Cacher) getStartResourceVersionForWatchLockedFunc(ctx context.Context, parsedWatchResourceVersion uint64, opts storage.ListOptions) (func() uint64, error) {
if opts.SendInitialEvents == nil || *opts.SendInitialEvents == true {
if opts.SendInitialEvents == nil || *opts.SendInitialEvents {
return func() uint64 { return parsedWatchResourceVersion }, nil
}
return c.getCommonResourceVersionLockedFunc(ctx, parsedWatchResourceVersion, opts)
@ -1318,7 +1278,7 @@ func (c *Cacher) getStartResourceVersionForWatchLockedFunc(ctx context.Context,
func (c *Cacher) getCommonResourceVersionLockedFunc(ctx context.Context, parsedWatchResourceVersion uint64, opts storage.ListOptions) (func() uint64, error) {
switch {
case len(opts.ResourceVersion) == 0:
rv, err := c.getCurrentResourceVersionFromStorage(ctx)
rv, err := storage.GetCurrentResourceVersionFromStorage(ctx, c.storage, c.newListFunc, c.resourcePrefix, c.objectType.String())
if err != nil {
return nil, err
}
@ -1336,7 +1296,7 @@ func (c *Cacher) getCommonResourceVersionLockedFunc(ctx context.Context, parsedW
// Additionally, it instructs the caller whether it should ask for
// all events from the cache (full state) or not.
func (c *Cacher) waitUntilWatchCacheFreshAndForceAllEvents(ctx context.Context, requestedWatchRV uint64, opts storage.ListOptions) (bool, error) {
if opts.SendInitialEvents != nil && *opts.SendInitialEvents == true {
if opts.SendInitialEvents != nil && *opts.SendInitialEvents {
err := c.watchCache.waitUntilFreshAndBlock(ctx, requestedWatchRV)
defer c.watchCache.RUnlock()
return err == nil, err

View File

@ -17,13 +17,16 @@ limitations under the License.
package storage
import (
"errors"
"fmt"
"k8s.io/apimachinery/pkg/api/errors"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/validation/field"
)
var ErrResourceVersionSetOnCreate = errors.New("resourceVersion should not be set on objects to be created")
const (
ErrCodeKeyNotFound int = iota + 1
ErrCodeKeyExists
@ -176,7 +179,7 @@ var tooLargeResourceVersionCauseMsg = "Too large resource version"
// NewTooLargeResourceVersionError returns a timeout error with the given retrySeconds for a request for
// a minimum resource version that is larger than the largest currently available resource version for a requested resource.
func NewTooLargeResourceVersionError(minimumResourceVersion, currentRevision uint64, retrySeconds int) error {
err := errors.NewTimeoutError(fmt.Sprintf("Too large resource version: %d, current: %d", minimumResourceVersion, currentRevision), retrySeconds)
err := apierrors.NewTimeoutError(fmt.Sprintf("Too large resource version: %d, current: %d", minimumResourceVersion, currentRevision), retrySeconds)
err.ErrStatus.Details.Causes = []metav1.StatusCause{
{
Type: metav1.CauseTypeResourceVersionTooLarge,
@ -188,8 +191,8 @@ func NewTooLargeResourceVersionError(minimumResourceVersion, currentRevision uin
// IsTooLargeResourceVersion returns true if the error is a TooLargeResourceVersion error.
func IsTooLargeResourceVersion(err error) bool {
if !errors.IsTimeout(err) {
if !apierrors.IsTimeout(err) {
return false
}
return errors.HasStatusCause(err, metav1.CauseTypeResourceVersionTooLarge)
return apierrors.HasStatusCause(err, metav1.CauseTypeResourceVersionTooLarge)
}

View File

@ -30,6 +30,17 @@ type event struct {
isDeleted bool
isCreated bool
isProgressNotify bool
// isInitialEventsEndBookmark helps us keep track
// of whether we have sent an annotated bookmark event.
//
// when this variable is set to true,
// a special annotation will be added
// to the bookmark event.
//
// note that we decided to extend the event
// struct field to eliminate contention
// between startWatching and processEvent
isInitialEventsEndBookmark bool
}
// parseKV converts a KeyValue retrieved from an initial sync() listing to a synthetic isCreated event.

View File

@ -69,7 +69,7 @@ var (
objectCounts = compbasemetrics.NewGaugeVec(
&compbasemetrics.GaugeOpts{
Name: "apiserver_storage_objects",
Help: "Number of stored objects at the time of last check split by kind.",
Help: "Number of stored objects at the time of last check split by kind. In case of a fetching error, the value will be -1.",
StabilityLevel: compbasemetrics.STABLE,
},
[]string{"resource"},
@ -228,7 +228,7 @@ func UpdateEtcdDbSize(ep string, size int64) {
// SetStorageMonitorGetter sets monitor getter to allow monitoring etcd stats.
func SetStorageMonitorGetter(getter func() ([]Monitor, error)) {
storageMonitor.monitorGetter = getter
storageMonitor.setGetter(getter)
}
// UpdateLeaseObjectCount sets the etcd_lease_object_counts metric.
@ -258,9 +258,22 @@ type StorageMetrics struct {
type monitorCollector struct {
compbasemetrics.BaseStableCollector
mutex sync.Mutex
monitorGetter func() ([]Monitor, error)
}
func (m *monitorCollector) setGetter(monitorGetter func() ([]Monitor, error)) {
m.mutex.Lock()
defer m.mutex.Unlock()
m.monitorGetter = monitorGetter
}
func (m *monitorCollector) getGetter() func() ([]Monitor, error) {
m.mutex.Lock()
defer m.mutex.Unlock()
return m.monitorGetter
}
// DescribeWithStability implements compbasemetrics.StableColletor
func (c *monitorCollector) DescribeWithStability(ch chan<- *compbasemetrics.Desc) {
ch <- storageSizeDescription
@ -268,7 +281,7 @@ func (c *monitorCollector) DescribeWithStability(ch chan<- *compbasemetrics.Desc
// CollectWithStability implements compbasemetrics.StableColletor
func (c *monitorCollector) CollectWithStability(ch chan<- compbasemetrics.Metric) {
monitors, err := c.monitorGetter()
monitors, err := c.getGetter()()
if err != nil {
return
}

View File

@ -32,19 +32,15 @@ import (
apierrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/util/validation/field"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/conversion"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/watch"
"k8s.io/apiserver/pkg/audit"
"k8s.io/apiserver/pkg/features"
"k8s.io/apiserver/pkg/storage"
"k8s.io/apiserver/pkg/storage/etcd3/metrics"
"k8s.io/apiserver/pkg/storage/value"
utilfeature "k8s.io/apiserver/pkg/util/feature"
"k8s.io/component-base/tracing"
"k8s.io/klog/v2"
)
@ -81,7 +77,6 @@ type store struct {
groupResource schema.GroupResource
groupResourceString string
watcher *watcher
pagingEnabled bool
leaseManager *leaseManager
}
@ -100,11 +95,11 @@ type objState struct {
}
// New returns an etcd3 implementation of storage.Interface.
func New(c *clientv3.Client, codec runtime.Codec, newFunc func() runtime.Object, prefix string, groupResource schema.GroupResource, transformer value.Transformer, pagingEnabled bool, leaseManagerConfig LeaseManagerConfig) storage.Interface {
return newStore(c, codec, newFunc, prefix, groupResource, transformer, pagingEnabled, leaseManagerConfig)
func New(c *clientv3.Client, codec runtime.Codec, newFunc, newListFunc func() runtime.Object, prefix, resourcePrefix string, groupResource schema.GroupResource, transformer value.Transformer, leaseManagerConfig LeaseManagerConfig) storage.Interface {
return newStore(c, codec, newFunc, newListFunc, prefix, resourcePrefix, groupResource, transformer, leaseManagerConfig)
}
func newStore(c *clientv3.Client, codec runtime.Codec, newFunc func() runtime.Object, prefix string, groupResource schema.GroupResource, transformer value.Transformer, pagingEnabled bool, leaseManagerConfig LeaseManagerConfig) *store {
func newStore(c *clientv3.Client, codec runtime.Codec, newFunc, newListFunc func() runtime.Object, prefix, resourcePrefix string, groupResource schema.GroupResource, transformer value.Transformer, leaseManagerConfig LeaseManagerConfig) *store {
versioner := storage.APIObjectVersioner{}
// for compatibility with etcd2 impl.
// no-op for default prefix of '/registry'.
@ -114,19 +109,36 @@ func newStore(c *clientv3.Client, codec runtime.Codec, newFunc func() runtime.Ob
// Ensure the pathPrefix ends in "/" here to simplify key concatenation later.
pathPrefix += "/"
}
result := &store{
w := &watcher{
client: c,
codec: codec,
newFunc: newFunc,
groupResource: groupResource,
versioner: versioner,
transformer: transformer,
}
if newFunc == nil {
w.objectType = "<unknown>"
} else {
w.objectType = reflect.TypeOf(newFunc()).String()
}
s := &store{
client: c,
codec: codec,
versioner: versioner,
transformer: transformer,
pagingEnabled: pagingEnabled,
pathPrefix: pathPrefix,
groupResource: groupResource,
groupResourceString: groupResource.String(),
watcher: newWatcher(c, codec, groupResource, newFunc, versioner),
watcher: w,
leaseManager: newDefaultLeaseManager(c, leaseManagerConfig),
}
return result
w.getCurrentStorageRV = func(ctx context.Context) (uint64, error) {
return storage.GetCurrentResourceVersionFromStorage(ctx, s, newListFunc, resourcePrefix, w.objectType)
}
return s
}
// Versioner implements storage.Interface.Versioner.
@ -185,7 +197,7 @@ func (s *store) Create(ctx context.Context, key string, obj, out runtime.Object,
)
defer span.End(500 * time.Millisecond)
if version, err := s.versioner.ObjectResourceVersion(obj); err == nil && version != 0 {
return errors.New("resourceVersion should not be set on objects to be created")
return storage.ErrResourceVersionSetOnCreate
}
if err := s.versioner.PrepareObjectForStorage(obj); err != nil {
return fmt.Errorf("PrepareObjectForStorage failed: %v", err)
@ -258,15 +270,7 @@ func (s *store) Delete(
func (s *store) conditionalDelete(
ctx context.Context, key string, out runtime.Object, v reflect.Value, preconditions *storage.Preconditions,
validateDeletion storage.ValidateObjectFunc, cachedExistingObject runtime.Object) error {
getCurrentState := func() (*objState, error) {
startTime := time.Now()
getResp, err := s.client.KV.Get(ctx, key)
metrics.RecordEtcdRequest("get", s.groupResourceString, err, startTime)
if err != nil {
return nil, err
}
return s.getState(ctx, getResp, key, v, false)
}
getCurrentState := s.getCurrentState(ctx, key, v, false)
var origState *objState
var err error
@ -394,15 +398,7 @@ func (s *store) GuaranteedUpdate(
return fmt.Errorf("unable to convert output object to pointer: %v", err)
}
getCurrentState := func() (*objState, error) {
startTime := time.Now()
getResp, err := s.client.KV.Get(ctx, preparedKey)
metrics.RecordEtcdRequest("get", s.groupResourceString, err, startTime)
if err != nil {
return nil, err
}
return s.getState(ctx, getResp, preparedKey, v, ignoreNotFound)
}
getCurrentState := s.getCurrentState(ctx, preparedKey, v, ignoreNotFound)
var origState *objState
var origStateIsCurrent bool
@ -594,17 +590,13 @@ func (s *store) GetList(ctx context.Context, key string, opts storage.ListOption
if err != nil {
return err
}
recursive := opts.Recursive
resourceVersion := opts.ResourceVersion
match := opts.ResourceVersionMatch
pred := opts.Predicate
ctx, span := tracing.Start(ctx, fmt.Sprintf("List(recursive=%v) etcd3", recursive),
ctx, span := tracing.Start(ctx, fmt.Sprintf("List(recursive=%v) etcd3", opts.Recursive),
attribute.String("audit-id", audit.GetAuditIDTruncated(ctx)),
attribute.String("key", key),
attribute.String("resourceVersion", resourceVersion),
attribute.String("resourceVersionMatch", string(match)),
attribute.Int("limit", int(pred.Limit)),
attribute.String("continue", pred.Continue))
attribute.String("resourceVersion", opts.ResourceVersion),
attribute.String("resourceVersionMatch", string(opts.ResourceVersionMatch)),
attribute.Int("limit", int(opts.Predicate.Limit)),
attribute.String("continue", opts.Predicate.Continue))
defer span.End(500 * time.Millisecond)
listPtr, err := meta.GetItemsPtr(listObj)
if err != nil {
@ -619,97 +611,68 @@ func (s *store) GetList(ctx context.Context, key string, opts storage.ListOption
// get children "directories". e.g. if we have key "/a", "/a/b", "/ab", getting keys
// with prefix "/a" will return all three, while with prefix "/a/" will return only
// "/a/b" which is the correct answer.
if recursive && !strings.HasSuffix(preparedKey, "/") {
if opts.Recursive && !strings.HasSuffix(preparedKey, "/") {
preparedKey += "/"
}
keyPrefix := preparedKey
// set the appropriate clientv3 options to filter the returned data set
var limitOption *clientv3.OpOption
limit := pred.Limit
limit := opts.Predicate.Limit
var paging bool
options := make([]clientv3.OpOption, 0, 4)
if s.pagingEnabled && pred.Limit > 0 {
if opts.Predicate.Limit > 0 {
paging = true
options = append(options, clientv3.WithLimit(limit))
limitOption = &options[len(options)-1]
}
newItemFunc := getNewItemFunc(listObj, v)
var fromRV *uint64
if len(resourceVersion) > 0 {
parsedRV, err := s.versioner.ParseResourceVersion(resourceVersion)
if err != nil {
return apierrors.NewBadRequest(fmt.Sprintf("invalid resource version: %v", err))
}
fromRV = &parsedRV
if opts.Recursive {
rangeEnd := clientv3.GetPrefixRangeEnd(keyPrefix)
options = append(options, clientv3.WithRange(rangeEnd))
}
var returnedRV, continueRV, withRev int64
newItemFunc := getNewItemFunc(listObj, v)
var continueRV, withRev int64
var continueKey string
switch {
case recursive && s.pagingEnabled && len(pred.Continue) > 0:
continueKey, continueRV, err = storage.DecodeContinue(pred.Continue, keyPrefix)
case opts.Recursive && len(opts.Predicate.Continue) > 0:
continueKey, continueRV, err = storage.DecodeContinue(opts.Predicate.Continue, keyPrefix)
if err != nil {
return apierrors.NewBadRequest(fmt.Sprintf("invalid continue token: %v", err))
}
if len(resourceVersion) > 0 && resourceVersion != "0" {
if len(opts.ResourceVersion) > 0 && opts.ResourceVersion != "0" {
return apierrors.NewBadRequest("specifying resource version is not allowed when using continue")
}
rangeEnd := clientv3.GetPrefixRangeEnd(keyPrefix)
options = append(options, clientv3.WithRange(rangeEnd))
preparedKey = continueKey
// If continueRV > 0, the LIST request needs a specific resource version.
// continueRV==0 is invalid.
// If continueRV < 0, the request is for the latest resource version.
if continueRV > 0 {
withRev = continueRV
returnedRV = continueRV
}
case recursive && s.pagingEnabled && pred.Limit > 0:
if fromRV != nil {
switch match {
case metav1.ResourceVersionMatchNotOlderThan:
// The not older than constraint is checked after we get a response from etcd,
// and returnedRV is then set to the revision we get from the etcd response.
case metav1.ResourceVersionMatchExact:
returnedRV = int64(*fromRV)
withRev = returnedRV
case "": // legacy case
if *fromRV > 0 {
returnedRV = int64(*fromRV)
withRev = returnedRV
}
default:
return fmt.Errorf("unknown ResourceVersionMatch value: %v", match)
case len(opts.ResourceVersion) > 0:
parsedRV, err := s.versioner.ParseResourceVersion(opts.ResourceVersion)
if err != nil {
return apierrors.NewBadRequest(fmt.Sprintf("invalid resource version: %v", err))
}
switch opts.ResourceVersionMatch {
case metav1.ResourceVersionMatchNotOlderThan:
// The not older than constraint is checked after we get a response from etcd,
// and returnedRV is then set to the revision we get from the etcd response.
case metav1.ResourceVersionMatchExact:
withRev = int64(parsedRV)
case "": // legacy case
if opts.Recursive && opts.Predicate.Limit > 0 && parsedRV > 0 {
withRev = int64(parsedRV)
}
}
rangeEnd := clientv3.GetPrefixRangeEnd(keyPrefix)
options = append(options, clientv3.WithRange(rangeEnd))
default:
if fromRV != nil {
switch match {
case metav1.ResourceVersionMatchNotOlderThan:
// The not older than constraint is checked after we get a response from etcd,
// and returnedRV is then set to the revision we get from the etcd response.
case metav1.ResourceVersionMatchExact:
returnedRV = int64(*fromRV)
withRev = returnedRV
case "": // legacy case
default:
return fmt.Errorf("unknown ResourceVersionMatch value: %v", match)
}
}
if recursive {
options = append(options, clientv3.WithPrefix())
default:
return fmt.Errorf("unknown ResourceVersionMatch value: %v", opts.ResourceVersionMatch)
}
}
if withRev != 0 {
options = append(options, clientv3.WithRev(withRev))
}
@ -728,7 +691,7 @@ func (s *store) GetList(ctx context.Context, key string, opts storage.ListOption
}()
metricsOp := "get"
if recursive {
if opts.Recursive {
metricsOp = "list"
}
@ -737,10 +700,10 @@ func (s *store) GetList(ctx context.Context, key string, opts storage.ListOption
getResp, err = s.client.KV.Get(ctx, preparedKey, options...)
metrics.RecordEtcdRequest(metricsOp, s.groupResourceString, err, startTime)
if err != nil {
return interpretListError(err, len(pred.Continue) > 0, continueKey, keyPrefix)
return interpretListError(err, len(opts.Predicate.Continue) > 0, continueKey, keyPrefix)
}
numFetched += len(getResp.Kvs)
if err = s.validateMinimumResourceVersion(resourceVersion, uint64(getResp.Header.Revision)); err != nil {
if err = s.validateMinimumResourceVersion(opts.ResourceVersion, uint64(getResp.Header.Revision)); err != nil {
return err
}
hasMore = getResp.More
@ -748,10 +711,15 @@ func (s *store) GetList(ctx context.Context, key string, opts storage.ListOption
if len(getResp.Kvs) == 0 && getResp.More {
return fmt.Errorf("no results were found, but etcd indicated there were more values remaining")
}
// indicate to the client which resource version was returned, and use the same resource version for subsequent requests.
if withRev == 0 {
withRev = getResp.Header.Revision
options = append(options, clientv3.WithRev(withRev))
}
// avoid small allocations for the result slice, since this can be called in many
// different contexts and we don't know how significantly the result will be filtered
if pred.Empty() {
if opts.Predicate.Empty() {
growSlice(v, len(getResp.Kvs))
} else {
growSlice(v, 2048, len(getResp.Kvs))
@ -759,7 +727,7 @@ func (s *store) GetList(ctx context.Context, key string, opts storage.ListOption
// take items from the response until the bucket is full, filtering as we go
for i, kv := range getResp.Kvs {
if paging && int64(v.Len()) >= pred.Limit {
if paging && int64(v.Len()) >= opts.Predicate.Limit {
hasMore = true
break
}
@ -770,7 +738,7 @@ func (s *store) GetList(ctx context.Context, key string, opts storage.ListOption
return storage.NewInternalErrorf("unable to transform key %q: %v", kv.Key, err)
}
if err := appendListItem(v, data, uint64(kv.ModRevision), pred, s.codec, s.versioner, newItemFunc); err != nil {
if err := appendListItem(v, data, uint64(kv.ModRevision), opts.Predicate, s.codec, s.versioner, newItemFunc); err != nil {
recordDecodeError(s.groupResourceString, string(kv.Key))
return err
}
@ -780,17 +748,12 @@ func (s *store) GetList(ctx context.Context, key string, opts storage.ListOption
getResp.Kvs[i] = nil
}
// indicate to the client which resource version was returned
if returnedRV == 0 {
returnedRV = getResp.Header.Revision
}
// no more results remain or we didn't request paging
if !hasMore || !paging {
break
}
// we're paging but we have filled our bucket
if int64(v.Len()) >= pred.Limit {
if int64(v.Len()) >= opts.Predicate.Limit {
break
}
@ -804,11 +767,8 @@ func (s *store) GetList(ctx context.Context, key string, opts storage.ListOption
*limitOption = clientv3.WithLimit(limit)
}
preparedKey = string(lastKey) + "\x00"
if withRev == 0 {
withRev = returnedRV
options = append(options, clientv3.WithRev(withRev))
}
}
if v.IsNil() {
// Ensure that we never return a nil Items pointer in the result for consistency.
v.Set(reflect.MakeSlice(v.Type(), 0, 0))
@ -818,7 +778,7 @@ func (s *store) GetList(ctx context.Context, key string, opts storage.ListOption
// we never return a key that the client wouldn't be allowed to see
if hasMore {
// we want to start immediately after the last key
next, err := storage.EncodeContinue(string(lastKey)+"\x00", keyPrefix, returnedRV)
next, err := storage.EncodeContinue(string(lastKey)+"\x00", keyPrefix, withRev)
if err != nil {
return err
}
@ -826,17 +786,15 @@ func (s *store) GetList(ctx context.Context, key string, opts storage.ListOption
// getResp.Count counts in objects that do not match the pred.
// Instead of returning inaccurate count for non-empty selectors, we return nil.
// Only set remainingItemCount if the predicate is empty.
if utilfeature.DefaultFeatureGate.Enabled(features.RemainingItemCount) {
if pred.Empty() {
c := int64(getResp.Count - pred.Limit)
remainingItemCount = &c
}
if opts.Predicate.Empty() {
c := int64(getResp.Count - opts.Predicate.Limit)
remainingItemCount = &c
}
return s.versioner.UpdateList(listObj, uint64(returnedRV), next, remainingItemCount)
return s.versioner.UpdateList(listObj, uint64(withRev), next, remainingItemCount)
}
// no continuation
return s.versioner.UpdateList(listObj, uint64(returnedRV), "", nil)
return s.versioner.UpdateList(listObj, uint64(withRev), "", nil)
}
// growSlice takes a slice value and grows its capacity up
@ -871,18 +829,7 @@ func growSlice(v reflect.Value, maxCapacity int, sizes ...int) {
}
// Watch implements storage.Interface.Watch.
// TODO(#115478): In order to graduate the WatchList feature to beta, the etcd3 implementation must/should also support it.
func (s *store) Watch(ctx context.Context, key string, opts storage.ListOptions) (watch.Interface, error) {
// it is safe to skip SendInitialEvents if the request is backward compatible
// see https://github.com/kubernetes/kubernetes/blob/267eb25e60955fe8e438c6311412e7cf7d028acb/staging/src/k8s.io/apiserver/pkg/storage/etcd3/watcher.go#L260
compatibility := opts.Predicate.AllowWatchBookmarks == false && (opts.ResourceVersion == "" || opts.ResourceVersion == "0")
if opts.SendInitialEvents != nil && !compatibility {
return nil, apierrors.NewInvalid(
schema.GroupKind{Group: s.groupResource.Group, Kind: s.groupResource.Resource},
"",
field.ErrorList{field.Forbidden(field.NewPath("sendInitialEvents"), "for watch is unsupported by an etcd cluster")},
)
}
preparedKey, err := s.prepareKey(key)
if err != nil {
return nil, err
@ -891,7 +838,7 @@ func (s *store) Watch(ctx context.Context, key string, opts storage.ListOptions)
if err != nil {
return nil, err
}
return s.watcher.Watch(s.watchContext(ctx), preparedKey, int64(rev), opts.Recursive, opts.ProgressNotify, s.transformer, opts.Predicate)
return s.watcher.Watch(s.watchContext(ctx), preparedKey, int64(rev), opts)
}
func (s *store) watchContext(ctx context.Context) context.Context {
@ -905,6 +852,18 @@ func (s *store) watchContext(ctx context.Context) context.Context {
return clientv3.WithRequireLeader(ctx)
}
func (s *store) getCurrentState(ctx context.Context, key string, v reflect.Value, ignoreNotFound bool) func() (*objState, error) {
return func() (*objState, error) {
startTime := time.Now()
getResp, err := s.client.KV.Get(ctx, key)
metrics.RecordEtcdRequest("get", s.groupResourceString, err, startTime)
if err != nil {
return nil, err
}
return s.getState(ctx, getResp, key, v, ignoreNotFound)
}
}
func (s *store) getState(ctx context.Context, getResp *clientv3.GetResponse, key string, v reflect.Value, ignoreNotFound bool) (*objState, error) {
state := &objState{
meta: &storage.ResponseMeta{},

View File

@ -18,27 +18,29 @@ package etcd3
import (
"context"
"errors"
"fmt"
"os"
"reflect"
"strconv"
"strings"
"sync"
"time"
clientv3 "go.etcd.io/etcd/client/v3"
grpccodes "google.golang.org/grpc/codes"
grpcstatus "google.golang.org/grpc/status"
apierrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/apimachinery/pkg/watch"
"k8s.io/apiserver/pkg/features"
"k8s.io/apiserver/pkg/storage"
"k8s.io/apiserver/pkg/storage/etcd3/metrics"
"k8s.io/apiserver/pkg/storage/value"
utilfeature "k8s.io/apiserver/pkg/util/feature"
utilflowcontrol "k8s.io/apiserver/pkg/util/flowcontrol"
clientv3 "go.etcd.io/etcd/client/v3"
"k8s.io/klog/v2"
)
@ -48,6 +50,9 @@ const (
outgoingBufSize = 100
)
// defaultWatcherMaxLimit is used to facilitate construction tests
var defaultWatcherMaxLimit int64 = maxLimit
// fatalOnDecodeError is used during testing to panic the server if watcher encounters a decoding error
var fatalOnDecodeError = false
@ -63,18 +68,19 @@ func TestOnlySetFatalOnDecodeError(b bool) {
}
type watcher struct {
client *clientv3.Client
codec runtime.Codec
newFunc func() runtime.Object
objectType string
groupResource schema.GroupResource
versioner storage.Versioner
client *clientv3.Client
codec runtime.Codec
newFunc func() runtime.Object
objectType string
groupResource schema.GroupResource
versioner storage.Versioner
transformer value.Transformer
getCurrentStorageRV func(context.Context) (uint64, error)
}
// watchChan implements watch.Interface.
type watchChan struct {
watcher *watcher
transformer value.Transformer
key string
initialRev int64
recursive bool
@ -87,35 +93,26 @@ type watchChan struct {
errChan chan error
}
func newWatcher(client *clientv3.Client, codec runtime.Codec, groupResource schema.GroupResource, newFunc func() runtime.Object, versioner storage.Versioner) *watcher {
res := &watcher{
client: client,
codec: codec,
groupResource: groupResource,
newFunc: newFunc,
versioner: versioner,
}
if newFunc == nil {
res.objectType = "<unknown>"
} else {
res.objectType = reflect.TypeOf(newFunc()).String()
}
return res
}
// Watch watches on a key and returns a watch.Interface that transfers relevant notifications.
// If rev is zero, it will return the existing object(s) and then start watching from
// the maximum revision+1 from returned objects.
// If rev is non-zero, it will watch events happened after given revision.
// If recursive is false, it watches on given key.
// If recursive is true, it watches any children and directories under the key, excluding the root key itself.
// pred must be non-nil. Only if pred matches the change, it will be returned.
func (w *watcher) Watch(ctx context.Context, key string, rev int64, recursive, progressNotify bool, transformer value.Transformer, pred storage.SelectionPredicate) (watch.Interface, error) {
if recursive && !strings.HasSuffix(key, "/") {
// If opts.Recursive is false, it watches on given key.
// If opts.Recursive is true, it watches any children and directories under the key, excluding the root key itself.
// pred must be non-nil. Only if opts.Predicate matches the change, it will be returned.
func (w *watcher) Watch(ctx context.Context, key string, rev int64, opts storage.ListOptions) (watch.Interface, error) {
if opts.Recursive && !strings.HasSuffix(key, "/") {
key += "/"
}
wc := w.createWatchChan(ctx, key, rev, recursive, progressNotify, transformer, pred)
go wc.run()
if opts.ProgressNotify && w.newFunc == nil {
return nil, apierrors.NewInternalError(errors.New("progressNotify for watch is unsupported by the etcd storage because no newFunc was provided"))
}
startWatchRV, err := w.getStartWatchResourceVersion(ctx, rev, opts)
if err != nil {
return nil, err
}
wc := w.createWatchChan(ctx, key, startWatchRV, opts.Recursive, opts.ProgressNotify, opts.Predicate)
go wc.run(isInitialEventsEndBookmarkRequired(opts), areInitialEventsRequired(rev, opts))
// For etcd watch we don't have an easy way to answer whether the watch
// has already caught up. So in the initial version (given that watchcache
@ -127,10 +124,9 @@ func (w *watcher) Watch(ctx context.Context, key string, rev int64, recursive, p
return wc, nil
}
func (w *watcher) createWatchChan(ctx context.Context, key string, rev int64, recursive, progressNotify bool, transformer value.Transformer, pred storage.SelectionPredicate) *watchChan {
func (w *watcher) createWatchChan(ctx context.Context, key string, rev int64, recursive, progressNotify bool, pred storage.SelectionPredicate) *watchChan {
wc := &watchChan{
watcher: w,
transformer: transformer,
key: key,
initialRev: rev,
recursive: recursive,
@ -148,6 +144,62 @@ func (w *watcher) createWatchChan(ctx context.Context, key string, rev int64, re
return wc
}
// getStartWatchResourceVersion returns a ResourceVersion
// the watch will be started from.
// Depending on the input parameters the semantics of the returned ResourceVersion are:
// - start at Exact (return resourceVersion)
// - start at Most Recent (return an RV from etcd)
func (w *watcher) getStartWatchResourceVersion(ctx context.Context, resourceVersion int64, opts storage.ListOptions) (int64, error) {
if resourceVersion > 0 {
return resourceVersion, nil
}
if !utilfeature.DefaultFeatureGate.Enabled(features.WatchList) {
return 0, nil
}
if opts.SendInitialEvents == nil || *opts.SendInitialEvents {
// note that when opts.SendInitialEvents=true
// we will be issuing a consistent LIST request
// against etcd followed by the special bookmark event
return 0, nil
}
// at this point the clients is interested
// only in getting a stream of events
// starting at the MostRecent point in time (RV)
currentStorageRV, err := w.getCurrentStorageRV(ctx)
if err != nil {
return 0, err
}
// currentStorageRV is taken from resp.Header.Revision (int64)
// and cast to uint64, so it is safe to do reverse
// at some point we should unify the interface but that
// would require changing Versioner.UpdateList
return int64(currentStorageRV), nil
}
// isInitialEventsEndBookmarkRequired since there is no way to directly set
// opts.ProgressNotify from the API and the etcd3 impl doesn't support
// notification for external clients we simply return initialEventsEndBookmarkRequired
// to only send the bookmark event after the initial list call.
//
// see: https://github.com/kubernetes/kubernetes/issues/120348
func isInitialEventsEndBookmarkRequired(opts storage.ListOptions) bool {
if !utilfeature.DefaultFeatureGate.Enabled(features.WatchList) {
return false
}
return opts.SendInitialEvents != nil && *opts.SendInitialEvents && opts.Predicate.AllowWatchBookmarks
}
// areInitialEventsRequired returns true if all events from the etcd should be returned.
func areInitialEventsRequired(resourceVersion int64, opts storage.ListOptions) bool {
if opts.SendInitialEvents == nil && resourceVersion == 0 {
return true // legacy case
}
if !utilfeature.DefaultFeatureGate.Enabled(features.WatchList) {
return false
}
return opts.SendInitialEvents != nil && *opts.SendInitialEvents
}
type etcdError interface {
Code() grpccodes.Code
Error() string
@ -173,9 +225,9 @@ func isCancelError(err error) bool {
return false
}
func (wc *watchChan) run() {
func (wc *watchChan) run(initialEventsEndBookmarkRequired, forceInitialEvents bool) {
watchClosedCh := make(chan struct{})
go wc.startWatching(watchClosedCh)
go wc.startWatching(watchClosedCh, initialEventsEndBookmarkRequired, forceInitialEvents)
var resultChanWG sync.WaitGroup
resultChanWG.Add(1)
@ -225,17 +277,58 @@ func (wc *watchChan) RequestWatchProgress() error {
func (wc *watchChan) sync() error {
opts := []clientv3.OpOption{}
if wc.recursive {
opts = append(opts, clientv3.WithPrefix())
opts = append(opts, clientv3.WithLimit(defaultWatcherMaxLimit))
rangeEnd := clientv3.GetPrefixRangeEnd(wc.key)
opts = append(opts, clientv3.WithRange(rangeEnd))
}
getResp, err := wc.watcher.client.Get(wc.ctx, wc.key, opts...)
if err != nil {
return err
var err error
var lastKey []byte
var withRev int64
var getResp *clientv3.GetResponse
metricsOp := "get"
if wc.recursive {
metricsOp = "list"
}
wc.initialRev = getResp.Header.Revision
for _, kv := range getResp.Kvs {
wc.sendEvent(parseKV(kv))
preparedKey := wc.key
for {
startTime := time.Now()
getResp, err = wc.watcher.client.KV.Get(wc.ctx, preparedKey, opts...)
metrics.RecordEtcdRequest(metricsOp, wc.watcher.groupResource.String(), err, startTime)
if err != nil {
return interpretListError(err, true, preparedKey, wc.key)
}
if len(getResp.Kvs) == 0 && getResp.More {
return fmt.Errorf("no results were found, but etcd indicated there were more values remaining")
}
// send items from the response until no more results
for i, kv := range getResp.Kvs {
lastKey = kv.Key
wc.sendEvent(parseKV(kv))
// free kv early. Long lists can take O(seconds) to decode.
getResp.Kvs[i] = nil
}
if withRev == 0 {
wc.initialRev = getResp.Header.Revision
}
// no more results remain
if !getResp.More {
return nil
}
preparedKey = string(lastKey) + "\x00"
if withRev == 0 {
withRev = getResp.Header.Revision
opts = append(opts, clientv3.WithRev(withRev))
}
}
return nil
}
func logWatchChannelErr(err error) {
@ -253,14 +346,44 @@ func logWatchChannelErr(err error) {
// startWatching does:
// - get current objects if initialRev=0; set initialRev to current rev
// - watch on given key and send events to process.
func (wc *watchChan) startWatching(watchClosedCh chan struct{}) {
if wc.initialRev == 0 {
//
// initialEventsEndBookmarkSent helps us keep track
// of whether we have sent an annotated bookmark event.
//
// it's important to note that we don't
// need to track the actual RV because
// we only send the bookmark event
// after the initial list call.
//
// when this variable is set to false,
// it means we don't have any specific
// preferences for delivering bookmark events.
func (wc *watchChan) startWatching(watchClosedCh chan struct{}, initialEventsEndBookmarkRequired, forceInitialEvents bool) {
if wc.initialRev > 0 && forceInitialEvents {
currentStorageRV, err := wc.watcher.getCurrentStorageRV(wc.ctx)
if err != nil {
wc.sendError(err)
return
}
if uint64(wc.initialRev) > currentStorageRV {
wc.sendError(storage.NewTooLargeResourceVersionError(uint64(wc.initialRev), currentStorageRV, int(wait.Jitter(1*time.Second, 3).Seconds())))
return
}
}
if forceInitialEvents {
if err := wc.sync(); err != nil {
klog.Errorf("failed to sync with latest state: %v", err)
wc.sendError(err)
return
}
}
if initialEventsEndBookmarkRequired {
wc.sendEvent(func() *event {
e := progressNotifyEvent(wc.initialRev)
e.isInitialEventsEndBookmark = true
return e
}())
}
opts := []clientv3.OpOption{clientv3.WithRev(wc.initialRev + 1), clientv3.WithPrevKV()}
if wc.recursive {
opts = append(opts, clientv3.WithPrefix())
@ -352,14 +475,17 @@ func (wc *watchChan) transform(e *event) (res *watch.Event) {
switch {
case e.isProgressNotify:
if wc.watcher.newFunc == nil {
return nil
}
object := wc.watcher.newFunc()
if err := wc.watcher.versioner.UpdateObject(object, uint64(e.rev)); err != nil {
klog.Errorf("failed to propagate object version: %v", err)
return nil
}
if e.isInitialEventsEndBookmark {
if err := storage.AnnotateInitialEventsEndBookmark(object); err != nil {
wc.sendError(fmt.Errorf("error while accessing object's metadata gr: %v, type: %v, obj: %#v, err: %v", wc.watcher.groupResource, wc.watcher.objectType, object, err))
return nil
}
}
res = &watch.Event{
Type: watch.Bookmark,
Object: object,
@ -447,7 +573,7 @@ func (wc *watchChan) prepareObjs(e *event) (curObj runtime.Object, oldObj runtim
}
if !e.isDeleted {
data, _, err := wc.transformer.TransformFromStorage(wc.ctx, e.value, authenticatedDataString(e.key))
data, _, err := wc.watcher.transformer.TransformFromStorage(wc.ctx, e.value, authenticatedDataString(e.key))
if err != nil {
return nil, nil, err
}
@ -462,7 +588,7 @@ func (wc *watchChan) prepareObjs(e *event) (curObj runtime.Object, oldObj runtim
// we need the object only to compute whether it was filtered out
// before).
if len(e.prevValue) > 0 && (e.isDeleted || !wc.acceptAll()) {
data, _, err := wc.transformer.TransformFromStorage(wc.ctx, e.prevValue, authenticatedDataString(e.key))
data, _, err := wc.watcher.transformer.TransformFromStorage(wc.ctx, e.prevValue, authenticatedDataString(e.key))
if err != nil {
return nil, nil, err
}

View File

@ -282,6 +282,19 @@ type ListOptions struct {
Recursive bool
// ProgressNotify determines whether storage-originated bookmark (progress notify) events should
// be delivered to the users. The option is ignored for non-watch requests.
//
// Firstly, note that this field is different from the Predicate.AllowWatchBookmarks field.
// Secondly, this field is intended for internal clients only such as the watch cache.
//
// This means that external clients do not have the ability to set this field directly.
// For example by setting the allowWatchBookmarks query parameter.
//
// The motivation for this approach is the fact that the frequency
// of bookmark events from a storage like etcd might be very high.
// As the number of watch requests increases, the server load would also increase.
//
// Furthermore, the server is not obligated to provide bookmark events at all,
// as described in https://github.com/kubernetes/enhancements/tree/master/keps/sig-api-machinery/956-watch-bookmark#proposal
ProgressNotify bool
// SendInitialEvents, when set together with Watch option,
// begin the watch stream with synthetic init events to build the

View File

@ -62,11 +62,6 @@ type Config struct {
Prefix string
// Transport holds all connection related info, i.e. equal TransportConfig means equal servers we talk to.
Transport TransportConfig
// Paging indicates whether the server implementation should allow paging (if it is
// supported). This is generally configured by feature gating, or by a specific
// resource type not wishing to allow paging, and is not intended for end users to
// set.
Paging bool
Codec runtime.Codec
// EncodeVersioner is the same groupVersioner used to build the
@ -115,7 +110,6 @@ func (config *Config) ForResource(resource schema.GroupResource) *ConfigForResou
func NewDefaultConfig(prefix string, codec runtime.Codec) *Config {
return &Config{
Paging: true,
Prefix: prefix,
Codec: codec,
CompactionInterval: DefaultCompactInterval,

View File

@ -419,7 +419,7 @@ func startCompactorOnce(c storagebackend.TransportConfig, interval time.Duration
}, nil
}
func newETCD3Storage(c storagebackend.ConfigForResource, newFunc func() runtime.Object) (storage.Interface, DestroyFunc, error) {
func newETCD3Storage(c storagebackend.ConfigForResource, newFunc, newListFunc func() runtime.Object, resourcePrefix string) (storage.Interface, DestroyFunc, error) {
stopCompactor, err := startCompactorOnce(c.Transport, c.CompactionInterval)
if err != nil {
return nil, nil, err
@ -454,7 +454,7 @@ func newETCD3Storage(c storagebackend.ConfigForResource, newFunc func() runtime.
if transformer == nil {
transformer = identity.NewEncryptCheckTransformer()
}
return etcd3.New(client, c.Codec, newFunc, c.Prefix, c.GroupResource, transformer, c.Paging, c.LeaseManagerConfig), destroyFunc, nil
return etcd3.New(client, c.Codec, newFunc, newListFunc, c.Prefix, resourcePrefix, c.GroupResource, transformer, c.LeaseManagerConfig), destroyFunc, nil
}
// startDBSizeMonitorPerEndpoint starts a loop to monitor etcd database size and update the

View File

@ -30,12 +30,12 @@ import (
type DestroyFunc func()
// Create creates a storage backend based on given config.
func Create(c storagebackend.ConfigForResource, newFunc func() runtime.Object) (storage.Interface, DestroyFunc, error) {
func Create(c storagebackend.ConfigForResource, newFunc, newListFunc func() runtime.Object, resourcePrefix string) (storage.Interface, DestroyFunc, error) {
switch c.Type {
case storagebackend.StorageTypeETCD2:
return nil, nil, fmt.Errorf("%s is no longer a supported storage backend", c.Type)
case storagebackend.StorageTypeUnset, storagebackend.StorageTypeETCD3:
return newETCD3Storage(c, newFunc)
return newETCD3Storage(c, newFunc, newListFunc, resourcePrefix)
default:
return nil, nil, fmt.Errorf("unknown storage type: %s", c.Type)
}

View File

@ -17,14 +17,25 @@ limitations under the License.
package storage
import (
"context"
"fmt"
"strconv"
"sync/atomic"
"k8s.io/apimachinery/pkg/api/meta"
"k8s.io/apimachinery/pkg/api/validation/path"
"k8s.io/apimachinery/pkg/fields"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime"
)
const (
// initialEventsAnnotationKey the name of the key
// under which an annotation marking the end of list stream
// is kept.
initialEventsAnnotationKey = "k8s.io/initial-events-end"
)
type SimpleUpdateFunc func(runtime.Object) (runtime.Object, error)
// SimpleUpdateFunc converts SimpleUpdateFunc into UpdateFunc
@ -79,3 +90,72 @@ func (hwm *HighWaterMark) Update(current int64) bool {
}
}
}
// GetCurrentResourceVersionFromStorage gets the current resource version from the underlying storage engine.
// This method issues an empty list request and reads only the ResourceVersion from the object metadata
func GetCurrentResourceVersionFromStorage(ctx context.Context, storage Interface, newListFunc func() runtime.Object, resourcePrefix, objectType string) (uint64, error) {
if storage == nil {
return 0, fmt.Errorf("storage wasn't provided for %s", objectType)
}
if newListFunc == nil {
return 0, fmt.Errorf("newListFunction wasn't provided for %s", objectType)
}
emptyList := newListFunc()
pred := SelectionPredicate{
Label: labels.Everything(),
Field: fields.Everything(),
Limit: 1, // just in case we actually hit something
}
err := storage.GetList(ctx, resourcePrefix, ListOptions{Predicate: pred}, emptyList)
if err != nil {
return 0, err
}
emptyListAccessor, err := meta.ListAccessor(emptyList)
if err != nil {
return 0, err
}
if emptyListAccessor == nil {
return 0, fmt.Errorf("unable to extract a list accessor from %T", emptyList)
}
currentResourceVersion, err := strconv.Atoi(emptyListAccessor.GetResourceVersion())
if err != nil {
return 0, err
}
if currentResourceVersion == 0 {
return 0, fmt.Errorf("the current resource version must be greater than 0")
}
return uint64(currentResourceVersion), nil
}
// AnnotateInitialEventsEndBookmark adds a special annotation to the given object
// which indicates that the initial events have been sent.
//
// Note that this function assumes that the obj's annotation
// field is a reference type (i.e. a map).
func AnnotateInitialEventsEndBookmark(obj runtime.Object) error {
objMeta, err := meta.Accessor(obj)
if err != nil {
return err
}
objAnnotations := objMeta.GetAnnotations()
if objAnnotations == nil {
objAnnotations = map[string]string{}
}
objAnnotations[initialEventsAnnotationKey] = "true"
objMeta.SetAnnotations(objAnnotations)
return nil
}
// HasInitialEventsEndBookmarkAnnotation checks the presence of the
// special annotation which marks that the initial events have been sent.
func HasInitialEventsEndBookmarkAnnotation(obj runtime.Object) (bool, error) {
objMeta, err := meta.Accessor(obj)
if err != nil {
return false, err
}
objAnnotations := objMeta.GetAnnotations()
return objAnnotations[initialEventsAnnotationKey] == "true", nil
}

View File

@ -26,6 +26,7 @@ import (
utilcache "k8s.io/apimachinery/pkg/util/cache"
"k8s.io/apiserver/pkg/storage/value"
"k8s.io/apiserver/pkg/storage/value/encrypt/envelope/metrics"
"k8s.io/utils/clock"
)
@ -38,10 +39,13 @@ type simpleCache struct {
ttl time.Duration
// hashPool is a per cache pool of hash.Hash (to avoid allocations from building the Hash)
// SHA-256 is used to prevent collisions
hashPool *sync.Pool
hashPool *sync.Pool
providerName string
mu sync.Mutex // guards call to set
recordCacheSize func(providerName string, size int) // for unit tests
}
func newSimpleCache(clock clock.Clock, ttl time.Duration) *simpleCache {
func newSimpleCache(clock clock.Clock, ttl time.Duration, providerName string) *simpleCache {
cache := utilcache.NewExpiringWithClock(clock)
cache.AllowExpiredGet = true // for a given key, the value (the decryptTransformer) is always the same
return &simpleCache{
@ -52,6 +56,8 @@ func newSimpleCache(clock clock.Clock, ttl time.Duration) *simpleCache {
return sha256.New()
},
},
providerName: providerName,
recordCacheSize: metrics.RecordDekSourceCacheSize,
}
}
@ -66,6 +72,8 @@ func (c *simpleCache) get(key []byte) value.Read {
// set caches the record for the key
func (c *simpleCache) set(key []byte, transformer value.Read) {
c.mu.Lock()
defer c.mu.Unlock()
if len(key) == 0 {
panic("key must not be empty")
}
@ -73,6 +81,8 @@ func (c *simpleCache) set(key []byte, transformer value.Read) {
panic("transformer must not be nil")
}
c.cache.Set(c.keyFunc(key), transformer, c.ttl)
// Add metrics for cache size
c.recordCacheSize(c.providerName, c.cache.Len())
}
// keyFunc generates a string key by hashing the inputs.

View File

@ -28,6 +28,7 @@ import (
"unsafe"
"github.com/gogo/protobuf/proto"
"go.opentelemetry.io/otel/attribute"
"golang.org/x/crypto/cryptobyte"
utilerrors "k8s.io/apimachinery/pkg/util/errors"
@ -39,21 +40,22 @@ import (
aestransformer "k8s.io/apiserver/pkg/storage/value/encrypt/aes"
kmstypes "k8s.io/apiserver/pkg/storage/value/encrypt/envelope/kmsv2/v2"
"k8s.io/apiserver/pkg/storage/value/encrypt/envelope/metrics"
"k8s.io/component-base/tracing"
"k8s.io/klog/v2"
kmsservice "k8s.io/kms/pkg/service"
"k8s.io/utils/clock"
)
// TODO integration test with old AES GCM data recorded and new KDF data recorded
func init() {
value.RegisterMetrics()
metrics.RegisterMetrics()
}
const (
// KMSAPIVersion is the version of the KMS API.
KMSAPIVersion = "v2beta1"
// KMSAPIVersionv2 is a version of the KMS API.
KMSAPIVersionv2 = "v2"
// KMSAPIVersionv2beta1 is a version of the KMS API.
KMSAPIVersionv2beta1 = "v2beta1"
// annotationsMaxSize is the maximum size of the annotations.
annotationsMaxSize = 32 * 1024 // 32 kB
// KeyIDMaxSize is the maximum size of the keyID.
@ -112,32 +114,51 @@ type envelopeTransformer struct {
stateFunc StateFunc
// cache is a thread-safe expiring lru cache which caches decrypted DEKs indexed by their encrypted form.
cache *simpleCache
cache *simpleCache
apiServerID string
}
// NewEnvelopeTransformer returns a transformer which implements a KEK-DEK based envelope encryption scheme.
// It uses envelopeService to encrypt and decrypt DEKs. Respective DEKs (in encrypted form) are prepended to
// the data items they encrypt.
func NewEnvelopeTransformer(envelopeService kmsservice.Service, providerName string, stateFunc StateFunc) value.Transformer {
return newEnvelopeTransformerWithClock(envelopeService, providerName, stateFunc, cacheTTL, clock.RealClock{})
func NewEnvelopeTransformer(envelopeService kmsservice.Service, providerName string, stateFunc StateFunc, apiServerID string) value.Transformer {
return newEnvelopeTransformerWithClock(envelopeService, providerName, stateFunc, apiServerID, cacheTTL, clock.RealClock{})
}
func newEnvelopeTransformerWithClock(envelopeService kmsservice.Service, providerName string, stateFunc StateFunc, cacheTTL time.Duration, clock clock.Clock) value.Transformer {
func newEnvelopeTransformerWithClock(envelopeService kmsservice.Service, providerName string, stateFunc StateFunc, apiServerID string, cacheTTL time.Duration, clock clock.Clock) value.Transformer {
return &envelopeTransformer{
envelopeService: envelopeService,
providerName: providerName,
stateFunc: stateFunc,
cache: newSimpleCache(clock, cacheTTL),
cache: newSimpleCache(clock, cacheTTL, providerName),
apiServerID: apiServerID,
}
}
// TransformFromStorage decrypts data encrypted by this transformer using envelope encryption.
func (t *envelopeTransformer) TransformFromStorage(ctx context.Context, data []byte, dataCtx value.Context) ([]byte, bool, error) {
ctx, span := tracing.Start(ctx, "TransformFromStorage with envelopeTransformer",
attribute.String("transformer.provider.name", t.providerName),
// The service.instance_id of the apiserver is already available in the trace
/*
{
"key": "service.instance.id",
"type": "string",
"value": "apiserver-zsteyir5lyrtdcmqqmd5kzze6m"
}
*/
)
defer span.End(500 * time.Millisecond)
span.AddEvent("About to decode encrypted object")
// Deserialize the EncryptedObject from the data.
encryptedObject, err := t.doDecode(data)
if err != nil {
span.AddEvent("Decoding encrypted object failed")
span.RecordError(err)
return nil, false, err
}
span.AddEvent("Decoded encrypted object")
useSeed := encryptedObject.EncryptedDEKSourceType == kmstypes.EncryptedDEKSourceType_HKDF_SHA256_XNONCE_AES_GCM_SEED
@ -158,6 +179,7 @@ func (t *envelopeTransformer) TransformFromStorage(ctx context.Context, data []b
// fallback to the envelope service if we do not have the transformer locally
if transformer == nil {
span.AddEvent("About to decrypt DEK using remote service")
value.RecordCacheMiss()
requestInfo := getRequestInfoFromContext(ctx)
@ -172,21 +194,28 @@ func (t *envelopeTransformer) TransformFromStorage(ctx context.Context, data []b
Annotations: encryptedObject.Annotations,
})
if err != nil {
span.AddEvent("DEK decryption failed")
span.RecordError(err)
return nil, false, fmt.Errorf("failed to decrypt DEK, error: %w", err)
}
span.AddEvent("DEK decryption succeeded")
transformer, err = t.addTransformerForDecryption(encryptedObjectCacheKey, key, useSeed)
if err != nil {
return nil, false, err
}
}
metrics.RecordKeyID(metrics.FromStorageLabel, t.providerName, encryptedObject.KeyID)
metrics.RecordKeyID(metrics.FromStorageLabel, t.providerName, encryptedObject.KeyID, t.apiServerID)
span.AddEvent("About to decrypt data using DEK")
out, stale, err := transformer.TransformFromStorage(ctx, encryptedObject.EncryptedData, dataCtx)
if err != nil {
span.AddEvent("Data decryption failed")
span.RecordError(err)
return nil, false, err
}
span.AddEvent("Data decryption succeeded")
// data is considered stale if the key ID does not match our current write transformer
return out,
stale ||
@ -197,6 +226,19 @@ func (t *envelopeTransformer) TransformFromStorage(ctx context.Context, data []b
// TransformToStorage encrypts data to be written to disk using envelope encryption.
func (t *envelopeTransformer) TransformToStorage(ctx context.Context, data []byte, dataCtx value.Context) ([]byte, error) {
ctx, span := tracing.Start(ctx, "TransformToStorage with envelopeTransformer",
attribute.String("transformer.provider.name", t.providerName),
// The service.instance_id of the apiserver is already available in the trace
/*
{
"key": "service.instance.id",
"type": "string",
"value": "apiserver-zsteyir5lyrtdcmqqmd5kzze6m"
}
*/
)
defer span.End(500 * time.Millisecond)
state, err := t.stateFunc()
if err != nil {
return nil, err
@ -208,7 +250,6 @@ func (t *envelopeTransformer) TransformToStorage(ctx context.Context, data []byt
// this prevents a cache miss every time the DEK rotates
// this has the side benefit of causing the cache to perform a GC
// TODO see if we can do this inside the stateFunc control loop
// TODO(aramase): Add metrics for cache size.
t.cache.set(state.CacheKey, state.Transformer)
requestInfo := getRequestInfoFromContext(ctx)
@ -216,18 +257,31 @@ func (t *envelopeTransformer) TransformToStorage(ctx context.Context, data []byt
"group", requestInfo.APIGroup, "version", requestInfo.APIVersion, "resource", requestInfo.Resource, "subresource", requestInfo.Subresource,
"verb", requestInfo.Verb, "namespace", requestInfo.Namespace, "name", requestInfo.Name)
span.AddEvent("About to encrypt data using DEK")
result, err := state.Transformer.TransformToStorage(ctx, data, dataCtx)
if err != nil {
span.AddEvent("Data encryption failed")
span.RecordError(err)
return nil, err
}
span.AddEvent("Data encryption succeeded")
metrics.RecordKeyID(metrics.ToStorageLabel, t.providerName, state.EncryptedObject.KeyID)
metrics.RecordKeyID(metrics.ToStorageLabel, t.providerName, state.EncryptedObject.KeyID, t.apiServerID)
encObjectCopy := state.EncryptedObject
encObjectCopy.EncryptedData = result
span.AddEvent("About to encode encrypted object")
// Serialize the EncryptedObject to a byte array.
return t.doEncode(&encObjectCopy)
out, err := t.doEncode(&encObjectCopy)
if err != nil {
span.AddEvent("Encoding encrypted object failed")
span.RecordError(err)
return nil, err
}
span.AddEvent("Encoded encrypted object")
return out, nil
}
// addTransformerForDecryption inserts a new transformer to the Envelope cache of DEKs for future reads.
@ -250,7 +304,6 @@ func (t *envelopeTransformer) addTransformerForDecryption(cacheKey []byte, key [
if err != nil {
return nil, err
}
// TODO(aramase): Add metrics for cache size.
t.cache.set(cacheKey, transformer)
return transformer, nil
}

View File

@ -71,11 +71,20 @@ type EncryptedObject struct {
// EncryptedData is the encrypted data.
EncryptedData []byte `protobuf:"bytes,1,opt,name=encryptedData,proto3" json:"encryptedData,omitempty"`
// KeyID is the KMS key ID used for encryption operations.
// keyID must satisfy the following constraints:
// 1. The keyID is not empty.
// 2. The size of keyID is less than 1 kB.
KeyID string `protobuf:"bytes,2,opt,name=keyID,proto3" json:"keyID,omitempty"`
// EncryptedDEKSource is the ciphertext of the source of the DEK used to encrypt the data stored in encryptedData.
// encryptedDEKSourceType defines the process of using the plaintext of this field to determine the aforementioned DEK.
// encryptedDEKSource must satisfy the following constraints:
// 1. The encrypted DEK source is not empty.
// 2. The size of encrypted DEK source is less than 1 kB.
EncryptedDEKSource []byte `protobuf:"bytes,3,opt,name=encryptedDEKSource,proto3" json:"encryptedDEKSource,omitempty"`
// Annotations is additional metadata that was provided by the KMS plugin.
// Annotations must satisfy the following constraints:
// 1. Annotation key must be a fully qualified domain name that conforms to the definition in DNS (RFC 1123).
// 2. The size of annotations keys + values is less than 32 kB.
Annotations map[string][]byte `protobuf:"bytes,4,rep,name=annotations,proto3" json:"annotations,omitempty" protobuf_key:"bytes,1,opt,name=key,proto3" protobuf_val:"bytes,2,opt,name=value,proto3"`
// encryptedDEKSourceType defines the process of using the plaintext of encryptedDEKSource to determine the DEK.
EncryptedDEKSourceType EncryptedDEKSourceType `protobuf:"varint,5,opt,name=encryptedDEKSourceType,proto3,enum=v2.EncryptedDEKSourceType" json:"encryptedDEKSourceType,omitempty"`

View File

@ -26,13 +26,22 @@ message EncryptedObject {
bytes encryptedData = 1;
// KeyID is the KMS key ID used for encryption operations.
// keyID must satisfy the following constraints:
// 1. The keyID is not empty.
// 2. The size of keyID is less than 1 kB.
string keyID = 2;
// EncryptedDEKSource is the ciphertext of the source of the DEK used to encrypt the data stored in encryptedData.
// encryptedDEKSourceType defines the process of using the plaintext of this field to determine the aforementioned DEK.
// encryptedDEKSource must satisfy the following constraints:
// 1. The encrypted DEK source is not empty.
// 2. The size of encrypted DEK source is less than 1 kB.
bytes encryptedDEKSource = 3;
// Annotations is additional metadata that was provided by the KMS plugin.
// Annotations must satisfy the following constraints:
// 1. Annotation key must be a fully qualified domain name that conforms to the definition in DNS (RFC 1123).
// 2. The size of annotations keys + values is less than 32 kB.
map<string, bytes> annotations = 4;
// encryptedDEKSourceType defines the process of using the plaintext of encryptedDEKSource to determine the DEK.

View File

@ -44,6 +44,7 @@ type metricLabels struct {
transformationType string
providerName string
keyIDHash string
apiServerIDHash string
}
/*
@ -107,21 +108,21 @@ var (
// keyIDHashTotal is the number of times a keyID is used
// e.g. apiserver_envelope_encryption_key_id_hash_total counter
// apiserver_envelope_encryption_key_id_hash_total{key_id_hash="sha256",
// apiserver_envelope_encryption_key_id_hash_total{apiserver_id_hash="sha256",key_id_hash="sha256",
// provider_name="providerName",transformation_type="from_storage"} 1
KeyIDHashTotal = metrics.NewCounterVec(
&metrics.CounterOpts{
Namespace: namespace,
Subsystem: subsystem,
Name: "key_id_hash_total",
Help: "Number of times a keyID is used split by transformation type and provider.",
Help: "Number of times a keyID is used split by transformation type, provider, and apiserver identity.",
StabilityLevel: metrics.ALPHA,
},
[]string{"transformation_type", "provider_name", "key_id_hash"},
[]string{"transformation_type", "provider_name", "key_id_hash", "apiserver_id_hash"},
)
// keyIDHashLastTimestampSeconds is the last time in seconds when a keyID was used
// e.g. apiserver_envelope_encryption_key_id_hash_last_timestamp_seconds{key_id_hash="sha256", provider_name="providerName",transformation_type="from_storage"} 1.674865558833728e+09
// e.g. apiserver_envelope_encryption_key_id_hash_last_timestamp_seconds{apiserver_id_hash="sha256",key_id_hash="sha256", provider_name="providerName",transformation_type="from_storage"} 1.674865558833728e+09
KeyIDHashLastTimestampSeconds = metrics.NewGaugeVec(
&metrics.GaugeOpts{
Namespace: namespace,
@ -130,11 +131,11 @@ var (
Help: "The last time in seconds when a keyID was used.",
StabilityLevel: metrics.ALPHA,
},
[]string{"transformation_type", "provider_name", "key_id_hash"},
[]string{"transformation_type", "provider_name", "key_id_hash", "apiserver_id_hash"},
)
// keyIDHashStatusLastTimestampSeconds is the last time in seconds when a keyID was returned by the Status RPC call.
// e.g. apiserver_envelope_encryption_key_id_hash_status_last_timestamp_seconds{key_id_hash="sha256", provider_name="providerName"} 1.674865558833728e+09
// e.g. apiserver_envelope_encryption_key_id_hash_status_last_timestamp_seconds{apiserver_id_hash="sha256",key_id_hash="sha256", provider_name="providerName"} 1.674865558833728e+09
KeyIDHashStatusLastTimestampSeconds = metrics.NewGaugeVec(
&metrics.GaugeOpts{
Namespace: namespace,
@ -143,7 +144,7 @@ var (
Help: "The last time in seconds when a keyID was returned by the Status RPC call.",
StabilityLevel: metrics.ALPHA,
},
[]string{"provider_name", "key_id_hash"},
[]string{"provider_name", "key_id_hash", "apiserver_id_hash"},
)
InvalidKeyIDFromStatusTotal = metrics.NewCounterVec(
@ -156,6 +157,17 @@ var (
},
[]string{"provider_name", "error"},
)
DekSourceCacheSize = metrics.NewGaugeVec(
&metrics.GaugeOpts{
Namespace: namespace,
Subsystem: subsystem,
Name: "dek_source_cache_size",
Help: "Number of records in data encryption key (DEK) source cache. On a restart, this value is an approximation of the number of decrypt RPC calls the server will make to the KMS plugin.",
StabilityLevel: metrics.ALPHA,
},
[]string{"provider_name"},
)
)
var registerMetricsFunc sync.Once
@ -171,19 +183,19 @@ func registerLRUMetrics() {
keyIDHashTotalMetricLabels = lru.NewWithEvictionFunc(cacheSize, func(key lru.Key, _ interface{}) {
item := key.(metricLabels)
if deleted := KeyIDHashTotal.DeleteLabelValues(item.transformationType, item.providerName, item.keyIDHash); deleted {
if deleted := KeyIDHashTotal.DeleteLabelValues(item.transformationType, item.providerName, item.keyIDHash, item.apiServerIDHash); deleted {
klog.InfoS("Deleted keyIDHashTotalMetricLabels", "transformationType", item.transformationType,
"providerName", item.providerName, "keyIDHash", item.keyIDHash)
"providerName", item.providerName, "keyIDHash", item.keyIDHash, "apiServerIDHash", item.apiServerIDHash)
}
if deleted := KeyIDHashLastTimestampSeconds.DeleteLabelValues(item.transformationType, item.providerName, item.keyIDHash); deleted {
if deleted := KeyIDHashLastTimestampSeconds.DeleteLabelValues(item.transformationType, item.providerName, item.keyIDHash, item.apiServerIDHash); deleted {
klog.InfoS("Deleted keyIDHashLastTimestampSecondsMetricLabels", "transformationType", item.transformationType,
"providerName", item.providerName, "keyIDHash", item.keyIDHash)
"providerName", item.providerName, "keyIDHash", item.keyIDHash, "apiServerIDHash", item.apiServerIDHash)
}
})
keyIDHashStatusLastTimestampSecondsMetricLabels = lru.NewWithEvictionFunc(cacheSize, func(key lru.Key, _ interface{}) {
item := key.(metricLabels)
if deleted := KeyIDHashStatusLastTimestampSeconds.DeleteLabelValues(item.providerName, item.keyIDHash); deleted {
klog.InfoS("Deleted keyIDHashStatusLastTimestampSecondsMetricLabels", "providerName", item.providerName, "keyIDHash", item.keyIDHash)
if deleted := KeyIDHashStatusLastTimestampSeconds.DeleteLabelValues(item.providerName, item.keyIDHash, item.apiServerIDHash); deleted {
klog.InfoS("Deleted keyIDHashStatusLastTimestampSecondsMetricLabels", "providerName", item.providerName, "keyIDHash", item.keyIDHash, "apiServerIDHash", item.apiServerIDHash)
}
})
}
@ -197,6 +209,7 @@ func RegisterMetrics() {
}
legacyregistry.MustRegister(dekCacheFillPercent)
legacyregistry.MustRegister(dekCacheInterArrivals)
legacyregistry.MustRegister(DekSourceCacheSize)
legacyregistry.MustRegister(KeyIDHashTotal)
legacyregistry.MustRegister(KeyIDHashLastTimestampSeconds)
legacyregistry.MustRegister(KeyIDHashStatusLastTimestampSeconds)
@ -206,22 +219,22 @@ func RegisterMetrics() {
}
// RecordKeyID records total count and last time in seconds when a KeyID was used for TransformFromStorage and TransformToStorage operations
func RecordKeyID(transformationType, providerName, keyID string) {
func RecordKeyID(transformationType, providerName, keyID, apiServerID string) {
lockRecordKeyID.Lock()
defer lockRecordKeyID.Unlock()
keyIDHash := addLabelToCache(keyIDHashTotalMetricLabels, transformationType, providerName, keyID)
KeyIDHashTotal.WithLabelValues(transformationType, providerName, keyIDHash).Inc()
KeyIDHashLastTimestampSeconds.WithLabelValues(transformationType, providerName, keyIDHash).SetToCurrentTime()
keyIDHash, apiServerIDHash := addLabelToCache(keyIDHashTotalMetricLabels, transformationType, providerName, keyID, apiServerID)
KeyIDHashTotal.WithLabelValues(transformationType, providerName, keyIDHash, apiServerIDHash).Inc()
KeyIDHashLastTimestampSeconds.WithLabelValues(transformationType, providerName, keyIDHash, apiServerIDHash).SetToCurrentTime()
}
// RecordKeyIDFromStatus records last time in seconds when a KeyID was returned by the Status RPC call.
func RecordKeyIDFromStatus(providerName, keyID string) {
func RecordKeyIDFromStatus(providerName, keyID, apiServerID string) {
lockRecordKeyIDStatus.Lock()
defer lockRecordKeyIDStatus.Unlock()
keyIDHash := addLabelToCache(keyIDHashStatusLastTimestampSecondsMetricLabels, "", providerName, keyID)
KeyIDHashStatusLastTimestampSeconds.WithLabelValues(providerName, keyIDHash).SetToCurrentTime()
keyIDHash, apiServerIDHash := addLabelToCache(keyIDHashStatusLastTimestampSecondsMetricLabels, "", providerName, keyID, apiServerID)
KeyIDHashStatusLastTimestampSeconds.WithLabelValues(providerName, keyIDHash, apiServerIDHash).SetToCurrentTime()
}
func RecordInvalidKeyIDFromStatus(providerName, errCode string) {
@ -255,6 +268,10 @@ func RecordDekCacheFillPercent(percent float64) {
dekCacheFillPercent.Set(percent)
}
func RecordDekSourceCacheSize(providerName string, size int) {
DekSourceCacheSize.WithLabelValues(providerName).Set(float64(size))
}
// RecordKMSOperationLatency records the latency of KMS operation.
func RecordKMSOperationLatency(providerName, methodName string, duration time.Duration, err error) {
KMSOperationsLatencyMetric.WithLabelValues(providerName, methodName, getErrorCode(err)).Observe(duration.Seconds())
@ -281,24 +298,25 @@ func getErrorCode(err error) string {
}
func getHash(data string) string {
if len(data) == 0 {
return ""
}
h := hashPool.Get().(hash.Hash)
h.Reset()
h.Write([]byte(data))
result := fmt.Sprintf("sha256:%x", h.Sum(nil))
dataHash := fmt.Sprintf("sha256:%x", h.Sum(nil))
hashPool.Put(h)
return result
return dataHash
}
func addLabelToCache(c *lru.Cache, transformationType, providerName, keyID string) string {
keyIDHash := ""
// only get hash if the keyID is not empty
if len(keyID) > 0 {
keyIDHash = getHash(keyID)
}
func addLabelToCache(c *lru.Cache, transformationType, providerName, keyID, apiServerID string) (string, string) {
keyIDHash := getHash(keyID)
apiServerIDHash := getHash(apiServerID)
c.Add(metricLabels{
transformationType: transformationType,
providerName: providerName,
keyIDHash: keyIDHash,
apiServerIDHash: apiServerIDHash,
}, nil) // value is irrelevant, this is a set and not a map
return keyIDHash
return keyIDHash, apiServerIDHash
}