Update to kube v1.17

Signed-off-by: Humble Chirammal <hchiramm@redhat.com>
This commit is contained in:
Humble Chirammal
2020-01-14 16:08:55 +05:30
committed by mergify[bot]
parent 327fcd1b1b
commit 3af1e26d7c
1710 changed files with 289562 additions and 168638 deletions

View File

@ -19,6 +19,8 @@ package storage
import (
"fmt"
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/validation/field"
)
@ -168,3 +170,31 @@ func NewInternalError(reason string) InternalError {
func NewInternalErrorf(format string, a ...interface{}) InternalError {
return InternalError{fmt.Sprintf(format, a...)}
}
var tooLargeResourceVersionCauseMsg = "Too large resource version"
// NewTooLargeResourceVersionError returns a timeout error with the given retrySeconds for a request for
// a minimum resource version that is larger than the largest currently available resource version for a requested resource.
func NewTooLargeResourceVersionError(minimumResourceVersion, currentRevision uint64, retrySeconds int) error {
err := errors.NewTimeoutError(fmt.Sprintf("Too large resource version: %d, current: %d", minimumResourceVersion, currentRevision), retrySeconds)
err.ErrStatus.Details.Causes = []metav1.StatusCause{{Message: tooLargeResourceVersionCauseMsg}}
return err
}
// IsTooLargeResourceVersion returns true if the error is a TooLargeResourceVersion error.
func IsTooLargeResourceVersion(err error) bool {
if !errors.IsTimeout(err) {
return false
}
switch t := err.(type) {
case errors.APIStatus:
if d := t.Status().Details; d != nil {
for _, cause := range d.Causes {
if cause.Message == tooLargeResourceVersionCauseMsg {
return true
}
}
}
}
return false
}

View File

@ -14,7 +14,7 @@ See the License for the specific language governing permissions and
limitations under the License.
*/
package etcd
package etcd3
import (
"strconv"

162
vendor/k8s.io/apiserver/pkg/storage/etcd3/compact.go generated vendored Normal file
View File

@ -0,0 +1,162 @@
/*
Copyright 2016 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package etcd3
import (
"context"
"strconv"
"sync"
"time"
"go.etcd.io/etcd/clientv3"
"k8s.io/klog"
)
const (
compactRevKey = "compact_rev_key"
)
var (
endpointsMapMu sync.Mutex
endpointsMap map[string]struct{}
)
func init() {
endpointsMap = make(map[string]struct{})
}
// StartCompactor starts a compactor in the background to compact old version of keys that's not needed.
// By default, we save the most recent 10 minutes data and compact versions > 10minutes ago.
// It should be enough for slow watchers and to tolerate burst.
// TODO: We might keep a longer history (12h) in the future once storage API can take advantage of past version of keys.
func StartCompactor(ctx context.Context, client *clientv3.Client, compactInterval time.Duration) {
endpointsMapMu.Lock()
defer endpointsMapMu.Unlock()
// In one process, we can have only one compactor for one cluster.
// Currently we rely on endpoints to differentiate clusters.
for _, ep := range client.Endpoints() {
if _, ok := endpointsMap[ep]; ok {
klog.V(4).Infof("compactor already exists for endpoints %v", client.Endpoints())
return
}
}
for _, ep := range client.Endpoints() {
endpointsMap[ep] = struct{}{}
}
if compactInterval != 0 {
go compactor(ctx, client, compactInterval)
}
}
// compactor periodically compacts historical versions of keys in etcd.
// It will compact keys with versions older than given interval.
// In other words, after compaction, it will only contain keys set during last interval.
// Any API call for the older versions of keys will return error.
// Interval is the time interval between each compaction. The first compaction happens after "interval".
func compactor(ctx context.Context, client *clientv3.Client, interval time.Duration) {
// Technical definitions:
// We have a special key in etcd defined as *compactRevKey*.
// compactRevKey's value will be set to the string of last compacted revision.
// compactRevKey's version will be used as logical time for comparison. THe version is referred as compact time.
// Initially, because the key doesn't exist, the compact time (version) is 0.
//
// Algorithm:
// - Compare to see if (local compact_time) = (remote compact_time).
// - If yes, increment both local and remote compact_time, and do a compaction.
// - If not, set local to remote compact_time.
//
// Technical details/insights:
//
// The protocol here is lease based. If one compactor CAS successfully, the others would know it when they fail in
// CAS later and would try again in 10 minutes. If an APIServer crashed, another one would "take over" the lease.
//
// For example, in the following diagram, we have a compactor C1 doing compaction in t1, t2. Another compactor C2
// at t1' (t1 < t1' < t2) would CAS fail, set its known oldRev to rev at t1', and try again in t2' (t2' > t2).
// If C1 crashed and wouldn't compact at t2, C2 would CAS successfully at t2'.
//
// oldRev(t2) curRev(t2)
// +
// oldRev curRev |
// + + |
// | | |
// | | t1' | t2'
// +---v-------------v----^---------v------^---->
// t0 t1 t2
//
// We have the guarantees:
// - in normal cases, the interval is 10 minutes.
// - in failover, the interval is >10m and <20m
//
// FAQ:
// - What if time is not accurate? We don't care as long as someone did the compaction. Atomicity is ensured using
// etcd API.
// - What happened under heavy load scenarios? Initially, each apiserver will do only one compaction
// every 10 minutes. This is very unlikely affecting or affected w.r.t. server load.
var compactTime int64
var rev int64
var err error
for {
select {
case <-time.After(interval):
case <-ctx.Done():
return
}
compactTime, rev, err = compact(ctx, client, compactTime, rev)
if err != nil {
klog.Errorf("etcd: endpoint (%v) compact failed: %v", client.Endpoints(), err)
continue
}
}
}
// compact compacts etcd store and returns current rev.
// It will return the current compact time and global revision if no error occurred.
// Note that CAS fail will not incur any error.
func compact(ctx context.Context, client *clientv3.Client, t, rev int64) (int64, int64, error) {
resp, err := client.KV.Txn(ctx).If(
clientv3.Compare(clientv3.Version(compactRevKey), "=", t),
).Then(
clientv3.OpPut(compactRevKey, strconv.FormatInt(rev, 10)), // Expect side effect: increment Version
).Else(
clientv3.OpGet(compactRevKey),
).Commit()
if err != nil {
return t, rev, err
}
curRev := resp.Header.Revision
if !resp.Succeeded {
curTime := resp.Responses[0].GetResponseRange().Kvs[0].Version
return curTime, curRev, nil
}
curTime := t + 1
if rev == 0 {
// We don't compact on bootstrap.
return curTime, curRev, nil
}
if _, err = client.Compact(ctx, rev); err != nil {
return curTime, curRev, err
}
klog.V(4).Infof("etcd: compacted rev (%d), endpoints (%v)", rev, client.Endpoints())
return curTime, curRev, nil
}

71
vendor/k8s.io/apiserver/pkg/storage/etcd3/errors.go generated vendored Normal file
View File

@ -0,0 +1,71 @@
/*
Copyright 2016 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package etcd3
import (
"k8s.io/apimachinery/pkg/api/errors"
etcdrpc "go.etcd.io/etcd/etcdserver/api/v3rpc/rpctypes"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
)
func interpretWatchError(err error) error {
switch {
case err == etcdrpc.ErrCompacted:
return errors.NewResourceExpired("The resourceVersion for the provided watch is too old.")
}
return err
}
const (
expired string = "The resourceVersion for the provided list is too old."
continueExpired string = "The provided continue parameter is too old " +
"to display a consistent list result. You can start a new list without " +
"the continue parameter."
inconsistentContinue string = "The provided continue parameter is too old " +
"to display a consistent list result. You can start a new list without " +
"the continue parameter, or use the continue token in this response to " +
"retrieve the remainder of the results. Continuing with the provided " +
"token results in an inconsistent list - objects that were created, " +
"modified, or deleted between the time the first chunk was returned " +
"and now may show up in the list."
)
func interpretListError(err error, paging bool, continueKey, keyPrefix string) error {
switch {
case err == etcdrpc.ErrCompacted:
if paging {
return handleCompactedErrorForPaging(continueKey, keyPrefix)
}
return errors.NewResourceExpired(expired)
}
return err
}
func handleCompactedErrorForPaging(continueKey, keyPrefix string) error {
// continueToken.ResoureVersion=-1 means that the apiserver can
// continue the list at the latest resource version. We don't use rv=0
// for this purpose to distinguish from a bad token that has empty rv.
newToken, err := encodeContinue(continueKey, keyPrefix, -1)
if err != nil {
utilruntime.HandleError(err)
return errors.NewResourceExpired(continueExpired)
}
statusError := errors.NewResourceExpired(inconsistentContinue)
statusError.ErrStatus.ListMeta.Continue = newToken
return statusError
}

63
vendor/k8s.io/apiserver/pkg/storage/etcd3/event.go generated vendored Normal file
View File

@ -0,0 +1,63 @@
/*
Copyright 2016 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package etcd3
import (
"fmt"
"go.etcd.io/etcd/clientv3"
"go.etcd.io/etcd/mvcc/mvccpb"
)
type event struct {
key string
value []byte
prevValue []byte
rev int64
isDeleted bool
isCreated bool
}
// parseKV converts a KeyValue retrieved from an initial sync() listing to a synthetic isCreated event.
func parseKV(kv *mvccpb.KeyValue) *event {
return &event{
key: string(kv.Key),
value: kv.Value,
prevValue: nil,
rev: kv.ModRevision,
isDeleted: false,
isCreated: true,
}
}
func parseEvent(e *clientv3.Event) (*event, error) {
if !e.IsCreate() && e.PrevKv == nil {
// If the previous value is nil, error. One example of how this is possible is if the previous value has been compacted already.
return nil, fmt.Errorf("etcd event received with PrevKv=nil (key=%q, modRevision=%d, type=%s)", string(e.Kv.Key), e.Kv.ModRevision, e.Type.String())
}
ret := &event{
key: string(e.Kv.Key),
value: e.Kv.Value,
rev: e.Kv.ModRevision,
isDeleted: e.Type == clientv3.EventTypeDelete,
isCreated: e.IsCreate(),
}
if e.PrevKv != nil {
ret.prevValue = e.PrevKv.Value
}
return ret, nil
}

View File

@ -14,4 +14,27 @@ See the License for the specific language governing permissions and
limitations under the License.
*/
package etcd // import "k8s.io/apiserver/pkg/storage/etcd"
package etcd3
import (
"encoding/json"
"fmt"
)
// etcdHealth encodes data returned from etcd /healthz handler.
type etcdHealth struct {
// Note this has to be public so the json library can modify it.
Health string `json:"health"`
}
// EtcdHealthCheck decodes data returned from etcd /healthz handler.
func EtcdHealthCheck(data []byte) error {
obj := etcdHealth{}
if err := json.Unmarshal(data, &obj); err != nil {
return err
}
if obj.Health != "true" {
return fmt.Errorf("Unhealthy status: %s", obj.Health)
}
return nil
}

View File

@ -0,0 +1,102 @@
/*
Copyright 2018 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package etcd3
import (
"context"
"sync"
"time"
"go.etcd.io/etcd/clientv3"
)
// leaseManager is used to manage leases requested from etcd. If a new write
// needs a lease that has similar expiration time to the previous one, the old
// lease will be reused to reduce the overhead of etcd, since lease operations
// are expensive. In the implementation, we only store one previous lease,
// since all the events have the same ttl.
type leaseManager struct {
client *clientv3.Client // etcd client used to grant leases
leaseMu sync.Mutex
prevLeaseID clientv3.LeaseID
prevLeaseExpirationTime time.Time
// The period of time in seconds and percent of TTL that each lease is
// reused. The minimum of them is used to avoid unreasonably large
// numbers. We use var instead of const for testing purposes.
leaseReuseDurationSeconds int64
leaseReuseDurationPercent float64
}
// newDefaultLeaseManager creates a new lease manager using default setting.
func newDefaultLeaseManager(client *clientv3.Client) *leaseManager {
return newLeaseManager(client, 60, 0.05)
}
// newLeaseManager creates a new lease manager with the number of buffered
// leases, lease reuse duration in seconds and percentage. The percentage
// value x means x*100%.
func newLeaseManager(client *clientv3.Client, leaseReuseDurationSeconds int64, leaseReuseDurationPercent float64) *leaseManager {
return &leaseManager{
client: client,
leaseReuseDurationSeconds: leaseReuseDurationSeconds,
leaseReuseDurationPercent: leaseReuseDurationPercent,
}
}
// setLeaseReuseDurationSeconds is used for testing purpose. It is used to
// reduce the extra lease duration to avoid unnecessary timeout in testing.
func (l *leaseManager) setLeaseReuseDurationSeconds(duration int64) {
l.leaseMu.Lock()
defer l.leaseMu.Unlock()
l.leaseReuseDurationSeconds = duration
}
// GetLease returns a lease based on requested ttl: if the cached previous
// lease can be reused, reuse it; otherwise request a new one from etcd.
func (l *leaseManager) GetLease(ctx context.Context, ttl int64) (clientv3.LeaseID, error) {
now := time.Now()
l.leaseMu.Lock()
defer l.leaseMu.Unlock()
// check if previous lease can be reused
reuseDurationSeconds := l.getReuseDurationSecondsLocked(ttl)
valid := now.Add(time.Duration(ttl) * time.Second).Before(l.prevLeaseExpirationTime)
sufficient := now.Add(time.Duration(ttl+reuseDurationSeconds) * time.Second).After(l.prevLeaseExpirationTime)
if valid && sufficient {
return l.prevLeaseID, nil
}
// request a lease with a little extra ttl from etcd
ttl += reuseDurationSeconds
lcr, err := l.client.Lease.Grant(ctx, ttl)
if err != nil {
return clientv3.LeaseID(0), err
}
// cache the new lease id
l.prevLeaseID = lcr.ID
l.prevLeaseExpirationTime = now.Add(time.Duration(ttl) * time.Second)
return lcr.ID, nil
}
// getReuseDurationSecondsLocked returns the reusable duration in seconds
// based on the configuration. Lock has to be acquired before calling this
// function.
func (l *leaseManager) getReuseDurationSecondsLocked(ttl int64) int64 {
reuseDurationSeconds := int64(l.leaseReuseDurationPercent * float64(ttl))
if reuseDurationSeconds > l.leaseReuseDurationSeconds {
reuseDurationSeconds = l.leaseReuseDurationSeconds
}
return reuseDurationSeconds
}

84
vendor/k8s.io/apiserver/pkg/storage/etcd3/logger.go generated vendored Normal file
View File

@ -0,0 +1,84 @@
/*
Copyright 2018 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package etcd3
import (
"fmt"
"go.etcd.io/etcd/clientv3"
"k8s.io/klog"
)
func init() {
clientv3.SetLogger(klogWrapper{})
}
type klogWrapper struct{}
const klogWrapperDepth = 4
func (klogWrapper) Info(args ...interface{}) {
klog.InfoDepth(klogWrapperDepth, args...)
}
func (klogWrapper) Infoln(args ...interface{}) {
klog.InfoDepth(klogWrapperDepth, fmt.Sprintln(args...))
}
func (klogWrapper) Infof(format string, args ...interface{}) {
klog.InfoDepth(klogWrapperDepth, fmt.Sprintf(format, args...))
}
func (klogWrapper) Warning(args ...interface{}) {
klog.WarningDepth(klogWrapperDepth, args...)
}
func (klogWrapper) Warningln(args ...interface{}) {
klog.WarningDepth(klogWrapperDepth, fmt.Sprintln(args...))
}
func (klogWrapper) Warningf(format string, args ...interface{}) {
klog.WarningDepth(klogWrapperDepth, fmt.Sprintf(format, args...))
}
func (klogWrapper) Error(args ...interface{}) {
klog.ErrorDepth(klogWrapperDepth, args...)
}
func (klogWrapper) Errorln(args ...interface{}) {
klog.ErrorDepth(klogWrapperDepth, fmt.Sprintln(args...))
}
func (klogWrapper) Errorf(format string, args ...interface{}) {
klog.ErrorDepth(klogWrapperDepth, fmt.Sprintf(format, args...))
}
func (klogWrapper) Fatal(args ...interface{}) {
klog.FatalDepth(klogWrapperDepth, args...)
}
func (klogWrapper) Fatalln(args ...interface{}) {
klog.FatalDepth(klogWrapperDepth, fmt.Sprintln(args...))
}
func (klogWrapper) Fatalf(format string, args ...interface{}) {
klog.FatalDepth(klogWrapperDepth, fmt.Sprintf(format, args...))
}
func (klogWrapper) V(l int) bool {
return bool(klog.V(klog.Level(l)))
}

View File

@ -0,0 +1,104 @@
/*
Copyright 2015 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package metrics
import (
"sync"
"time"
compbasemetrics "k8s.io/component-base/metrics"
"k8s.io/component-base/metrics/legacyregistry"
)
/*
* By default, all the following metrics are defined as falling under
* ALPHA stability level https://github.com/kubernetes/enhancements/blob/master/keps/sig-instrumentation/20190404-kubernetes-control-plane-metrics-stability.md#stability-classes)
*
* Promoting the stability level of the metric is a responsibility of the component owner, since it
* involves explicitly acknowledging support for the metric across multiple releases, in accordance with
* the metric stability policy.
*/
var (
etcdRequestLatency = compbasemetrics.NewHistogramVec(
&compbasemetrics.HistogramOpts{
Name: "etcd_request_duration_seconds",
Help: "Etcd request latency in seconds for each operation and object type.",
StabilityLevel: compbasemetrics.ALPHA,
},
[]string{"operation", "type"},
)
objectCounts = compbasemetrics.NewGaugeVec(
&compbasemetrics.GaugeOpts{
Name: "etcd_object_counts",
Help: "Number of stored objects at the time of last check split by kind.",
StabilityLevel: compbasemetrics.ALPHA,
},
[]string{"resource"},
)
deprecatedEtcdRequestLatenciesSummary = compbasemetrics.NewSummaryVec(
&compbasemetrics.SummaryOpts{
Name: "etcd_request_latencies_summary",
Help: "Etcd request latency summary in microseconds for each operation and object type.",
StabilityLevel: compbasemetrics.ALPHA,
DeprecatedVersion: "1.14.0",
},
[]string{"operation", "type"},
)
)
var registerMetrics sync.Once
// Register all metrics.
func Register() {
// Register the metrics.
registerMetrics.Do(func() {
legacyregistry.MustRegister(etcdRequestLatency)
legacyregistry.MustRegister(objectCounts)
// TODO(danielqsj): Remove the following metrics, they are deprecated
legacyregistry.MustRegister(deprecatedEtcdRequestLatenciesSummary)
})
}
// UpdateObjectCount sets the etcd_object_counts metric.
func UpdateObjectCount(resourcePrefix string, count int64) {
objectCounts.WithLabelValues(resourcePrefix).Set(float64(count))
}
// RecordEtcdRequestLatency sets the etcd_request_duration_seconds metrics.
func RecordEtcdRequestLatency(verb, resource string, startTime time.Time) {
etcdRequestLatency.WithLabelValues(verb, resource).Observe(sinceInSeconds(startTime))
deprecatedEtcdRequestLatenciesSummary.WithLabelValues(verb, resource).Observe(sinceInMicroseconds(startTime))
}
// Reset resets the etcd_request_duration_seconds metric.
func Reset() {
etcdRequestLatency.Reset()
deprecatedEtcdRequestLatenciesSummary.Reset()
}
// sinceInMicroseconds gets the time since the specified start in microseconds.
func sinceInMicroseconds(start time.Time) float64 {
return float64(time.Since(start).Nanoseconds() / time.Microsecond.Nanoseconds())
}
// sinceInSeconds gets the time since the specified start in seconds.
func sinceInSeconds(start time.Time) float64 {
return time.Since(start).Seconds()
}

857
vendor/k8s.io/apiserver/pkg/storage/etcd3/store.go generated vendored Normal file
View File

@ -0,0 +1,857 @@
/*
Copyright 2016 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package etcd3
import (
"bytes"
"context"
"encoding/base64"
"encoding/json"
"errors"
"fmt"
"path"
"reflect"
"strings"
"time"
"go.etcd.io/etcd/clientv3"
apierrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/api/meta"
"k8s.io/apimachinery/pkg/conversion"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/watch"
"k8s.io/apiserver/pkg/features"
"k8s.io/apiserver/pkg/storage"
"k8s.io/apiserver/pkg/storage/etcd3/metrics"
"k8s.io/apiserver/pkg/storage/value"
utilfeature "k8s.io/apiserver/pkg/util/feature"
"k8s.io/klog"
utiltrace "k8s.io/utils/trace"
)
// authenticatedDataString satisfies the value.Context interface. It uses the key to
// authenticate the stored data. This does not defend against reuse of previously
// encrypted values under the same key, but will prevent an attacker from using an
// encrypted value from a different key. A stronger authenticated data segment would
// include the etcd3 Version field (which is incremented on each write to a key and
// reset when the key is deleted), but an attacker with write access to etcd can
// force deletion and recreation of keys to weaken that angle.
type authenticatedDataString string
// AuthenticatedData implements the value.Context interface.
func (d authenticatedDataString) AuthenticatedData() []byte {
return []byte(string(d))
}
var _ value.Context = authenticatedDataString("")
type store struct {
client *clientv3.Client
// getOpts contains additional options that should be passed
// to all Get() calls.
getOps []clientv3.OpOption
codec runtime.Codec
versioner storage.Versioner
transformer value.Transformer
pathPrefix string
watcher *watcher
pagingEnabled bool
leaseManager *leaseManager
}
type objState struct {
obj runtime.Object
meta *storage.ResponseMeta
rev int64
data []byte
stale bool
}
// New returns an etcd3 implementation of storage.Interface.
func New(c *clientv3.Client, codec runtime.Codec, prefix string, transformer value.Transformer, pagingEnabled bool) storage.Interface {
return newStore(c, pagingEnabled, codec, prefix, transformer)
}
func newStore(c *clientv3.Client, pagingEnabled bool, codec runtime.Codec, prefix string, transformer value.Transformer) *store {
versioner := APIObjectVersioner{}
result := &store{
client: c,
codec: codec,
versioner: versioner,
transformer: transformer,
pagingEnabled: pagingEnabled,
// for compatibility with etcd2 impl.
// no-op for default prefix of '/registry'.
// keeps compatibility with etcd2 impl for custom prefixes that don't start with '/'
pathPrefix: path.Join("/", prefix),
watcher: newWatcher(c, codec, versioner, transformer),
leaseManager: newDefaultLeaseManager(c),
}
return result
}
// Versioner implements storage.Interface.Versioner.
func (s *store) Versioner() storage.Versioner {
return s.versioner
}
// Get implements storage.Interface.Get.
func (s *store) Get(ctx context.Context, key string, resourceVersion string, out runtime.Object, ignoreNotFound bool) error {
key = path.Join(s.pathPrefix, key)
startTime := time.Now()
getResp, err := s.client.KV.Get(ctx, key, s.getOps...)
metrics.RecordEtcdRequestLatency("get", getTypeName(out), startTime)
if err != nil {
return err
}
if err = s.ensureMinimumResourceVersion(resourceVersion, uint64(getResp.Header.Revision)); err != nil {
return err
}
if len(getResp.Kvs) == 0 {
if ignoreNotFound {
return runtime.SetZeroValue(out)
}
return storage.NewKeyNotFoundError(key, 0)
}
kv := getResp.Kvs[0]
data, _, err := s.transformer.TransformFromStorage(kv.Value, authenticatedDataString(key))
if err != nil {
return storage.NewInternalError(err.Error())
}
return decode(s.codec, s.versioner, data, out, kv.ModRevision)
}
// Create implements storage.Interface.Create.
func (s *store) Create(ctx context.Context, key string, obj, out runtime.Object, ttl uint64) error {
if version, err := s.versioner.ObjectResourceVersion(obj); err == nil && version != 0 {
return errors.New("resourceVersion should not be set on objects to be created")
}
if err := s.versioner.PrepareObjectForStorage(obj); err != nil {
return fmt.Errorf("PrepareObjectForStorage failed: %v", err)
}
data, err := runtime.Encode(s.codec, obj)
if err != nil {
return err
}
key = path.Join(s.pathPrefix, key)
opts, err := s.ttlOpts(ctx, int64(ttl))
if err != nil {
return err
}
newData, err := s.transformer.TransformToStorage(data, authenticatedDataString(key))
if err != nil {
return storage.NewInternalError(err.Error())
}
startTime := time.Now()
txnResp, err := s.client.KV.Txn(ctx).If(
notFound(key),
).Then(
clientv3.OpPut(key, string(newData), opts...),
).Commit()
metrics.RecordEtcdRequestLatency("create", getTypeName(obj), startTime)
if err != nil {
return err
}
if !txnResp.Succeeded {
return storage.NewKeyExistsError(key, 0)
}
if out != nil {
putResp := txnResp.Responses[0].GetResponsePut()
return decode(s.codec, s.versioner, data, out, putResp.Header.Revision)
}
return nil
}
// Delete implements storage.Interface.Delete.
func (s *store) Delete(ctx context.Context, key string, out runtime.Object, preconditions *storage.Preconditions, validateDeletion storage.ValidateObjectFunc) error {
v, err := conversion.EnforcePtr(out)
if err != nil {
return fmt.Errorf("unable to convert output object to pointer: %v", err)
}
key = path.Join(s.pathPrefix, key)
return s.conditionalDelete(ctx, key, out, v, preconditions, validateDeletion)
}
func (s *store) conditionalDelete(ctx context.Context, key string, out runtime.Object, v reflect.Value, preconditions *storage.Preconditions, validateDeletion storage.ValidateObjectFunc) error {
startTime := time.Now()
getResp, err := s.client.KV.Get(ctx, key)
metrics.RecordEtcdRequestLatency("get", getTypeName(out), startTime)
if err != nil {
return err
}
for {
origState, err := s.getState(getResp, key, v, false)
if err != nil {
return err
}
if preconditions != nil {
if err := preconditions.Check(key, origState.obj); err != nil {
return err
}
}
if err := validateDeletion(ctx, origState.obj); err != nil {
return err
}
startTime := time.Now()
txnResp, err := s.client.KV.Txn(ctx).If(
clientv3.Compare(clientv3.ModRevision(key), "=", origState.rev),
).Then(
clientv3.OpDelete(key),
).Else(
clientv3.OpGet(key),
).Commit()
metrics.RecordEtcdRequestLatency("delete", getTypeName(out), startTime)
if err != nil {
return err
}
if !txnResp.Succeeded {
getResp = (*clientv3.GetResponse)(txnResp.Responses[0].GetResponseRange())
klog.V(4).Infof("deletion of %s failed because of a conflict, going to retry", key)
continue
}
return decode(s.codec, s.versioner, origState.data, out, origState.rev)
}
}
// GuaranteedUpdate implements storage.Interface.GuaranteedUpdate.
func (s *store) GuaranteedUpdate(
ctx context.Context, key string, out runtime.Object, ignoreNotFound bool,
preconditions *storage.Preconditions, tryUpdate storage.UpdateFunc, suggestion ...runtime.Object) error {
trace := utiltrace.New("GuaranteedUpdate etcd3", utiltrace.Field{"type", getTypeName(out)})
defer trace.LogIfLong(500 * time.Millisecond)
v, err := conversion.EnforcePtr(out)
if err != nil {
return fmt.Errorf("unable to convert output object to pointer: %v", err)
}
key = path.Join(s.pathPrefix, key)
getCurrentState := func() (*objState, error) {
startTime := time.Now()
getResp, err := s.client.KV.Get(ctx, key, s.getOps...)
metrics.RecordEtcdRequestLatency("get", getTypeName(out), startTime)
if err != nil {
return nil, err
}
return s.getState(getResp, key, v, ignoreNotFound)
}
var origState *objState
var mustCheckData bool
if len(suggestion) == 1 && suggestion[0] != nil {
origState, err = s.getStateFromObject(suggestion[0])
if err != nil {
return err
}
mustCheckData = true
} else {
origState, err = getCurrentState()
if err != nil {
return err
}
}
trace.Step("initial value restored")
transformContext := authenticatedDataString(key)
for {
if err := preconditions.Check(key, origState.obj); err != nil {
// If our data is already up to date, return the error
if !mustCheckData {
return err
}
// It's possible we were working with stale data
// Actually fetch
origState, err = getCurrentState()
if err != nil {
return err
}
mustCheckData = false
// Retry
continue
}
ret, ttl, err := s.updateState(origState, tryUpdate)
if err != nil {
// If our data is already up to date, return the error
if !mustCheckData {
return err
}
// It's possible we were working with stale data
// Actually fetch
origState, err = getCurrentState()
if err != nil {
return err
}
mustCheckData = false
// Retry
continue
}
data, err := runtime.Encode(s.codec, ret)
if err != nil {
return err
}
if !origState.stale && bytes.Equal(data, origState.data) {
// if we skipped the original Get in this loop, we must refresh from
// etcd in order to be sure the data in the store is equivalent to
// our desired serialization
if mustCheckData {
origState, err = getCurrentState()
if err != nil {
return err
}
mustCheckData = false
if !bytes.Equal(data, origState.data) {
// original data changed, restart loop
continue
}
}
// recheck that the data from etcd is not stale before short-circuiting a write
if !origState.stale {
return decode(s.codec, s.versioner, origState.data, out, origState.rev)
}
}
newData, err := s.transformer.TransformToStorage(data, transformContext)
if err != nil {
return storage.NewInternalError(err.Error())
}
opts, err := s.ttlOpts(ctx, int64(ttl))
if err != nil {
return err
}
trace.Step("Transaction prepared")
startTime := time.Now()
txnResp, err := s.client.KV.Txn(ctx).If(
clientv3.Compare(clientv3.ModRevision(key), "=", origState.rev),
).Then(
clientv3.OpPut(key, string(newData), opts...),
).Else(
clientv3.OpGet(key),
).Commit()
metrics.RecordEtcdRequestLatency("update", getTypeName(out), startTime)
if err != nil {
return err
}
trace.Step("Transaction committed")
if !txnResp.Succeeded {
getResp := (*clientv3.GetResponse)(txnResp.Responses[0].GetResponseRange())
klog.V(4).Infof("GuaranteedUpdate of %s failed because of a conflict, going to retry", key)
origState, err = s.getState(getResp, key, v, ignoreNotFound)
if err != nil {
return err
}
trace.Step("Retry value restored")
mustCheckData = false
continue
}
putResp := txnResp.Responses[0].GetResponsePut()
return decode(s.codec, s.versioner, data, out, putResp.Header.Revision)
}
}
// GetToList implements storage.Interface.GetToList.
func (s *store) GetToList(ctx context.Context, key string, resourceVersion string, pred storage.SelectionPredicate, listObj runtime.Object) error {
trace := utiltrace.New("GetToList etcd3",
utiltrace.Field{"key", key},
utiltrace.Field{"resourceVersion", resourceVersion},
utiltrace.Field{"limit", pred.Limit},
utiltrace.Field{"continue", pred.Continue})
defer trace.LogIfLong(500 * time.Millisecond)
listPtr, err := meta.GetItemsPtr(listObj)
if err != nil {
return err
}
v, err := conversion.EnforcePtr(listPtr)
if err != nil || v.Kind() != reflect.Slice {
return fmt.Errorf("need ptr to slice: %v", err)
}
key = path.Join(s.pathPrefix, key)
startTime := time.Now()
getResp, err := s.client.KV.Get(ctx, key, s.getOps...)
metrics.RecordEtcdRequestLatency("get", getTypeName(listPtr), startTime)
if err != nil {
return err
}
if err = s.ensureMinimumResourceVersion(resourceVersion, uint64(getResp.Header.Revision)); err != nil {
return err
}
if len(getResp.Kvs) > 0 {
data, _, err := s.transformer.TransformFromStorage(getResp.Kvs[0].Value, authenticatedDataString(key))
if err != nil {
return storage.NewInternalError(err.Error())
}
if err := appendListItem(v, data, uint64(getResp.Kvs[0].ModRevision), pred, s.codec, s.versioner); err != nil {
return err
}
}
// update version with cluster level revision
return s.versioner.UpdateList(listObj, uint64(getResp.Header.Revision), "", nil)
}
func (s *store) Count(key string) (int64, error) {
key = path.Join(s.pathPrefix, key)
startTime := time.Now()
getResp, err := s.client.KV.Get(context.Background(), key, clientv3.WithRange(clientv3.GetPrefixRangeEnd(key)), clientv3.WithCountOnly())
metrics.RecordEtcdRequestLatency("listWithCount", key, startTime)
if err != nil {
return 0, err
}
return getResp.Count, nil
}
// continueToken is a simple structured object for encoding the state of a continue token.
// TODO: if we change the version of the encoded from, we can't start encoding the new version
// until all other servers are upgraded (i.e. we need to support rolling schema)
// This is a public API struct and cannot change.
type continueToken struct {
APIVersion string `json:"v"`
ResourceVersion int64 `json:"rv"`
StartKey string `json:"start"`
}
// parseFrom transforms an encoded predicate from into a versioned struct.
// TODO: return a typed error that instructs clients that they must relist
func decodeContinue(continueValue, keyPrefix string) (fromKey string, rv int64, err error) {
data, err := base64.RawURLEncoding.DecodeString(continueValue)
if err != nil {
return "", 0, fmt.Errorf("continue key is not valid: %v", err)
}
var c continueToken
if err := json.Unmarshal(data, &c); err != nil {
return "", 0, fmt.Errorf("continue key is not valid: %v", err)
}
switch c.APIVersion {
case "meta.k8s.io/v1":
if c.ResourceVersion == 0 {
return "", 0, fmt.Errorf("continue key is not valid: incorrect encoded start resourceVersion (version meta.k8s.io/v1)")
}
if len(c.StartKey) == 0 {
return "", 0, fmt.Errorf("continue key is not valid: encoded start key empty (version meta.k8s.io/v1)")
}
// defend against path traversal attacks by clients - path.Clean will ensure that startKey cannot
// be at a higher level of the hierarchy, and so when we append the key prefix we will end up with
// continue start key that is fully qualified and cannot range over anything less specific than
// keyPrefix.
key := c.StartKey
if !strings.HasPrefix(key, "/") {
key = "/" + key
}
cleaned := path.Clean(key)
if cleaned != key {
return "", 0, fmt.Errorf("continue key is not valid: %s", c.StartKey)
}
return keyPrefix + cleaned[1:], c.ResourceVersion, nil
default:
return "", 0, fmt.Errorf("continue key is not valid: server does not recognize this encoded version %q", c.APIVersion)
}
}
// encodeContinue returns a string representing the encoded continuation of the current query.
func encodeContinue(key, keyPrefix string, resourceVersion int64) (string, error) {
nextKey := strings.TrimPrefix(key, keyPrefix)
if nextKey == key {
return "", fmt.Errorf("unable to encode next field: the key and key prefix do not match")
}
out, err := json.Marshal(&continueToken{APIVersion: "meta.k8s.io/v1", ResourceVersion: resourceVersion, StartKey: nextKey})
if err != nil {
return "", err
}
return base64.RawURLEncoding.EncodeToString(out), nil
}
// List implements storage.Interface.List.
func (s *store) List(ctx context.Context, key, resourceVersion string, pred storage.SelectionPredicate, listObj runtime.Object) error {
trace := utiltrace.New("List etcd3",
utiltrace.Field{"key", key},
utiltrace.Field{"resourceVersion", resourceVersion},
utiltrace.Field{"limit", pred.Limit},
utiltrace.Field{"continue", pred.Continue})
defer trace.LogIfLong(500 * time.Millisecond)
listPtr, err := meta.GetItemsPtr(listObj)
if err != nil {
return err
}
v, err := conversion.EnforcePtr(listPtr)
if err != nil || v.Kind() != reflect.Slice {
return fmt.Errorf("need ptr to slice: %v", err)
}
if s.pathPrefix != "" {
key = path.Join(s.pathPrefix, key)
}
// We need to make sure the key ended with "/" so that we only get children "directories".
// e.g. if we have key "/a", "/a/b", "/ab", getting keys with prefix "/a" will return all three,
// while with prefix "/a/" will return only "/a/b" which is the correct answer.
if !strings.HasSuffix(key, "/") {
key += "/"
}
keyPrefix := key
// set the appropriate clientv3 options to filter the returned data set
var paging bool
options := make([]clientv3.OpOption, 0, 4)
if s.pagingEnabled && pred.Limit > 0 {
paging = true
options = append(options, clientv3.WithLimit(pred.Limit))
}
var returnedRV, continueRV int64
var continueKey string
switch {
case s.pagingEnabled && len(pred.Continue) > 0:
continueKey, continueRV, err = decodeContinue(pred.Continue, keyPrefix)
if err != nil {
return apierrors.NewBadRequest(fmt.Sprintf("invalid continue token: %v", err))
}
if len(resourceVersion) > 0 && resourceVersion != "0" {
return apierrors.NewBadRequest("specifying resource version is not allowed when using continue")
}
rangeEnd := clientv3.GetPrefixRangeEnd(keyPrefix)
options = append(options, clientv3.WithRange(rangeEnd))
key = continueKey
// If continueRV > 0, the LIST request needs a specific resource version.
// continueRV==0 is invalid.
// If continueRV < 0, the request is for the latest resource version.
if continueRV > 0 {
options = append(options, clientv3.WithRev(continueRV))
returnedRV = continueRV
}
case s.pagingEnabled && pred.Limit > 0:
if len(resourceVersion) > 0 {
fromRV, err := s.versioner.ParseResourceVersion(resourceVersion)
if err != nil {
return apierrors.NewBadRequest(fmt.Sprintf("invalid resource version: %v", err))
}
if fromRV > 0 {
options = append(options, clientv3.WithRev(int64(fromRV)))
}
returnedRV = int64(fromRV)
}
rangeEnd := clientv3.GetPrefixRangeEnd(keyPrefix)
options = append(options, clientv3.WithRange(rangeEnd))
default:
options = append(options, clientv3.WithPrefix())
}
// loop until we have filled the requested limit from etcd or there are no more results
var lastKey []byte
var hasMore bool
var getResp *clientv3.GetResponse
for {
startTime := time.Now()
getResp, err = s.client.KV.Get(ctx, key, options...)
metrics.RecordEtcdRequestLatency("list", getTypeName(listPtr), startTime)
if err != nil {
return interpretListError(err, len(pred.Continue) > 0, continueKey, keyPrefix)
}
if err = s.ensureMinimumResourceVersion(resourceVersion, uint64(getResp.Header.Revision)); err != nil {
return err
}
hasMore = getResp.More
if len(getResp.Kvs) == 0 && getResp.More {
return fmt.Errorf("no results were found, but etcd indicated there were more values remaining")
}
// avoid small allocations for the result slice, since this can be called in many
// different contexts and we don't know how significantly the result will be filtered
if pred.Empty() {
growSlice(v, len(getResp.Kvs))
} else {
growSlice(v, 2048, len(getResp.Kvs))
}
// take items from the response until the bucket is full, filtering as we go
for _, kv := range getResp.Kvs {
if paging && int64(v.Len()) >= pred.Limit {
hasMore = true
break
}
lastKey = kv.Key
data, _, err := s.transformer.TransformFromStorage(kv.Value, authenticatedDataString(kv.Key))
if err != nil {
return storage.NewInternalErrorf("unable to transform key %q: %v", kv.Key, err)
}
if err := appendListItem(v, data, uint64(kv.ModRevision), pred, s.codec, s.versioner); err != nil {
return err
}
}
// indicate to the client which resource version was returned
if returnedRV == 0 {
returnedRV = getResp.Header.Revision
}
// no more results remain or we didn't request paging
if !hasMore || !paging {
break
}
// we're paging but we have filled our bucket
if int64(v.Len()) >= pred.Limit {
break
}
key = string(lastKey) + "\x00"
}
// instruct the client to begin querying from immediately after the last key we returned
// we never return a key that the client wouldn't be allowed to see
if hasMore {
// we want to start immediately after the last key
next, err := encodeContinue(string(lastKey)+"\x00", keyPrefix, returnedRV)
if err != nil {
return err
}
var remainingItemCount *int64
// getResp.Count counts in objects that do not match the pred.
// Instead of returning inaccurate count for non-empty selectors, we return nil.
// Only set remainingItemCount if the predicate is empty.
if utilfeature.DefaultFeatureGate.Enabled(features.RemainingItemCount) {
if pred.Empty() {
c := int64(getResp.Count - pred.Limit)
remainingItemCount = &c
}
}
return s.versioner.UpdateList(listObj, uint64(returnedRV), next, remainingItemCount)
}
// no continuation
return s.versioner.UpdateList(listObj, uint64(returnedRV), "", nil)
}
// growSlice takes a slice value and grows its capacity up
// to the maximum of the passed sizes or maxCapacity, whichever
// is smaller. Above maxCapacity decisions about allocation are left
// to the Go runtime on append. This allows a caller to make an
// educated guess about the potential size of the total list while
// still avoiding overly aggressive initial allocation. If sizes
// is empty maxCapacity will be used as the size to grow.
func growSlice(v reflect.Value, maxCapacity int, sizes ...int) {
cap := v.Cap()
max := cap
for _, size := range sizes {
if size > max {
max = size
}
}
if len(sizes) == 0 || max > maxCapacity {
max = maxCapacity
}
if max <= cap {
return
}
if v.Len() > 0 {
extra := reflect.MakeSlice(v.Type(), 0, max)
reflect.Copy(extra, v)
v.Set(extra)
} else {
extra := reflect.MakeSlice(v.Type(), 0, max)
v.Set(extra)
}
}
// Watch implements storage.Interface.Watch.
func (s *store) Watch(ctx context.Context, key string, resourceVersion string, pred storage.SelectionPredicate) (watch.Interface, error) {
return s.watch(ctx, key, resourceVersion, pred, false)
}
// WatchList implements storage.Interface.WatchList.
func (s *store) WatchList(ctx context.Context, key string, resourceVersion string, pred storage.SelectionPredicate) (watch.Interface, error) {
return s.watch(ctx, key, resourceVersion, pred, true)
}
func (s *store) watch(ctx context.Context, key string, rv string, pred storage.SelectionPredicate, recursive bool) (watch.Interface, error) {
rev, err := s.versioner.ParseResourceVersion(rv)
if err != nil {
return nil, err
}
key = path.Join(s.pathPrefix, key)
return s.watcher.Watch(ctx, key, int64(rev), recursive, pred)
}
func (s *store) getState(getResp *clientv3.GetResponse, key string, v reflect.Value, ignoreNotFound bool) (*objState, error) {
state := &objState{
meta: &storage.ResponseMeta{},
}
if u, ok := v.Addr().Interface().(runtime.Unstructured); ok {
state.obj = u.NewEmptyInstance()
} else {
state.obj = reflect.New(v.Type()).Interface().(runtime.Object)
}
if len(getResp.Kvs) == 0 {
if !ignoreNotFound {
return nil, storage.NewKeyNotFoundError(key, 0)
}
if err := runtime.SetZeroValue(state.obj); err != nil {
return nil, err
}
} else {
data, stale, err := s.transformer.TransformFromStorage(getResp.Kvs[0].Value, authenticatedDataString(key))
if err != nil {
return nil, storage.NewInternalError(err.Error())
}
state.rev = getResp.Kvs[0].ModRevision
state.meta.ResourceVersion = uint64(state.rev)
state.data = data
state.stale = stale
if err := decode(s.codec, s.versioner, state.data, state.obj, state.rev); err != nil {
return nil, err
}
}
return state, nil
}
func (s *store) getStateFromObject(obj runtime.Object) (*objState, error) {
state := &objState{
obj: obj,
meta: &storage.ResponseMeta{},
}
rv, err := s.versioner.ObjectResourceVersion(obj)
if err != nil {
return nil, fmt.Errorf("couldn't get resource version: %v", err)
}
state.rev = int64(rv)
state.meta.ResourceVersion = uint64(state.rev)
// Compute the serialized form - for that we need to temporarily clean
// its resource version field (those are not stored in etcd).
if err := s.versioner.PrepareObjectForStorage(obj); err != nil {
return nil, fmt.Errorf("PrepareObjectForStorage failed: %v", err)
}
state.data, err = runtime.Encode(s.codec, obj)
if err != nil {
return nil, err
}
if err := s.versioner.UpdateObject(state.obj, uint64(rv)); err != nil {
klog.Errorf("failed to update object version: %v", err)
}
return state, nil
}
func (s *store) updateState(st *objState, userUpdate storage.UpdateFunc) (runtime.Object, uint64, error) {
ret, ttlPtr, err := userUpdate(st.obj, *st.meta)
if err != nil {
return nil, 0, err
}
if err := s.versioner.PrepareObjectForStorage(ret); err != nil {
return nil, 0, fmt.Errorf("PrepareObjectForStorage failed: %v", err)
}
var ttl uint64
if ttlPtr != nil {
ttl = *ttlPtr
}
return ret, ttl, nil
}
// ttlOpts returns client options based on given ttl.
// ttl: if ttl is non-zero, it will attach the key to a lease with ttl of roughly the same length
func (s *store) ttlOpts(ctx context.Context, ttl int64) ([]clientv3.OpOption, error) {
if ttl == 0 {
return nil, nil
}
id, err := s.leaseManager.GetLease(ctx, ttl)
if err != nil {
return nil, err
}
return []clientv3.OpOption{clientv3.WithLease(id)}, nil
}
// ensureMinimumResourceVersion returns a 'too large resource' version error when the provided minimumResourceVersion is
// greater than the most recent actualRevision available from storage.
func (s *store) ensureMinimumResourceVersion(minimumResourceVersion string, actualRevision uint64) error {
if minimumResourceVersion == "" {
return nil
}
minimumRV, err := s.versioner.ParseResourceVersion(minimumResourceVersion)
if err != nil {
return apierrors.NewBadRequest(fmt.Sprintf("invalid resource version: %v", err))
}
// Enforce the storage.Interface guarantee that the resource version of the returned data
// "will be at least 'resourceVersion'".
if minimumRV > actualRevision {
return storage.NewTooLargeResourceVersionError(minimumRV, actualRevision, 0)
}
return nil
}
// decode decodes value of bytes into object. It will also set the object resource version to rev.
// On success, objPtr would be set to the object.
func decode(codec runtime.Codec, versioner storage.Versioner, value []byte, objPtr runtime.Object, rev int64) error {
if _, err := conversion.EnforcePtr(objPtr); err != nil {
return fmt.Errorf("unable to convert output object to pointer: %v", err)
}
_, _, err := codec.Decode(value, nil, objPtr)
if err != nil {
return err
}
// being unable to set the version does not prevent the object from being extracted
if err := versioner.UpdateObject(objPtr, uint64(rev)); err != nil {
klog.Errorf("failed to update object version: %v", err)
}
return nil
}
// appendListItem decodes and appends the object (if it passes filter) to v, which must be a slice.
func appendListItem(v reflect.Value, data []byte, rev uint64, pred storage.SelectionPredicate, codec runtime.Codec, versioner storage.Versioner) error {
obj, _, err := codec.Decode(data, nil, reflect.New(v.Type().Elem()).Interface().(runtime.Object))
if err != nil {
return err
}
// being unable to set the version does not prevent the object from being extracted
if err := versioner.UpdateObject(obj, rev); err != nil {
klog.Errorf("failed to update object version: %v", err)
}
if matched, err := pred.Matches(obj); err == nil && matched {
v.Set(reflect.Append(v, reflect.ValueOf(obj).Elem()))
}
return nil
}
func notFound(key string) clientv3.Cmp {
return clientv3.Compare(clientv3.ModRevision(key), "=", 0)
}
// getTypeName returns type name of an object for reporting purposes.
func getTypeName(obj interface{}) string {
return reflect.TypeOf(obj).String()
}

414
vendor/k8s.io/apiserver/pkg/storage/etcd3/watcher.go generated vendored Normal file
View File

@ -0,0 +1,414 @@
/*
Copyright 2016 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package etcd3
import (
"context"
"errors"
"fmt"
"os"
"strconv"
"strings"
"sync"
apierrs "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/watch"
"k8s.io/apiserver/pkg/storage"
"k8s.io/apiserver/pkg/storage/value"
"go.etcd.io/etcd/clientv3"
"k8s.io/klog"
)
const (
// We have set a buffer in order to reduce times of context switches.
incomingBufSize = 100
outgoingBufSize = 100
)
// fatalOnDecodeError is used during testing to panic the server if watcher encounters a decoding error
var fatalOnDecodeError = false
// errTestingDecode is the only error that testingDeferOnDecodeError catches during a panic
var errTestingDecode = errors.New("sentinel error only used during testing to indicate watch decoding error")
// testingDeferOnDecodeError is used during testing to recover from a panic caused by errTestingDecode, all other values continue to panic
func testingDeferOnDecodeError() {
if r := recover(); r != nil && r != errTestingDecode {
panic(r)
}
}
func init() {
// check to see if we are running in a test environment
TestOnlySetFatalOnDecodeError(true)
fatalOnDecodeError, _ = strconv.ParseBool(os.Getenv("KUBE_PANIC_WATCH_DECODE_ERROR"))
}
// TestOnlySetFatalOnDecodeError should only be used for cases where decode errors are expected and need to be tested. e.g. conversion webhooks.
func TestOnlySetFatalOnDecodeError(b bool) {
fatalOnDecodeError = b
}
type watcher struct {
client *clientv3.Client
codec runtime.Codec
versioner storage.Versioner
transformer value.Transformer
}
// watchChan implements watch.Interface.
type watchChan struct {
watcher *watcher
key string
initialRev int64
recursive bool
internalPred storage.SelectionPredicate
ctx context.Context
cancel context.CancelFunc
incomingEventChan chan *event
resultChan chan watch.Event
errChan chan error
}
func newWatcher(client *clientv3.Client, codec runtime.Codec, versioner storage.Versioner, transformer value.Transformer) *watcher {
return &watcher{
client: client,
codec: codec,
versioner: versioner,
transformer: transformer,
}
}
// Watch watches on a key and returns a watch.Interface that transfers relevant notifications.
// If rev is zero, it will return the existing object(s) and then start watching from
// the maximum revision+1 from returned objects.
// If rev is non-zero, it will watch events happened after given revision.
// If recursive is false, it watches on given key.
// If recursive is true, it watches any children and directories under the key, excluding the root key itself.
// pred must be non-nil. Only if pred matches the change, it will be returned.
func (w *watcher) Watch(ctx context.Context, key string, rev int64, recursive bool, pred storage.SelectionPredicate) (watch.Interface, error) {
if recursive && !strings.HasSuffix(key, "/") {
key += "/"
}
wc := w.createWatchChan(ctx, key, rev, recursive, pred)
go wc.run()
return wc, nil
}
func (w *watcher) createWatchChan(ctx context.Context, key string, rev int64, recursive bool, pred storage.SelectionPredicate) *watchChan {
wc := &watchChan{
watcher: w,
key: key,
initialRev: rev,
recursive: recursive,
internalPred: pred,
incomingEventChan: make(chan *event, incomingBufSize),
resultChan: make(chan watch.Event, outgoingBufSize),
errChan: make(chan error, 1),
}
if pred.Empty() {
// The filter doesn't filter out any object.
wc.internalPred = storage.Everything
}
wc.ctx, wc.cancel = context.WithCancel(ctx)
return wc
}
func (wc *watchChan) run() {
watchClosedCh := make(chan struct{})
go wc.startWatching(watchClosedCh)
var resultChanWG sync.WaitGroup
resultChanWG.Add(1)
go wc.processEvent(&resultChanWG)
select {
case err := <-wc.errChan:
if err == context.Canceled {
break
}
errResult := transformErrorToEvent(err)
if errResult != nil {
// error result is guaranteed to be received by user before closing ResultChan.
select {
case wc.resultChan <- *errResult:
case <-wc.ctx.Done(): // user has given up all results
}
}
case <-watchClosedCh:
case <-wc.ctx.Done(): // user cancel
}
// We use wc.ctx to reap all goroutines. Under whatever condition, we should stop them all.
// It's fine to double cancel.
wc.cancel()
// we need to wait until resultChan wouldn't be used anymore
resultChanWG.Wait()
close(wc.resultChan)
}
func (wc *watchChan) Stop() {
wc.cancel()
}
func (wc *watchChan) ResultChan() <-chan watch.Event {
return wc.resultChan
}
// sync tries to retrieve existing data and send them to process.
// The revision to watch will be set to the revision in response.
// All events sent will have isCreated=true
func (wc *watchChan) sync() error {
opts := []clientv3.OpOption{}
if wc.recursive {
opts = append(opts, clientv3.WithPrefix())
}
getResp, err := wc.watcher.client.Get(wc.ctx, wc.key, opts...)
if err != nil {
return err
}
wc.initialRev = getResp.Header.Revision
for _, kv := range getResp.Kvs {
wc.sendEvent(parseKV(kv))
}
return nil
}
// startWatching does:
// - get current objects if initialRev=0; set initialRev to current rev
// - watch on given key and send events to process.
func (wc *watchChan) startWatching(watchClosedCh chan struct{}) {
if wc.initialRev == 0 {
if err := wc.sync(); err != nil {
klog.Errorf("failed to sync with latest state: %v", err)
wc.sendError(err)
return
}
}
opts := []clientv3.OpOption{clientv3.WithRev(wc.initialRev + 1), clientv3.WithPrevKV()}
if wc.recursive {
opts = append(opts, clientv3.WithPrefix())
}
wch := wc.watcher.client.Watch(wc.ctx, wc.key, opts...)
for wres := range wch {
if wres.Err() != nil {
err := wres.Err()
// If there is an error on server (e.g. compaction), the channel will return it before closed.
klog.Errorf("watch chan error: %v", err)
wc.sendError(err)
return
}
for _, e := range wres.Events {
parsedEvent, err := parseEvent(e)
if err != nil {
klog.Errorf("watch chan error: %v", err)
wc.sendError(err)
return
}
wc.sendEvent(parsedEvent)
}
}
// When we come to this point, it's only possible that client side ends the watch.
// e.g. cancel the context, close the client.
// If this watch chan is broken and context isn't cancelled, other goroutines will still hang.
// We should notify the main thread that this goroutine has exited.
close(watchClosedCh)
}
// processEvent processes events from etcd watcher and sends results to resultChan.
func (wc *watchChan) processEvent(wg *sync.WaitGroup) {
defer wg.Done()
for {
select {
case e := <-wc.incomingEventChan:
res := wc.transform(e)
if res == nil {
continue
}
if len(wc.resultChan) == outgoingBufSize {
klog.V(3).Infof("Fast watcher, slow processing. Number of buffered events: %d."+
"Probably caused by slow dispatching events to watchers", outgoingBufSize)
}
// If user couldn't receive results fast enough, we also block incoming events from watcher.
// Because storing events in local will cause more memory usage.
// The worst case would be closing the fast watcher.
select {
case wc.resultChan <- *res:
case <-wc.ctx.Done():
return
}
case <-wc.ctx.Done():
return
}
}
}
func (wc *watchChan) filter(obj runtime.Object) bool {
if wc.internalPred.Empty() {
return true
}
matched, err := wc.internalPred.Matches(obj)
return err == nil && matched
}
func (wc *watchChan) acceptAll() bool {
return wc.internalPred.Empty()
}
// transform transforms an event into a result for user if not filtered.
func (wc *watchChan) transform(e *event) (res *watch.Event) {
curObj, oldObj, err := wc.prepareObjs(e)
if err != nil {
klog.Errorf("failed to prepare current and previous objects: %v", err)
wc.sendError(err)
return nil
}
switch {
case e.isDeleted:
if !wc.filter(oldObj) {
return nil
}
res = &watch.Event{
Type: watch.Deleted,
Object: oldObj,
}
case e.isCreated:
if !wc.filter(curObj) {
return nil
}
res = &watch.Event{
Type: watch.Added,
Object: curObj,
}
default:
if wc.acceptAll() {
res = &watch.Event{
Type: watch.Modified,
Object: curObj,
}
return res
}
curObjPasses := wc.filter(curObj)
oldObjPasses := wc.filter(oldObj)
switch {
case curObjPasses && oldObjPasses:
res = &watch.Event{
Type: watch.Modified,
Object: curObj,
}
case curObjPasses && !oldObjPasses:
res = &watch.Event{
Type: watch.Added,
Object: curObj,
}
case !curObjPasses && oldObjPasses:
res = &watch.Event{
Type: watch.Deleted,
Object: oldObj,
}
}
}
return res
}
func transformErrorToEvent(err error) *watch.Event {
err = interpretWatchError(err)
if _, ok := err.(apierrs.APIStatus); !ok {
err = apierrs.NewInternalError(err)
}
status := err.(apierrs.APIStatus).Status()
return &watch.Event{
Type: watch.Error,
Object: &status,
}
}
func (wc *watchChan) sendError(err error) {
select {
case wc.errChan <- err:
case <-wc.ctx.Done():
}
}
func (wc *watchChan) sendEvent(e *event) {
if len(wc.incomingEventChan) == incomingBufSize {
klog.V(3).Infof("Fast watcher, slow processing. Number of buffered events: %d."+
"Probably caused by slow decoding, user not receiving fast, or other processing logic",
incomingBufSize)
}
select {
case wc.incomingEventChan <- e:
case <-wc.ctx.Done():
}
}
func (wc *watchChan) prepareObjs(e *event) (curObj runtime.Object, oldObj runtime.Object, err error) {
if !e.isDeleted {
data, _, err := wc.watcher.transformer.TransformFromStorage(e.value, authenticatedDataString(e.key))
if err != nil {
return nil, nil, err
}
curObj, err = decodeObj(wc.watcher.codec, wc.watcher.versioner, data, e.rev)
if err != nil {
return nil, nil, err
}
}
// We need to decode prevValue, only if this is deletion event or
// the underlying filter doesn't accept all objects (otherwise we
// know that the filter for previous object will return true and
// we need the object only to compute whether it was filtered out
// before).
if len(e.prevValue) > 0 && (e.isDeleted || !wc.acceptAll()) {
data, _, err := wc.watcher.transformer.TransformFromStorage(e.prevValue, authenticatedDataString(e.key))
if err != nil {
return nil, nil, err
}
// Note that this sends the *old* object with the etcd revision for the time at
// which it gets deleted.
oldObj, err = decodeObj(wc.watcher.codec, wc.watcher.versioner, data, e.rev)
if err != nil {
return nil, nil, err
}
}
return curObj, oldObj, nil
}
func decodeObj(codec runtime.Codec, versioner storage.Versioner, data []byte, rev int64) (_ runtime.Object, err error) {
obj, err := runtime.Decode(codec, []byte(data))
if err != nil {
if fatalOnDecodeError {
// catch watch decode error iff we caused it on
// purpose during a unit test
defer testingDeferOnDecodeError()
// we are running in a test environment and thus an
// error here is due to a coder mistake if the defer
// does not catch it
panic(err)
}
return nil, err
}
// ensure resource version is set on the object we load from etcd
if err := versioner.UpdateObject(obj, uint64(rev)); err != nil {
return nil, fmt.Errorf("failure to version api object (%d) %#v: %v", rev, obj, err)
}
return obj, nil
}

View File

@ -73,16 +73,13 @@ type ResponseMeta struct {
ResourceVersion uint64
}
// MatchValue defines a pair (<index name>, <value for that index>).
type MatchValue struct {
IndexName string
Value string
}
// IndexerFunc is a function that for a given object computes
// <value of an index> for a particular <index>.
type IndexerFunc func(obj runtime.Object) string
// TriggerPublisherFunc is a function that takes an object, and returns a list of pairs
// (<index name>, <index value for the given object>) for all indexes known
// to that function.
type TriggerPublisherFunc func(obj runtime.Object) []MatchValue
// IndexerFuncs is a mapping from <index name> to function that
// for a given object computes <value for that index>.
type IndexerFuncs map[string]IndexerFunc
// Everything accepts all objects.
var Everything = SelectionPredicate{
@ -98,10 +95,10 @@ type UpdateFunc func(input runtime.Object, res ResponseMeta) (output runtime.Obj
// ValidateObjectFunc is a function to act on a given object. An error may be returned
// if the hook cannot be completed. The function may NOT transform the provided
// object.
type ValidateObjectFunc func(obj runtime.Object) error
type ValidateObjectFunc func(ctx context.Context, obj runtime.Object) error
// ValidateAllObjectFunc is a "admit everything" instance of ValidateObjectFunc.
func ValidateAllObjectFunc(obj runtime.Object) error {
func ValidateAllObjectFunc(ctx context.Context, obj runtime.Object) error {
return nil
}

View File

@ -124,20 +124,6 @@ func (s *SelectionPredicate) MatchesSingle() (string, bool) {
return "", false
}
// For any index defined by IndexFields, if a matcher can match only (a subset)
// of objects that return <value> for a given index, a pair (<index name>, <value>)
// wil be returned.
// TODO: Consider supporting also labels.
func (s *SelectionPredicate) MatcherIndex() []MatchValue {
var result []MatchValue
for _, field := range s.IndexFields {
if value, ok := s.Field.RequiresExactMatch(field); ok {
result = append(result, MatchValue{IndexName: field, Value: value})
}
}
return result
}
// Empty returns true if the predicate performs no filtering.
func (s *SelectionPredicate) Empty() bool {
return s.Label.Empty() && s.Field.Empty()

View File

@ -39,14 +39,6 @@ func EverythingFunc(runtime.Object) bool {
return true
}
func NoTriggerFunc() []MatchValue {
return nil
}
func NoTriggerPublisher(runtime.Object) []MatchValue {
return nil
}
func NamespaceKeyFunc(prefix string, obj runtime.Object) (string, error) {
meta, err := meta.Accessor(obj)
if err != nil {

189
vendor/k8s.io/apiserver/pkg/storage/value/metrics.go generated vendored Normal file
View File

@ -0,0 +1,189 @@
/*
Copyright 2019 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package value
import (
"sync"
"time"
"google.golang.org/grpc/status"
"k8s.io/component-base/metrics"
"k8s.io/component-base/metrics/legacyregistry"
)
const (
namespace = "apiserver"
subsystem = "storage"
)
/*
* By default, all the following metrics are defined as falling under
* ALPHA stability level https://github.com/kubernetes/enhancements/blob/master/keps/sig-instrumentation/20190404-kubernetes-control-plane-metrics-stability.md#stability-classes)
*
* Promoting the stability level of the metric is a responsibility of the component owner, since it
* involves explicitly acknowledging support for the metric across multiple releases, in accordance with
* the metric stability policy.
*/
var (
transformerLatencies = metrics.NewHistogramVec(
&metrics.HistogramOpts{
Namespace: namespace,
Subsystem: subsystem,
Name: "transformation_duration_seconds",
Help: "Latencies in seconds of value transformation operations.",
// In-process transformations (ex. AES CBC) complete on the order of 20 microseconds. However, when
// external KMS is involved latencies may climb into milliseconds.
Buckets: metrics.ExponentialBuckets(5e-6, 2, 14),
StabilityLevel: metrics.ALPHA,
},
[]string{"transformation_type"},
)
deprecatedTransformerLatencies = metrics.NewHistogramVec(
&metrics.HistogramOpts{
Namespace: namespace,
Subsystem: subsystem,
Name: "transformation_latencies_microseconds",
Help: "Latencies in microseconds of value transformation operations.",
// In-process transformations (ex. AES CBC) complete on the order of 20 microseconds. However, when
// external KMS is involved latencies may climb into milliseconds.
Buckets: metrics.ExponentialBuckets(5, 2, 14),
StabilityLevel: metrics.ALPHA,
DeprecatedVersion: "1.14.0",
},
[]string{"transformation_type"},
)
transformerOperationsTotal = metrics.NewCounterVec(
&metrics.CounterOpts{
Namespace: namespace,
Subsystem: subsystem,
Name: "transformation_operations_total",
Help: "Total number of transformations.",
StabilityLevel: metrics.ALPHA,
},
[]string{"transformation_type", "transformer_prefix", "status"},
)
deprecatedTransformerFailuresTotal = metrics.NewCounterVec(
&metrics.CounterOpts{
Namespace: namespace,
Subsystem: subsystem,
Name: "transformation_failures_total",
Help: "Total number of failed transformation operations.",
StabilityLevel: metrics.ALPHA,
DeprecatedVersion: "1.15.0",
},
[]string{"transformation_type"},
)
envelopeTransformationCacheMissTotal = metrics.NewCounter(
&metrics.CounterOpts{
Namespace: namespace,
Subsystem: subsystem,
Name: "envelope_transformation_cache_misses_total",
Help: "Total number of cache misses while accessing key decryption key(KEK).",
StabilityLevel: metrics.ALPHA,
},
)
dataKeyGenerationLatencies = metrics.NewHistogram(
&metrics.HistogramOpts{
Namespace: namespace,
Subsystem: subsystem,
Name: "data_key_generation_duration_seconds",
Help: "Latencies in seconds of data encryption key(DEK) generation operations.",
Buckets: metrics.ExponentialBuckets(5e-6, 2, 14),
StabilityLevel: metrics.ALPHA,
},
)
deprecatedDataKeyGenerationLatencies = metrics.NewHistogram(
&metrics.HistogramOpts{
Namespace: namespace,
Subsystem: subsystem,
Name: "data_key_generation_latencies_microseconds",
Help: "Latencies in microseconds of data encryption key(DEK) generation operations.",
Buckets: metrics.ExponentialBuckets(5, 2, 14),
StabilityLevel: metrics.ALPHA,
DeprecatedVersion: "1.14.0",
},
)
dataKeyGenerationFailuresTotal = metrics.NewCounter(
&metrics.CounterOpts{
Namespace: namespace,
Subsystem: subsystem,
Name: "data_key_generation_failures_total",
Help: "Total number of failed data encryption key(DEK) generation operations.",
StabilityLevel: metrics.ALPHA,
},
)
)
var registerMetrics sync.Once
func RegisterMetrics() {
registerMetrics.Do(func() {
legacyregistry.MustRegister(transformerLatencies)
legacyregistry.MustRegister(deprecatedTransformerLatencies)
legacyregistry.MustRegister(transformerOperationsTotal)
legacyregistry.MustRegister(deprecatedTransformerFailuresTotal)
legacyregistry.MustRegister(envelopeTransformationCacheMissTotal)
legacyregistry.MustRegister(dataKeyGenerationLatencies)
legacyregistry.MustRegister(deprecatedDataKeyGenerationLatencies)
legacyregistry.MustRegister(dataKeyGenerationFailuresTotal)
})
}
// RecordTransformation records latencies and count of TransformFromStorage and TransformToStorage operations.
// Note that transformation_failures_total metric is deprecated, use transformation_operations_total instead.
func RecordTransformation(transformationType, transformerPrefix string, start time.Time, err error) {
transformerOperationsTotal.WithLabelValues(transformationType, transformerPrefix, status.Code(err).String()).Inc()
switch {
case err == nil:
transformerLatencies.WithLabelValues(transformationType).Observe(sinceInSeconds(start))
deprecatedTransformerLatencies.WithLabelValues(transformationType).Observe(sinceInMicroseconds(start))
default:
deprecatedTransformerFailuresTotal.WithLabelValues(transformationType).Inc()
}
}
// RecordCacheMiss records a miss on Key Encryption Key(KEK) - call to KMS was required to decrypt KEK.
func RecordCacheMiss() {
envelopeTransformationCacheMissTotal.Inc()
}
// RecordDataKeyGeneration records latencies and count of Data Encryption Key generation operations.
func RecordDataKeyGeneration(start time.Time, err error) {
if err != nil {
dataKeyGenerationFailuresTotal.Inc()
return
}
dataKeyGenerationLatencies.Observe(sinceInSeconds(start))
deprecatedDataKeyGenerationLatencies.Observe(sinceInMicroseconds(start))
}
// sinceInMicroseconds gets the time since the specified start in microseconds.
func sinceInMicroseconds(start time.Time) float64 {
return float64(time.Since(start).Nanoseconds() / time.Microsecond.Nanoseconds())
}
// sinceInSeconds gets the time since the specified start in seconds.
func sinceInSeconds(start time.Time) float64 {
return time.Since(start).Seconds()
}

View File

@ -0,0 +1,209 @@
/*
Copyright 2017 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
// Package value contains methods for assisting with transformation of values in storage.
package value
import (
"bytes"
"fmt"
"sync"
"time"
"k8s.io/apimachinery/pkg/util/errors"
)
func init() {
RegisterMetrics()
}
// Context is additional information that a storage transformation may need to verify the data at rest.
type Context interface {
// AuthenticatedData should return an array of bytes that describes the current value. If the value changes,
// the transformer may report the value as unreadable or tampered. This may be nil if no such description exists
// or is needed. For additional verification, set this to data that strongly identifies the value, such as
// the key and creation version of the stored data.
AuthenticatedData() []byte
}
// Transformer allows a value to be transformed before being read from or written to the underlying store. The methods
// must be able to undo the transformation caused by the other.
type Transformer interface {
// TransformFromStorage may transform the provided data from its underlying storage representation or return an error.
// Stale is true if the object on disk is stale and a write to etcd should be issued, even if the contents of the object
// have not changed.
TransformFromStorage(data []byte, context Context) (out []byte, stale bool, err error)
// TransformToStorage may transform the provided data into the appropriate form in storage or return an error.
TransformToStorage(data []byte, context Context) (out []byte, err error)
}
type identityTransformer struct{}
// IdentityTransformer performs no transformation of the provided data.
var IdentityTransformer Transformer = identityTransformer{}
func (identityTransformer) TransformFromStorage(b []byte, ctx Context) ([]byte, bool, error) {
return b, false, nil
}
func (identityTransformer) TransformToStorage(b []byte, ctx Context) ([]byte, error) {
return b, nil
}
// DefaultContext is a simple implementation of Context for a slice of bytes.
type DefaultContext []byte
// AuthenticatedData returns itself.
func (c DefaultContext) AuthenticatedData() []byte { return []byte(c) }
// MutableTransformer allows a transformer to be changed safely at runtime.
type MutableTransformer struct {
lock sync.RWMutex
transformer Transformer
}
// NewMutableTransformer creates a transformer that can be updated at any time by calling Set()
func NewMutableTransformer(transformer Transformer) *MutableTransformer {
return &MutableTransformer{transformer: transformer}
}
// Set updates the nested transformer.
func (t *MutableTransformer) Set(transformer Transformer) {
t.lock.Lock()
t.transformer = transformer
t.lock.Unlock()
}
func (t *MutableTransformer) TransformFromStorage(data []byte, context Context) (out []byte, stale bool, err error) {
t.lock.RLock()
transformer := t.transformer
t.lock.RUnlock()
return transformer.TransformFromStorage(data, context)
}
func (t *MutableTransformer) TransformToStorage(data []byte, context Context) (out []byte, err error) {
t.lock.RLock()
transformer := t.transformer
t.lock.RUnlock()
return transformer.TransformToStorage(data, context)
}
// PrefixTransformer holds a transformer interface and the prefix that the transformation is located under.
type PrefixTransformer struct {
Prefix []byte
Transformer Transformer
}
type prefixTransformers struct {
transformers []PrefixTransformer
err error
}
var _ Transformer = &prefixTransformers{}
// NewPrefixTransformers supports the Transformer interface by checking the incoming data against the provided
// prefixes in order. The first matching prefix will be used to transform the value (the prefix is stripped
// before the Transformer interface is invoked). The first provided transformer will be used when writing to
// the store.
func NewPrefixTransformers(err error, transformers ...PrefixTransformer) Transformer {
if err == nil {
err = fmt.Errorf("the provided value does not match any of the supported transformers")
}
return &prefixTransformers{
transformers: transformers,
err: err,
}
}
// TransformFromStorage finds the first transformer with a prefix matching the provided data and returns
// the result of transforming the value. It will always mark any transformation as stale that is not using
// the first transformer.
func (t *prefixTransformers) TransformFromStorage(data []byte, context Context) ([]byte, bool, error) {
start := time.Now()
var errs []error
for i, transformer := range t.transformers {
if bytes.HasPrefix(data, transformer.Prefix) {
result, stale, err := transformer.Transformer.TransformFromStorage(data[len(transformer.Prefix):], context)
// To migrate away from encryption, user can specify an identity transformer higher up
// (in the config file) than the encryption transformer. In that scenario, the identity transformer needs to
// identify (during reads from disk) whether the data being read is encrypted or not. If the data is encrypted,
// it shall throw an error, but that error should not prevent the next subsequent transformer from being tried.
if len(transformer.Prefix) == 0 && err != nil {
continue
}
if len(transformer.Prefix) == 0 {
RecordTransformation("from_storage", "identity", start, err)
} else {
RecordTransformation("from_storage", string(transformer.Prefix), start, err)
}
// It is valid to have overlapping prefixes when the same encryption provider
// is specified multiple times but with different keys (the first provider is
// being rotated to and some later provider is being rotated away from).
//
// Example:
//
// {
// "aescbc": {
// "keys": [
// {
// "name": "2",
// "secret": "some key 2"
// }
// ]
// }
// },
// {
// "aescbc": {
// "keys": [
// {
// "name": "1",
// "secret": "some key 1"
// }
// ]
// }
// },
//
// The transformers for both aescbc configs share the prefix k8s:enc:aescbc:v1:
// but a failure in the first one should not prevent a later match from being attempted.
// Thus we never short-circuit on a prefix match that results in an error.
if err != nil {
errs = append(errs, err)
continue
}
return result, stale || i != 0, err
}
}
if err := errors.Reduce(errors.NewAggregate(errs)); err != nil {
return nil, false, err
}
RecordTransformation("from_storage", "unknown", start, t.err)
return nil, false, t.err
}
// TransformToStorage uses the first transformer and adds its prefix to the data.
func (t *prefixTransformers) TransformToStorage(data []byte, context Context) ([]byte, error) {
start := time.Now()
transformer := t.transformers[0]
prefixedData := make([]byte, len(transformer.Prefix), len(data)+len(transformer.Prefix))
copy(prefixedData, transformer.Prefix)
result, err := transformer.Transformer.TransformToStorage(data, context)
RecordTransformation("to_storage", string(transformer.Prefix), start, err)
if err != nil {
return nil, err
}
prefixedData = append(prefixedData, result...)
return prefixedData, nil
}