rebase: bump k8s.io/kubernetes from 1.26.2 to 1.27.2

Bumps [k8s.io/kubernetes](https://github.com/kubernetes/kubernetes) from 1.26.2 to 1.27.2.
- [Release notes](https://github.com/kubernetes/kubernetes/releases)
- [Commits](https://github.com/kubernetes/kubernetes/compare/v1.26.2...v1.27.2)

---
updated-dependencies:
- dependency-name: k8s.io/kubernetes
  dependency-type: direct:production
  update-type: version-update:semver-minor
...

Signed-off-by: dependabot[bot] <support@github.com>
This commit is contained in:
dependabot[bot]
2023-05-29 21:03:29 +00:00
committed by mergify[bot]
parent 0e79135419
commit 07b05616a0
1072 changed files with 208716 additions and 198880 deletions

4
vendor/k8s.io/apiserver/pkg/storage/etcd3/OWNERS generated vendored Normal file
View File

@ -0,0 +1,4 @@
# See the OWNERS docs at https://go.k8s.io/owners
reviewers:
- wojtek-t

162
vendor/k8s.io/apiserver/pkg/storage/etcd3/compact.go generated vendored Normal file
View File

@ -0,0 +1,162 @@
/*
Copyright 2016 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package etcd3
import (
"context"
"strconv"
"sync"
"time"
clientv3 "go.etcd.io/etcd/client/v3"
"k8s.io/klog/v2"
)
const (
compactRevKey = "compact_rev_key"
)
var (
endpointsMapMu sync.Mutex
endpointsMap map[string]struct{}
)
func init() {
endpointsMap = make(map[string]struct{})
}
// StartCompactor starts a compactor in the background to compact old version of keys that's not needed.
// By default, we save the most recent 5 minutes data and compact versions > 5minutes ago.
// It should be enough for slow watchers and to tolerate burst.
// TODO: We might keep a longer history (12h) in the future once storage API can take advantage of past version of keys.
func StartCompactor(ctx context.Context, client *clientv3.Client, compactInterval time.Duration) {
endpointsMapMu.Lock()
defer endpointsMapMu.Unlock()
// In one process, we can have only one compactor for one cluster.
// Currently we rely on endpoints to differentiate clusters.
for _, ep := range client.Endpoints() {
if _, ok := endpointsMap[ep]; ok {
klog.V(4).Infof("compactor already exists for endpoints %v", client.Endpoints())
return
}
}
for _, ep := range client.Endpoints() {
endpointsMap[ep] = struct{}{}
}
if compactInterval != 0 {
go compactor(ctx, client, compactInterval)
}
}
// compactor periodically compacts historical versions of keys in etcd.
// It will compact keys with versions older than given interval.
// In other words, after compaction, it will only contain keys set during last interval.
// Any API call for the older versions of keys will return error.
// Interval is the time interval between each compaction. The first compaction happens after "interval".
func compactor(ctx context.Context, client *clientv3.Client, interval time.Duration) {
// Technical definitions:
// We have a special key in etcd defined as *compactRevKey*.
// compactRevKey's value will be set to the string of last compacted revision.
// compactRevKey's version will be used as logical time for comparison. THe version is referred as compact time.
// Initially, because the key doesn't exist, the compact time (version) is 0.
//
// Algorithm:
// - Compare to see if (local compact_time) = (remote compact_time).
// - If yes, increment both local and remote compact_time, and do a compaction.
// - If not, set local to remote compact_time.
//
// Technical details/insights:
//
// The protocol here is lease based. If one compactor CAS successfully, the others would know it when they fail in
// CAS later and would try again in 5 minutes. If an APIServer crashed, another one would "take over" the lease.
//
// For example, in the following diagram, we have a compactor C1 doing compaction in t1, t2. Another compactor C2
// at t1' (t1 < t1' < t2) would CAS fail, set its known oldRev to rev at t1', and try again in t2' (t2' > t2).
// If C1 crashed and wouldn't compact at t2, C2 would CAS successfully at t2'.
//
// oldRev(t2) curRev(t2)
// +
// oldRev curRev |
// + + |
// | | |
// | | t1' | t2'
// +---v-------------v----^---------v------^---->
// t0 t1 t2
//
// We have the guarantees:
// - in normal cases, the interval is 5 minutes.
// - in failover, the interval is >5m and <10m
//
// FAQ:
// - What if time is not accurate? We don't care as long as someone did the compaction. Atomicity is ensured using
// etcd API.
// - What happened under heavy load scenarios? Initially, each apiserver will do only one compaction
// every 5 minutes. This is very unlikely affecting or affected w.r.t. server load.
var compactTime int64
var rev int64
var err error
for {
select {
case <-time.After(interval):
case <-ctx.Done():
return
}
compactTime, rev, err = compact(ctx, client, compactTime, rev)
if err != nil {
klog.Errorf("etcd: endpoint (%v) compact failed: %v", client.Endpoints(), err)
continue
}
}
}
// compact compacts etcd store and returns current rev.
// It will return the current compact time and global revision if no error occurred.
// Note that CAS fail will not incur any error.
func compact(ctx context.Context, client *clientv3.Client, t, rev int64) (int64, int64, error) {
resp, err := client.KV.Txn(ctx).If(
clientv3.Compare(clientv3.Version(compactRevKey), "=", t),
).Then(
clientv3.OpPut(compactRevKey, strconv.FormatInt(rev, 10)), // Expect side effect: increment Version
).Else(
clientv3.OpGet(compactRevKey),
).Commit()
if err != nil {
return t, rev, err
}
curRev := resp.Header.Revision
if !resp.Succeeded {
curTime := resp.Responses[0].GetResponseRange().Kvs[0].Version
return curTime, curRev, nil
}
curTime := t + 1
if rev == 0 {
// We don't compact on bootstrap.
return curTime, curRev, nil
}
if _, err = client.Compact(ctx, rev); err != nil {
return curTime, curRev, err
}
klog.V(4).Infof("etcd: compacted rev (%d), endpoints (%v)", rev, client.Endpoints())
return curTime, curRev, nil
}

72
vendor/k8s.io/apiserver/pkg/storage/etcd3/errors.go generated vendored Normal file
View File

@ -0,0 +1,72 @@
/*
Copyright 2016 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package etcd3
import (
"k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apiserver/pkg/storage"
etcdrpc "go.etcd.io/etcd/api/v3/v3rpc/rpctypes"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
)
func interpretWatchError(err error) error {
switch {
case err == etcdrpc.ErrCompacted:
return errors.NewResourceExpired("The resourceVersion for the provided watch is too old.")
}
return err
}
const (
expired string = "The resourceVersion for the provided list is too old."
continueExpired string = "The provided continue parameter is too old " +
"to display a consistent list result. You can start a new list without " +
"the continue parameter."
inconsistentContinue string = "The provided continue parameter is too old " +
"to display a consistent list result. You can start a new list without " +
"the continue parameter, or use the continue token in this response to " +
"retrieve the remainder of the results. Continuing with the provided " +
"token results in an inconsistent list - objects that were created, " +
"modified, or deleted between the time the first chunk was returned " +
"and now may show up in the list."
)
func interpretListError(err error, paging bool, continueKey, keyPrefix string) error {
switch {
case err == etcdrpc.ErrCompacted:
if paging {
return handleCompactedErrorForPaging(continueKey, keyPrefix)
}
return errors.NewResourceExpired(expired)
}
return err
}
func handleCompactedErrorForPaging(continueKey, keyPrefix string) error {
// continueToken.ResoureVersion=-1 means that the apiserver can
// continue the list at the latest resource version. We don't use rv=0
// for this purpose to distinguish from a bad token that has empty rv.
newToken, err := storage.EncodeContinue(continueKey, keyPrefix, -1)
if err != nil {
utilruntime.HandleError(err)
return errors.NewResourceExpired(continueExpired)
}
statusError := errors.NewResourceExpired(inconsistentContinue)
statusError.ErrStatus.ListMeta.Continue = newToken
return statusError
}

71
vendor/k8s.io/apiserver/pkg/storage/etcd3/event.go generated vendored Normal file
View File

@ -0,0 +1,71 @@
/*
Copyright 2016 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package etcd3
import (
"fmt"
"go.etcd.io/etcd/api/v3/mvccpb"
clientv3 "go.etcd.io/etcd/client/v3"
)
type event struct {
key string
value []byte
prevValue []byte
rev int64
isDeleted bool
isCreated bool
isProgressNotify bool
}
// parseKV converts a KeyValue retrieved from an initial sync() listing to a synthetic isCreated event.
func parseKV(kv *mvccpb.KeyValue) *event {
return &event{
key: string(kv.Key),
value: kv.Value,
prevValue: nil,
rev: kv.ModRevision,
isDeleted: false,
isCreated: true,
}
}
func parseEvent(e *clientv3.Event) (*event, error) {
if !e.IsCreate() && e.PrevKv == nil {
// If the previous value is nil, error. One example of how this is possible is if the previous value has been compacted already.
return nil, fmt.Errorf("etcd event received with PrevKv=nil (key=%q, modRevision=%d, type=%s)", string(e.Kv.Key), e.Kv.ModRevision, e.Type.String())
}
ret := &event{
key: string(e.Kv.Key),
value: e.Kv.Value,
rev: e.Kv.ModRevision,
isDeleted: e.Type == clientv3.EventTypeDelete,
isCreated: e.IsCreate(),
}
if e.PrevKv != nil {
ret.prevValue = e.PrevKv.Value
}
return ret, nil
}
func progressNotifyEvent(rev int64) *event {
return &event{
rev: rev,
isProgressNotify: true,
}
}

View File

@ -0,0 +1,40 @@
/*
Copyright 2015 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package etcd3
import (
"encoding/json"
"fmt"
)
// etcdHealth encodes data returned from etcd /healthz handler.
type etcdHealth struct {
// Note this has to be public so the json library can modify it.
Health string `json:"health"`
}
// EtcdHealthCheck decodes data returned from etcd /healthz handler.
func EtcdHealthCheck(data []byte) error {
obj := etcdHealth{}
if err := json.Unmarshal(data, &obj); err != nil {
return err
}
if obj.Health != "true" {
return fmt.Errorf("Unhealthy status: %s", obj.Health)
}
return nil
}

View File

@ -0,0 +1,108 @@
/*
Copyright 2022 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package etcd3
import (
"context"
"time"
clientv3 "go.etcd.io/etcd/client/v3"
endpointsrequest "k8s.io/apiserver/pkg/endpoints/request"
)
// NewETCDLatencyTracker returns an implementation of
// clientv3.KV that times the calls from the specified
// 'delegate' KV instance in order to track latency incurred.
func NewETCDLatencyTracker(delegate clientv3.KV) clientv3.KV {
return &clientV3KVLatencyTracker{KV: delegate}
}
// clientV3KVLatencyTracker decorates a clientv3.KV instance and times
// each call so we can track the latency an API request incurs in etcd
// round trips (the time it takes to send data to etcd and get the
// complete response back)
//
// If an API request involves N (N>=1) round trips to etcd, then we will sum
// up the latenciy incurred in each roundtrip.
// It uses the context associated with the request in flight, so there
// are no states shared among the requests in flight, and so there is no
// concurrency overhead.
// If the goroutine executing the request handler makes concurrent calls
// to the underlying storage layer, that is protected since the latency
// tracking function TrackStorageLatency is thread safe.
//
// NOTE: Compact is an asynchronous process and is not associated with
//
// any request, so we will not be tracking its latency.
type clientV3KVLatencyTracker struct {
clientv3.KV
}
func (c *clientV3KVLatencyTracker) Put(ctx context.Context, key, val string, opts ...clientv3.OpOption) (*clientv3.PutResponse, error) {
startedAt := time.Now()
defer func() {
endpointsrequest.TrackStorageLatency(ctx, time.Since(startedAt))
}()
return c.KV.Put(ctx, key, val, opts...)
}
func (c *clientV3KVLatencyTracker) Get(ctx context.Context, key string, opts ...clientv3.OpOption) (*clientv3.GetResponse, error) {
startedAt := time.Now()
defer func() {
endpointsrequest.TrackStorageLatency(ctx, time.Since(startedAt))
}()
return c.KV.Get(ctx, key, opts...)
}
func (c *clientV3KVLatencyTracker) Delete(ctx context.Context, key string, opts ...clientv3.OpOption) (*clientv3.DeleteResponse, error) {
startedAt := time.Now()
defer func() {
endpointsrequest.TrackStorageLatency(ctx, time.Since(startedAt))
}()
return c.KV.Delete(ctx, key, opts...)
}
func (c *clientV3KVLatencyTracker) Do(ctx context.Context, op clientv3.Op) (clientv3.OpResponse, error) {
startedAt := time.Now()
defer func() {
endpointsrequest.TrackStorageLatency(ctx, time.Since(startedAt))
}()
return c.KV.Do(ctx, op)
}
func (c *clientV3KVLatencyTracker) Txn(ctx context.Context) clientv3.Txn {
return &clientV3TxnTracker{ctx: ctx, Txn: c.KV.Txn(ctx)}
}
type clientV3TxnTracker struct {
ctx context.Context
clientv3.Txn
}
func (t *clientV3TxnTracker) Commit() (*clientv3.TxnResponse, error) {
startedAt := time.Now()
defer func() {
endpointsrequest.TrackStorageLatency(t.ctx, time.Since(startedAt))
}()
return t.Txn.Commit()
}

View File

@ -0,0 +1,131 @@
/*
Copyright 2018 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package etcd3
import (
"context"
"sync"
"time"
clientv3 "go.etcd.io/etcd/client/v3"
"k8s.io/apiserver/pkg/storage/etcd3/metrics"
)
const (
defaultLeaseReuseDurationSeconds = 60
defaultLeaseMaxObjectCount = 1000
)
// LeaseManagerConfig is configuration for creating a lease manager.
type LeaseManagerConfig struct {
// ReuseDurationSeconds specifies time in seconds that each lease is reused
ReuseDurationSeconds int64
// MaxObjectCount specifies how many objects that a lease can attach
MaxObjectCount int64
}
// NewDefaultLeaseManagerConfig creates a LeaseManagerConfig with default values
func NewDefaultLeaseManagerConfig() LeaseManagerConfig {
return LeaseManagerConfig{
ReuseDurationSeconds: defaultLeaseReuseDurationSeconds,
MaxObjectCount: defaultLeaseMaxObjectCount,
}
}
// leaseManager is used to manage leases requested from etcd. If a new write
// needs a lease that has similar expiration time to the previous one, the old
// lease will be reused to reduce the overhead of etcd, since lease operations
// are expensive. In the implementation, we only store one previous lease,
// since all the events have the same ttl.
type leaseManager struct {
client *clientv3.Client // etcd client used to grant leases
leaseMu sync.Mutex
prevLeaseID clientv3.LeaseID
prevLeaseExpirationTime time.Time
// The period of time in seconds and percent of TTL that each lease is
// reused. The minimum of them is used to avoid unreasonably large
// numbers.
leaseReuseDurationSeconds int64
leaseReuseDurationPercent float64
leaseMaxAttachedObjectCount int64
leaseAttachedObjectCount int64
}
// newDefaultLeaseManager creates a new lease manager using default setting.
func newDefaultLeaseManager(client *clientv3.Client, config LeaseManagerConfig) *leaseManager {
if config.MaxObjectCount <= 0 {
config.MaxObjectCount = defaultLeaseMaxObjectCount
}
return newLeaseManager(client, config.ReuseDurationSeconds, 0.05, config.MaxObjectCount)
}
// newLeaseManager creates a new lease manager with the number of buffered
// leases, lease reuse duration in seconds and percentage. The percentage
// value x means x*100%.
func newLeaseManager(client *clientv3.Client, leaseReuseDurationSeconds int64, leaseReuseDurationPercent float64, maxObjectCount int64) *leaseManager {
return &leaseManager{
client: client,
leaseReuseDurationSeconds: leaseReuseDurationSeconds,
leaseReuseDurationPercent: leaseReuseDurationPercent,
leaseMaxAttachedObjectCount: maxObjectCount,
}
}
// GetLease returns a lease based on requested ttl: if the cached previous
// lease can be reused, reuse it; otherwise request a new one from etcd.
func (l *leaseManager) GetLease(ctx context.Context, ttl int64) (clientv3.LeaseID, error) {
now := time.Now()
l.leaseMu.Lock()
defer l.leaseMu.Unlock()
// check if previous lease can be reused
reuseDurationSeconds := l.getReuseDurationSecondsLocked(ttl)
valid := now.Add(time.Duration(ttl) * time.Second).Before(l.prevLeaseExpirationTime)
sufficient := now.Add(time.Duration(ttl+reuseDurationSeconds) * time.Second).After(l.prevLeaseExpirationTime)
// We count all operations that happened in the same lease, regardless of success or failure.
// Currently each GetLease call only attach 1 object
l.leaseAttachedObjectCount++
if valid && sufficient && l.leaseAttachedObjectCount <= l.leaseMaxAttachedObjectCount {
return l.prevLeaseID, nil
}
// request a lease with a little extra ttl from etcd
ttl += reuseDurationSeconds
lcr, err := l.client.Lease.Grant(ctx, ttl)
if err != nil {
return clientv3.LeaseID(0), err
}
// cache the new lease id
l.prevLeaseID = lcr.ID
l.prevLeaseExpirationTime = now.Add(time.Duration(ttl) * time.Second)
// refresh count
metrics.UpdateLeaseObjectCount(l.leaseAttachedObjectCount)
l.leaseAttachedObjectCount = 1
return lcr.ID, nil
}
// getReuseDurationSecondsLocked returns the reusable duration in seconds
// based on the configuration. Lock has to be acquired before calling this
// function.
func (l *leaseManager) getReuseDurationSecondsLocked(ttl int64) int64 {
reuseDurationSeconds := int64(l.leaseReuseDurationPercent * float64(ttl))
if reuseDurationSeconds > l.leaseReuseDurationSeconds {
reuseDurationSeconds = l.leaseReuseDurationSeconds
}
return reuseDurationSeconds
}

90
vendor/k8s.io/apiserver/pkg/storage/etcd3/logger.go generated vendored Normal file
View File

@ -0,0 +1,90 @@
/*
Copyright 2018 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package etcd3
import (
"fmt"
"google.golang.org/grpc/grpclog"
"k8s.io/klog/v2"
)
func init() {
grpclog.SetLoggerV2(klogWrapper{})
}
type klogWrapper struct{}
const klogWrapperDepth = 4
func (klogWrapper) Info(args ...interface{}) {
if klogV := klog.V(5); klogV.Enabled() {
klogV.InfoSDepth(klogWrapperDepth, fmt.Sprint(args...))
}
}
func (klogWrapper) Infoln(args ...interface{}) {
if klogV := klog.V(5); klogV.Enabled() {
klogV.InfoSDepth(klogWrapperDepth, fmt.Sprintln(args...))
}
}
func (klogWrapper) Infof(format string, args ...interface{}) {
if klogV := klog.V(5); klogV.Enabled() {
klog.V(5).InfoSDepth(klogWrapperDepth, fmt.Sprintf(format, args...))
}
}
func (klogWrapper) Warning(args ...interface{}) {
klog.WarningDepth(klogWrapperDepth, args...)
}
func (klogWrapper) Warningln(args ...interface{}) {
klog.WarningDepth(klogWrapperDepth, fmt.Sprintln(args...))
}
func (klogWrapper) Warningf(format string, args ...interface{}) {
klog.WarningDepth(klogWrapperDepth, fmt.Sprintf(format, args...))
}
func (klogWrapper) Error(args ...interface{}) {
klog.ErrorDepth(klogWrapperDepth, args...)
}
func (klogWrapper) Errorln(args ...interface{}) {
klog.ErrorDepth(klogWrapperDepth, fmt.Sprintln(args...))
}
func (klogWrapper) Errorf(format string, args ...interface{}) {
klog.ErrorDepth(klogWrapperDepth, fmt.Sprintf(format, args...))
}
func (klogWrapper) Fatal(args ...interface{}) {
klog.FatalDepth(klogWrapperDepth, args...)
}
func (klogWrapper) Fatalln(args ...interface{}) {
klog.FatalDepth(klogWrapperDepth, fmt.Sprintln(args...))
}
func (klogWrapper) Fatalf(format string, args ...interface{}) {
klog.FatalDepth(klogWrapperDepth, fmt.Sprintf(format, args...))
}
func (klogWrapper) V(l int) bool {
return bool(klog.V(klog.Level(l)).Enabled())
}

View File

@ -0,0 +1,4 @@
# See the OWNERS docs at https://go.k8s.io/owners
approvers:
- logicalhan

View File

@ -0,0 +1,179 @@
/*
Copyright 2015 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package metrics
import (
"sync"
"time"
compbasemetrics "k8s.io/component-base/metrics"
"k8s.io/component-base/metrics/legacyregistry"
)
/*
* By default, all the following metrics are defined as falling under
* ALPHA stability level https://github.com/kubernetes/enhancements/blob/master/keps/sig-instrumentation/1209-metrics-stability/kubernetes-control-plane-metrics-stability.md#stability-classes)
*
* Promoting the stability level of the metric is a responsibility of the component owner, since it
* involves explicitly acknowledging support for the metric across multiple releases, in accordance with
* the metric stability policy.
*/
var (
etcdRequestLatency = compbasemetrics.NewHistogramVec(
&compbasemetrics.HistogramOpts{
Name: "etcd_request_duration_seconds",
Help: "Etcd request latency in seconds for each operation and object type.",
// Etcd request latency in seconds for each operation and object type.
// This metric is used for verifying etcd api call latencies SLO
// keep consistent with apiserver metric 'requestLatencies' in
// staging/src/k8s.io/apiserver/pkg/endpoints/metrics/metrics.go
Buckets: []float64{0.005, 0.025, 0.05, 0.1, 0.2, 0.4, 0.6, 0.8, 1.0, 1.25, 1.5, 2, 3,
4, 5, 6, 8, 10, 15, 20, 30, 45, 60},
StabilityLevel: compbasemetrics.ALPHA,
},
[]string{"operation", "type"},
)
objectCounts = compbasemetrics.NewGaugeVec(
&compbasemetrics.GaugeOpts{
Name: "apiserver_storage_objects",
Help: "Number of stored objects at the time of last check split by kind.",
StabilityLevel: compbasemetrics.STABLE,
},
[]string{"resource"},
)
dbTotalSize = compbasemetrics.NewGaugeVec(
&compbasemetrics.GaugeOpts{
Subsystem: "apiserver",
Name: "storage_db_total_size_in_bytes",
Help: "Total size of the storage database file physically allocated in bytes.",
StabilityLevel: compbasemetrics.ALPHA,
},
[]string{"endpoint"},
)
etcdBookmarkCounts = compbasemetrics.NewGaugeVec(
&compbasemetrics.GaugeOpts{
Name: "etcd_bookmark_counts",
Help: "Number of etcd bookmarks (progress notify events) split by kind.",
StabilityLevel: compbasemetrics.ALPHA,
},
[]string{"resource"},
)
etcdLeaseObjectCounts = compbasemetrics.NewHistogramVec(
&compbasemetrics.HistogramOpts{
Name: "etcd_lease_object_counts",
Help: "Number of objects attached to a single etcd lease.",
Buckets: []float64{10, 50, 100, 500, 1000, 2500, 5000},
StabilityLevel: compbasemetrics.ALPHA,
},
[]string{},
)
listStorageCount = compbasemetrics.NewCounterVec(
&compbasemetrics.CounterOpts{
Name: "apiserver_storage_list_total",
Help: "Number of LIST requests served from storage",
StabilityLevel: compbasemetrics.ALPHA,
},
[]string{"resource"},
)
listStorageNumFetched = compbasemetrics.NewCounterVec(
&compbasemetrics.CounterOpts{
Name: "apiserver_storage_list_fetched_objects_total",
Help: "Number of objects read from storage in the course of serving a LIST request",
StabilityLevel: compbasemetrics.ALPHA,
},
[]string{"resource"},
)
listStorageNumSelectorEvals = compbasemetrics.NewCounterVec(
&compbasemetrics.CounterOpts{
Name: "apiserver_storage_list_evaluated_objects_total",
Help: "Number of objects tested in the course of serving a LIST request from storage",
StabilityLevel: compbasemetrics.ALPHA,
},
[]string{"resource"},
)
listStorageNumReturned = compbasemetrics.NewCounterVec(
&compbasemetrics.CounterOpts{
Name: "apiserver_storage_list_returned_objects_total",
Help: "Number of objects returned for a LIST request from storage",
StabilityLevel: compbasemetrics.ALPHA,
},
[]string{"resource"},
)
)
var registerMetrics sync.Once
// Register all metrics.
func Register() {
// Register the metrics.
registerMetrics.Do(func() {
legacyregistry.MustRegister(etcdRequestLatency)
legacyregistry.MustRegister(objectCounts)
legacyregistry.MustRegister(dbTotalSize)
legacyregistry.MustRegister(etcdBookmarkCounts)
legacyregistry.MustRegister(etcdLeaseObjectCounts)
legacyregistry.MustRegister(listStorageCount)
legacyregistry.MustRegister(listStorageNumFetched)
legacyregistry.MustRegister(listStorageNumSelectorEvals)
legacyregistry.MustRegister(listStorageNumReturned)
})
}
// UpdateObjectCount sets the apiserver_storage_object_counts metric.
func UpdateObjectCount(resourcePrefix string, count int64) {
objectCounts.WithLabelValues(resourcePrefix).Set(float64(count))
}
// RecordEtcdRequestLatency sets the etcd_request_duration_seconds metrics.
func RecordEtcdRequestLatency(verb, resource string, startTime time.Time) {
etcdRequestLatency.WithLabelValues(verb, resource).Observe(sinceInSeconds(startTime))
}
// RecordEtcdBookmark updates the etcd_bookmark_counts metric.
func RecordEtcdBookmark(resource string) {
etcdBookmarkCounts.WithLabelValues(resource).Inc()
}
// Reset resets the etcd_request_duration_seconds metric.
func Reset() {
etcdRequestLatency.Reset()
}
// sinceInSeconds gets the time since the specified start in seconds.
func sinceInSeconds(start time.Time) float64 {
return time.Since(start).Seconds()
}
// UpdateEtcdDbSize sets the etcd_db_total_size_in_bytes metric.
func UpdateEtcdDbSize(ep string, size int64) {
dbTotalSize.WithLabelValues(ep).Set(float64(size))
}
// UpdateLeaseObjectCount sets the etcd_lease_object_counts metric.
func UpdateLeaseObjectCount(count int64) {
// Currently we only store one previous lease, since all the events have the same ttl.
// See pkg/storage/etcd3/lease_manager.go
etcdLeaseObjectCounts.WithLabelValues().Observe(float64(count))
}
// RecordListEtcd3Metrics notes various metrics of the cost to serve a LIST request
func RecordStorageListMetrics(resource string, numFetched, numEvald, numReturned int) {
listStorageCount.WithLabelValues(resource).Inc()
listStorageNumFetched.WithLabelValues(resource).Add(float64(numFetched))
listStorageNumSelectorEvals.WithLabelValues(resource).Add(float64(numEvald))
listStorageNumReturned.WithLabelValues(resource).Add(float64(numReturned))
}

1028
vendor/k8s.io/apiserver/pkg/storage/etcd3/store.go generated vendored Normal file

File diff suppressed because it is too large Load Diff

466
vendor/k8s.io/apiserver/pkg/storage/etcd3/watcher.go generated vendored Normal file
View File

@ -0,0 +1,466 @@
/*
Copyright 2016 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package etcd3
import (
"context"
"fmt"
"os"
"reflect"
"strconv"
"strings"
"sync"
apierrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/watch"
"k8s.io/apiserver/pkg/storage"
"k8s.io/apiserver/pkg/storage/etcd3/metrics"
"k8s.io/apiserver/pkg/storage/value"
utilflowcontrol "k8s.io/apiserver/pkg/util/flowcontrol"
clientv3 "go.etcd.io/etcd/client/v3"
"k8s.io/klog/v2"
)
const (
// We have set a buffer in order to reduce times of context switches.
incomingBufSize = 100
outgoingBufSize = 100
)
// fatalOnDecodeError is used during testing to panic the server if watcher encounters a decoding error
var fatalOnDecodeError = false
func init() {
// check to see if we are running in a test environment
TestOnlySetFatalOnDecodeError(true)
fatalOnDecodeError, _ = strconv.ParseBool(os.Getenv("KUBE_PANIC_WATCH_DECODE_ERROR"))
}
// TestOnlySetFatalOnDecodeError should only be used for cases where decode errors are expected and need to be tested. e.g. conversion webhooks.
func TestOnlySetFatalOnDecodeError(b bool) {
fatalOnDecodeError = b
}
type watcher struct {
client *clientv3.Client
codec runtime.Codec
newFunc func() runtime.Object
objectType string
groupResource schema.GroupResource
versioner storage.Versioner
}
// watchChan implements watch.Interface.
type watchChan struct {
watcher *watcher
transformer value.Transformer
key string
initialRev int64
recursive bool
progressNotify bool
internalPred storage.SelectionPredicate
ctx context.Context
cancel context.CancelFunc
incomingEventChan chan *event
resultChan chan watch.Event
errChan chan error
}
func newWatcher(client *clientv3.Client, codec runtime.Codec, groupResource schema.GroupResource, newFunc func() runtime.Object, versioner storage.Versioner) *watcher {
res := &watcher{
client: client,
codec: codec,
groupResource: groupResource,
newFunc: newFunc,
versioner: versioner,
}
if newFunc == nil {
res.objectType = "<unknown>"
} else {
res.objectType = reflect.TypeOf(newFunc()).String()
}
return res
}
// Watch watches on a key and returns a watch.Interface that transfers relevant notifications.
// If rev is zero, it will return the existing object(s) and then start watching from
// the maximum revision+1 from returned objects.
// If rev is non-zero, it will watch events happened after given revision.
// If recursive is false, it watches on given key.
// If recursive is true, it watches any children and directories under the key, excluding the root key itself.
// pred must be non-nil. Only if pred matches the change, it will be returned.
func (w *watcher) Watch(ctx context.Context, key string, rev int64, recursive, progressNotify bool, transformer value.Transformer, pred storage.SelectionPredicate) (watch.Interface, error) {
if recursive && !strings.HasSuffix(key, "/") {
key += "/"
}
wc := w.createWatchChan(ctx, key, rev, recursive, progressNotify, transformer, pred)
go wc.run()
// For etcd watch we don't have an easy way to answer whether the watch
// has already caught up. So in the initial version (given that watchcache
// is by default enabled for all resources but Events), we just deliver
// the initialization signal immediately. Improving this will be explored
// in the future.
utilflowcontrol.WatchInitialized(ctx)
return wc, nil
}
func (w *watcher) createWatchChan(ctx context.Context, key string, rev int64, recursive, progressNotify bool, transformer value.Transformer, pred storage.SelectionPredicate) *watchChan {
wc := &watchChan{
watcher: w,
transformer: transformer,
key: key,
initialRev: rev,
recursive: recursive,
progressNotify: progressNotify,
internalPred: pred,
incomingEventChan: make(chan *event, incomingBufSize),
resultChan: make(chan watch.Event, outgoingBufSize),
errChan: make(chan error, 1),
}
if pred.Empty() {
// The filter doesn't filter out any object.
wc.internalPred = storage.Everything
}
// The etcd server waits until it cannot find a leader for 3 election
// timeouts to cancel existing streams. 3 is currently a hard coded
// constant. The election timeout defaults to 1000ms. If the cluster is
// healthy, when the leader is stopped, the leadership transfer should be
// smooth. (leader transfers its leadership before stopping). If leader is
// hard killed, other servers will take an election timeout to realize
// leader lost and start campaign.
wc.ctx, wc.cancel = context.WithCancel(clientv3.WithRequireLeader(ctx))
return wc
}
func (wc *watchChan) run() {
watchClosedCh := make(chan struct{})
go wc.startWatching(watchClosedCh)
var resultChanWG sync.WaitGroup
resultChanWG.Add(1)
go wc.processEvent(&resultChanWG)
select {
case err := <-wc.errChan:
if err == context.Canceled {
break
}
errResult := transformErrorToEvent(err)
if errResult != nil {
// error result is guaranteed to be received by user before closing ResultChan.
select {
case wc.resultChan <- *errResult:
case <-wc.ctx.Done(): // user has given up all results
}
}
case <-watchClosedCh:
case <-wc.ctx.Done(): // user cancel
}
// We use wc.ctx to reap all goroutines. Under whatever condition, we should stop them all.
// It's fine to double cancel.
wc.cancel()
// we need to wait until resultChan wouldn't be used anymore
resultChanWG.Wait()
close(wc.resultChan)
}
func (wc *watchChan) Stop() {
wc.cancel()
}
func (wc *watchChan) ResultChan() <-chan watch.Event {
return wc.resultChan
}
// sync tries to retrieve existing data and send them to process.
// The revision to watch will be set to the revision in response.
// All events sent will have isCreated=true
func (wc *watchChan) sync() error {
opts := []clientv3.OpOption{}
if wc.recursive {
opts = append(opts, clientv3.WithPrefix())
}
getResp, err := wc.watcher.client.Get(wc.ctx, wc.key, opts...)
if err != nil {
return err
}
wc.initialRev = getResp.Header.Revision
for _, kv := range getResp.Kvs {
wc.sendEvent(parseKV(kv))
}
return nil
}
// logWatchChannelErr checks whether the error is about mvcc revision compaction which is regarded as warning
func logWatchChannelErr(err error) {
if !strings.Contains(err.Error(), "mvcc: required revision has been compacted") {
klog.Errorf("watch chan error: %v", err)
} else {
klog.Warningf("watch chan error: %v", err)
}
}
// startWatching does:
// - get current objects if initialRev=0; set initialRev to current rev
// - watch on given key and send events to process.
func (wc *watchChan) startWatching(watchClosedCh chan struct{}) {
if wc.initialRev == 0 {
if err := wc.sync(); err != nil {
klog.Errorf("failed to sync with latest state: %v", err)
wc.sendError(err)
return
}
}
opts := []clientv3.OpOption{clientv3.WithRev(wc.initialRev + 1), clientv3.WithPrevKV()}
if wc.recursive {
opts = append(opts, clientv3.WithPrefix())
}
if wc.progressNotify {
opts = append(opts, clientv3.WithProgressNotify())
}
wch := wc.watcher.client.Watch(wc.ctx, wc.key, opts...)
for wres := range wch {
if wres.Err() != nil {
err := wres.Err()
// If there is an error on server (e.g. compaction), the channel will return it before closed.
logWatchChannelErr(err)
wc.sendError(err)
return
}
if wres.IsProgressNotify() {
wc.sendEvent(progressNotifyEvent(wres.Header.GetRevision()))
metrics.RecordEtcdBookmark(wc.watcher.groupResource.String())
continue
}
for _, e := range wres.Events {
parsedEvent, err := parseEvent(e)
if err != nil {
logWatchChannelErr(err)
wc.sendError(err)
return
}
wc.sendEvent(parsedEvent)
}
}
// When we come to this point, it's only possible that client side ends the watch.
// e.g. cancel the context, close the client.
// If this watch chan is broken and context isn't cancelled, other goroutines will still hang.
// We should notify the main thread that this goroutine has exited.
close(watchClosedCh)
}
// processEvent processes events from etcd watcher and sends results to resultChan.
func (wc *watchChan) processEvent(wg *sync.WaitGroup) {
defer wg.Done()
for {
select {
case e := <-wc.incomingEventChan:
res := wc.transform(e)
if res == nil {
continue
}
if len(wc.resultChan) == outgoingBufSize {
klog.V(3).InfoS("Fast watcher, slow processing. Probably caused by slow dispatching events to watchers", "outgoingEvents", outgoingBufSize, "objectType", wc.watcher.objectType, "groupResource", wc.watcher.groupResource)
}
// If user couldn't receive results fast enough, we also block incoming events from watcher.
// Because storing events in local will cause more memory usage.
// The worst case would be closing the fast watcher.
select {
case wc.resultChan <- *res:
case <-wc.ctx.Done():
return
}
case <-wc.ctx.Done():
return
}
}
}
func (wc *watchChan) filter(obj runtime.Object) bool {
if wc.internalPred.Empty() {
return true
}
matched, err := wc.internalPred.Matches(obj)
return err == nil && matched
}
func (wc *watchChan) acceptAll() bool {
return wc.internalPred.Empty()
}
// transform transforms an event into a result for user if not filtered.
func (wc *watchChan) transform(e *event) (res *watch.Event) {
curObj, oldObj, err := wc.prepareObjs(e)
if err != nil {
klog.Errorf("failed to prepare current and previous objects: %v", err)
wc.sendError(err)
return nil
}
switch {
case e.isProgressNotify:
if wc.watcher.newFunc == nil {
return nil
}
object := wc.watcher.newFunc()
if err := wc.watcher.versioner.UpdateObject(object, uint64(e.rev)); err != nil {
klog.Errorf("failed to propagate object version: %v", err)
return nil
}
res = &watch.Event{
Type: watch.Bookmark,
Object: object,
}
case e.isDeleted:
if !wc.filter(oldObj) {
return nil
}
res = &watch.Event{
Type: watch.Deleted,
Object: oldObj,
}
case e.isCreated:
if !wc.filter(curObj) {
return nil
}
res = &watch.Event{
Type: watch.Added,
Object: curObj,
}
default:
if wc.acceptAll() {
res = &watch.Event{
Type: watch.Modified,
Object: curObj,
}
return res
}
curObjPasses := wc.filter(curObj)
oldObjPasses := wc.filter(oldObj)
switch {
case curObjPasses && oldObjPasses:
res = &watch.Event{
Type: watch.Modified,
Object: curObj,
}
case curObjPasses && !oldObjPasses:
res = &watch.Event{
Type: watch.Added,
Object: curObj,
}
case !curObjPasses && oldObjPasses:
res = &watch.Event{
Type: watch.Deleted,
Object: oldObj,
}
}
}
return res
}
func transformErrorToEvent(err error) *watch.Event {
err = interpretWatchError(err)
if _, ok := err.(apierrors.APIStatus); !ok {
err = apierrors.NewInternalError(err)
}
status := err.(apierrors.APIStatus).Status()
return &watch.Event{
Type: watch.Error,
Object: &status,
}
}
func (wc *watchChan) sendError(err error) {
select {
case wc.errChan <- err:
case <-wc.ctx.Done():
}
}
func (wc *watchChan) sendEvent(e *event) {
if len(wc.incomingEventChan) == incomingBufSize {
klog.V(3).InfoS("Fast watcher, slow processing. Probably caused by slow decoding, user not receiving fast, or other processing logic", "incomingEvents", incomingBufSize, "objectType", wc.watcher.objectType, "groupResource", wc.watcher.groupResource)
}
select {
case wc.incomingEventChan <- e:
case <-wc.ctx.Done():
}
}
func (wc *watchChan) prepareObjs(e *event) (curObj runtime.Object, oldObj runtime.Object, err error) {
if e.isProgressNotify {
// progressNotify events doesn't contain neither current nor previous object version,
return nil, nil, nil
}
if !e.isDeleted {
data, _, err := wc.transformer.TransformFromStorage(wc.ctx, e.value, authenticatedDataString(e.key))
if err != nil {
return nil, nil, err
}
curObj, err = decodeObj(wc.watcher.codec, wc.watcher.versioner, data, e.rev)
if err != nil {
return nil, nil, err
}
}
// We need to decode prevValue, only if this is deletion event or
// the underlying filter doesn't accept all objects (otherwise we
// know that the filter for previous object will return true and
// we need the object only to compute whether it was filtered out
// before).
if len(e.prevValue) > 0 && (e.isDeleted || !wc.acceptAll()) {
data, _, err := wc.transformer.TransformFromStorage(wc.ctx, e.prevValue, authenticatedDataString(e.key))
if err != nil {
return nil, nil, err
}
// Note that this sends the *old* object with the etcd revision for the time at
// which it gets deleted.
oldObj, err = decodeObj(wc.watcher.codec, wc.watcher.versioner, data, e.rev)
if err != nil {
return nil, nil, err
}
}
return curObj, oldObj, nil
}
func decodeObj(codec runtime.Codec, versioner storage.Versioner, data []byte, rev int64) (_ runtime.Object, err error) {
obj, err := runtime.Decode(codec, []byte(data))
if err != nil {
if fatalOnDecodeError {
// we are running in a test environment and thus an
// error here is due to a coder mistake if the defer
// does not catch it
panic(err)
}
return nil, err
}
// ensure resource version is set on the object we load from etcd
if err := versioner.UpdateObject(obj, uint64(rev)); err != nil {
return nil, fmt.Errorf("failure to version api object (%d) %#v: %v", rev, obj, err)
}
return obj, nil
}