rebase: update all k8s packages to 0.27.2

Signed-off-by: Niels de Vos <ndevos@ibm.com>
This commit is contained in:
Niels de Vos
2023-06-01 18:58:10 +02:00
committed by mergify[bot]
parent 07b05616a0
commit 2551a0b05f
618 changed files with 42944 additions and 16168 deletions

View File

@ -0,0 +1,543 @@
/*
Copyright 2023 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package cacher
import (
"context"
"fmt"
"sync"
"time"
"k8s.io/apimachinery/pkg/api/meta"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/watch"
"k8s.io/apiserver/pkg/storage"
"k8s.io/apiserver/pkg/storage/cacher/metrics"
utilflowcontrol "k8s.io/apiserver/pkg/util/flowcontrol"
"k8s.io/klog/v2"
)
// possible states of the cache watcher
const (
// cacheWatcherWaitingForBookmark indicates the cacher
// is waiting for a bookmark event with a specific RV set
cacheWatcherWaitingForBookmark = iota
// cacheWatcherBookmarkReceived indicates that the cacher
// has received a bookmark event with required RV
cacheWatcherBookmarkReceived
// cacheWatcherBookmarkSent indicates that the cacher
// has already sent a bookmark event to a client
cacheWatcherBookmarkSent
)
// cacheWatcher implements watch.Interface
// this is not thread-safe
type cacheWatcher struct {
input chan *watchCacheEvent
result chan watch.Event
done chan struct{}
filter filterWithAttrsFunc
stopped bool
forget func(bool)
versioner storage.Versioner
// The watcher will be closed by server after the deadline,
// save it here to send bookmark events before that.
deadline time.Time
allowWatchBookmarks bool
groupResource schema.GroupResource
// human readable identifier that helps assigning cacheWatcher
// instance with request
identifier string
// drainInputBuffer indicates whether we should delay closing this watcher
// and send all event in the input buffer.
drainInputBuffer bool
// bookmarkAfterResourceVersion holds an RV that indicates
// when we should start delivering bookmark events.
// If this field holds the value of 0 that means
// we don't have any special preferences toward delivering bookmark events.
// Note that this field is used in conjunction with the state field.
// It should not be changed once the watcher has been started.
bookmarkAfterResourceVersion uint64
// stateMutex protects state
stateMutex sync.Mutex
// state holds a numeric value indicating the current state of the watcher
state int
}
func newCacheWatcher(
chanSize int,
filter filterWithAttrsFunc,
forget func(bool),
versioner storage.Versioner,
deadline time.Time,
allowWatchBookmarks bool,
groupResource schema.GroupResource,
identifier string,
) *cacheWatcher {
return &cacheWatcher{
input: make(chan *watchCacheEvent, chanSize),
result: make(chan watch.Event, chanSize),
done: make(chan struct{}),
filter: filter,
stopped: false,
forget: forget,
versioner: versioner,
deadline: deadline,
allowWatchBookmarks: allowWatchBookmarks,
groupResource: groupResource,
identifier: identifier,
}
}
// Implements watch.Interface.
func (c *cacheWatcher) ResultChan() <-chan watch.Event {
return c.result
}
// Implements watch.Interface.
func (c *cacheWatcher) Stop() {
c.forget(false)
}
// we rely on the fact that stopLocked is actually protected by Cacher.Lock()
func (c *cacheWatcher) stopLocked() {
if !c.stopped {
c.stopped = true
// stop without draining the input channel was requested.
if !c.drainInputBuffer {
close(c.done)
}
close(c.input)
}
// Even if the watcher was already stopped, if it previously was
// using draining mode and it's not using it now we need to
// close the done channel now. Otherwise we could leak the
// processing goroutine if it will be trying to put more objects
// into result channel, the channel will be full and there will
// already be noone on the processing the events on the receiving end.
if !c.drainInputBuffer && !c.isDoneChannelClosedLocked() {
close(c.done)
}
}
func (c *cacheWatcher) nonblockingAdd(event *watchCacheEvent) bool {
// if the bookmarkAfterResourceVersion hasn't been seen
// we will try to deliver a bookmark event every second.
// the following check will discard a bookmark event
// if it is < than the bookmarkAfterResourceVersion
// so that we don't pollute the input channel
if event.Type == watch.Bookmark && event.ResourceVersion < c.bookmarkAfterResourceVersion {
return false
}
select {
case c.input <- event:
c.markBookmarkAfterRvAsReceived(event)
return true
default:
return false
}
}
// Nil timer means that add will not block (if it can't send event immediately, it will break the watcher)
//
// Note that bookmark events are never added via the add method only via the nonblockingAdd.
// Changing this behaviour will require moving the markBookmarkAfterRvAsReceived method
func (c *cacheWatcher) add(event *watchCacheEvent, timer *time.Timer) bool {
// Try to send the event immediately, without blocking.
if c.nonblockingAdd(event) {
return true
}
closeFunc := func() {
// This means that we couldn't send event to that watcher.
// Since we don't want to block on it infinitely,
// we simply terminate it.
klog.V(1).Infof("Forcing %v watcher close due to unresponsiveness: %v. len(c.input) = %v, len(c.result) = %v", c.groupResource.String(), c.identifier, len(c.input), len(c.result))
metrics.TerminatedWatchersCounter.WithLabelValues(c.groupResource.String()).Inc()
// This means that we couldn't send event to that watcher.
// Since we don't want to block on it infinitely, we simply terminate it.
// we are graceful = false, when:
//
// (a) The bookmarkAfterResourceVersionReceived hasn't been received,
// we can safely terminate the watcher. Because the client is waiting
// for this specific bookmark, and we even haven't received one.
// (b) We have seen the bookmarkAfterResourceVersion, and it was sent already to the client.
// We can simply terminate the watcher.
// we are graceful = true, when:
//
// (a) We have seen a bookmark, but it hasn't been sent to the client yet.
// That means we should drain the input buffer which contains
// the bookmarkAfterResourceVersion we want. We do that to make progress
// as clients can re-establish a new watch with the given RV and receive
// further notifications.
graceful := func() bool {
c.stateMutex.Lock()
defer c.stateMutex.Unlock()
return c.state == cacheWatcherBookmarkReceived
}()
klog.V(1).Infof("Forcing %v watcher close due to unresponsiveness: %v. len(c.input) = %v, len(c.result) = %v, graceful = %v", c.groupResource.String(), c.identifier, len(c.input), len(c.result), graceful)
c.forget(graceful)
}
if timer == nil {
closeFunc()
return false
}
// OK, block sending, but only until timer fires.
select {
case c.input <- event:
return true
case <-timer.C:
closeFunc()
return false
}
}
func (c *cacheWatcher) nextBookmarkTime(now time.Time, bookmarkFrequency time.Duration) (time.Time, bool) {
// We try to send bookmarks:
//
// (a) right before the watcher timeout - for now we simply set it 2s before
// the deadline
//
// (b) roughly every minute
//
// (c) immediately when the bookmarkAfterResourceVersion wasn't confirmed
// in this scenario the client have already seen (or is in the process of sending)
// all initial data and is interested in seeing
// a specific RV value (aka. the bookmarkAfterResourceVersion)
// since we don't know when the cacher will see the RV we increase frequency
//
// (b) gives us periodicity if the watch breaks due to unexpected
// conditions, (a) ensures that on timeout the watcher is as close to
// now as possible - this covers 99% of cases.
if !c.wasBookmarkAfterRvReceived() {
return time.Time{}, true // schedule immediately
}
heartbeatTime := now.Add(bookmarkFrequency)
if c.deadline.IsZero() {
// Timeout is set by our client libraries (e.g. reflector) as well as defaulted by
// apiserver if properly configured. So this shoudln't happen in practice.
return heartbeatTime, true
}
if pretimeoutTime := c.deadline.Add(-2 * time.Second); pretimeoutTime.Before(heartbeatTime) {
heartbeatTime = pretimeoutTime
}
if heartbeatTime.Before(now) {
return time.Time{}, false
}
return heartbeatTime, true
}
// wasBookmarkAfterRvReceived same as wasBookmarkAfterRvReceivedLocked just acquires a lock
func (c *cacheWatcher) wasBookmarkAfterRvReceived() bool {
c.stateMutex.Lock()
defer c.stateMutex.Unlock()
return c.wasBookmarkAfterRvReceivedLocked()
}
// wasBookmarkAfterRvReceivedLocked checks if the given cacheWatcher
// have seen a bookmark event >= bookmarkAfterResourceVersion
func (c *cacheWatcher) wasBookmarkAfterRvReceivedLocked() bool {
return c.state != cacheWatcherWaitingForBookmark
}
// markBookmarkAfterRvAsReceived indicates that the given cacheWatcher
// have seen a bookmark event >= bookmarkAfterResourceVersion
func (c *cacheWatcher) markBookmarkAfterRvAsReceived(event *watchCacheEvent) {
if event.Type == watch.Bookmark {
c.stateMutex.Lock()
defer c.stateMutex.Unlock()
if c.wasBookmarkAfterRvReceivedLocked() {
return
}
// bookmark events are scheduled by startDispatchingBookmarkEvents method
// since we received a bookmark event that means we have
// converged towards the expected RV and it is okay to update the state so that
// this cacher can be scheduler for a regular bookmark events
c.state = cacheWatcherBookmarkReceived
}
}
// wasBookmarkAfterRvSentLocked checks if a bookmark event
// with an RV >= the bookmarkAfterResourceVersion has been sent by this watcher
func (c *cacheWatcher) wasBookmarkAfterRvSentLocked() bool {
return c.state == cacheWatcherBookmarkSent
}
// wasBookmarkAfterRvSent same as wasBookmarkAfterRvSentLocked just acquires a lock
func (c *cacheWatcher) wasBookmarkAfterRvSent() bool {
c.stateMutex.Lock()
defer c.stateMutex.Unlock()
return c.wasBookmarkAfterRvSentLocked()
}
// markBookmarkAfterRvSent indicates that the given cacheWatcher
// have sent a bookmark event with an RV >= the bookmarkAfterResourceVersion
//
// this function relies on the fact that the nonblockingAdd method
// won't admit a bookmark event with an RV < the bookmarkAfterResourceVersion
// so the first received bookmark event is considered to match the bookmarkAfterResourceVersion
func (c *cacheWatcher) markBookmarkAfterRvSent(event *watchCacheEvent) {
// note that bookmark events are not so common so will acquire a lock every ~60 second or so
if event.Type == watch.Bookmark {
c.stateMutex.Lock()
defer c.stateMutex.Unlock()
if !c.wasBookmarkAfterRvSentLocked() {
c.state = cacheWatcherBookmarkSent
}
}
}
// setBookmarkAfterResourceVersion sets the bookmarkAfterResourceVersion and the state associated with it
func (c *cacheWatcher) setBookmarkAfterResourceVersion(bookmarkAfterResourceVersion uint64) {
state := cacheWatcherWaitingForBookmark
if bookmarkAfterResourceVersion == 0 {
state = cacheWatcherBookmarkSent // if no specific RV was requested we assume no-op
}
c.state = state
c.bookmarkAfterResourceVersion = bookmarkAfterResourceVersion
}
// setDrainInputBufferLocked if set to true indicates that we should delay closing this watcher
// until we send all events residing in the input buffer.
func (c *cacheWatcher) setDrainInputBufferLocked(drain bool) {
c.drainInputBuffer = drain
}
// isDoneChannelClosed checks if c.done channel is closed
func (c *cacheWatcher) isDoneChannelClosedLocked() bool {
select {
case <-c.done:
return true
default:
}
return false
}
func getMutableObject(object runtime.Object) runtime.Object {
if _, ok := object.(*cachingObject); ok {
// It is safe to return without deep-copy, because the underlying
// object will lazily perform deep-copy on the first try to change
// any of its fields.
return object
}
return object.DeepCopyObject()
}
func updateResourceVersion(object runtime.Object, versioner storage.Versioner, resourceVersion uint64) {
if err := versioner.UpdateObject(object, resourceVersion); err != nil {
utilruntime.HandleError(fmt.Errorf("failure to version api object (%d) %#v: %v", resourceVersion, object, err))
}
}
func (c *cacheWatcher) convertToWatchEvent(event *watchCacheEvent) *watch.Event {
if event.Type == watch.Bookmark {
e := &watch.Event{Type: watch.Bookmark, Object: event.Object.DeepCopyObject()}
if !c.wasBookmarkAfterRvSent() {
objMeta, err := meta.Accessor(e.Object)
if err != nil {
utilruntime.HandleError(fmt.Errorf("error while accessing object's metadata gr: %v, identifier: %v, obj: %#v, err: %v", c.groupResource, c.identifier, e.Object, err))
return nil
}
objAnnotations := objMeta.GetAnnotations()
if objAnnotations == nil {
objAnnotations = map[string]string{}
}
objAnnotations["k8s.io/initial-events-end"] = "true"
objMeta.SetAnnotations(objAnnotations)
}
return e
}
curObjPasses := event.Type != watch.Deleted && c.filter(event.Key, event.ObjLabels, event.ObjFields)
oldObjPasses := false
if event.PrevObject != nil {
oldObjPasses = c.filter(event.Key, event.PrevObjLabels, event.PrevObjFields)
}
if !curObjPasses && !oldObjPasses {
// Watcher is not interested in that object.
return nil
}
switch {
case curObjPasses && !oldObjPasses:
return &watch.Event{Type: watch.Added, Object: getMutableObject(event.Object)}
case curObjPasses && oldObjPasses:
return &watch.Event{Type: watch.Modified, Object: getMutableObject(event.Object)}
case !curObjPasses && oldObjPasses:
// return a delete event with the previous object content, but with the event's resource version
oldObj := getMutableObject(event.PrevObject)
// We know that if oldObj is cachingObject (which can only be set via
// setCachingObjects), its resourceVersion is already set correctly and
// we don't need to update it. However, since cachingObject efficiently
// handles noop updates, we avoid this microoptimization here.
updateResourceVersion(oldObj, c.versioner, event.ResourceVersion)
return &watch.Event{Type: watch.Deleted, Object: oldObj}
}
return nil
}
// NOTE: sendWatchCacheEvent is assumed to not modify <event> !!!
func (c *cacheWatcher) sendWatchCacheEvent(event *watchCacheEvent) {
watchEvent := c.convertToWatchEvent(event)
if watchEvent == nil {
// Watcher is not interested in that object.
return
}
// We need to ensure that if we put event X to the c.result, all
// previous events were already put into it before, no matter whether
// c.done is close or not.
// Thus we cannot simply select from c.done and c.result and this
// would give us non-determinism.
// At the same time, we don't want to block infinitely on putting
// to c.result, when c.done is already closed.
//
// This ensures that with c.done already close, we at most once go
// into the next select after this. With that, no matter which
// statement we choose there, we will deliver only consecutive
// events.
select {
case <-c.done:
return
default:
}
select {
case c.result <- *watchEvent:
c.markBookmarkAfterRvSent(event)
case <-c.done:
}
}
func (c *cacheWatcher) processInterval(ctx context.Context, cacheInterval *watchCacheInterval, resourceVersion uint64) {
defer utilruntime.HandleCrash()
defer close(c.result)
defer c.Stop()
// Check how long we are processing initEvents.
// As long as these are not processed, we are not processing
// any incoming events, so if it takes long, we may actually
// block all watchers for some time.
// TODO: From the logs it seems that there happens processing
// times even up to 1s which is very long. However, this doesn't
// depend that much on the number of initEvents. E.g. from the
// 2000-node Kubemark run we have logs like this, e.g.:
// ... processing 13862 initEvents took 66.808689ms
// ... processing 14040 initEvents took 993.532539ms
// We should understand what is blocking us in those cases (e.g.
// is it lack of CPU, network, or sth else) and potentially
// consider increase size of result buffer in those cases.
const initProcessThreshold = 500 * time.Millisecond
startTime := time.Now()
initEventCount := 0
for {
event, err := cacheInterval.Next()
if err != nil {
// An error indicates that the cache interval
// has been invalidated and can no longer serve
// events.
//
// Initially we considered sending an "out-of-history"
// Error event in this case, but because historically
// such events weren't sent out of the watchCache, we
// decided not to. This is still ok, because on watch
// closure, the watcher will try to re-instantiate the
// watch and then will get an explicit "out-of-history"
// window. There is potential for optimization, but for
// now, in order to be on the safe side and not break
// custom clients, the cost of it is something that we
// are fully accepting.
klog.Warningf("couldn't retrieve watch event to serve: %#v", err)
return
}
if event == nil {
break
}
c.sendWatchCacheEvent(event)
// With some events already sent, update resourceVersion so that
// events that were buffered and not yet processed won't be delivered
// to this watcher second time causing going back in time.
//
// There is one case where events are not necessary ordered by
// resourceVersion, being a case of watching from resourceVersion=0,
// which at the beginning returns the state of each objects.
// For the purpose of it, we need to max it with the resource version
// that we have so far.
if event.ResourceVersion > resourceVersion {
resourceVersion = event.ResourceVersion
}
initEventCount++
}
if initEventCount > 0 {
metrics.InitCounter.WithLabelValues(c.groupResource.String()).Add(float64(initEventCount))
}
processingTime := time.Since(startTime)
if processingTime > initProcessThreshold {
klog.V(2).Infof("processing %d initEvents of %s (%s) took %v", initEventCount, c.groupResource, c.identifier, processingTime)
}
c.process(ctx, resourceVersion)
}
func (c *cacheWatcher) process(ctx context.Context, resourceVersion uint64) {
// At this point we already start processing incoming watch events.
// However, the init event can still be processed because their serialization
// and sending to the client happens asynchrnously.
// TODO: As describe in the KEP, we would like to estimate that by delaying
// the initialization signal proportionally to the number of events to
// process, but we're leaving this to the tuning phase.
utilflowcontrol.WatchInitialized(ctx)
for {
select {
case event, ok := <-c.input:
if !ok {
return
}
// only send events newer than resourceVersion
// or a bookmark event with an RV equal to resourceVersion
// if we haven't sent one to the client
if event.ResourceVersion > resourceVersion || (event.Type == watch.Bookmark && event.ResourceVersion == resourceVersion && !c.wasBookmarkAfterRvSent()) {
c.sendWatchCacheEvent(event)
}
case <-ctx.Done():
return
}
}
}

View File

@ -21,6 +21,7 @@ import (
"fmt"
"net/http"
"reflect"
"strconv"
"sync"
"time"
@ -34,17 +35,17 @@ import (
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/apimachinery/pkg/watch"
"k8s.io/apiserver/pkg/audit"
"k8s.io/apiserver/pkg/endpoints/request"
"k8s.io/apiserver/pkg/features"
"k8s.io/apiserver/pkg/storage"
"k8s.io/apiserver/pkg/storage/cacher/metrics"
utilfeature "k8s.io/apiserver/pkg/util/feature"
utilflowcontrol "k8s.io/apiserver/pkg/util/flowcontrol"
"k8s.io/client-go/tools/cache"
"k8s.io/component-base/tracing"
"k8s.io/klog/v2"
"k8s.io/utils/clock"
)
@ -127,29 +128,37 @@ func (wm watchersMap) terminateAll(done func(*cacheWatcher)) {
}
type indexedWatchers struct {
allWatchers watchersMap
allWatchers map[namespacedName]watchersMap
valueWatchers map[string]watchersMap
}
func (i *indexedWatchers) addWatcher(w *cacheWatcher, number int, value string, supported bool) {
func (i *indexedWatchers) addWatcher(w *cacheWatcher, number int, scope namespacedName, value string, supported bool) {
if supported {
if _, ok := i.valueWatchers[value]; !ok {
i.valueWatchers[value] = watchersMap{}
}
i.valueWatchers[value].addWatcher(w, number)
} else {
i.allWatchers.addWatcher(w, number)
scopedWatchers, ok := i.allWatchers[scope]
if !ok {
scopedWatchers = watchersMap{}
i.allWatchers[scope] = scopedWatchers
}
scopedWatchers.addWatcher(w, number)
}
}
func (i *indexedWatchers) deleteWatcher(number int, value string, supported bool, done func(*cacheWatcher)) {
func (i *indexedWatchers) deleteWatcher(number int, scope namespacedName, value string, supported bool, done func(*cacheWatcher)) {
if supported {
i.valueWatchers[value].deleteWatcher(number, done)
if len(i.valueWatchers[value]) == 0 {
delete(i.valueWatchers, value)
}
} else {
i.allWatchers.deleteWatcher(number, done)
i.allWatchers[scope].deleteWatcher(number, done)
if len(i.allWatchers[scope]) == 0 {
delete(i.allWatchers, scope)
}
}
}
@ -161,10 +170,13 @@ func (i *indexedWatchers) terminateAll(groupResource schema.GroupResource, done
if len(i.allWatchers) > 0 || len(i.valueWatchers) > 0 {
klog.Warningf("Terminating all watchers from cacher %v", groupResource)
}
i.allWatchers.terminateAll(done)
for _, watchers := range i.allWatchers {
watchers.terminateAll(done)
}
for _, watchers := range i.valueWatchers {
watchers.terminateAll(done)
}
i.allWatchers = map[namespacedName]watchersMap{}
i.valueWatchers = map[string]watchersMap{}
}
@ -278,6 +290,9 @@ type Cacher struct {
// newFunc is a function that creates new empty object storing a object of type Type.
newFunc func() runtime.Object
// newListFunc is a function that creates new empty list for storing objects of type Type.
newListFunc func() runtime.Object
// indexedTrigger is used for optimizing amount of watchers that needs to process
// an incoming event.
indexedTrigger *indexedTriggerFunc
@ -359,10 +374,11 @@ func NewCacherFromConfig(config Config) (*Cacher, error) {
groupResource: config.GroupResource,
versioner: config.Versioner,
newFunc: config.NewFunc,
newListFunc: config.NewListFunc,
indexedTrigger: indexedTrigger,
watcherIdx: 0,
watchers: indexedWatchers{
allWatchers: make(map[int]*cacheWatcher),
allWatchers: make(map[namespacedName]watchersMap),
valueWatchers: make(map[string]watchersMap),
},
// TODO: Figure out the correct value for the buffer size.
@ -399,6 +415,9 @@ func NewCacherFromConfig(config Config) (*Cacher, error) {
// We don't want to terminate all watchers as recreating all watchers puts high load on api-server.
// In most of the cases, leader is reelected within few cycles.
reflector.MaxInternalErrorRetryDuration = time.Second * 30
// since the watch-list is provided by the watch cache instruct
// the reflector to issue a regular LIST against the store
reflector.UseWatchList = false
cacher.watchCache = watchCache
cacher.reflector = reflector
@ -479,18 +498,50 @@ func (c *Cacher) Delete(
return c.storage.Delete(ctx, key, out, preconditions, validateDeletion, nil)
}
type namespacedName struct {
namespace string
name string
}
// Watch implements storage.Interface.
func (c *Cacher) Watch(ctx context.Context, key string, opts storage.ListOptions) (watch.Interface, error) {
pred := opts.Predicate
watchRV, err := c.versioner.ParseResourceVersion(opts.ResourceVersion)
// if the watch-list feature wasn't set and the resourceVersion is unset
// ensure that the rv from which the watch is being served, is the latest
// one. "latest" is ensured by serving the watch from
// the underlying storage.
//
// it should never happen due to our validation but let's just be super-safe here
// and disable sendingInitialEvents when the feature wasn't enabled
if !utilfeature.DefaultFeatureGate.Enabled(features.WatchList) && opts.SendInitialEvents != nil {
opts.SendInitialEvents = nil
}
if opts.SendInitialEvents == nil && opts.ResourceVersion == "" {
return c.storage.Watch(ctx, key, opts)
}
requestedWatchRV, err := c.versioner.ParseResourceVersion(opts.ResourceVersion)
if err != nil {
return nil, err
}
if err := c.ready.wait(); err != nil {
readyGeneration, err := c.ready.waitAndReadGeneration(ctx)
if err != nil {
return nil, errors.NewServiceUnavailable(err.Error())
}
// determine the namespace and name scope of the watch, first from the request, secondarily from the field selector
scope := namespacedName{}
if requestNamespace, ok := request.NamespaceFrom(ctx); ok && len(requestNamespace) > 0 {
scope.namespace = requestNamespace
} else if selectorNamespace, ok := pred.Field.RequiresExactMatch("metadata.namespace"); ok {
scope.namespace = selectorNamespace
}
if requestInfo, ok := request.RequestInfoFrom(ctx); ok && requestInfo != nil && len(requestInfo.Name) > 0 {
scope.name = requestInfo.Name
} else if selectorName, ok := pred.Field.RequiresExactMatch("metadata.name"); ok {
scope.name = selectorName
}
triggerValue, triggerSupported := "", false
if c.indexedTrigger != nil {
for _, field := range pred.IndexFields {
@ -509,6 +560,18 @@ func (c *Cacher) Watch(ctx context.Context, key string, opts storage.ListOptions
// watchers on our watcher having a processing hiccup
chanSize := c.watchCache.suggestedWatchChannelSize(c.indexedTrigger != nil, triggerSupported)
// Determine a function that computes the bookmarkAfterResourceVersion
bookmarkAfterResourceVersionFn, err := c.getBookmarkAfterResourceVersionLockedFunc(ctx, requestedWatchRV, opts)
if err != nil {
return newErrWatcher(err), nil
}
// Determine a function that computes the watchRV we should start from
startWatchResourceVersionFn, err := c.getStartResourceVersionForWatchLockedFunc(ctx, requestedWatchRV, opts)
if err != nil {
return newErrWatcher(err), nil
}
// Determine watch timeout('0' means deadline is not set, ignore checking)
deadline, _ := ctx.Deadline()
@ -536,7 +599,17 @@ func (c *Cacher) Watch(ctx context.Context, key string, opts storage.ListOptions
// underlying watchCache is calling processEvent under its lock.
c.watchCache.RLock()
defer c.watchCache.RUnlock()
cacheInterval, err := c.watchCache.getAllEventsSinceLocked(watchRV)
forceAllEvents, err := c.waitUntilWatchCacheFreshAndForceAllEvents(ctx, requestedWatchRV, opts)
if err != nil {
return newErrWatcher(err), nil
}
startWatchRV := startWatchResourceVersionFn()
var cacheInterval *watchCacheInterval
if forceAllEvents {
cacheInterval, err = c.watchCache.getIntervalFromStoreLocked()
} else {
cacheInterval, err = c.watchCache.getAllEventsSinceLocked(startWatchRV)
}
if err != nil {
// To match the uncached watch implementation, once we have passed authn/authz/admission,
// and successfully parsed a resource version, other errors must fail with a watch event of type ERROR,
@ -544,12 +617,24 @@ func (c *Cacher) Watch(ctx context.Context, key string, opts storage.ListOptions
return newErrWatcher(err), nil
}
addedWatcher := false
func() {
c.Lock()
defer c.Unlock()
if generation, ok := c.ready.checkAndReadGeneration(); generation != readyGeneration || !ok {
// We went unready or are already on a different generation.
// Avoid registering and starting the watch as it will have to be
// terminated immediately anyway.
return
}
// Update watcher.forget function once we can compute it.
watcher.forget = forgetWatcher(c, watcher, c.watcherIdx, triggerValue, triggerSupported)
c.watchers.addWatcher(watcher, c.watcherIdx, triggerValue, triggerSupported)
watcher.forget = forgetWatcher(c, watcher, c.watcherIdx, scope, triggerValue, triggerSupported)
// Update the bookMarkAfterResourceVersion
watcher.setBookmarkAfterResourceVersion(bookmarkAfterResourceVersionFn())
c.watchers.addWatcher(watcher, c.watcherIdx, scope, triggerValue, triggerSupported)
addedWatcher = true
// Add it to the queue only when the client support watch bookmarks.
if watcher.allowWatchBookmarks {
@ -558,7 +643,15 @@ func (c *Cacher) Watch(ctx context.Context, key string, opts storage.ListOptions
c.watcherIdx++
}()
go watcher.processInterval(ctx, cacheInterval, watchRV)
if !addedWatcher {
// Watcher isn't really started at this point, so it's safe to just drop it.
//
// We're simulating the immediate watch termination, which boils down to simply
// closing the watcher.
return newImmediateCloseWatcher(), nil
}
go watcher.processInterval(ctx, cacheInterval, startWatchRV)
return watcher, nil
}
@ -586,7 +679,7 @@ func (c *Cacher) Get(ctx context.Context, key string, opts storage.GetOptions, o
// Do not create a trace - it's not for free and there are tons
// of Get requests. We can add it if it will be really needed.
if err := c.ready.wait(); err != nil {
if err := c.ready.wait(ctx); err != nil {
return errors.NewServiceUnavailable(err.Error())
}
@ -621,9 +714,11 @@ func (c *Cacher) Get(ctx context.Context, key string, opts storage.GetOptions, o
func shouldDelegateList(opts storage.ListOptions) bool {
resourceVersion := opts.ResourceVersion
pred := opts.Predicate
match := opts.ResourceVersionMatch
pagingEnabled := utilfeature.DefaultFeatureGate.Enabled(features.APIListChunking)
hasContinuation := pagingEnabled && len(pred.Continue) > 0
hasLimit := pagingEnabled && pred.Limit > 0 && resourceVersion != "0"
unsupportedMatch := match != "" && match != metav1.ResourceVersionMatchNotOlderThan
// If resourceVersion is not specified, serve it from underlying
// storage (for backward compatibility). If a continuation is
@ -631,7 +726,7 @@ func shouldDelegateList(opts storage.ListOptions) bool {
// Limits are only sent to storage when resourceVersion is non-zero
// since the watch cache isn't able to perform continuations, and
// limits are ignored when resource version is zero
return resourceVersion == "" || hasContinuation || hasLimit || opts.ResourceVersionMatch == metav1.ResourceVersionMatchExact
return resourceVersion == "" || hasContinuation || hasLimit || unsupportedMatch
}
func (c *Cacher) listItems(ctx context.Context, listRV uint64, key string, pred storage.SelectionPredicate, recursive bool) ([]interface{}, uint64, string, error) {
@ -676,7 +771,7 @@ func (c *Cacher) GetList(ctx context.Context, key string, opts storage.ListOptio
attribute.Stringer("type", c.groupResource))
defer span.End(500 * time.Millisecond)
if err := c.ready.wait(); err != nil {
if err := c.ready.wait(ctx); err != nil {
return errors.NewServiceUnavailable(err.Error())
}
span.AddEvent("Ready")
@ -715,6 +810,10 @@ func (c *Cacher) GetList(ctx context.Context, key string, opts storage.ListOptio
listVal.Set(reflect.Append(listVal, reflect.ValueOf(elem.Object).Elem()))
}
}
if listVal.IsNil() {
// Ensure that we never return a nil Items pointer in the result for consistency.
listVal.Set(reflect.MakeSlice(listVal.Type(), 0, 0))
}
span.AddEvent("Filtered items", attribute.Int("count", listVal.Len()))
if c.versioner != nil {
if err := c.versioner.UpdateList(listObj, readResourceVersion, "", nil); err != nil {
@ -974,10 +1073,32 @@ func (c *Cacher) startDispatching(event *watchCacheEvent) {
return
}
// Iterate over "allWatchers" no matter what the trigger function is.
for _, watcher := range c.watchers.allWatchers {
// iterate over watchers for each applicable namespace/name tuple
namespace := event.ObjFields["metadata.namespace"]
name := event.ObjFields["metadata.name"]
if len(namespace) > 0 {
if len(name) > 0 {
// namespaced watchers scoped by name
for _, watcher := range c.watchers.allWatchers[namespacedName{namespace: namespace, name: name}] {
c.watchersBuffer = append(c.watchersBuffer, watcher)
}
}
// namespaced watchers not scoped by name
for _, watcher := range c.watchers.allWatchers[namespacedName{namespace: namespace}] {
c.watchersBuffer = append(c.watchersBuffer, watcher)
}
}
if len(name) > 0 {
// cluster-wide watchers scoped by name
for _, watcher := range c.watchers.allWatchers[namespacedName{name: name}] {
c.watchersBuffer = append(c.watchersBuffer, watcher)
}
}
// cluster-wide watchers unscoped by name
for _, watcher := range c.watchers.allWatchers[namespacedName{}] {
c.watchersBuffer = append(c.watchersBuffer, watcher)
}
if supported {
// Iterate over watchers interested in the given values of the trigger.
for _, triggerValue := range triggerValues {
@ -1059,7 +1180,7 @@ func (c *Cacher) Stop() {
c.stopWg.Wait()
}
func forgetWatcher(c *Cacher, w *cacheWatcher, index int, triggerValue string, triggerSupported bool) func(bool) {
func forgetWatcher(c *Cacher, w *cacheWatcher, index int, scope namespacedName, triggerValue string, triggerSupported bool) func(bool) {
return func(drainWatcher bool) {
c.Lock()
defer c.Unlock()
@ -1069,7 +1190,7 @@ func forgetWatcher(c *Cacher, w *cacheWatcher, index int, triggerValue string, t
// It's possible that the watcher is already not in the structure (e.g. in case of
// simultaneous Stop() and terminateAllWatchers(), but it is safe to call stopLocked()
// on a watcher multiple times.
c.watchers.deleteWatcher(index, triggerValue, triggerSupported, c.stopWatcherLocked)
c.watchers.deleteWatcher(index, scope, triggerValue, triggerSupported, c.stopWatcherLocked)
}
}
@ -1085,7 +1206,7 @@ func filterWithAttrsFunction(key string, p storage.SelectionPredicate) filterWit
// LastSyncResourceVersion returns resource version to which the underlying cache is synced.
func (c *Cacher) LastSyncResourceVersion() (uint64, error) {
if err := c.ready.wait(); err != nil {
if err := c.ready.wait(context.Background()); err != nil {
return 0, errors.NewServiceUnavailable(err.Error())
}
@ -1093,6 +1214,101 @@ func (c *Cacher) LastSyncResourceVersion() (uint64, error) {
return c.versioner.ParseResourceVersion(resourceVersion)
}
// getCurrentResourceVersionFromStorage gets the current resource version from the underlying storage engine.
// this method issues an empty list request and reads only the ResourceVersion from the object metadata
func (c *Cacher) getCurrentResourceVersionFromStorage(ctx context.Context) (uint64, error) {
if c.newListFunc == nil {
return 0, fmt.Errorf("newListFunction wasn't provided for %v", c.objectType)
}
emptyList := c.newListFunc()
pred := storage.SelectionPredicate{
Label: labels.Everything(),
Field: fields.Everything(),
Limit: 1, // just in case we actually hit something
}
err := c.storage.GetList(ctx, c.resourcePrefix, storage.ListOptions{Predicate: pred}, emptyList)
if err != nil {
return 0, err
}
emptyListAccessor, err := meta.ListAccessor(emptyList)
if err != nil {
return 0, err
}
if emptyListAccessor == nil {
return 0, fmt.Errorf("unable to extract a list accessor from %T", emptyList)
}
currentResourceVersion, err := strconv.Atoi(emptyListAccessor.GetResourceVersion())
if err != nil {
return 0, err
}
if currentResourceVersion == 0 {
return 0, fmt.Errorf("the current resource version must be greater than 0")
}
return uint64(currentResourceVersion), nil
}
// getBookmarkAfterResourceVersionLockedFunc returns a function that
// spits a ResourceVersion after which the bookmark event will be delivered.
//
// The returned function must be called under the watchCache lock.
func (c *Cacher) getBookmarkAfterResourceVersionLockedFunc(ctx context.Context, parsedResourceVersion uint64, opts storage.ListOptions) (func() uint64, error) {
if opts.SendInitialEvents == nil || *opts.SendInitialEvents == false || !opts.Predicate.AllowWatchBookmarks {
return func() uint64 { return 0 }, nil
}
return c.getCommonResourceVersionLockedFunc(ctx, parsedResourceVersion, opts)
}
// getStartResourceVersionForWatchLockedFunc returns a function that
// spits a ResourceVersion the watch will be started from.
// Depending on the input parameters the semantics of the returned ResourceVersion are:
// - start at Exact (return parsedWatchResourceVersion)
// - start at Most Recent (return an RV from etcd)
// - start at Any (return the current watchCache's RV)
//
// The returned function must be called under the watchCache lock.
func (c *Cacher) getStartResourceVersionForWatchLockedFunc(ctx context.Context, parsedWatchResourceVersion uint64, opts storage.ListOptions) (func() uint64, error) {
if opts.SendInitialEvents == nil || *opts.SendInitialEvents == true {
return func() uint64 { return parsedWatchResourceVersion }, nil
}
return c.getCommonResourceVersionLockedFunc(ctx, parsedWatchResourceVersion, opts)
}
// getCommonResourceVersionLockedFunc a helper that simply computes a ResourceVersion
// based on the input parameters. Please examine callers of this method to get more context.
//
// The returned function must be called under the watchCache lock.
func (c *Cacher) getCommonResourceVersionLockedFunc(ctx context.Context, parsedWatchResourceVersion uint64, opts storage.ListOptions) (func() uint64, error) {
switch {
case len(opts.ResourceVersion) == 0:
rv, err := c.getCurrentResourceVersionFromStorage(ctx)
if err != nil {
return nil, err
}
return func() uint64 { return rv }, nil
case parsedWatchResourceVersion == 0:
// here we assume that watchCache locked is already held
return func() uint64 { return c.watchCache.resourceVersion }, nil
default:
return func() uint64 { return parsedWatchResourceVersion }, nil
}
}
// waitUntilWatchCacheFreshAndForceAllEvents waits until cache is at least
// as fresh as given requestedWatchRV if sendInitialEvents was requested.
// Additionally, it instructs the caller whether it should ask for
// all events from the cache (full state) or not.
func (c *Cacher) waitUntilWatchCacheFreshAndForceAllEvents(ctx context.Context, requestedWatchRV uint64, opts storage.ListOptions) (bool, error) {
if opts.SendInitialEvents != nil && *opts.SendInitialEvents == true {
err := c.watchCache.waitUntilFreshAndBlock(ctx, requestedWatchRV)
defer c.watchCache.RUnlock()
return err == nil, err
}
return false, nil
}
// cacherListerWatcher opaques storage.Interface to expose cache.ListerWatcher.
type cacherListerWatcher struct {
storage storage.Interface
@ -1181,339 +1397,23 @@ func (c *errWatcher) Stop() {
// no-op
}
// cacheWatcher implements watch.Interface
// this is not thread-safe
type cacheWatcher struct {
input chan *watchCacheEvent
result chan watch.Event
done chan struct{}
filter filterWithAttrsFunc
stopped bool
forget func(bool)
versioner storage.Versioner
// The watcher will be closed by server after the deadline,
// save it here to send bookmark events before that.
deadline time.Time
allowWatchBookmarks bool
groupResource schema.GroupResource
// human readable identifier that helps assigning cacheWatcher
// instance with request
identifier string
// drainInputBuffer indicates whether we should delay closing this watcher
// and send all event in the input buffer.
drainInputBuffer bool
// immediateCloseWatcher implements watch.Interface that is immediately closed
type immediateCloseWatcher struct {
result chan watch.Event
}
func newCacheWatcher(
chanSize int,
filter filterWithAttrsFunc,
forget func(bool),
versioner storage.Versioner,
deadline time.Time,
allowWatchBookmarks bool,
groupResource schema.GroupResource,
identifier string,
) *cacheWatcher {
return &cacheWatcher{
input: make(chan *watchCacheEvent, chanSize),
result: make(chan watch.Event, chanSize),
done: make(chan struct{}),
filter: filter,
stopped: false,
forget: forget,
versioner: versioner,
deadline: deadline,
allowWatchBookmarks: allowWatchBookmarks,
groupResource: groupResource,
identifier: identifier,
}
func newImmediateCloseWatcher() *immediateCloseWatcher {
watcher := &immediateCloseWatcher{result: make(chan watch.Event)}
close(watcher.result)
return watcher
}
// Implements watch.Interface.
func (c *cacheWatcher) ResultChan() <-chan watch.Event {
func (c *immediateCloseWatcher) ResultChan() <-chan watch.Event {
return c.result
}
// Implements watch.Interface.
func (c *cacheWatcher) Stop() {
c.forget(false)
}
// we rely on the fact that stopLocked is actually protected by Cacher.Lock()
func (c *cacheWatcher) stopLocked() {
if !c.stopped {
c.stopped = true
// stop without draining the input channel was requested.
if !c.drainInputBuffer {
close(c.done)
}
close(c.input)
}
// Even if the watcher was already stopped, if it previously was
// using draining mode and it's not using it now we need to
// close the done channel now. Otherwise we could leak the
// processing goroutine if it will be trying to put more objects
// into result channel, the channel will be full and there will
// already be noone on the processing the events on the receiving end.
if !c.drainInputBuffer && !c.isDoneChannelClosedLocked() {
close(c.done)
}
}
func (c *cacheWatcher) nonblockingAdd(event *watchCacheEvent) bool {
select {
case c.input <- event:
return true
default:
return false
}
}
// Nil timer means that add will not block (if it can't send event immediately, it will break the watcher)
func (c *cacheWatcher) add(event *watchCacheEvent, timer *time.Timer) bool {
// Try to send the event immediately, without blocking.
if c.nonblockingAdd(event) {
return true
}
closeFunc := func() {
// This means that we couldn't send event to that watcher.
// Since we don't want to block on it infinitely,
// we simply terminate it.
klog.V(1).Infof("Forcing %v watcher close due to unresponsiveness: %v. len(c.input) = %v, len(c.result) = %v", c.groupResource.String(), c.identifier, len(c.input), len(c.result))
metrics.TerminatedWatchersCounter.WithLabelValues(c.groupResource.String()).Inc()
c.forget(false)
}
if timer == nil {
closeFunc()
return false
}
// OK, block sending, but only until timer fires.
select {
case c.input <- event:
return true
case <-timer.C:
closeFunc()
return false
}
}
func (c *cacheWatcher) nextBookmarkTime(now time.Time, bookmarkFrequency time.Duration) (time.Time, bool) {
// We try to send bookmarks:
//
// (a) right before the watcher timeout - for now we simply set it 2s before
// the deadline
//
// (b) roughly every minute
//
// (b) gives us periodicity if the watch breaks due to unexpected
// conditions, (a) ensures that on timeout the watcher is as close to
// now as possible - this covers 99% of cases.
heartbeatTime := now.Add(bookmarkFrequency)
if c.deadline.IsZero() {
// Timeout is set by our client libraries (e.g. reflector) as well as defaulted by
// apiserver if properly configured. So this shoudln't happen in practice.
return heartbeatTime, true
}
if pretimeoutTime := c.deadline.Add(-2 * time.Second); pretimeoutTime.Before(heartbeatTime) {
heartbeatTime = pretimeoutTime
}
if heartbeatTime.Before(now) {
return time.Time{}, false
}
return heartbeatTime, true
}
// setDrainInputBufferLocked if set to true indicates that we should delay closing this watcher
// until we send all events residing in the input buffer.
func (c *cacheWatcher) setDrainInputBufferLocked(drain bool) {
c.drainInputBuffer = drain
}
// isDoneChannelClosed checks if c.done channel is closed
func (c *cacheWatcher) isDoneChannelClosedLocked() bool {
select {
case <-c.done:
return true
default:
}
return false
}
func getMutableObject(object runtime.Object) runtime.Object {
if _, ok := object.(*cachingObject); ok {
// It is safe to return without deep-copy, because the underlying
// object will lazily perform deep-copy on the first try to change
// any of its fields.
return object
}
return object.DeepCopyObject()
}
func updateResourceVersion(object runtime.Object, versioner storage.Versioner, resourceVersion uint64) {
if err := versioner.UpdateObject(object, resourceVersion); err != nil {
utilruntime.HandleError(fmt.Errorf("failure to version api object (%d) %#v: %v", resourceVersion, object, err))
}
}
func (c *cacheWatcher) convertToWatchEvent(event *watchCacheEvent) *watch.Event {
if event.Type == watch.Bookmark {
return &watch.Event{Type: watch.Bookmark, Object: event.Object.DeepCopyObject()}
}
curObjPasses := event.Type != watch.Deleted && c.filter(event.Key, event.ObjLabels, event.ObjFields)
oldObjPasses := false
if event.PrevObject != nil {
oldObjPasses = c.filter(event.Key, event.PrevObjLabels, event.PrevObjFields)
}
if !curObjPasses && !oldObjPasses {
// Watcher is not interested in that object.
return nil
}
switch {
case curObjPasses && !oldObjPasses:
return &watch.Event{Type: watch.Added, Object: getMutableObject(event.Object)}
case curObjPasses && oldObjPasses:
return &watch.Event{Type: watch.Modified, Object: getMutableObject(event.Object)}
case !curObjPasses && oldObjPasses:
// return a delete event with the previous object content, but with the event's resource version
oldObj := getMutableObject(event.PrevObject)
// We know that if oldObj is cachingObject (which can only be set via
// setCachingObjects), its resourceVersion is already set correctly and
// we don't need to update it. However, since cachingObject efficiently
// handles noop updates, we avoid this microoptimization here.
updateResourceVersion(oldObj, c.versioner, event.ResourceVersion)
return &watch.Event{Type: watch.Deleted, Object: oldObj}
}
return nil
}
// NOTE: sendWatchCacheEvent is assumed to not modify <event> !!!
func (c *cacheWatcher) sendWatchCacheEvent(event *watchCacheEvent) {
watchEvent := c.convertToWatchEvent(event)
if watchEvent == nil {
// Watcher is not interested in that object.
return
}
// We need to ensure that if we put event X to the c.result, all
// previous events were already put into it before, no matter whether
// c.done is close or not.
// Thus we cannot simply select from c.done and c.result and this
// would give us non-determinism.
// At the same time, we don't want to block infinitely on putting
// to c.result, when c.done is already closed.
//
// This ensures that with c.done already close, we at most once go
// into the next select after this. With that, no matter which
// statement we choose there, we will deliver only consecutive
// events.
select {
case <-c.done:
return
default:
}
select {
case c.result <- *watchEvent:
case <-c.done:
}
}
func (c *cacheWatcher) processInterval(ctx context.Context, cacheInterval *watchCacheInterval, resourceVersion uint64) {
defer utilruntime.HandleCrash()
defer close(c.result)
defer c.Stop()
// Check how long we are processing initEvents.
// As long as these are not processed, we are not processing
// any incoming events, so if it takes long, we may actually
// block all watchers for some time.
// TODO: From the logs it seems that there happens processing
// times even up to 1s which is very long. However, this doesn't
// depend that much on the number of initEvents. E.g. from the
// 2000-node Kubemark run we have logs like this, e.g.:
// ... processing 13862 initEvents took 66.808689ms
// ... processing 14040 initEvents took 993.532539ms
// We should understand what is blocking us in those cases (e.g.
// is it lack of CPU, network, or sth else) and potentially
// consider increase size of result buffer in those cases.
const initProcessThreshold = 500 * time.Millisecond
startTime := time.Now()
initEventCount := 0
for {
event, err := cacheInterval.Next()
if err != nil {
// An error indicates that the cache interval
// has been invalidated and can no longer serve
// events.
//
// Initially we considered sending an "out-of-history"
// Error event in this case, but because historically
// such events weren't sent out of the watchCache, we
// decided not to. This is still ok, because on watch
// closure, the watcher will try to re-instantiate the
// watch and then will get an explicit "out-of-history"
// window. There is potential for optimization, but for
// now, in order to be on the safe side and not break
// custom clients, the cost of it is something that we
// are fully accepting.
klog.Warningf("couldn't retrieve watch event to serve: %#v", err)
return
}
if event == nil {
break
}
c.sendWatchCacheEvent(event)
// With some events already sent, update resourceVersion so that
// events that were buffered and not yet processed won't be delivered
// to this watcher second time causing going back in time.
resourceVersion = event.ResourceVersion
initEventCount++
}
if initEventCount > 0 {
metrics.InitCounter.WithLabelValues(c.groupResource.String()).Add(float64(initEventCount))
}
processingTime := time.Since(startTime)
if processingTime > initProcessThreshold {
klog.V(2).Infof("processing %d initEvents of %s (%s) took %v", initEventCount, c.groupResource, c.identifier, processingTime)
}
c.process(ctx, resourceVersion)
}
func (c *cacheWatcher) process(ctx context.Context, resourceVersion uint64) {
// At this point we already start processing incoming watch events.
// However, the init event can still be processed because their serialization
// and sending to the client happens asynchrnously.
// TODO: As describe in the KEP, we would like to estimate that by delaying
// the initialization signal proportionally to the number of events to
// process, but we're leaving this to the tuning phase.
utilflowcontrol.WatchInitialized(ctx)
for {
select {
case event, ok := <-c.input:
if !ok {
return
}
// only send events newer than resourceVersion
if event.ResourceVersion > resourceVersion {
c.sendWatchCacheEvent(event)
}
case <-ctx.Done():
return
}
}
func (c *immediateCloseWatcher) Stop() {
// no-op
}

View File

@ -74,6 +74,17 @@ var (
[]string{"resource"},
)
EventsReceivedCounter = compbasemetrics.NewCounterVec(
&compbasemetrics.CounterOpts{
Namespace: namespace,
Subsystem: subsystem,
Name: "events_received_total",
Help: "Counter of events received in watch cache broken by resource type.",
StabilityLevel: compbasemetrics.ALPHA,
},
[]string{"resource"},
)
EventsCounter = compbasemetrics.NewCounterVec(
&compbasemetrics.CounterOpts{
Namespace: namespace,
@ -147,6 +158,7 @@ func Register() {
legacyregistry.MustRegister(listCacheNumFetched)
legacyregistry.MustRegister(listCacheNumReturned)
legacyregistry.MustRegister(InitCounter)
legacyregistry.MustRegister(EventsReceivedCounter)
legacyregistry.MustRegister(EventsCounter)
legacyregistry.MustRegister(TerminatedWatchersCounter)
legacyregistry.MustRegister(watchCacheCapacityIncreaseTotal)
@ -167,7 +179,7 @@ func RecordListCacheMetrics(resourcePrefix, indexName string, numFetched, numRet
func RecordsWatchCacheCapacityChange(objType string, old, new int) {
WatchCacheCapacity.WithLabelValues(objType).Set(float64(new))
if old < new {
WatchCacheCapacity.WithLabelValues(objType).Inc()
watchCacheCapacityIncreaseTotal.WithLabelValues(objType).Inc()
return
}
watchCacheCapacityDecreaseTotal.WithLabelValues(objType).Inc()

View File

@ -17,6 +17,7 @@ limitations under the License.
package cacher
import (
"context"
"fmt"
"sync"
)
@ -30,67 +31,127 @@ const (
)
// ready is a three state condition variable that blocks until is Ready if is not Stopped.
// Its initial state is Pending.
// Its initial state is Pending and its state machine diagram is as follow.
//
// Pending <------> Ready -----> Stopped
//
// | ^
// └---------------------------┘
type ready struct {
state status
c *sync.Cond
state status // represent the state of the variable
generation int // represent the number of times we have transtioned to ready
lock sync.RWMutex // protect the state and generation variables
restartLock sync.Mutex // protect the transition from ready to pending where the channel is recreated
waitCh chan struct{} // blocks until is ready or stopped
}
func newReady() *ready {
return &ready{
c: sync.NewCond(&sync.RWMutex{}),
state: Pending,
waitCh: make(chan struct{}),
state: Pending,
}
}
// done close the channel once the state is Ready or Stopped
func (r *ready) done() chan struct{} {
r.restartLock.Lock()
defer r.restartLock.Unlock()
return r.waitCh
}
// wait blocks until it is Ready or Stopped, it returns an error if is Stopped.
func (r *ready) wait() error {
r.c.L.Lock()
defer r.c.L.Unlock()
for r.state == Pending {
r.c.Wait()
}
switch r.state {
case Ready:
return nil
case Stopped:
return fmt.Errorf("apiserver cacher is stopped")
default:
return fmt.Errorf("unexpected apiserver cache state: %v", r.state)
func (r *ready) wait(ctx context.Context) error {
_, err := r.waitAndReadGeneration(ctx)
return err
}
// waitAndReadGenration blocks until it is Ready or Stopped and returns number
// of times we entered ready state if Ready and error otherwise.
func (r *ready) waitAndReadGeneration(ctx context.Context) (int, error) {
for {
// r.done() only blocks if state is Pending
select {
case <-ctx.Done():
return 0, ctx.Err()
case <-r.done():
}
r.lock.RLock()
switch r.state {
case Pending:
// since we allow to switch between the states Pending and Ready
// if there is a quick transition from Pending -> Ready -> Pending
// a process that was waiting can get unblocked and see a Pending
// state again. If the state is Pending we have to wait again to
// avoid an inconsistent state on the system, with some processes not
// waiting despite the state moved back to Pending.
r.lock.RUnlock()
case Ready:
generation := r.generation
r.lock.RUnlock()
return generation, nil
case Stopped:
r.lock.RUnlock()
return 0, fmt.Errorf("apiserver cacher is stopped")
default:
r.lock.RUnlock()
return 0, fmt.Errorf("unexpected apiserver cache state: %v", r.state)
}
}
}
// check returns true only if it is Ready.
func (r *ready) check() bool {
// TODO: Make check() function more sophisticated, in particular
// allow it to behave as "waitWithTimeout".
rwMutex := r.c.L.(*sync.RWMutex)
rwMutex.RLock()
defer rwMutex.RUnlock()
return r.state == Ready
_, ok := r.checkAndReadGeneration()
return ok
}
// checkAndReadGeneration returns the current generation and whether it is Ready.
func (r *ready) checkAndReadGeneration() (int, bool) {
r.lock.RLock()
defer r.lock.RUnlock()
return r.generation, r.state == Ready
}
// set the state to Pending (false) or Ready (true), it does not have effect if the state is Stopped.
func (r *ready) set(ok bool) {
r.c.L.Lock()
defer r.c.L.Unlock()
r.lock.Lock()
defer r.lock.Unlock()
if r.state == Stopped {
return
}
if ok {
if ok && r.state == Pending {
r.state = Ready
} else {
r.generation++
select {
case <-r.waitCh:
default:
close(r.waitCh)
}
} else if !ok && r.state == Ready {
// creating the waitCh can be racy if
// something enter the wait() method
select {
case <-r.waitCh:
r.restartLock.Lock()
r.waitCh = make(chan struct{})
r.restartLock.Unlock()
default:
}
r.state = Pending
}
r.c.Broadcast()
}
// stop the condition variable and set it as Stopped. This state is irreversible.
func (r *ready) stop() {
r.c.L.Lock()
defer r.c.L.Unlock()
r.lock.Lock()
defer r.lock.Unlock()
if r.state != Stopped {
r.state = Stopped
r.c.Broadcast()
}
select {
case <-r.waitCh:
default:
close(r.waitCh)
}
}

View File

@ -156,14 +156,15 @@ type watchCache struct {
// getAttrsFunc is used to get labels and fields of an object.
getAttrsFunc func(runtime.Object) (labels.Set, fields.Set, error)
// cache is used a cyclic buffer - its first element (with the smallest
// resourceVersion) is defined by startIndex, its last element is defined
// by endIndex (if cache is full it will be startIndex + capacity).
// Both startIndex and endIndex can be greater than buffer capacity -
// you should always apply modulo capacity to get an index in cache array.
// cache is used a cyclic buffer - the "current" contents of it are
// stored in [start_index%capacity, end_index%capacity) - so the
// "current" contents have exactly end_index-start_index items.
cache []*watchCacheEvent
startIndex int
endIndex int
// removedEventSinceRelist holds the information whether any of the events
// were already removed from the `cache` cyclic buffer since the last relist
removedEventSinceRelist bool
// store will effectively support LIST operation from the "end of cache
// history" i.e. from the moment just after the newest cached watched event.
@ -280,6 +281,8 @@ func (w *watchCache) objectToVersionedRuntimeObject(obj interface{}) (runtime.Ob
// processEvent is safe as long as there is at most one call to it in flight
// at any point in time.
func (w *watchCache) processEvent(event watch.Event, resourceVersion uint64, updateFunc func(*storeElement) error) error {
metrics.EventsReceivedCounter.WithLabelValues(w.groupResource.String()).Inc()
key, err := w.keyFunc(event.Object)
if err != nil {
return fmt.Errorf("couldn't compute key: %v", err)
@ -344,6 +347,7 @@ func (w *watchCache) updateCache(event *watchCacheEvent) {
if w.isCacheFullLocked() {
// Cache is full - remove the oldest element.
w.startIndex++
w.removedEventSinceRelist = true
}
w.cache[w.endIndex%w.capacity] = event
w.endIndex++
@ -463,6 +467,20 @@ func (w *watchCache) waitUntilFreshAndBlock(ctx context.Context, resourceVersion
return nil
}
type sortableStoreElements []interface{}
func (s sortableStoreElements) Len() int {
return len(s)
}
func (s sortableStoreElements) Less(i, j int) bool {
return s[i].(*storeElement).Key < s[j].(*storeElement).Key
}
func (s sortableStoreElements) Swap(i, j int) {
s[i], s[j] = s[j], s[i]
}
// WaitUntilFreshAndList returns list of pointers to `storeElement` objects along
// with their ResourceVersion and the name of the index, if any, that was used.
func (w *watchCache) WaitUntilFreshAndList(ctx context.Context, resourceVersion uint64, matchValues []storage.MatchValue) ([]interface{}, uint64, string, error) {
@ -472,16 +490,21 @@ func (w *watchCache) WaitUntilFreshAndList(ctx context.Context, resourceVersion
return nil, 0, "", err
}
// This isn't the place where we do "final filtering" - only some "prefiltering" is happening here. So the only
// requirement here is to NOT miss anything that should be returned. We can return as many non-matching items as we
// want - they will be filtered out later. The fact that we return less things is only further performance improvement.
// TODO: if multiple indexes match, return the one with the fewest items, so as to do as much filtering as possible.
for _, matchValue := range matchValues {
if result, err := w.store.ByIndex(matchValue.IndexName, matchValue.Value); err == nil {
return result, w.resourceVersion, matchValue.IndexName, nil
result, rv, index, err := func() ([]interface{}, uint64, string, error) {
// This isn't the place where we do "final filtering" - only some "prefiltering" is happening here. So the only
// requirement here is to NOT miss anything that should be returned. We can return as many non-matching items as we
// want - they will be filtered out later. The fact that we return less things is only further performance improvement.
// TODO: if multiple indexes match, return the one with the fewest items, so as to do as much filtering as possible.
for _, matchValue := range matchValues {
if result, err := w.store.ByIndex(matchValue.IndexName, matchValue.Value); err == nil {
return result, w.resourceVersion, matchValue.IndexName, nil
}
}
}
return w.store.List(), w.resourceVersion, "", nil
return w.store.List(), w.resourceVersion, "", nil
}()
sort.Sort(sortableStoreElements(result))
return result, rv, index, err
}
// WaitUntilFreshAndGet returns a pointers to <storeElement> object.
@ -551,8 +574,15 @@ func (w *watchCache) Replace(objs []interface{}, resourceVersion string) error {
w.Lock()
defer w.Unlock()
w.startIndex = 0
w.endIndex = 0
// Ensure startIndex never decreases, so that existing watchCacheInterval
// instances get "invalid" errors if the try to download from the buffer
// using their own start/end indexes calculated from previous buffer
// content.
// Empty the cyclic buffer, ensuring startIndex doesn't decrease.
w.startIndex = w.endIndex
w.removedEventSinceRelist = false
if err := w.store.Replace(toReplace, resourceVersion); err != nil {
return err
}
@ -643,7 +673,7 @@ func (w *watchCache) getAllEventsSinceLocked(resourceVersion uint64) (*watchCach
size := w.endIndex - w.startIndex
var oldest uint64
switch {
case w.listResourceVersion > 0 && w.startIndex == 0:
case w.listResourceVersion > 0 && !w.removedEventSinceRelist:
// If no event was removed from the buffer since last relist, the oldest watch
// event we can deliver is one greater than the resource version of the list.
oldest = w.listResourceVersion + 1
@ -665,11 +695,7 @@ func (w *watchCache) getAllEventsSinceLocked(resourceVersion uint64) (*watchCach
// current state and only then start watching from that point.
//
// TODO: In v2 api, we should stop returning the current state - #13969.
ci, err := newCacheIntervalFromStore(w.resourceVersion, w.store, w.getAttrsFunc)
if err != nil {
return nil, err
}
return ci, nil
return w.getIntervalFromStoreLocked()
}
if resourceVersion < oldest-1 {
return nil, errors.NewResourceExpired(fmt.Sprintf("too old resource version: %d (%d)", resourceVersion, oldest-1))
@ -686,3 +712,14 @@ func (w *watchCache) getAllEventsSinceLocked(resourceVersion uint64) (*watchCach
ci := newCacheInterval(w.startIndex+first, w.endIndex, indexerFunc, w.indexValidator, &w.RWMutex)
return ci, nil
}
// getIntervalFromStoreLocked returns a watchCacheInterval
// that covers the entire storage state.
// This function assumes to be called under the watchCache lock.
func (w *watchCache) getIntervalFromStoreLocked() (*watchCacheInterval, error) {
ci, err := newCacheIntervalFromStore(w.resourceVersion, w.store, w.getAttrsFunc)
if err != nil {
return nil, err
}
return ci, nil
}

View File

@ -47,8 +47,7 @@ func NewETCDLatencyTracker(delegate clientv3.KV) clientv3.KV {
// tracking function TrackStorageLatency is thread safe.
//
// NOTE: Compact is an asynchronous process and is not associated with
//
// any request, so we will not be tracking its latency.
// any request, so we will not be tracking its latency.
type clientV3KVLatencyTracker struct {
clientv3.KV
}

View File

@ -64,6 +64,15 @@ var (
},
[]string{"endpoint"},
)
etcdEventsReceivedCounts = compbasemetrics.NewCounterVec(
&compbasemetrics.CounterOpts{
Subsystem: "apiserver",
Name: "storage_events_received_total",
Help: "Number of etcd events received split by kind.",
StabilityLevel: compbasemetrics.ALPHA,
},
[]string{"resource"},
)
etcdBookmarkCounts = compbasemetrics.NewGaugeVec(
&compbasemetrics.GaugeOpts{
Name: "etcd_bookmark_counts",
@ -113,6 +122,15 @@ var (
},
[]string{"resource"},
)
decodeErrorCounts = compbasemetrics.NewCounterVec(
&compbasemetrics.CounterOpts{
Namespace: "apiserver",
Name: "storage_decode_errors_total",
Help: "Number of stored object decode errors split by object type",
StabilityLevel: compbasemetrics.ALPHA,
},
[]string{"resource"},
)
)
var registerMetrics sync.Once
@ -130,6 +148,7 @@ func Register() {
legacyregistry.MustRegister(listStorageNumFetched)
legacyregistry.MustRegister(listStorageNumSelectorEvals)
legacyregistry.MustRegister(listStorageNumReturned)
legacyregistry.MustRegister(decodeErrorCounts)
})
}
@ -143,11 +162,21 @@ func RecordEtcdRequestLatency(verb, resource string, startTime time.Time) {
etcdRequestLatency.WithLabelValues(verb, resource).Observe(sinceInSeconds(startTime))
}
// RecordEtcdEvent updated the etcd_events_received_total metric.
func RecordEtcdEvent(resource string) {
etcdEventsReceivedCounts.WithLabelValues(resource).Inc()
}
// RecordEtcdBookmark updates the etcd_bookmark_counts metric.
func RecordEtcdBookmark(resource string) {
etcdBookmarkCounts.WithLabelValues(resource).Inc()
}
// RecordDecodeError sets the storage_decode_errors metrics.
func RecordDecodeError(resource string) {
decodeErrorCounts.WithLabelValues(resource).Inc()
}
// Reset resets the etcd_request_duration_seconds metric.
func Reset() {
etcdRequestLatency.Reset()

View File

@ -33,6 +33,7 @@ import (
"k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/util/validation/field"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/conversion"
@ -156,7 +157,12 @@ func (s *store) Get(ctx context.Context, key string, opts storage.GetOptions, ou
return storage.NewInternalError(err.Error())
}
return decode(s.codec, s.versioner, data, out, kv.ModRevision)
err = decode(s.codec, s.versioner, data, out, kv.ModRevision)
if err != nil {
recordDecodeError(s.groupResourceString, preparedKey)
return err
}
return nil
}
// Create implements storage.Interface.Create.
@ -220,6 +226,7 @@ func (s *store) Create(ctx context.Context, key string, obj, out runtime.Object,
err = decode(s.codec, s.versioner, data, out, putResp.Header.Revision)
if err != nil {
span.AddEvent("decode failed", attribute.Int("len", len(data)), attribute.String("err", err.Error()))
recordDecodeError(s.groupResourceString, preparedKey)
return err
}
span.AddEvent("decode succeeded", attribute.Int("len", len(data)))
@ -352,7 +359,12 @@ func (s *store) conditionalDelete(
if deleteResp.Header == nil {
return errors.New("invalid DeleteRange response - nil header")
}
return decode(s.codec, s.versioner, origState.data, out, deleteResp.Header.Revision)
err = decode(s.codec, s.versioner, origState.data, out, deleteResp.Header.Revision)
if err != nil {
recordDecodeError(s.groupResourceString, key)
return err
}
return nil
}
}
@ -470,7 +482,12 @@ func (s *store) GuaranteedUpdate(
}
// recheck that the data from etcd is not stale before short-circuiting a write
if !origState.stale {
return decode(s.codec, s.versioner, origState.data, destination, origState.rev)
err = decode(s.codec, s.versioner, origState.data, destination, origState.rev)
if err != nil {
recordDecodeError(s.groupResourceString, preparedKey)
return err
}
return nil
}
}
@ -518,6 +535,7 @@ func (s *store) GuaranteedUpdate(
err = decode(s.codec, s.versioner, data, destination, putResp.Header.Revision)
if err != nil {
span.AddEvent("decode failed", attribute.Int("len", len(data)), attribute.String("err", err.Error()))
recordDecodeError(s.groupResourceString, preparedKey)
return err
}
span.AddEvent("decode succeeded", attribute.Int("len", len(data)))
@ -745,6 +763,7 @@ func (s *store) GetList(ctx context.Context, key string, opts storage.ListOption
}
if err := appendListItem(v, data, uint64(kv.ModRevision), pred, s.codec, s.versioner, newItemFunc); err != nil {
recordDecodeError(s.groupResourceString, string(kv.Key))
return err
}
numEvald++
@ -782,6 +801,10 @@ func (s *store) GetList(ctx context.Context, key string, opts storage.ListOption
options = append(options, clientv3.WithRev(withRev))
}
}
if v.IsNil() {
// Ensure that we never return a nil Items pointer in the result for consistency.
v.Set(reflect.MakeSlice(v.Type(), 0, 0))
}
// instruct the client to begin querying from immediately after the last key we returned
// we never return a key that the client wouldn't be allowed to see
@ -841,6 +864,13 @@ func growSlice(v reflect.Value, maxCapacity int, sizes ...int) {
// Watch implements storage.Interface.Watch.
func (s *store) Watch(ctx context.Context, key string, opts storage.ListOptions) (watch.Interface, error) {
if opts.SendInitialEvents != nil {
return nil, apierrors.NewInvalid(
schema.GroupKind{Group: s.groupResource.Group, Kind: s.groupResource.Resource},
"",
field.ErrorList{field.Forbidden(field.NewPath("sendInitialEvents"), "for watch is unsupported by an etcd cluster")},
)
}
preparedKey, err := s.prepareKey(key)
if err != nil {
return nil, err
@ -880,6 +910,7 @@ func (s *store) getState(ctx context.Context, getResp *clientv3.GetResponse, key
state.data = data
state.stale = stale
if err := decode(s.codec, s.versioner, state.data, state.obj, state.rev); err != nil {
recordDecodeError(s.groupResourceString, key)
return nil, err
}
}
@ -1018,6 +1049,12 @@ func appendListItem(v reflect.Value, data []byte, rev uint64, pred storage.Selec
return nil
}
// recordDecodeError record decode error split by object type.
func recordDecodeError(resource string, key string) {
metrics.RecordDecodeError(resource)
klog.V(4).Infof("Decoding %s \"%s\" failed", resource, key)
}
func notFound(key string) clientv3.Cmp {
return clientv3.Compare(clientv3.ModRevision(key), "=", 0)
}

View File

@ -25,6 +25,9 @@ import (
"strings"
"sync"
grpccodes "google.golang.org/grpc/codes"
grpcstatus "google.golang.org/grpc/status"
apierrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
@ -35,6 +38,7 @@ import (
utilflowcontrol "k8s.io/apiserver/pkg/util/flowcontrol"
clientv3 "go.etcd.io/etcd/client/v3"
"k8s.io/klog/v2"
)
@ -152,6 +156,31 @@ func (w *watcher) createWatchChan(ctx context.Context, key string, rev int64, re
return wc
}
type etcdError interface {
Code() grpccodes.Code
Error() string
}
type grpcError interface {
GRPCStatus() *grpcstatus.Status
}
func isCancelError(err error) bool {
if err == nil {
return false
}
if err == context.Canceled {
return true
}
if etcdErr, ok := err.(etcdError); ok && etcdErr.Code() == grpccodes.Canceled {
return true
}
if grpcErr, ok := err.(grpcError); ok && grpcErr.GRPCStatus().Code() == grpccodes.Canceled {
return true
}
return false
}
func (wc *watchChan) run() {
watchClosedCh := make(chan struct{})
go wc.startWatching(watchClosedCh)
@ -162,7 +191,7 @@ func (wc *watchChan) run() {
select {
case err := <-wc.errChan:
if err == context.Canceled {
if isCancelError(err) {
break
}
errResult := transformErrorToEvent(err)
@ -213,12 +242,15 @@ func (wc *watchChan) sync() error {
return nil
}
// logWatchChannelErr checks whether the error is about mvcc revision compaction which is regarded as warning
func logWatchChannelErr(err error) {
if !strings.Contains(err.Error(), "mvcc: required revision has been compacted") {
klog.Errorf("watch chan error: %v", err)
} else {
switch {
case strings.Contains(err.Error(), "mvcc: required revision has been compacted"):
// mvcc revision compaction which is regarded as warning, not error
klog.Warningf("watch chan error: %v", err)
case isCancelError(err):
// expected when watches close, no need to log
default:
klog.Errorf("watch chan error: %v", err)
}
}
@ -256,6 +288,7 @@ func (wc *watchChan) startWatching(watchClosedCh chan struct{}) {
}
for _, e := range wres.Events {
metrics.RecordEtcdEvent(wc.watcher.groupResource.String())
parsedEvent, err := parseEvent(e)
if err != nil {
logWatchChannelErr(err)

View File

@ -268,4 +268,10 @@ type ListOptions struct {
// ProgressNotify determines whether storage-originated bookmark (progress notify) events should
// be delivered to the users. The option is ignored for non-watch requests.
ProgressNotify bool
// SendInitialEvents, when set together with Watch option,
// begin the watch stream with synthetic init events to build the
// whole state of all resources followed by a synthetic "Bookmark"
// event containing a ResourceVersion after which the server
// continues streaming events.
SendInitialEvents *bool
}

View File

@ -112,6 +112,18 @@ func (s *SelectionPredicate) MatchesObjectAttributes(l labels.Set, f fields.Set)
return matched
}
// MatchesSingleNamespace will return (namespace, true) if and only if s.Field matches on the object's
// namespace.
func (s *SelectionPredicate) MatchesSingleNamespace() (string, bool) {
if len(s.Continue) > 0 {
return "", false
}
if namespace, ok := s.Field.RequiresExactMatch("metadata.namespace"); ok {
return namespace, true
}
return "", false
}
// MatchesSingle will return (name, true) if and only if s.Field matches on the object's
// name.
func (s *SelectionPredicate) MatchesSingle() (string, bool) {

View File

@ -23,14 +23,24 @@ import (
"crypto/aes"
"crypto/cipher"
"crypto/rand"
"encoding/binary"
"errors"
"fmt"
"io"
"sync/atomic"
"time"
"k8s.io/apiserver/pkg/storage/value"
"k8s.io/klog/v2"
)
// gcm implements AEAD encryption of the provided values given a cipher.Block algorithm.
type gcm struct {
aead cipher.AEAD
nonceFunc func([]byte) error
}
// NewGCMTransformer takes the given block cipher and performs encryption and decryption on the given data.
// It implements AEAD encryption of the provided values given a cipher.Block algorithm.
// The authenticated data provided as part of the value.Context method must match when the same
// value is set to and loaded from storage. In order to ensure that values cannot be copied by
// an attacker from a location under their control, use characteristics of the storage location
@ -43,44 +53,148 @@ import (
// therefore transformers using this implementation *must* ensure they allow for frequent key
// rotation. Future work should include investigation of AES-GCM-SIV as an alternative to
// random nonces.
type gcm struct {
block cipher.Block
func NewGCMTransformer(block cipher.Block) (value.Transformer, error) {
aead, err := newGCM(block)
if err != nil {
return nil, err
}
return &gcm{aead: aead, nonceFunc: randomNonce}, nil
}
// NewGCMTransformer takes the given block cipher and performs encryption and decryption on the given
// data.
func NewGCMTransformer(block cipher.Block) value.Transformer {
return &gcm{block: block}
// NewGCMTransformerWithUniqueKeyUnsafe is the same as NewGCMTransformer but is unsafe for general
// use because it makes assumptions about the key underlying the block cipher. Specifically,
// it uses a 96-bit nonce where the first 32 bits are random data and the remaining 64 bits are
// a monotonically incrementing atomic counter. This means that the key must be randomly generated
// on process startup and must never be used for encryption outside the lifetime of the process.
// Unlike NewGCMTransformer, this function is immune to the birthday attack and thus the key can
// be used for 2^64-1 writes without rotation. Furthermore, cryptographic wear out of AES-GCM with
// a sequential nonce occurs after 2^64 encryptions, which is not a concern for our use cases.
// Even if that occurs, the nonce counter would overflow and crash the process. We have no concerns
// around plaintext length because all stored items are small (less than 2 MB). To prevent the
// chance of the block cipher being accidentally re-used, it is not taken in as input. Instead,
// a new random key is generated and returned on every invocation of this function. This key is
// used as the input to the block cipher. If the key is stored and retrieved at a later point,
// it can be passed to NewGCMTransformer(aes.NewCipher(key)) to construct a transformer capable
// of decrypting values encrypted by this transformer (that transformer must not be used for encryption).
func NewGCMTransformerWithUniqueKeyUnsafe() (value.Transformer, []byte, error) {
key, err := generateKey(32)
if err != nil {
return nil, nil, err
}
block, err := aes.NewCipher(key)
if err != nil {
return nil, nil, err
}
nonceGen := &nonceGenerator{
// we start the nonce counter at one billion so that we are
// guaranteed to detect rollover across different go routines
zero: 1_000_000_000,
fatal: die,
}
nonceGen.nonce.Add(nonceGen.zero)
transformer, err := newGCMTransformerWithUniqueKeyUnsafe(block, nonceGen)
if err != nil {
return nil, nil, err
}
return transformer, key, nil
}
func newGCMTransformerWithUniqueKeyUnsafe(block cipher.Block, nonceGen *nonceGenerator) (value.Transformer, error) {
aead, err := newGCM(block)
if err != nil {
return nil, err
}
nonceFunc := func(b []byte) error {
// we only need 8 bytes to store our 64 bit incrementing nonce
// instead of leaving the unused bytes as zeros, set those to random bits
// this mostly protects us from weird edge cases like a VM restore that rewinds our atomic counter
randNonceSize := len(b) - 8
if err := randomNonce(b[:randNonceSize]); err != nil {
return err
}
nonceGen.next(b[randNonceSize:])
return nil
}
return &gcm{aead: aead, nonceFunc: nonceFunc}, nil
}
func newGCM(block cipher.Block) (cipher.AEAD, error) {
aead, err := cipher.NewGCM(block)
if err != nil {
return nil, err
}
if nonceSize := aead.NonceSize(); nonceSize != 12 { // all data in etcd will be broken if this ever changes
return nil, fmt.Errorf("crypto/cipher.NewGCM returned unexpected nonce size: %d", nonceSize)
}
return aead, nil
}
func randomNonce(b []byte) error {
_, err := rand.Read(b)
return err
}
type nonceGenerator struct {
// even at one million encryptions per second, this counter is enough for half a million years
// using this struct avoids alignment bugs: https://pkg.go.dev/sync/atomic#pkg-note-BUG
nonce atomic.Uint64
zero uint64
fatal func(msg string)
}
func (n *nonceGenerator) next(b []byte) {
incrementingNonce := n.nonce.Add(1)
if incrementingNonce <= n.zero {
// this should never happen, and is unrecoverable if it does
n.fatal("aes-gcm detected nonce overflow - cryptographic wear out has occurred")
}
binary.LittleEndian.PutUint64(b, incrementingNonce)
}
func die(msg string) {
// nolint:logcheck // we want the stack traces, log flushing, and process exiting logic from FatalDepth
klog.FatalDepth(1, msg)
}
// generateKey generates a random key using system randomness.
func generateKey(length int) (key []byte, err error) {
defer func(start time.Time) {
value.RecordDataKeyGeneration(start, err)
}(time.Now())
key = make([]byte, length)
if _, err = rand.Read(key); err != nil {
return nil, err
}
return key, nil
}
func (t *gcm) TransformFromStorage(ctx context.Context, data []byte, dataCtx value.Context) ([]byte, bool, error) {
aead, err := cipher.NewGCM(t.block)
if err != nil {
return nil, false, err
}
nonceSize := aead.NonceSize()
nonceSize := t.aead.NonceSize()
if len(data) < nonceSize {
return nil, false, fmt.Errorf("the stored data was shorter than the required size")
return nil, false, errors.New("the stored data was shorter than the required size")
}
result, err := aead.Open(nil, data[:nonceSize], data[nonceSize:], dataCtx.AuthenticatedData())
result, err := t.aead.Open(nil, data[:nonceSize], data[nonceSize:], dataCtx.AuthenticatedData())
return result, false, err
}
func (t *gcm) TransformToStorage(ctx context.Context, data []byte, dataCtx value.Context) ([]byte, error) {
aead, err := cipher.NewGCM(t.block)
if err != nil {
return nil, err
nonceSize := t.aead.NonceSize()
result := make([]byte, nonceSize+t.aead.Overhead()+len(data))
if err := t.nonceFunc(result[:nonceSize]); err != nil {
return nil, fmt.Errorf("failed to write nonce for AES-GCM: %w", err)
}
nonceSize := aead.NonceSize()
result := make([]byte, nonceSize+aead.Overhead()+len(data))
n, err := rand.Read(result[:nonceSize])
if err != nil {
return nil, err
}
if n != nonceSize {
return nil, fmt.Errorf("unable to read sufficient random bytes")
}
cipherText := aead.Seal(result[nonceSize:nonceSize], result[:nonceSize], data, dataCtx.AuthenticatedData())
cipherText := t.aead.Seal(result[nonceSize:nonceSize], result[:nonceSize], data, dataCtx.AuthenticatedData())
return result[:nonceSize+len(cipherText)], nil
}
@ -96,7 +210,7 @@ func NewCBCTransformer(block cipher.Block) value.Transformer {
}
var (
ErrInvalidBlockSize = fmt.Errorf("the stored data is not a multiple of the block size")
errInvalidBlockSize = errors.New("the stored data is not a multiple of the block size")
errInvalidPKCS7Data = errors.New("invalid PKCS7 data (empty or not padded)")
errInvalidPKCS7Padding = errors.New("invalid padding on input")
)
@ -104,13 +218,13 @@ var (
func (t *cbc) TransformFromStorage(ctx context.Context, data []byte, dataCtx value.Context) ([]byte, bool, error) {
blockSize := aes.BlockSize
if len(data) < blockSize {
return nil, false, fmt.Errorf("the stored data was shorter than the required size")
return nil, false, errors.New("the stored data was shorter than the required size")
}
iv := data[:blockSize]
data = data[blockSize:]
if len(data)%blockSize != 0 {
return nil, false, ErrInvalidBlockSize
return nil, false, errInvalidBlockSize
}
result := make([]byte, len(data))
@ -140,7 +254,7 @@ func (t *cbc) TransformToStorage(ctx context.Context, data []byte, dataCtx value
result := make([]byte, blockSize+len(data)+paddingSize)
iv := result[:blockSize]
if _, err := io.ReadFull(rand.Reader, iv); err != nil {
return nil, fmt.Errorf("unable to read sufficient random bytes")
return nil, errors.New("unable to read sufficient random bytes")
}
copy(result[blockSize:], data)

View File

@ -53,7 +53,7 @@ type envelopeTransformer struct {
transformers *lru.Cache
// baseTransformerFunc creates a new transformer for encrypting the data with the DEK.
baseTransformerFunc func(cipher.Block) value.Transformer
baseTransformerFunc func(cipher.Block) (value.Transformer, error)
cacheSize int
cacheEnabled bool
@ -63,7 +63,7 @@ type envelopeTransformer struct {
// It uses envelopeService to encrypt and decrypt DEKs. Respective DEKs (in encrypted form) are prepended to
// the data items they encrypt. A cache (of size cacheSize) is maintained to store the most recently
// used decrypted DEKs in memory.
func NewEnvelopeTransformer(envelopeService Service, cacheSize int, baseTransformerFunc func(cipher.Block) value.Transformer) value.Transformer {
func NewEnvelopeTransformer(envelopeService Service, cacheSize int, baseTransformerFunc func(cipher.Block) (value.Transformer, error)) value.Transformer {
var (
cache *lru.Cache
)
@ -161,7 +161,11 @@ func (t *envelopeTransformer) addTransformer(encKey []byte, key []byte) (value.T
if err != nil {
return nil, err
}
transformer := t.baseTransformerFunc(block)
transformer, err := t.baseTransformerFunc(block)
if err != nil {
return nil, err
}
// Use base64 of encKey as the key into the cache because hashicorp/golang-lru
// cannot hash []uint8.
if t.cacheEnabled {

View File

@ -28,9 +28,9 @@ import (
"google.golang.org/grpc/credentials/insecure"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apiserver/pkg/storage/value/encrypt/envelope/util"
"k8s.io/klog/v2"
kmsapi "k8s.io/kms/apis/v1beta1"
"k8s.io/kms/pkg/util"
)
const (
@ -53,7 +53,7 @@ type gRPCService struct {
// NewGRPCService returns an envelope.Service which use gRPC to communicate the remote KMS provider.
func NewGRPCService(ctx context.Context, endpoint string, callTimeout time.Duration) (Service, error) {
klog.V(4).Infof("Configure KMS provider with endpoint: %s", endpoint)
klog.V(4).InfoS("Configure KMS provider", "endpoint", endpoint)
addr, err := util.ParseEndpoint(endpoint)
if err != nil {
@ -72,9 +72,9 @@ func NewGRPCService(ctx context.Context, endpoint string, callTimeout time.Durat
// addr - comes from the closure
c, err := net.DialUnix(unixProtocol, nil, &net.UnixAddr{Name: addr})
if err != nil {
klog.Errorf("failed to create connection to unix socket: %s, error: %v", addr, err)
klog.ErrorS(err, "failed to create connection to unix socket", "addr", addr)
} else {
klog.V(4).Infof("Successfully dialed Unix socket %v", addr)
klog.V(4).InfoS("Successfully dialed Unix socket", "addr", addr)
}
return c, err
}))
@ -113,7 +113,7 @@ func (g *gRPCService) checkAPIVersion(ctx context.Context) error {
}
g.versionChecked = true
klog.V(4).Infof("Version of KMS provider is %s", response.Version)
klog.V(4).InfoS("KMS provider api version verified", "version", response.Version)
return nil
}

View File

@ -0,0 +1,108 @@
/*
Copyright 2023 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
// Package kmsv2 transforms values for storage at rest using a Envelope v2 provider
package kmsv2
import (
"context"
"crypto/sha256"
"hash"
"sync"
"time"
"unsafe"
utilcache "k8s.io/apimachinery/pkg/util/cache"
"k8s.io/apiserver/pkg/storage/value"
"k8s.io/utils/clock"
)
// prevent decryptTransformer from drifting from value.Transformer
var _ decryptTransformer = value.Transformer(nil)
// decryptTransformer is the decryption subset of value.Transformer.
// this exists purely to statically enforce that transformers placed in the cache are not used for encryption.
// this is relevant in the context of nonce collision since transformers that are created
// from encrypted DEKs retrieved from etcd cannot maintain their nonce counter state.
type decryptTransformer interface {
TransformFromStorage(ctx context.Context, data []byte, dataCtx value.Context) (out []byte, stale bool, err error)
}
type simpleCache struct {
cache *utilcache.Expiring
ttl time.Duration
// hashPool is a per cache pool of hash.Hash (to avoid allocations from building the Hash)
// SHA-256 is used to prevent collisions
hashPool *sync.Pool
}
func newSimpleCache(clock clock.Clock, ttl time.Duration) *simpleCache {
return &simpleCache{
cache: utilcache.NewExpiringWithClock(clock),
ttl: ttl,
hashPool: &sync.Pool{
New: func() interface{} {
return sha256.New()
},
},
}
}
// given a key, return the transformer, or nil if it does not exist in the cache
func (c *simpleCache) get(key []byte) decryptTransformer {
record, ok := c.cache.Get(c.keyFunc(key))
if !ok {
return nil
}
return record.(decryptTransformer)
}
// set caches the record for the key
func (c *simpleCache) set(key []byte, transformer decryptTransformer) {
if len(key) == 0 {
panic("key must not be empty")
}
if transformer == nil {
panic("transformer must not be nil")
}
c.cache.Set(c.keyFunc(key), transformer, c.ttl)
}
// keyFunc generates a string key by hashing the inputs.
// This lowers the memory requirement of the cache.
func (c *simpleCache) keyFunc(s []byte) string {
h := c.hashPool.Get().(hash.Hash)
h.Reset()
if _, err := h.Write(s); err != nil {
panic(err) // Write() on hash never fails
}
key := toString(h.Sum(nil)) // skip base64 encoding to save an allocation
c.hashPool.Put(h)
return key
}
// toString performs unholy acts to avoid allocations
func toString(b []byte) string {
// unsafe.SliceData relies on cap whereas we want to rely on len
if len(b) == 0 {
return ""
}
// Copied from go 1.20.1 strings.Builder.String
// https://github.com/golang/go/blob/202a1a57064127c3f19d96df57b9f9586145e21c/src/strings/builder.go#L48
return unsafe.String(unsafe.SliceData(b), len(b))
}

View File

@ -20,120 +20,148 @@ package kmsv2
import (
"context"
"crypto/aes"
"crypto/cipher"
"crypto/rand"
"encoding/base64"
"fmt"
"sort"
"time"
"unsafe"
"github.com/gogo/protobuf/proto"
"golang.org/x/crypto/cryptobyte"
utilerrors "k8s.io/apimachinery/pkg/util/errors"
"k8s.io/apimachinery/pkg/util/uuid"
"k8s.io/apimachinery/pkg/util/validation"
"k8s.io/apimachinery/pkg/util/validation/field"
genericapirequest "k8s.io/apiserver/pkg/endpoints/request"
"k8s.io/apiserver/pkg/storage/value"
kmstypes "k8s.io/apiserver/pkg/storage/value/encrypt/envelope/kmsv2/v2alpha1"
aestransformer "k8s.io/apiserver/pkg/storage/value/encrypt/aes"
kmstypes "k8s.io/apiserver/pkg/storage/value/encrypt/envelope/kmsv2/v2"
"k8s.io/apiserver/pkg/storage/value/encrypt/envelope/metrics"
"k8s.io/klog/v2"
"k8s.io/utils/lru"
kmsservice "k8s.io/kms/pkg/service"
"k8s.io/utils/clock"
)
func init() {
value.RegisterMetrics()
metrics.RegisterMetrics()
}
const (
// KMSAPIVersion is the version of the KMS API.
KMSAPIVersion = "v2alpha1"
KMSAPIVersion = "v2beta1"
// annotationsMaxSize is the maximum size of the annotations.
annotationsMaxSize = 32 * 1024 // 32 kB
// keyIDMaxSize is the maximum size of the keyID.
keyIDMaxSize = 1 * 1024 // 1 kB
// KeyIDMaxSize is the maximum size of the keyID.
KeyIDMaxSize = 1 * 1024 // 1 kB
// encryptedDEKMaxSize is the maximum size of the encrypted DEK.
encryptedDEKMaxSize = 1 * 1024 // 1 kB
// cacheTTL is the default time-to-live for the cache entry.
// this allows the cache to grow to an infinite size for up to a day.
// this is meant as a temporary solution until the cache is re-written to not have a TTL.
// there is unlikely to be any meaningful memory impact on the server
// because the cache will likely never have more than a few thousand entries
// and each entry is roughly ~200 bytes in size. with DEK reuse
// and no storage migration, the number of entries in this cache
// would be approximated by unique key IDs used by the KMS plugin
// combined with the number of server restarts. If storage migration
// is performed after key ID changes, and the number of restarts
// is limited, this cache size may be as small as the number of API
// servers in use (once old entries expire out from the TTL).
cacheTTL = 24 * time.Hour
// error code
errKeyIDOKCode ErrCodeKeyID = "ok"
errKeyIDEmptyCode ErrCodeKeyID = "empty"
errKeyIDTooLongCode ErrCodeKeyID = "too_long"
)
// Service allows encrypting and decrypting data using an external Key Management Service.
type Service interface {
// Decrypt a given bytearray to obtain the original data as bytes.
Decrypt(ctx context.Context, uid string, req *DecryptRequest) ([]byte, error)
// Encrypt bytes to a ciphertext.
Encrypt(ctx context.Context, uid string, data []byte) (*EncryptResponse, error)
// Status returns the status of the KMS.
Status(ctx context.Context) (*StatusResponse, error)
// NowFunc is exported so tests can override it.
var NowFunc = time.Now
type StateFunc func() (State, error)
type ErrCodeKeyID string
type State struct {
Transformer value.Transformer
EncryptedDEK []byte
KeyID string
Annotations map[string][]byte
UID string
ExpirationTimestamp time.Time
// CacheKey is the key used to cache the DEK in transformer.cache.
CacheKey []byte
}
func (s *State) ValidateEncryptCapability() error {
if now := NowFunc(); now.After(s.ExpirationTimestamp) {
return fmt.Errorf("EDEK with keyID %q expired at %s (current time is %s)",
s.KeyID, s.ExpirationTimestamp.Format(time.RFC3339), now.Format(time.RFC3339))
}
return nil
}
type envelopeTransformer struct {
envelopeService Service
envelopeService kmsservice.Service
providerName string
stateFunc StateFunc
// transformers is a thread-safe LRU cache which caches decrypted DEKs indexed by their encrypted form.
transformers *lru.Cache
// baseTransformerFunc creates a new transformer for encrypting the data with the DEK.
baseTransformerFunc func(cipher.Block) value.Transformer
cacheSize int
cacheEnabled bool
}
// EncryptResponse is the response from the Envelope service when encrypting data.
type EncryptResponse struct {
Ciphertext []byte
KeyID string
Annotations map[string][]byte
}
// DecryptRequest is the request to the Envelope service when decrypting data.
type DecryptRequest struct {
Ciphertext []byte
KeyID string
Annotations map[string][]byte
}
// StatusResponse is the response from the Envelope service when getting the status of the service.
type StatusResponse struct {
Version string
Healthz string
KeyID string
// cache is a thread-safe expiring lru cache which caches decrypted DEKs indexed by their encrypted form.
cache *simpleCache
}
// NewEnvelopeTransformer returns a transformer which implements a KEK-DEK based envelope encryption scheme.
// It uses envelopeService to encrypt and decrypt DEKs. Respective DEKs (in encrypted form) are prepended to
// the data items they encrypt. A cache (of size cacheSize) is maintained to store the most recently
// used decrypted DEKs in memory.
func NewEnvelopeTransformer(envelopeService Service, cacheSize int, baseTransformerFunc func(cipher.Block) value.Transformer) value.Transformer {
var cache *lru.Cache
if cacheSize > 0 {
// TODO(aramase): Switch to using expiring cache: kubernetes/kubernetes/staging/src/k8s.io/apimachinery/pkg/util/cache/expiring.go.
// It handles scans a lot better, doesn't have to be right sized, and don't have a global lock on reads.
cache = lru.New(cacheSize)
}
// the data items they encrypt.
func NewEnvelopeTransformer(envelopeService kmsservice.Service, providerName string, stateFunc StateFunc) value.Transformer {
return newEnvelopeTransformerWithClock(envelopeService, providerName, stateFunc, cacheTTL, clock.RealClock{})
}
func newEnvelopeTransformerWithClock(envelopeService kmsservice.Service, providerName string, stateFunc StateFunc, cacheTTL time.Duration, clock clock.Clock) value.Transformer {
return &envelopeTransformer{
envelopeService: envelopeService,
transformers: cache,
baseTransformerFunc: baseTransformerFunc,
cacheEnabled: cacheSize > 0,
cacheSize: cacheSize,
envelopeService: envelopeService,
providerName: providerName,
stateFunc: stateFunc,
cache: newSimpleCache(clock, cacheTTL),
}
}
// TransformFromStorage decrypts data encrypted by this transformer using envelope encryption.
func (t *envelopeTransformer) TransformFromStorage(ctx context.Context, data []byte, dataCtx value.Context) ([]byte, bool, error) {
metrics.RecordArrival(metrics.FromStorageLabel, time.Now())
// Deserialize the EncryptedObject from the data.
encryptedObject, err := t.doDecode(data)
if err != nil {
return nil, false, err
}
// Look up the decrypted DEK from cache or Envelope.
transformer := t.getTransformer(encryptedObject.EncryptedDEK)
// TODO: consider marking state.EncryptedDEK != encryptedObject.EncryptedDEK as a stale read to support DEK defragmentation
// at a minimum we should have a metric that helps the user understand if DEK fragmentation is high
state, err := t.stateFunc() // no need to call state.ValidateEncryptCapability on reads
if err != nil {
return nil, false, err
}
encryptedObjectCacheKey, err := generateCacheKey(encryptedObject.EncryptedDEK, encryptedObject.KeyID, encryptedObject.Annotations)
if err != nil {
return nil, false, err
}
// Look up the decrypted DEK from cache first
transformer := t.cache.get(encryptedObjectCacheKey)
// fallback to the envelope service if we do not have the transformer locally
if transformer == nil {
if t.cacheEnabled {
value.RecordCacheMiss()
}
value.RecordCacheMiss()
requestInfo := getRequestInfoFromContext(ctx)
uid := string(uuid.NewUUID())
klog.V(6).InfoS("Decrypting content using envelope service", "uid", uid, "key", string(dataCtx.AuthenticatedData()))
key, err := t.envelopeService.Decrypt(ctx, uid, &DecryptRequest{
klog.V(6).InfoS("decrypting content using envelope service", "uid", uid, "key", string(dataCtx.AuthenticatedData()),
"group", requestInfo.APIGroup, "version", requestInfo.APIVersion, "resource", requestInfo.Resource, "subresource", requestInfo.Subresource,
"verb", requestInfo.Verb, "namespace", requestInfo.Namespace, "name", requestInfo.Name)
key, err := t.envelopeService.Decrypt(ctx, uid, &kmsservice.DecryptRequest{
Ciphertext: encryptedObject.EncryptedDEK,
KeyID: encryptedObject.KeyID,
Annotations: encryptedObject.Annotations,
@ -142,80 +170,79 @@ func (t *envelopeTransformer) TransformFromStorage(ctx context.Context, data []b
return nil, false, fmt.Errorf("failed to decrypt DEK, error: %w", err)
}
transformer, err = t.addTransformer(encryptedObject.EncryptedDEK, key)
transformer, err = t.addTransformerForDecryption(encryptedObjectCacheKey, key)
if err != nil {
return nil, false, err
}
}
metrics.RecordKeyID(metrics.FromStorageLabel, t.providerName, encryptedObject.KeyID)
out, stale, err := transformer.TransformFromStorage(ctx, encryptedObject.EncryptedData, dataCtx)
if err != nil {
return nil, false, err
}
// data is considered stale if the key ID does not match our current write transformer
return out, stale || encryptedObject.KeyID != state.KeyID, nil
return transformer.TransformFromStorage(ctx, encryptedObject.EncryptedData, dataCtx)
}
// TransformToStorage encrypts data to be written to disk using envelope encryption.
func (t *envelopeTransformer) TransformToStorage(ctx context.Context, data []byte, dataCtx value.Context) ([]byte, error) {
metrics.RecordArrival(metrics.ToStorageLabel, time.Now())
newKey, err := generateKey(32)
state, err := t.stateFunc()
if err != nil {
return nil, err
}
if err := state.ValidateEncryptCapability(); err != nil {
return nil, err
}
// this prevents a cache miss every time the DEK rotates
// this has the side benefit of causing the cache to perform a GC
// TODO see if we can do this inside the stateFunc control loop
// TODO(aramase): Add metrics for cache fill percentage with custom cache implementation.
t.cache.set(state.CacheKey, state.Transformer)
requestInfo := getRequestInfoFromContext(ctx)
klog.V(6).InfoS("encrypting content using DEK", "uid", state.UID, "key", string(dataCtx.AuthenticatedData()),
"group", requestInfo.APIGroup, "version", requestInfo.APIVersion, "resource", requestInfo.Resource, "subresource", requestInfo.Subresource,
"verb", requestInfo.Verb, "namespace", requestInfo.Namespace, "name", requestInfo.Name)
result, err := state.Transformer.TransformToStorage(ctx, data, dataCtx)
if err != nil {
return nil, err
}
uid := string(uuid.NewUUID())
klog.V(6).InfoS("Encrypting content using envelope service", "uid", uid, "key", string(dataCtx.AuthenticatedData()))
resp, err := t.envelopeService.Encrypt(ctx, uid, newKey)
if err != nil {
return nil, fmt.Errorf("failed to encrypt DEK, error: %w", err)
}
transformer, err := t.addTransformer(resp.Ciphertext, newKey)
if err != nil {
return nil, err
}
result, err := transformer.TransformToStorage(ctx, data, dataCtx)
if err != nil {
return nil, err
}
metrics.RecordKeyID(metrics.ToStorageLabel, t.providerName, state.KeyID)
encObject := &kmstypes.EncryptedObject{
KeyID: resp.KeyID,
EncryptedDEK: resp.Ciphertext,
KeyID: state.KeyID,
EncryptedDEK: state.EncryptedDEK,
EncryptedData: result,
Annotations: resp.Annotations,
Annotations: state.Annotations,
}
// Serialize the EncryptedObject to a byte array.
return t.doEncode(encObject)
}
// addTransformer inserts a new transformer to the Envelope cache of DEKs for future reads.
func (t *envelopeTransformer) addTransformer(encKey []byte, key []byte) (value.Transformer, error) {
// addTransformerForDecryption inserts a new transformer to the Envelope cache of DEKs for future reads.
func (t *envelopeTransformer) addTransformerForDecryption(cacheKey []byte, key []byte) (decryptTransformer, error) {
block, err := aes.NewCipher(key)
if err != nil {
return nil, err
}
transformer := t.baseTransformerFunc(block)
// Use base64 of encKey as the key into the cache because hashicorp/golang-lru
// cannot hash []uint8.
if t.cacheEnabled {
t.transformers.Add(base64.StdEncoding.EncodeToString(encKey), transformer)
metrics.RecordDekCacheFillPercent(float64(t.transformers.Len()) / float64(t.cacheSize))
// this is compatible with NewGCMTransformerWithUniqueKeyUnsafe for decryption
// it would use random nonces for encryption but we never do that
transformer, err := aestransformer.NewGCMTransformer(block)
if err != nil {
return nil, err
}
// TODO(aramase): Add metrics for cache fill percentage with custom cache implementation.
t.cache.set(cacheKey, transformer)
return transformer, nil
}
// getTransformer fetches the transformer corresponding to encKey from cache, if it exists.
func (t *envelopeTransformer) getTransformer(encKey []byte) value.Transformer {
if !t.cacheEnabled {
return nil
}
_transformer, found := t.transformers.Get(base64.StdEncoding.EncodeToString(encKey))
if found {
return _transformer.(value.Transformer)
}
return nil
}
// doEncode encodes the EncryptedObject to a byte array.
func (t *envelopeTransformer) doEncode(request *kmstypes.EncryptedObject) ([]byte, error) {
if err := validateEncryptedObject(request); err != nil {
@ -238,17 +265,34 @@ func (t *envelopeTransformer) doDecode(originalData []byte) (*kmstypes.Encrypted
return o, nil
}
// generateKey generates a random key using system randomness.
func generateKey(length int) (key []byte, err error) {
defer func(start time.Time) {
value.RecordDataKeyGeneration(start, err)
}(time.Now())
key = make([]byte, length)
if _, err = rand.Read(key); err != nil {
return nil, err
func GenerateTransformer(ctx context.Context, uid string, envelopeService kmsservice.Service) (value.Transformer, *kmsservice.EncryptResponse, []byte, error) {
transformer, newKey, err := aestransformer.NewGCMTransformerWithUniqueKeyUnsafe()
if err != nil {
return nil, nil, nil, err
}
return key, nil
klog.V(6).InfoS("encrypting content using envelope service", "uid", uid)
resp, err := envelopeService.Encrypt(ctx, uid, newKey)
if err != nil {
return nil, nil, nil, fmt.Errorf("failed to encrypt DEK, error: %w", err)
}
if err := validateEncryptedObject(&kmstypes.EncryptedObject{
KeyID: resp.KeyID,
EncryptedDEK: resp.Ciphertext,
EncryptedData: []byte{0}, // any non-empty value to pass validation
Annotations: resp.Annotations,
}); err != nil {
return nil, nil, nil, err
}
cacheKey, err := generateCacheKey(resp.Ciphertext, resp.KeyID, resp.Annotations)
if err != nil {
return nil, nil, nil, err
}
return transformer, resp, cacheKey, nil
}
func validateEncryptedObject(o *kmstypes.EncryptedObject) error {
@ -261,7 +305,7 @@ func validateEncryptedObject(o *kmstypes.EncryptedObject) error {
if err := validateEncryptedDEK(o.EncryptedDEK); err != nil {
return fmt.Errorf("failed to validate encrypted DEK: %w", err)
}
if err := validateKeyID(o.KeyID); err != nil {
if _, err := ValidateKeyID(o.KeyID); err != nil {
return fmt.Errorf("failed to validate key id: %w", err)
}
if err := validateAnnotations(o.Annotations); err != nil {
@ -301,15 +345,78 @@ func validateAnnotations(annotations map[string][]byte) error {
return utilerrors.NewAggregate(errs)
}
// validateKeyID tests the following:
// ValidateKeyID tests the following:
// 1. The keyID is not empty.
// 2. The size of keyID is less than 1 kB.
func validateKeyID(keyID string) error {
func ValidateKeyID(keyID string) (ErrCodeKeyID, error) {
if len(keyID) == 0 {
return fmt.Errorf("keyID is empty")
return errKeyIDEmptyCode, fmt.Errorf("keyID is empty")
}
if len(keyID) > keyIDMaxSize {
return fmt.Errorf("keyID is %d bytes, which exceeds the max size of %d", len(keyID), keyIDMaxSize)
if len(keyID) > KeyIDMaxSize {
return errKeyIDTooLongCode, fmt.Errorf("keyID is %d bytes, which exceeds the max size of %d", len(keyID), KeyIDMaxSize)
}
return nil
return errKeyIDOKCode, nil
}
func getRequestInfoFromContext(ctx context.Context) *genericapirequest.RequestInfo {
if reqInfo, found := genericapirequest.RequestInfoFrom(ctx); found {
return reqInfo
}
return &genericapirequest.RequestInfo{}
}
// generateCacheKey returns a key for the cache.
// The key is a concatenation of:
// 1. encryptedDEK
// 2. keyID
// 3. length of annotations
// 4. annotations (sorted by key) - each annotation is a concatenation of:
// a. annotation key
// b. annotation value
func generateCacheKey(encryptedDEK []byte, keyID string, annotations map[string][]byte) ([]byte, error) {
// TODO(aramase): use sync pool buffer to avoid allocations
b := cryptobyte.NewBuilder(nil)
b.AddUint16LengthPrefixed(func(b *cryptobyte.Builder) {
b.AddBytes(encryptedDEK)
})
b.AddUint16LengthPrefixed(func(b *cryptobyte.Builder) {
b.AddBytes(toBytes(keyID))
})
if len(annotations) == 0 {
return b.Bytes()
}
// add the length of annotations to the cache key
b.AddUint32(uint32(len(annotations)))
// Sort the annotations by key.
keys := make([]string, 0, len(annotations))
for k := range annotations {
k := k
keys = append(keys, k)
}
sort.Strings(keys)
for _, k := range keys {
// The maximum size of annotations is annotationsMaxSize (32 kB) so we can safely
// assume that the length of the key and value will fit in a uint16.
b.AddUint16LengthPrefixed(func(b *cryptobyte.Builder) {
b.AddBytes(toBytes(k))
})
b.AddUint16LengthPrefixed(func(b *cryptobyte.Builder) {
b.AddBytes(annotations[k])
})
}
return b.Bytes()
}
// toBytes performs unholy acts to avoid allocations
func toBytes(s string) []byte {
// unsafe.StringData is unspecified for the empty string, so we provide a strict interpretation
if len(s) == 0 {
return nil
}
// Copied from go 1.20.1 os.File.WriteString
// https://github.com/golang/go/blob/202a1a57064127c3f19d96df57b9f9586145e21c/src/os/file.go#L246
return unsafe.Slice(unsafe.StringData(s), len(s))
}

View File

@ -27,9 +27,11 @@ import (
"google.golang.org/grpc/credentials/insecure"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apiserver/pkg/storage/value/encrypt/envelope/util"
"k8s.io/apiserver/pkg/storage/value/encrypt/envelope/metrics"
"k8s.io/klog/v2"
kmsapi "k8s.io/kms/apis/v2alpha1"
kmsapi "k8s.io/kms/apis/v2"
kmsservice "k8s.io/kms/pkg/service"
"k8s.io/kms/pkg/util"
)
const (
@ -45,8 +47,8 @@ type gRPCService struct {
}
// NewGRPCService returns an envelope.Service which use gRPC to communicate the remote KMS provider.
func NewGRPCService(ctx context.Context, endpoint string, callTimeout time.Duration) (Service, error) {
klog.V(4).Infof("Configure KMS provider with endpoint: %s", endpoint)
func NewGRPCService(ctx context.Context, endpoint, providerName string, callTimeout time.Duration) (kmsservice.Service, error) {
klog.V(4).InfoS("Configure KMS provider", "endpoint", endpoint)
addr, err := util.ParseEndpoint(endpoint)
if err != nil {
@ -64,12 +66,14 @@ func NewGRPCService(ctx context.Context, endpoint string, callTimeout time.Durat
// addr - comes from the closure
c, err := net.DialUnix(unixProtocol, nil, &net.UnixAddr{Name: addr})
if err != nil {
klog.Errorf("failed to create connection to unix socket: %s, error: %v", addr, err)
klog.ErrorS(err, "failed to create connection to unix socket", "addr", addr)
} else {
klog.V(4).Infof("Successfully dialed Unix socket %v", addr)
klog.V(4).InfoS("Successfully dialed Unix socket", "addr", addr)
}
return c, err
}))
}),
grpc.WithChainUnaryInterceptor(recordMetricsInterceptor(providerName)),
)
if err != nil {
return nil, fmt.Errorf("failed to create connection to %s, error: %v", endpoint, err)
@ -88,7 +92,7 @@ func NewGRPCService(ctx context.Context, endpoint string, callTimeout time.Durat
}
// Decrypt a given data string to obtain the original byte data.
func (g *gRPCService) Decrypt(ctx context.Context, uid string, req *DecryptRequest) ([]byte, error) {
func (g *gRPCService) Decrypt(ctx context.Context, uid string, req *kmsservice.DecryptRequest) ([]byte, error) {
ctx, cancel := context.WithTimeout(ctx, g.callTimeout)
defer cancel()
@ -106,7 +110,7 @@ func (g *gRPCService) Decrypt(ctx context.Context, uid string, req *DecryptReque
}
// Encrypt bytes to a string ciphertext.
func (g *gRPCService) Encrypt(ctx context.Context, uid string, plaintext []byte) (*EncryptResponse, error) {
func (g *gRPCService) Encrypt(ctx context.Context, uid string, plaintext []byte) (*kmsservice.EncryptResponse, error) {
ctx, cancel := context.WithTimeout(ctx, g.callTimeout)
defer cancel()
@ -118,7 +122,7 @@ func (g *gRPCService) Encrypt(ctx context.Context, uid string, plaintext []byte)
if err != nil {
return nil, err
}
return &EncryptResponse{
return &kmsservice.EncryptResponse{
Ciphertext: response.Ciphertext,
KeyID: response.KeyId,
Annotations: response.Annotations,
@ -126,7 +130,7 @@ func (g *gRPCService) Encrypt(ctx context.Context, uid string, plaintext []byte)
}
// Status returns the status of the KMSv2 provider.
func (g *gRPCService) Status(ctx context.Context) (*StatusResponse, error) {
func (g *gRPCService) Status(ctx context.Context) (*kmsservice.StatusResponse, error) {
ctx, cancel := context.WithTimeout(ctx, g.callTimeout)
defer cancel()
@ -135,5 +139,15 @@ func (g *gRPCService) Status(ctx context.Context) (*StatusResponse, error) {
if err != nil {
return nil, err
}
return &StatusResponse{Version: response.Version, Healthz: response.Healthz, KeyID: response.KeyId}, nil
return &kmsservice.StatusResponse{Version: response.Version, Healthz: response.Healthz, KeyID: response.KeyId}, nil
}
func recordMetricsInterceptor(providerName string) grpc.UnaryClientInterceptor {
return func(ctx context.Context, method string, req, reply interface{}, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error {
start := NowFunc()
respErr := invoker(ctx, method, req, reply, cc, opts...)
elapsed := NowFunc().Sub(start)
metrics.RecordKMSOperationLatency(providerName, method, elapsed, respErr)
return respErr
}
}

View File

@ -17,7 +17,7 @@ limitations under the License.
// Code generated by protoc-gen-gogo. DO NOT EDIT.
// source: api.proto
package v2alpha1
package v2
import (
fmt "fmt"
@ -104,25 +104,28 @@ func (m *EncryptedObject) GetAnnotations() map[string][]byte {
}
func init() {
proto.RegisterType((*EncryptedObject)(nil), "v2alpha1.EncryptedObject")
proto.RegisterMapType((map[string][]byte)(nil), "v2alpha1.EncryptedObject.AnnotationsEntry")
proto.RegisterType((*EncryptedObject)(nil), "v2.EncryptedObject")
proto.RegisterMapType((map[string][]byte)(nil), "v2.EncryptedObject.AnnotationsEntry")
}
func init() { proto.RegisterFile("api.proto", fileDescriptor_00212fb1f9d3bf1c) }
var fileDescriptor_00212fb1f9d3bf1c = []byte{
// 200 bytes of a gzipped FileDescriptorProto
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xe2, 0xe2, 0x4c, 0x2c, 0xc8, 0xd4,
0x2b, 0x28, 0xca, 0x2f, 0xc9, 0x17, 0xe2, 0x28, 0x33, 0x4a, 0xcc, 0x29, 0xc8, 0x48, 0x34, 0x54,
0xfa, 0xcf, 0xc8, 0xc5, 0xef, 0x9a, 0x97, 0x5c, 0x54, 0x59, 0x50, 0x92, 0x9a, 0xe2, 0x9f, 0x94,
0x95, 0x9a, 0x5c, 0x22, 0xa4, 0xc2, 0xc5, 0x9b, 0x0a, 0x13, 0x72, 0x49, 0x2c, 0x49, 0x94, 0x60,
0x54, 0x60, 0xd4, 0xe0, 0x09, 0x42, 0x15, 0x14, 0x12, 0xe1, 0x62, 0xcd, 0x4e, 0xad, 0xf4, 0x74,
0x91, 0x60, 0x52, 0x60, 0xd4, 0xe0, 0x0c, 0x82, 0x70, 0x84, 0x94, 0xb8, 0x78, 0x10, 0xca, 0x5c,
0xbd, 0x25, 0x98, 0xc1, 0x5a, 0x51, 0xc4, 0x84, 0x7c, 0xb8, 0xb8, 0x13, 0xf3, 0xf2, 0xf2, 0x4b,
0x12, 0x4b, 0x32, 0xf3, 0xf3, 0x8a, 0x25, 0x58, 0x14, 0x98, 0x35, 0xb8, 0x8d, 0xb4, 0xf4, 0x60,
0x6e, 0xd2, 0x43, 0x73, 0x8f, 0x9e, 0x23, 0x42, 0xb1, 0x6b, 0x5e, 0x49, 0x51, 0x65, 0x10, 0xb2,
0x76, 0x29, 0x3b, 0x2e, 0x01, 0x74, 0x05, 0x42, 0x02, 0x5c, 0xcc, 0xd9, 0xa9, 0x95, 0x60, 0x77,
0x73, 0x06, 0x81, 0x98, 0x20, 0xd7, 0x96, 0x25, 0xe6, 0x94, 0xa6, 0x82, 0x5d, 0xcb, 0x13, 0x04,
0xe1, 0x58, 0x31, 0x59, 0x30, 0x26, 0xb1, 0x81, 0x83, 0xc4, 0x18, 0x10, 0x00, 0x00, 0xff, 0xff,
0x88, 0x8c, 0xbb, 0x4e, 0x1f, 0x01, 0x00, 0x00,
// 244 bytes of a gzipped FileDescriptorProto
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x5c, 0x90, 0xb1, 0x4b, 0x03, 0x31,
0x14, 0xc6, 0xc9, 0x9d, 0x0a, 0x97, 0x9e, 0x58, 0x82, 0xc3, 0xe1, 0x74, 0x94, 0x0e, 0x37, 0x25,
0x10, 0x97, 0x22, 0x52, 0x50, 0x7a, 0x82, 0x38, 0x08, 0x19, 0xdd, 0xd2, 0xfa, 0x28, 0x67, 0x6a,
0x12, 0x92, 0x18, 0xc8, 0x9f, 0xee, 0x26, 0x4d, 0x95, 0xda, 0xdb, 0xde, 0xf7, 0xf1, 0xfb, 0xe0,
0xc7, 0xc3, 0x95, 0xb4, 0x03, 0xb5, 0xce, 0x04, 0x43, 0x8a, 0xc8, 0x67, 0xdf, 0x08, 0x5f, 0xf5,
0x7a, 0xe3, 0x92, 0x0d, 0xf0, 0xfe, 0xba, 0xfe, 0x80, 0x4d, 0x20, 0x73, 0x7c, 0x09, 0x7f, 0xd5,
0x4a, 0x06, 0xd9, 0xa0, 0x16, 0x75, 0xb5, 0x38, 0x2d, 0xc9, 0x35, 0x3e, 0x57, 0x90, 0x9e, 0x57,
0x4d, 0xd1, 0xa2, 0xae, 0x12, 0x87, 0x40, 0x66, 0xb8, 0x3e, 0x62, 0xfd, 0x4b, 0x53, 0xe6, 0xe9,
0x49, 0x47, 0x9e, 0xf0, 0x44, 0x6a, 0x6d, 0x82, 0x0c, 0x83, 0xd1, 0xbe, 0x39, 0x6b, 0xcb, 0x6e,
0xc2, 0xe7, 0x34, 0x72, 0x3a, 0x32, 0xa1, 0x0f, 0x47, 0xac, 0xd7, 0xc1, 0x25, 0xf1, 0x7f, 0x78,
0xb3, 0xc4, 0xd3, 0x31, 0x40, 0xa6, 0xb8, 0x54, 0x90, 0xb2, 0x71, 0x25, 0xf6, 0xe7, 0xde, 0x33,
0xca, 0xdd, 0x17, 0x64, 0xcf, 0x5a, 0x1c, 0xc2, 0x5d, 0xb1, 0x40, 0x8f, 0xcb, 0xb7, 0x7b, 0xb5,
0xf0, 0x74, 0x30, 0x4c, 0xda, 0xc1, 0x83, 0x8b, 0xe0, 0x98, 0x55, 0x5b, 0xe6, 0x83, 0x71, 0x72,
0x0b, 0x2c, 0x93, 0xec, 0x57, 0x9d, 0x81, 0x8e, 0xb0, 0x33, 0x16, 0x98, 0xfa, 0xf4, 0x91, 0xb3,
0xc8, 0xd7, 0x17, 0xf9, 0x8d, 0xb7, 0x3f, 0x01, 0x00, 0x00, 0xff, 0xff, 0x00, 0x80, 0x43, 0x93,
0x53, 0x01, 0x00, 0x00,
}

View File

@ -14,10 +14,11 @@ See the License for the specific language governing permissions and
limitations under the License.
*/
// To regenerate api.pb.go run hack/update-generated-kms.sh
// To regenerate api.pb.go run `hack/update-codegen.sh protobindings`
syntax = "proto3";
package v2alpha1;
package v2;
option go_package = "k8s.io/apiserver/pkg/storage/value/encrypt/envelope/kmsv2/v2";
// EncryptedObject is the representation of data stored in etcd after envelope encryption.
message EncryptedObject {

View File

@ -14,5 +14,5 @@ See the License for the specific language governing permissions and
limitations under the License.
*/
// Package v2alpha1 contains definition of kms-plugin's serialized types.
package v2alpha1
// Package v2 contains definition of kms-plugin's serialized types.
package v2

View File

@ -17,11 +17,20 @@ limitations under the License.
package metrics
import (
"crypto/sha256"
"errors"
"fmt"
"hash"
"sync"
"time"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
"k8s.io/component-base/metrics"
"k8s.io/component-base/metrics/legacyregistry"
"k8s.io/klog/v2"
"k8s.io/utils/lru"
)
const (
@ -31,6 +40,12 @@ const (
ToStorageLabel = "to_storage"
)
type metricLabels struct {
transformationType string
providerName string
keyIDHash string
}
/*
* By default, all the following metrics are defined as falling under
* ALPHA stability level https://github.com/kubernetes/enhancements/blob/master/keps/sig-instrumentation/1209-metrics-stability/kubernetes-control-plane-metrics-stability.md#stability-classes)
@ -40,12 +55,18 @@ const (
* the metric stability policy.
*/
var (
lockLastFromStorage sync.Mutex
lockLastToStorage sync.Mutex
lockLastFromStorage sync.Mutex
lockLastToStorage sync.Mutex
lockRecordKeyID sync.Mutex
lockRecordKeyIDStatus sync.Mutex
lastFromStorage time.Time
lastToStorage time.Time
lastFromStorage time.Time
lastToStorage time.Time
keyIDHashTotalMetricLabels *lru.Cache
keyIDHashStatusLastTimestampSecondsMetricLabels *lru.Cache
cacheSize = 100
// This metric is only used for KMS v1 API.
dekCacheFillPercent = metrics.NewGauge(
&metrics.GaugeOpts{
Namespace: namespace,
@ -56,6 +77,7 @@ var (
},
)
// This metric is only used for KMS v1 API.
dekCacheInterArrivals = metrics.NewHistogramVec(
&metrics.HistogramOpts{
Namespace: namespace,
@ -67,17 +89,145 @@ var (
},
[]string{"transformation_type"},
)
// These metrics are made public to be used by unit tests.
KMSOperationsLatencyMetric = metrics.NewHistogramVec(
&metrics.HistogramOpts{
Namespace: namespace,
Subsystem: subsystem,
Name: "kms_operations_latency_seconds",
Help: "KMS operation duration with gRPC error code status total.",
StabilityLevel: metrics.ALPHA,
// Use custom buckets to avoid the default buckets which are too small for KMS operations.
// Start 0.1ms with the last bucket being [~52s, +Inf)
Buckets: metrics.ExponentialBuckets(0.0001, 2, 20),
},
[]string{"provider_name", "method_name", "grpc_status_code"},
)
// keyIDHashTotal is the number of times a keyID is used
// e.g. apiserver_envelope_encryption_key_id_hash_total counter
// apiserver_envelope_encryption_key_id_hash_total{key_id_hash="sha256",
// provider_name="providerName",transformation_type="from_storage"} 1
KeyIDHashTotal = metrics.NewCounterVec(
&metrics.CounterOpts{
Namespace: namespace,
Subsystem: subsystem,
Name: "key_id_hash_total",
Help: "Number of times a keyID is used split by transformation type and provider.",
StabilityLevel: metrics.ALPHA,
},
[]string{"transformation_type", "provider_name", "key_id_hash"},
)
// keyIDHashLastTimestampSeconds is the last time in seconds when a keyID was used
// e.g. apiserver_envelope_encryption_key_id_hash_last_timestamp_seconds{key_id_hash="sha256", provider_name="providerName",transformation_type="from_storage"} 1.674865558833728e+09
KeyIDHashLastTimestampSeconds = metrics.NewGaugeVec(
&metrics.GaugeOpts{
Namespace: namespace,
Subsystem: subsystem,
Name: "key_id_hash_last_timestamp_seconds",
Help: "The last time in seconds when a keyID was used.",
StabilityLevel: metrics.ALPHA,
},
[]string{"transformation_type", "provider_name", "key_id_hash"},
)
// keyIDHashStatusLastTimestampSeconds is the last time in seconds when a keyID was returned by the Status RPC call.
// e.g. apiserver_envelope_encryption_key_id_hash_status_last_timestamp_seconds{key_id_hash="sha256", provider_name="providerName"} 1.674865558833728e+09
KeyIDHashStatusLastTimestampSeconds = metrics.NewGaugeVec(
&metrics.GaugeOpts{
Namespace: namespace,
Subsystem: subsystem,
Name: "key_id_hash_status_last_timestamp_seconds",
Help: "The last time in seconds when a keyID was returned by the Status RPC call.",
StabilityLevel: metrics.ALPHA,
},
[]string{"provider_name", "key_id_hash"},
)
InvalidKeyIDFromStatusTotal = metrics.NewCounterVec(
&metrics.CounterOpts{
Namespace: namespace,
Subsystem: subsystem,
Name: "invalid_key_id_from_status_total",
Help: "Number of times an invalid keyID is returned by the Status RPC call split by error.",
StabilityLevel: metrics.ALPHA,
},
[]string{"provider_name", "error"},
)
)
var registerMetricsFunc sync.Once
var hashPool *sync.Pool
func registerLRUMetrics() {
if keyIDHashTotalMetricLabels != nil {
keyIDHashTotalMetricLabels.Clear()
}
if keyIDHashStatusLastTimestampSecondsMetricLabels != nil {
keyIDHashStatusLastTimestampSecondsMetricLabels.Clear()
}
keyIDHashTotalMetricLabels = lru.NewWithEvictionFunc(cacheSize, func(key lru.Key, _ interface{}) {
item := key.(metricLabels)
if deleted := KeyIDHashTotal.DeleteLabelValues(item.transformationType, item.providerName, item.keyIDHash); deleted {
klog.InfoS("Deleted keyIDHashTotalMetricLabels", "transformationType", item.transformationType,
"providerName", item.providerName, "keyIDHash", item.keyIDHash)
}
if deleted := KeyIDHashLastTimestampSeconds.DeleteLabelValues(item.transformationType, item.providerName, item.keyIDHash); deleted {
klog.InfoS("Deleted keyIDHashLastTimestampSecondsMetricLabels", "transformationType", item.transformationType,
"providerName", item.providerName, "keyIDHash", item.keyIDHash)
}
})
keyIDHashStatusLastTimestampSecondsMetricLabels = lru.NewWithEvictionFunc(cacheSize, func(key lru.Key, _ interface{}) {
item := key.(metricLabels)
if deleted := KeyIDHashStatusLastTimestampSeconds.DeleteLabelValues(item.providerName, item.keyIDHash); deleted {
klog.InfoS("Deleted keyIDHashStatusLastTimestampSecondsMetricLabels", "providerName", item.providerName, "keyIDHash", item.keyIDHash)
}
})
}
func RegisterMetrics() {
registerMetricsFunc.Do(func() {
registerLRUMetrics()
hashPool = &sync.Pool{
New: func() interface{} {
return sha256.New()
},
}
legacyregistry.MustRegister(dekCacheFillPercent)
legacyregistry.MustRegister(dekCacheInterArrivals)
legacyregistry.MustRegister(KeyIDHashTotal)
legacyregistry.MustRegister(KeyIDHashLastTimestampSeconds)
legacyregistry.MustRegister(KeyIDHashStatusLastTimestampSeconds)
legacyregistry.MustRegister(InvalidKeyIDFromStatusTotal)
legacyregistry.MustRegister(KMSOperationsLatencyMetric)
})
}
// RecordKeyID records total count and last time in seconds when a KeyID was used for TransformFromStorage and TransformToStorage operations
func RecordKeyID(transformationType, providerName, keyID string) {
lockRecordKeyID.Lock()
defer lockRecordKeyID.Unlock()
keyIDHash := addLabelToCache(keyIDHashTotalMetricLabels, transformationType, providerName, keyID)
KeyIDHashTotal.WithLabelValues(transformationType, providerName, keyIDHash).Inc()
KeyIDHashLastTimestampSeconds.WithLabelValues(transformationType, providerName, keyIDHash).SetToCurrentTime()
}
// RecordKeyIDFromStatus records last time in seconds when a KeyID was returned by the Status RPC call.
func RecordKeyIDFromStatus(providerName, keyID string) {
lockRecordKeyIDStatus.Lock()
defer lockRecordKeyIDStatus.Unlock()
keyIDHash := addLabelToCache(keyIDHashStatusLastTimestampSecondsMetricLabels, "", providerName, keyID)
KeyIDHashStatusLastTimestampSeconds.WithLabelValues(providerName, keyIDHash).SetToCurrentTime()
}
func RecordInvalidKeyIDFromStatus(providerName, errCode string) {
InvalidKeyIDFromStatusTotal.WithLabelValues(providerName, errCode).Inc()
}
func RecordArrival(transformationType string, start time.Time) {
switch transformationType {
case FromStorageLabel:
@ -104,3 +254,51 @@ func RecordArrival(transformationType string, start time.Time) {
func RecordDekCacheFillPercent(percent float64) {
dekCacheFillPercent.Set(percent)
}
// RecordKMSOperationLatency records the latency of KMS operation.
func RecordKMSOperationLatency(providerName, methodName string, duration time.Duration, err error) {
KMSOperationsLatencyMetric.WithLabelValues(providerName, methodName, getErrorCode(err)).Observe(duration.Seconds())
}
type gRPCError interface {
GRPCStatus() *status.Status
}
func getErrorCode(err error) string {
if err == nil {
return codes.OK.String()
}
// handle errors wrapped with fmt.Errorf and similar
var s gRPCError
if errors.As(err, &s) {
return s.GRPCStatus().Code().String()
}
// This is not gRPC error. The operation must have failed before gRPC
// method was called, otherwise we would get gRPC error.
return "unknown-non-grpc"
}
func getHash(data string) string {
h := hashPool.Get().(hash.Hash)
h.Reset()
h.Write([]byte(data))
result := fmt.Sprintf("sha256:%x", h.Sum(nil))
hashPool.Put(h)
return result
}
func addLabelToCache(c *lru.Cache, transformationType, providerName, keyID string) string {
keyIDHash := ""
// only get hash if the keyID is not empty
if len(keyID) > 0 {
keyIDHash = getHash(keyID)
}
c.Add(metricLabels{
transformationType: transformationType,
providerName: providerName,
keyIDHash: keyIDHash,
}, nil) // value is irrelevant, this is a set and not a map
return keyIDHash
}

View File

@ -1,54 +0,0 @@
/*
Copyright 2022 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package util
import (
"fmt"
"net/url"
"strings"
)
const (
// unixProtocol is the only supported protocol for remote KMS provider.
unixProtocol = "unix"
)
// Parse the endpoint to extract schema, host or path.
func ParseEndpoint(endpoint string) (string, error) {
if len(endpoint) == 0 {
return "", fmt.Errorf("remote KMS provider can't use empty string as endpoint")
}
u, err := url.Parse(endpoint)
if err != nil {
return "", fmt.Errorf("invalid endpoint %q for remote KMS provider, error: %v", endpoint, err)
}
if u.Scheme != unixProtocol {
return "", fmt.Errorf("unsupported scheme %q for remote KMS provider", u.Scheme)
}
// Linux abstract namespace socket - no physical file required
// Warning: Linux Abstract sockets have not concept of ACL (unlike traditional file based sockets).
// However, Linux Abstract sockets are subject to Linux networking namespace, so will only be accessible to
// containers within the same pod (unless host networking is used).
if strings.HasPrefix(u.Path, "/@") {
return strings.TrimPrefix(u.Path, "/"), nil
}
return u.Path, nil
}

View File

@ -51,7 +51,7 @@ var (
Buckets: metrics.ExponentialBuckets(5e-6, 2, 25),
StabilityLevel: metrics.ALPHA,
},
[]string{"transformation_type"},
[]string{"transformation_type", "transformer_prefix"},
)
transformerOperationsTotal = metrics.NewCounterVec(
@ -111,12 +111,11 @@ func RegisterMetrics() {
// RecordTransformation records latencies and count of TransformFromStorage and TransformToStorage operations.
// Note that transformation_failures_total metric is deprecated, use transformation_operations_total instead.
func RecordTransformation(transformationType, transformerPrefix string, start time.Time, err error) {
func RecordTransformation(transformationType, transformerPrefix string, elapsed time.Duration, err error) {
transformerOperationsTotal.WithLabelValues(transformationType, transformerPrefix, status.Code(err).String()).Inc()
switch {
case err == nil:
transformerLatencies.WithLabelValues(transformationType).Observe(sinceInSeconds(start))
if err == nil {
transformerLatencies.WithLabelValues(transformationType, transformerPrefix).Observe(elapsed.Seconds())
}
}

View File

@ -100,9 +100,9 @@ func (t *prefixTransformers) TransformFromStorage(ctx context.Context, data []by
continue
}
if len(transformer.Prefix) == 0 {
RecordTransformation("from_storage", "identity", start, err)
RecordTransformation("from_storage", "identity", time.Since(start), err)
} else {
RecordTransformation("from_storage", string(transformer.Prefix), start, err)
RecordTransformation("from_storage", string(transformer.Prefix), time.Since(start), err)
}
// It is valid to have overlapping prefixes when the same encryption provider
@ -146,7 +146,7 @@ func (t *prefixTransformers) TransformFromStorage(ctx context.Context, data []by
if err := errors.Reduce(errors.NewAggregate(errs)); err != nil {
return nil, false, err
}
RecordTransformation("from_storage", "unknown", start, t.err)
RecordTransformation("from_storage", "unknown", time.Since(start), t.err)
return nil, false, t.err
}
@ -155,7 +155,7 @@ func (t *prefixTransformers) TransformToStorage(ctx context.Context, data []byte
start := time.Now()
transformer := t.transformers[0]
result, err := transformer.Transformer.TransformToStorage(ctx, data, dataCtx)
RecordTransformation("to_storage", string(transformer.Prefix), start, err)
RecordTransformation("to_storage", string(transformer.Prefix), time.Since(start), err)
if err != nil {
return nil, err
}