rebase: bump sigs.k8s.io/controller-runtime

Bumps the k8s-dependencies group with 1 update in the / directory: [sigs.k8s.io/controller-runtime](https://github.com/kubernetes-sigs/controller-runtime).


Updates `sigs.k8s.io/controller-runtime` from 0.19.4 to 0.20.1
- [Release notes](https://github.com/kubernetes-sigs/controller-runtime/releases)
- [Changelog](https://github.com/kubernetes-sigs/controller-runtime/blob/main/RELEASE.md)
- [Commits](https://github.com/kubernetes-sigs/controller-runtime/compare/v0.19.4...v0.20.1)

---
updated-dependencies:
- dependency-name: sigs.k8s.io/controller-runtime
  dependency-type: direct:production
  update-type: version-update:semver-minor
  dependency-group: k8s-dependencies
...

Signed-off-by: dependabot[bot] <support@github.com>
This commit is contained in:
dependabot[bot] 2025-01-27 20:58:32 +00:00 committed by GitHub
parent ec5fefcc6c
commit 636fa26ab8
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
35 changed files with 2279 additions and 561 deletions

6
go.mod
View File

@ -42,7 +42,7 @@ require (
k8s.io/mount-utils v0.32.1 k8s.io/mount-utils v0.32.1
k8s.io/pod-security-admission v0.32.1 k8s.io/pod-security-admission v0.32.1
k8s.io/utils v0.0.0-20241104100929-3ea5e8cea738 k8s.io/utils v0.0.0-20241104100929-3ea5e8cea738
sigs.k8s.io/controller-runtime v0.19.4 sigs.k8s.io/controller-runtime v0.20.1
) )
require ( require (
@ -101,7 +101,7 @@ require (
github.com/godbus/dbus/v5 v5.1.0 // indirect github.com/godbus/dbus/v5 v5.1.0 // indirect
github.com/gogo/protobuf v1.3.2 // indirect github.com/gogo/protobuf v1.3.2 // indirect
github.com/golang-jwt/jwt/v5 v5.2.1 // indirect github.com/golang-jwt/jwt/v5 v5.2.1 // indirect
github.com/google/btree v1.0.1 // indirect github.com/google/btree v1.1.3 // indirect
github.com/google/cadvisor v0.51.0 // indirect github.com/google/cadvisor v0.51.0 // indirect
github.com/google/cel-go v0.22.0 // indirect github.com/google/cel-go v0.22.0 // indirect
github.com/google/gnostic-models v0.6.8 // indirect github.com/google/gnostic-models v0.6.8 // indirect
@ -188,7 +188,7 @@ require (
gopkg.in/natefinch/lumberjack.v2 v2.2.1 // indirect gopkg.in/natefinch/lumberjack.v2 v2.2.1 // indirect
gopkg.in/yaml.v2 v2.4.0 // indirect gopkg.in/yaml.v2 v2.4.0 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect gopkg.in/yaml.v3 v3.0.1 // indirect
k8s.io/apiextensions-apiserver v0.31.1 // indirect k8s.io/apiextensions-apiserver v0.32.0 // indirect
k8s.io/apiserver v0.32.1 // indirect k8s.io/apiserver v0.32.1 // indirect
k8s.io/component-base v0.32.1 // indirect k8s.io/component-base v0.32.1 // indirect
k8s.io/component-helpers v0.32.1 // indirect k8s.io/component-helpers v0.32.1 // indirect

7
go.sum
View File

@ -1745,8 +1745,9 @@ github.com/gomodules/jsonpatch/v2 v2.2.0 h1:QBjDK/nX43P4z/Os3gnk8VeFdLDgBuMns1Wl
github.com/gomodules/jsonpatch/v2 v2.2.0/go.mod h1:WXp+iVDkoLQqPudfQ9GBlwB2eZ5DKOnjQZCYdOS8GPY= github.com/gomodules/jsonpatch/v2 v2.2.0/go.mod h1:WXp+iVDkoLQqPudfQ9GBlwB2eZ5DKOnjQZCYdOS8GPY=
github.com/google/btree v0.0.0-20180813153112-4030bb1f1f0c/go.mod h1:lNA+9X1NB3Zf8V7Ke586lFgjr2dZNuvo3lPJSGZ5JPQ= github.com/google/btree v0.0.0-20180813153112-4030bb1f1f0c/go.mod h1:lNA+9X1NB3Zf8V7Ke586lFgjr2dZNuvo3lPJSGZ5JPQ=
github.com/google/btree v1.0.0/go.mod h1:lNA+9X1NB3Zf8V7Ke586lFgjr2dZNuvo3lPJSGZ5JPQ= github.com/google/btree v1.0.0/go.mod h1:lNA+9X1NB3Zf8V7Ke586lFgjr2dZNuvo3lPJSGZ5JPQ=
github.com/google/btree v1.0.1 h1:gK4Kx5IaGY9CD5sPJ36FHiBJ6ZXl0kilRiiCj+jdYp4=
github.com/google/btree v1.0.1/go.mod h1:xXMiIv4Fb/0kKde4SpL7qlzvu5cMJDRkFDxJfI9uaxA= github.com/google/btree v1.0.1/go.mod h1:xXMiIv4Fb/0kKde4SpL7qlzvu5cMJDRkFDxJfI9uaxA=
github.com/google/btree v1.1.3 h1:CVpQJjYgC4VbzxeGVHfvZrv1ctoYCAI8vbl07Fcxlyg=
github.com/google/btree v1.1.3/go.mod h1:qOPhT0dTNdNzV6Z/lhRX0YXUafgPLFUh+gZMl761Gm4=
github.com/google/cadvisor v0.51.0 h1:BspqSPdZoLKrnvuZNOvM/KiJ/A+RdixwagN20n+2H8k= github.com/google/cadvisor v0.51.0 h1:BspqSPdZoLKrnvuZNOvM/KiJ/A+RdixwagN20n+2H8k=
github.com/google/cadvisor v0.51.0/go.mod h1:czGE/c/P/i0QFpVNKTFrIEzord9Y10YfpwuaSWXELc0= github.com/google/cadvisor v0.51.0/go.mod h1:czGE/c/P/i0QFpVNKTFrIEzord9Y10YfpwuaSWXELc0=
github.com/google/cel-go v0.22.0 h1:b3FJZxpiv1vTMo2/5RDUqAHPxkT8mmMfJIrq1llbf7g= github.com/google/cel-go v0.22.0 h1:b3FJZxpiv1vTMo2/5RDUqAHPxkT8mmMfJIrq1llbf7g=
@ -3657,8 +3658,8 @@ rsc.io/sampler v1.3.0/go.mod h1:T1hPZKmBbMNahiBKFy5HrXp6adAjACjK9JXDnKaTXpA=
sigs.k8s.io/apiserver-network-proxy/konnectivity-client v0.31.0 h1:CPT0ExVicCzcpeN4baWEV2ko2Z/AsiZgEdwgcfwLgMo= sigs.k8s.io/apiserver-network-proxy/konnectivity-client v0.31.0 h1:CPT0ExVicCzcpeN4baWEV2ko2Z/AsiZgEdwgcfwLgMo=
sigs.k8s.io/apiserver-network-proxy/konnectivity-client v0.31.0/go.mod h1:Ve9uj1L+deCXFrPOk1LpFXqTg7LCFzFso6PA48q/XZw= sigs.k8s.io/apiserver-network-proxy/konnectivity-client v0.31.0/go.mod h1:Ve9uj1L+deCXFrPOk1LpFXqTg7LCFzFso6PA48q/XZw=
sigs.k8s.io/controller-runtime v0.2.2/go.mod h1:9dyohw3ZtoXQuV1e766PHUn+cmrRCIcBh6XIMFNMZ+I= sigs.k8s.io/controller-runtime v0.2.2/go.mod h1:9dyohw3ZtoXQuV1e766PHUn+cmrRCIcBh6XIMFNMZ+I=
sigs.k8s.io/controller-runtime v0.19.4 h1:SUmheabttt0nx8uJtoII4oIP27BVVvAKFvdvGFwV/Qo= sigs.k8s.io/controller-runtime v0.20.1 h1:JbGMAG/X94NeM3xvjenVUaBjy6Ui4Ogd/J5ZtjZnHaE=
sigs.k8s.io/controller-runtime v0.19.4/go.mod h1:iRmWllt8IlaLjvTTDLhRBXIEtkCK6hwVBJJsYS9Ajf4= sigs.k8s.io/controller-runtime v0.20.1/go.mod h1:BrP3w158MwvB3ZbNpaAcIKkHQ7YGpYnzpoSTZ8E14WU=
sigs.k8s.io/json v0.0.0-20221116044647-bc3834ca7abd/go.mod h1:B8JuhiUyNFVKdsE8h686QcCxMaH6HrOAZj4vswFpcB0= sigs.k8s.io/json v0.0.0-20221116044647-bc3834ca7abd/go.mod h1:B8JuhiUyNFVKdsE8h686QcCxMaH6HrOAZj4vswFpcB0=
sigs.k8s.io/json v0.0.0-20241010143419-9aa6b5e7a4b3 h1:/Rv+M11QRah1itp8VhT6HoVx1Ray9eB4DBr+K+/sCJ8= sigs.k8s.io/json v0.0.0-20241010143419-9aa6b5e7a4b3 h1:/Rv+M11QRah1itp8VhT6HoVx1Ray9eB4DBr+K+/sCJ8=
sigs.k8s.io/json v0.0.0-20241010143419-9aa6b5e7a4b3/go.mod h1:18nIHnGi6636UCz6m8i4DhaJ65T6EruyzmoQqI2BVDo= sigs.k8s.io/json v0.0.0-20241010143419-9aa6b5e7a4b3/go.mod h1:18nIHnGi6636UCz6m8i4DhaJ65T6EruyzmoQqI2BVDo=

View File

@ -1 +0,0 @@
language: go

View File

@ -1,7 +1,5 @@
# BTree implementation for Go # BTree implementation for Go
![Travis CI Build Status](https://api.travis-ci.org/google/btree.svg?branch=master)
This package provides an in-memory B-Tree implementation for Go, useful as This package provides an in-memory B-Tree implementation for Go, useful as
an ordered, mutable data structure. an ordered, mutable data structure.

View File

@ -12,6 +12,9 @@
// See the License for the specific language governing permissions and // See the License for the specific language governing permissions and
// limitations under the License. // limitations under the License.
//go:build !go1.18
// +build !go1.18
// Package btree implements in-memory B-Trees of arbitrary degree. // Package btree implements in-memory B-Trees of arbitrary degree.
// //
// btree implements an in-memory B-Tree for use as an ordered data structure. // btree implements an in-memory B-Tree for use as an ordered data structure.
@ -476,7 +479,7 @@ func (n *node) growChildAndRemove(i int, item Item, minItems int, typ toRemove)
child := n.mutableChild(i) child := n.mutableChild(i)
// merge with right child // merge with right child
mergeItem := n.items.removeAt(i) mergeItem := n.items.removeAt(i)
mergeChild := n.children.removeAt(i + 1) mergeChild := n.children.removeAt(i + 1).mutableFor(n.cow)
child.items = append(child.items, mergeItem) child.items = append(child.items, mergeItem)
child.items = append(child.items, mergeChild.items...) child.items = append(child.items, mergeChild.items...)
child.children = append(child.children, mergeChild.children...) child.children = append(child.children, mergeChild.children...)

1083
vendor/github.com/google/btree/btree_generic.go generated vendored Normal file

File diff suppressed because it is too large Load Diff

94
vendor/golang.org/x/exp/maps/maps.go generated vendored
View File

@ -1,94 +0,0 @@
// Copyright 2021 The Go Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
// Package maps defines various functions useful with maps of any type.
package maps
// Keys returns the keys of the map m.
// The keys will be in an indeterminate order.
func Keys[M ~map[K]V, K comparable, V any](m M) []K {
r := make([]K, 0, len(m))
for k := range m {
r = append(r, k)
}
return r
}
// Values returns the values of the map m.
// The values will be in an indeterminate order.
func Values[M ~map[K]V, K comparable, V any](m M) []V {
r := make([]V, 0, len(m))
for _, v := range m {
r = append(r, v)
}
return r
}
// Equal reports whether two maps contain the same key/value pairs.
// Values are compared using ==.
func Equal[M1, M2 ~map[K]V, K, V comparable](m1 M1, m2 M2) bool {
if len(m1) != len(m2) {
return false
}
for k, v1 := range m1 {
if v2, ok := m2[k]; !ok || v1 != v2 {
return false
}
}
return true
}
// EqualFunc is like Equal, but compares values using eq.
// Keys are still compared with ==.
func EqualFunc[M1 ~map[K]V1, M2 ~map[K]V2, K comparable, V1, V2 any](m1 M1, m2 M2, eq func(V1, V2) bool) bool {
if len(m1) != len(m2) {
return false
}
for k, v1 := range m1 {
if v2, ok := m2[k]; !ok || !eq(v1, v2) {
return false
}
}
return true
}
// Clear removes all entries from m, leaving it empty.
func Clear[M ~map[K]V, K comparable, V any](m M) {
for k := range m {
delete(m, k)
}
}
// Clone returns a copy of m. This is a shallow clone:
// the new keys and values are set using ordinary assignment.
func Clone[M ~map[K]V, K comparable, V any](m M) M {
// Preserve nil in case it matters.
if m == nil {
return nil
}
r := make(M, len(m))
for k, v := range m {
r[k] = v
}
return r
}
// Copy copies all key/value pairs in src adding them to dst.
// When a key in src is already present in dst,
// the value in dst will be overwritten by the value associated
// with the key in src.
func Copy[M1 ~map[K]V, M2 ~map[K]V, K comparable, V any](dst M1, src M2) {
for k, v := range src {
dst[k] = v
}
}
// DeleteFunc deletes any key/value pairs from m for which del returns true.
func DeleteFunc[M ~map[K]V, K comparable, V any](m M, del func(K, V) bool) {
for k, v := range m {
if del(k, v) {
delete(m, k)
}
}
}

135
vendor/golang.org/x/sync/errgroup/errgroup.go generated vendored Normal file
View File

@ -0,0 +1,135 @@
// Copyright 2016 The Go Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
// Package errgroup provides synchronization, error propagation, and Context
// cancelation for groups of goroutines working on subtasks of a common task.
//
// [errgroup.Group] is related to [sync.WaitGroup] but adds handling of tasks
// returning errors.
package errgroup
import (
"context"
"fmt"
"sync"
)
type token struct{}
// A Group is a collection of goroutines working on subtasks that are part of
// the same overall task.
//
// A zero Group is valid, has no limit on the number of active goroutines,
// and does not cancel on error.
type Group struct {
cancel func(error)
wg sync.WaitGroup
sem chan token
errOnce sync.Once
err error
}
func (g *Group) done() {
if g.sem != nil {
<-g.sem
}
g.wg.Done()
}
// WithContext returns a new Group and an associated Context derived from ctx.
//
// The derived Context is canceled the first time a function passed to Go
// returns a non-nil error or the first time Wait returns, whichever occurs
// first.
func WithContext(ctx context.Context) (*Group, context.Context) {
ctx, cancel := withCancelCause(ctx)
return &Group{cancel: cancel}, ctx
}
// Wait blocks until all function calls from the Go method have returned, then
// returns the first non-nil error (if any) from them.
func (g *Group) Wait() error {
g.wg.Wait()
if g.cancel != nil {
g.cancel(g.err)
}
return g.err
}
// Go calls the given function in a new goroutine.
// It blocks until the new goroutine can be added without the number of
// active goroutines in the group exceeding the configured limit.
//
// The first call to return a non-nil error cancels the group's context, if the
// group was created by calling WithContext. The error will be returned by Wait.
func (g *Group) Go(f func() error) {
if g.sem != nil {
g.sem <- token{}
}
g.wg.Add(1)
go func() {
defer g.done()
if err := f(); err != nil {
g.errOnce.Do(func() {
g.err = err
if g.cancel != nil {
g.cancel(g.err)
}
})
}
}()
}
// TryGo calls the given function in a new goroutine only if the number of
// active goroutines in the group is currently below the configured limit.
//
// The return value reports whether the goroutine was started.
func (g *Group) TryGo(f func() error) bool {
if g.sem != nil {
select {
case g.sem <- token{}:
// Note: this allows barging iff channels in general allow barging.
default:
return false
}
}
g.wg.Add(1)
go func() {
defer g.done()
if err := f(); err != nil {
g.errOnce.Do(func() {
g.err = err
if g.cancel != nil {
g.cancel(g.err)
}
})
}
}()
return true
}
// SetLimit limits the number of active goroutines in this group to at most n.
// A negative value indicates no limit.
//
// Any subsequent call to the Go method will block until it can add an active
// goroutine without exceeding the configured limit.
//
// The limit must not be modified while any goroutines in the group are active.
func (g *Group) SetLimit(n int) {
if n < 0 {
g.sem = nil
return
}
if len(g.sem) != 0 {
panic(fmt.Errorf("errgroup: modify limit while %v goroutines in the group are still active", len(g.sem)))
}
g.sem = make(chan token, n)
}

13
vendor/golang.org/x/sync/errgroup/go120.go generated vendored Normal file
View File

@ -0,0 +1,13 @@
// Copyright 2023 The Go Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
//go:build go1.20
package errgroup
import "context"
func withCancelCause(parent context.Context) (context.Context, func(error)) {
return context.WithCancelCause(parent)
}

14
vendor/golang.org/x/sync/errgroup/pre_go120.go generated vendored Normal file
View File

@ -0,0 +1,14 @@
// Copyright 2023 The Go Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
//go:build !go1.20
package errgroup
import "context"
func withCancelCause(parent context.Context) (context.Context, func(error)) {
ctx, cancel := context.WithCancel(parent)
return ctx, func(error) { cancel() }
}

14
vendor/modules.txt vendored
View File

@ -364,8 +364,8 @@ github.com/golang/protobuf/ptypes
github.com/golang/protobuf/ptypes/any github.com/golang/protobuf/ptypes/any
github.com/golang/protobuf/ptypes/duration github.com/golang/protobuf/ptypes/duration
github.com/golang/protobuf/ptypes/timestamp github.com/golang/protobuf/ptypes/timestamp
# github.com/google/btree v1.0.1 # github.com/google/btree v1.1.3
## explicit; go 1.12 ## explicit; go 1.18
github.com/google/btree github.com/google/btree
# github.com/google/cadvisor v0.51.0 # github.com/google/cadvisor v0.51.0
## explicit; go 1.21 ## explicit; go 1.21
@ -856,7 +856,6 @@ golang.org/x/crypto/ssh/internal/bcrypt_pbkdf
# golang.org/x/exp v0.0.0-20240719175910-8a7402abbf56 # golang.org/x/exp v0.0.0-20240719175910-8a7402abbf56
## explicit; go 1.20 ## explicit; go 1.20
golang.org/x/exp/constraints golang.org/x/exp/constraints
golang.org/x/exp/maps
golang.org/x/exp/slices golang.org/x/exp/slices
# golang.org/x/net v0.34.0 # golang.org/x/net v0.34.0
## explicit; go 1.18 ## explicit; go 1.18
@ -879,6 +878,7 @@ golang.org/x/oauth2
golang.org/x/oauth2/internal golang.org/x/oauth2/internal
# golang.org/x/sync v0.10.0 # golang.org/x/sync v0.10.0
## explicit; go 1.18 ## explicit; go 1.18
golang.org/x/sync/errgroup
golang.org/x/sync/singleflight golang.org/x/sync/singleflight
# golang.org/x/sys v0.29.0 # golang.org/x/sys v0.29.0
## explicit; go 1.18 ## explicit; go 1.18
@ -1124,7 +1124,7 @@ k8s.io/api/storage/v1
k8s.io/api/storage/v1alpha1 k8s.io/api/storage/v1alpha1
k8s.io/api/storage/v1beta1 k8s.io/api/storage/v1beta1
k8s.io/api/storagemigration/v1alpha1 k8s.io/api/storagemigration/v1alpha1
# k8s.io/apiextensions-apiserver v0.31.1 => k8s.io/apiextensions-apiserver v0.32.1 # k8s.io/apiextensions-apiserver v0.32.0 => k8s.io/apiextensions-apiserver v0.32.1
## explicit; go 1.23.0 ## explicit; go 1.23.0
k8s.io/apiextensions-apiserver/pkg/apis/apiextensions k8s.io/apiextensions-apiserver/pkg/apis/apiextensions
k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1 k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1
@ -2035,8 +2035,8 @@ sigs.k8s.io/apiserver-network-proxy/konnectivity-client/pkg/client
sigs.k8s.io/apiserver-network-proxy/konnectivity-client/pkg/client/metrics sigs.k8s.io/apiserver-network-proxy/konnectivity-client/pkg/client/metrics
sigs.k8s.io/apiserver-network-proxy/konnectivity-client/pkg/common/metrics sigs.k8s.io/apiserver-network-proxy/konnectivity-client/pkg/common/metrics
sigs.k8s.io/apiserver-network-proxy/konnectivity-client/proto/client sigs.k8s.io/apiserver-network-proxy/konnectivity-client/proto/client
# sigs.k8s.io/controller-runtime v0.19.4 # sigs.k8s.io/controller-runtime v0.20.1
## explicit; go 1.22.0 ## explicit; go 1.23.0
sigs.k8s.io/controller-runtime/pkg/cache sigs.k8s.io/controller-runtime/pkg/cache
sigs.k8s.io/controller-runtime/pkg/cache/internal sigs.k8s.io/controller-runtime/pkg/cache/internal
sigs.k8s.io/controller-runtime/pkg/certwatcher sigs.k8s.io/controller-runtime/pkg/certwatcher
@ -2047,6 +2047,7 @@ sigs.k8s.io/controller-runtime/pkg/client/config
sigs.k8s.io/controller-runtime/pkg/cluster sigs.k8s.io/controller-runtime/pkg/cluster
sigs.k8s.io/controller-runtime/pkg/config sigs.k8s.io/controller-runtime/pkg/config
sigs.k8s.io/controller-runtime/pkg/controller sigs.k8s.io/controller-runtime/pkg/controller
sigs.k8s.io/controller-runtime/pkg/controller/priorityqueue
sigs.k8s.io/controller-runtime/pkg/event sigs.k8s.io/controller-runtime/pkg/event
sigs.k8s.io/controller-runtime/pkg/handler sigs.k8s.io/controller-runtime/pkg/handler
sigs.k8s.io/controller-runtime/pkg/healthz sigs.k8s.io/controller-runtime/pkg/healthz
@ -2055,6 +2056,7 @@ sigs.k8s.io/controller-runtime/pkg/internal/controller/metrics
sigs.k8s.io/controller-runtime/pkg/internal/field/selector sigs.k8s.io/controller-runtime/pkg/internal/field/selector
sigs.k8s.io/controller-runtime/pkg/internal/httpserver sigs.k8s.io/controller-runtime/pkg/internal/httpserver
sigs.k8s.io/controller-runtime/pkg/internal/log sigs.k8s.io/controller-runtime/pkg/internal/log
sigs.k8s.io/controller-runtime/pkg/internal/metrics
sigs.k8s.io/controller-runtime/pkg/internal/recorder sigs.k8s.io/controller-runtime/pkg/internal/recorder
sigs.k8s.io/controller-runtime/pkg/internal/source sigs.k8s.io/controller-runtime/pkg/internal/source
sigs.k8s.io/controller-runtime/pkg/internal/syncs sigs.k8s.io/controller-runtime/pkg/internal/syncs

View File

@ -19,11 +19,12 @@ package cache
import ( import (
"context" "context"
"fmt" "fmt"
"maps"
"net/http" "net/http"
"slices"
"sort" "sort"
"time" "time"
"golang.org/x/exp/maps"
corev1 "k8s.io/api/core/v1" corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/meta" "k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
@ -231,15 +232,16 @@ type Options struct {
// This will be used for all object types, unless it is set in ByObject or // This will be used for all object types, unless it is set in ByObject or
// DefaultNamespaces. // DefaultNamespaces.
// //
// Defaults to false. // Defaults to true.
DefaultEnableWatchBookmarks *bool DefaultEnableWatchBookmarks *bool
// ByObject restricts the cache's ListWatch to the desired fields per GVK at the specified object. // ByObject restricts the cache's ListWatch to the desired fields per GVK at the specified object.
// If unset, this will fall through to the Default* settings. // If unset, this will fall through to the Default* settings.
ByObject map[client.Object]ByObject ByObject map[client.Object]ByObject
// newInformer allows overriding of NewSharedIndexInformer for testing. // NewInformer allows overriding of NewSharedIndexInformer, for example for testing
newInformer *func(toolscache.ListerWatcher, runtime.Object, time.Duration, toolscache.Indexers) toolscache.SharedIndexInformer // or if someone wants to write their own Informer.
NewInformer func(toolscache.ListerWatcher, runtime.Object, time.Duration, toolscache.Indexers) toolscache.SharedIndexInformer
} }
// ByObject offers more fine-grained control over the cache's ListWatch by object. // ByObject offers more fine-grained control over the cache's ListWatch by object.
@ -291,7 +293,7 @@ type ByObject struct {
// assume bookmarks are returned at any specific interval, nor may they // assume bookmarks are returned at any specific interval, nor may they
// assume the server will send any BOOKMARK event during a session. // assume the server will send any BOOKMARK event during a session.
// //
// Defaults to false. // Defaults to true.
EnableWatchBookmarks *bool EnableWatchBookmarks *bool
} }
@ -326,7 +328,7 @@ type Config struct {
// assume bookmarks are returned at any specific interval, nor may they // assume bookmarks are returned at any specific interval, nor may they
// assume the server will send any BOOKMARK event during a session. // assume the server will send any BOOKMARK event during a session.
// //
// Defaults to false. // Defaults to true.
EnableWatchBookmarks *bool EnableWatchBookmarks *bool
} }
@ -430,8 +432,8 @@ func newCache(restConfig *rest.Config, opts Options) newCacheFunc {
Transform: config.Transform, Transform: config.Transform,
WatchErrorHandler: opts.DefaultWatchErrorHandler, WatchErrorHandler: opts.DefaultWatchErrorHandler,
UnsafeDisableDeepCopy: ptr.Deref(config.UnsafeDisableDeepCopy, false), UnsafeDisableDeepCopy: ptr.Deref(config.UnsafeDisableDeepCopy, false),
EnableWatchBookmarks: ptr.Deref(config.EnableWatchBookmarks, false), EnableWatchBookmarks: ptr.Deref(config.EnableWatchBookmarks, true),
NewInformer: opts.newInformer, NewInformer: opts.NewInformer,
}), }),
readerFailOnMissingInformer: opts.ReaderFailOnMissingInformer, readerFailOnMissingInformer: opts.ReaderFailOnMissingInformer,
} }
@ -467,6 +469,8 @@ func defaultOpts(config *rest.Config, opts Options) (Options, error) {
} }
} }
opts.ByObject = maps.Clone(opts.ByObject)
opts.DefaultNamespaces = maps.Clone(opts.DefaultNamespaces)
for obj, byObject := range opts.ByObject { for obj, byObject := range opts.ByObject {
isNamespaced, err := apiutil.IsObjectNamespaced(obj, opts.Scheme, opts.Mapper) isNamespaced, err := apiutil.IsObjectNamespaced(obj, opts.Scheme, opts.Mapper)
if err != nil { if err != nil {
@ -478,6 +482,8 @@ func defaultOpts(config *rest.Config, opts Options) (Options, error) {
if isNamespaced && byObject.Namespaces == nil { if isNamespaced && byObject.Namespaces == nil {
byObject.Namespaces = maps.Clone(opts.DefaultNamespaces) byObject.Namespaces = maps.Clone(opts.DefaultNamespaces)
} else {
byObject.Namespaces = maps.Clone(byObject.Namespaces)
} }
// Default the namespace-level configs first, because they need to use the undefaulted type-level config // Default the namespace-level configs first, because they need to use the undefaulted type-level config
@ -485,7 +491,6 @@ func defaultOpts(config *rest.Config, opts Options) (Options, error) {
for namespace, config := range byObject.Namespaces { for namespace, config := range byObject.Namespaces {
// 1. Default from the undefaulted type-level config // 1. Default from the undefaulted type-level config
config = defaultConfig(config, byObjectToConfig(byObject)) config = defaultConfig(config, byObjectToConfig(byObject))
// 2. Default from the namespace-level config. This was defaulted from the global default config earlier, but // 2. Default from the namespace-level config. This was defaulted from the global default config earlier, but
// might not have an entry for the current namespace. // might not have an entry for the current namespace.
if defaultNamespaceSettings, hasDefaultNamespace := opts.DefaultNamespaces[namespace]; hasDefaultNamespace { if defaultNamespaceSettings, hasDefaultNamespace := opts.DefaultNamespaces[namespace]; hasDefaultNamespace {
@ -498,7 +503,7 @@ func defaultOpts(config *rest.Config, opts Options) (Options, error) {
if namespace == metav1.NamespaceAll { if namespace == metav1.NamespaceAll {
config.FieldSelector = fields.AndSelectors( config.FieldSelector = fields.AndSelectors(
appendIfNotNil( appendIfNotNil(
namespaceAllSelector(maps.Keys(byObject.Namespaces)), namespaceAllSelector(slices.Collect(maps.Keys(byObject.Namespaces))),
config.FieldSelector, config.FieldSelector,
)..., )...,
) )
@ -529,7 +534,7 @@ func defaultOpts(config *rest.Config, opts Options) (Options, error) {
if namespace == metav1.NamespaceAll { if namespace == metav1.NamespaceAll {
cfg.FieldSelector = fields.AndSelectors( cfg.FieldSelector = fields.AndSelectors(
appendIfNotNil( appendIfNotNil(
namespaceAllSelector(maps.Keys(opts.DefaultNamespaces)), namespaceAllSelector(slices.Collect(maps.Keys(opts.DefaultNamespaces))),
cfg.FieldSelector, cfg.FieldSelector,
)..., )...,
) )

View File

@ -18,10 +18,11 @@ package cache
import ( import (
"context" "context"
"maps"
"slices"
"strings" "strings"
"sync" "sync"
"golang.org/x/exp/maps"
"k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apimachinery/pkg/runtime/schema"
"sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/client"
@ -73,7 +74,7 @@ func (dbt *delegatingByGVKCache) GetInformerForKind(ctx context.Context, gvk sch
} }
func (dbt *delegatingByGVKCache) Start(ctx context.Context) error { func (dbt *delegatingByGVKCache) Start(ctx context.Context) error {
allCaches := maps.Values(dbt.caches) allCaches := slices.Collect(maps.Values(dbt.caches))
allCaches = append(allCaches, dbt.defaultCache) allCaches = append(allCaches, dbt.defaultCache)
wg := &sync.WaitGroup{} wg := &sync.WaitGroup{}
@ -100,7 +101,7 @@ func (dbt *delegatingByGVKCache) Start(ctx context.Context) error {
func (dbt *delegatingByGVKCache) WaitForCacheSync(ctx context.Context) bool { func (dbt *delegatingByGVKCache) WaitForCacheSync(ctx context.Context) bool {
synced := true synced := true
for _, cache := range append(maps.Values(dbt.caches), dbt.defaultCache) { for _, cache := range append(slices.Collect(maps.Values(dbt.caches)), dbt.defaultCache) {
if !cache.WaitForCacheSync(ctx) { if !cache.WaitForCacheSync(ctx) {
synced = false synced = false
} }

View File

@ -47,7 +47,7 @@ type InformersOpts struct {
Mapper meta.RESTMapper Mapper meta.RESTMapper
ResyncPeriod time.Duration ResyncPeriod time.Duration
Namespace string Namespace string
NewInformer *func(cache.ListerWatcher, runtime.Object, time.Duration, cache.Indexers) cache.SharedIndexInformer NewInformer func(cache.ListerWatcher, runtime.Object, time.Duration, cache.Indexers) cache.SharedIndexInformer
Selector Selector Selector Selector
Transform cache.TransformFunc Transform cache.TransformFunc
UnsafeDisableDeepCopy bool UnsafeDisableDeepCopy bool
@ -59,7 +59,7 @@ type InformersOpts struct {
func NewInformers(config *rest.Config, options *InformersOpts) *Informers { func NewInformers(config *rest.Config, options *InformersOpts) *Informers {
newInformer := cache.NewSharedIndexInformer newInformer := cache.NewSharedIndexInformer
if options.NewInformer != nil { if options.NewInformer != nil {
newInformer = *options.NewInformer newInformer = options.NewInformer
} }
return &Informers{ return &Informers{
config: config, config: config,
@ -585,7 +585,7 @@ func newGVKFixupWatcher(gvk schema.GroupVersionKind, watcher watch.Interface) wa
// hammer the apiserver with list requests simultaneously. // hammer the apiserver with list requests simultaneously.
func calculateResyncPeriod(resync time.Duration) time.Duration { func calculateResyncPeriod(resync time.Duration) time.Duration {
// the factor will fall into [0.9, 1.1) // the factor will fall into [0.9, 1.1)
factor := rand.Float64()/5.0 + 0.9 //nolint:gosec factor := rand.Float64()/5.0 + 0.9
return time.Duration(float64(resync.Nanoseconds()) * factor) return time.Duration(float64(resync.Nanoseconds()) * factor)
} }

View File

@ -28,6 +28,7 @@ import (
"k8s.io/client-go/discovery" "k8s.io/client-go/discovery"
"k8s.io/client-go/rest" "k8s.io/client-go/rest"
"k8s.io/client-go/restmapper" "k8s.io/client-go/restmapper"
"k8s.io/utils/ptr"
) )
// NewDynamicRESTMapper returns a dynamic RESTMapper for cfg. The dynamic // NewDynamicRESTMapper returns a dynamic RESTMapper for cfg. The dynamic
@ -41,6 +42,7 @@ func NewDynamicRESTMapper(cfg *rest.Config, httpClient *http.Client) (meta.RESTM
if err != nil { if err != nil {
return nil, err return nil, err
} }
return &mapper{ return &mapper{
mapper: restmapper.NewDiscoveryRESTMapper([]*restmapper.APIGroupResources{}), mapper: restmapper.NewDiscoveryRESTMapper([]*restmapper.APIGroupResources{}),
client: client, client: client,
@ -53,11 +55,15 @@ func NewDynamicRESTMapper(cfg *rest.Config, httpClient *http.Client) (meta.RESTM
// client for discovery information to do REST mappings. // client for discovery information to do REST mappings.
type mapper struct { type mapper struct {
mapper meta.RESTMapper mapper meta.RESTMapper
client discovery.DiscoveryInterface client discovery.AggregatedDiscoveryInterface
knownGroups map[string]*restmapper.APIGroupResources knownGroups map[string]*restmapper.APIGroupResources
apiGroups map[string]*metav1.APIGroup apiGroups map[string]*metav1.APIGroup
initialDiscoveryDone bool
// mutex to provide thread-safe mapper reloading. // mutex to provide thread-safe mapper reloading.
// It protects all fields in the mapper as well as methods
// that have the `Locked` suffix.
mu sync.RWMutex mu sync.RWMutex
} }
@ -159,28 +165,42 @@ func (m *mapper) addKnownGroupAndReload(groupName string, versions ...string) er
versions = nil versions = nil
} }
m.mu.Lock()
defer m.mu.Unlock()
// If no specific versions are set by user, we will scan all available ones for the API group. // If no specific versions are set by user, we will scan all available ones for the API group.
// This operation requires 2 requests: /api and /apis, but only once. For all subsequent calls // This operation requires 2 requests: /api and /apis, but only once. For all subsequent calls
// this data will be taken from cache. // this data will be taken from cache.
if len(versions) == 0 { //
apiGroup, err := m.findAPIGroupByName(groupName) // We always run this once, because if the server supports aggregated discovery, this will
// load everything with two api calls which we assume is overall cheaper.
if len(versions) == 0 || !m.initialDiscoveryDone {
apiGroup, didAggregatedDiscovery, err := m.findAPIGroupByNameAndMaybeAggregatedDiscoveryLocked(groupName)
if err != nil { if err != nil {
return err return err
} }
if apiGroup != nil { if apiGroup != nil && len(versions) == 0 {
for _, version := range apiGroup.Versions { for _, version := range apiGroup.Versions {
versions = append(versions, version.Version) versions = append(versions, version.Version)
} }
} }
}
m.mu.Lock() // No need to do anything further if aggregatedDiscovery is supported and we did a lookup
defer m.mu.Unlock() if didAggregatedDiscovery {
failedGroups := make(map[schema.GroupVersion]error)
// Create or fetch group resources from cache. for _, version := range versions {
groupResources := &restmapper.APIGroupResources{ if m.knownGroups[groupName] == nil || m.knownGroups[groupName].VersionedResources[version] == nil {
Group: metav1.APIGroup{Name: groupName}, failedGroups[schema.GroupVersion{Group: groupName, Version: version}] = &meta.NoResourceMatchError{
VersionedResources: make(map[string][]metav1.APIResource), PartialResource: schema.GroupVersionResource{
Group: groupName,
Version: version,
}}
}
}
if len(failedGroups) > 0 {
return ptr.To(ErrResourceDiscoveryFailed(failedGroups))
}
return nil
}
} }
// Update information for group resources about versioned resources. // Update information for group resources about versioned resources.
@ -194,13 +214,26 @@ func (m *mapper) addKnownGroupAndReload(groupName string, versions ...string) er
return fmt.Errorf("failed to get API group resources: %w", err) return fmt.Errorf("failed to get API group resources: %w", err)
} }
if _, ok := m.knownGroups[groupName]; ok { m.addGroupVersionResourcesToCacheAndReloadLocked(groupVersionResources)
groupResources = m.knownGroups[groupName] return nil
} }
// addGroupVersionResourcesToCacheAndReloadLocked does what the name suggests. The mutex must be held when
// calling it.
func (m *mapper) addGroupVersionResourcesToCacheAndReloadLocked(gvr map[schema.GroupVersion]*metav1.APIResourceList) {
// Update information for group resources about the API group by adding new versions. // Update information for group resources about the API group by adding new versions.
// Ignore the versions that are already registered. // Ignore the versions that are already registered
for groupVersion, resources := range groupVersionResources { for groupVersion, resources := range gvr {
var groupResources *restmapper.APIGroupResources
if _, ok := m.knownGroups[groupVersion.Group]; ok {
groupResources = m.knownGroups[groupVersion.Group]
} else {
groupResources = &restmapper.APIGroupResources{
Group: metav1.APIGroup{Name: groupVersion.Group},
VersionedResources: make(map[string][]metav1.APIResource),
}
}
version := groupVersion.Version version := groupVersion.Version
groupResources.VersionedResources[version] = resources.APIResources groupResources.VersionedResources[version] = resources.APIResources
@ -214,60 +247,56 @@ func (m *mapper) addKnownGroupAndReload(groupName string, versions ...string) er
if !found { if !found {
groupResources.Group.Versions = append(groupResources.Group.Versions, metav1.GroupVersionForDiscovery{ groupResources.Group.Versions = append(groupResources.Group.Versions, metav1.GroupVersionForDiscovery{
GroupVersion: metav1.GroupVersion{Group: groupName, Version: version}.String(), GroupVersion: metav1.GroupVersion{Group: groupVersion.Group, Version: version}.String(),
Version: version, Version: version,
}) })
} }
// Update data in the cache.
m.knownGroups[groupVersion.Group] = groupResources
} }
// Update data in the cache. // Finally, reload the mapper.
m.knownGroups[groupName] = groupResources
// Finally, update the group with received information and regenerate the mapper.
updatedGroupResources := make([]*restmapper.APIGroupResources, 0, len(m.knownGroups)) updatedGroupResources := make([]*restmapper.APIGroupResources, 0, len(m.knownGroups))
for _, agr := range m.knownGroups { for _, agr := range m.knownGroups {
updatedGroupResources = append(updatedGroupResources, agr) updatedGroupResources = append(updatedGroupResources, agr)
} }
m.mapper = restmapper.NewDiscoveryRESTMapper(updatedGroupResources) m.mapper = restmapper.NewDiscoveryRESTMapper(updatedGroupResources)
return nil
} }
// findAPIGroupByNameLocked returns API group by its name. // findAPIGroupByNameAndMaybeAggregatedDiscoveryLocked tries to find the passed apiGroup.
func (m *mapper) findAPIGroupByName(groupName string) (*metav1.APIGroup, error) { // If the server supports aggregated discovery, it will always perform that.
// Looking in the cache first. func (m *mapper) findAPIGroupByNameAndMaybeAggregatedDiscoveryLocked(groupName string) (_ *metav1.APIGroup, didAggregatedDiscovery bool, _ error) {
{ // Looking in the cache first
m.mu.RLock() group, ok := m.apiGroups[groupName]
group, ok := m.apiGroups[groupName] if ok {
m.mu.RUnlock() return group, false, nil
if ok {
return group, nil
}
} }
// Update the cache if nothing was found. // Update the cache if nothing was found.
apiGroups, err := m.client.ServerGroups() apiGroups, maybeResources, _, err := m.client.GroupsAndMaybeResources()
if err != nil { if err != nil {
return nil, fmt.Errorf("failed to get server groups: %w", err) return nil, false, fmt.Errorf("failed to get server groups: %w", err)
} }
if len(apiGroups.Groups) == 0 { if len(apiGroups.Groups) == 0 {
return nil, fmt.Errorf("received an empty API groups list") return nil, false, fmt.Errorf("received an empty API groups list")
} }
m.mu.Lock() m.initialDiscoveryDone = true
if len(maybeResources) > 0 {
didAggregatedDiscovery = true
m.addGroupVersionResourcesToCacheAndReloadLocked(maybeResources)
}
for i := range apiGroups.Groups { for i := range apiGroups.Groups {
group := &apiGroups.Groups[i] group := &apiGroups.Groups[i]
m.apiGroups[group.Name] = group m.apiGroups[group.Name] = group
} }
m.mu.Unlock()
// Looking in the cache again. // Looking in the cache again.
m.mu.RLock()
defer m.mu.RUnlock()
// Don't return an error here if the API group is not present. // Don't return an error here if the API group is not present.
// The reloaded RESTMapper will take care of returning a NoMatchError. // The reloaded RESTMapper will take care of returning a NoMatchError.
return m.apiGroups[groupName], nil return m.apiGroups[groupName], didAggregatedDiscovery, nil
} }
// fetchGroupVersionResourcesLocked fetches the resources for the specified group and its versions. // fetchGroupVersionResourcesLocked fetches the resources for the specified group and its versions.
@ -283,10 +312,10 @@ func (m *mapper) fetchGroupVersionResourcesLocked(groupName string, versions ...
if apierrors.IsNotFound(err) { if apierrors.IsNotFound(err) {
// If the version is not found, we remove the group from the cache // If the version is not found, we remove the group from the cache
// so it gets refreshed on the next call. // so it gets refreshed on the next call.
if m.isAPIGroupCached(groupVersion) { if m.isAPIGroupCachedLocked(groupVersion) {
delete(m.apiGroups, groupName) delete(m.apiGroups, groupName)
} }
if m.isGroupVersionCached(groupVersion) { if m.isGroupVersionCachedLocked(groupVersion) {
delete(m.knownGroups, groupName) delete(m.knownGroups, groupName)
} }
continue continue
@ -308,8 +337,8 @@ func (m *mapper) fetchGroupVersionResourcesLocked(groupName string, versions ...
return groupVersionResources, nil return groupVersionResources, nil
} }
// isGroupVersionCached checks if a version for a group is cached in the known groups cache. // isGroupVersionCachedLocked checks if a version for a group is cached in the known groups cache.
func (m *mapper) isGroupVersionCached(gv schema.GroupVersion) bool { func (m *mapper) isGroupVersionCachedLocked(gv schema.GroupVersion) bool {
if cachedGroup, ok := m.knownGroups[gv.Group]; ok { if cachedGroup, ok := m.knownGroups[gv.Group]; ok {
_, cached := cachedGroup.VersionedResources[gv.Version] _, cached := cachedGroup.VersionedResources[gv.Version]
return cached return cached
@ -318,8 +347,8 @@ func (m *mapper) isGroupVersionCached(gv schema.GroupVersion) bool {
return false return false
} }
// isAPIGroupCached checks if a version for a group is cached in the api groups cache. // isAPIGroupCachedLocked checks if a version for a group is cached in the api groups cache.
func (m *mapper) isAPIGroupCached(gv schema.GroupVersion) bool { func (m *mapper) isAPIGroupCachedLocked(gv schema.GroupVersion) bool {
cachedGroup, ok := m.apiGroups[gv.Group] cachedGroup, ok := m.apiGroups[gv.Group]
if !ok { if !ok {
return false return false

View File

@ -113,11 +113,11 @@ func newClient(config *rest.Config, options Options) (*client, error) {
} }
if config.WarningHandler == nil { if config.WarningHandler == nil {
// By default, we de-duplicate and surface warnings. // By default, we surface warnings.
config.WarningHandler = log.NewKubeAPIWarningLogger( config.WarningHandler = log.NewKubeAPIWarningLogger(
log.Log.WithName("KubeAPIWarningLogger"), log.Log.WithName("KubeAPIWarningLogger"),
log.KubeAPIWarningLoggerOptions{ log.KubeAPIWarningLoggerOptions{
Deduplicate: true, Deduplicate: false,
}, },
) )
} }

View File

@ -94,16 +94,16 @@ type SubResourceClientConstructor interface {
// - ServiceAccount token creation: // - ServiceAccount token creation:
// sa := &corev1.ServiceAccount{ObjectMeta: metav1.ObjectMeta{Namespace: "foo", Name: "bar"}} // sa := &corev1.ServiceAccount{ObjectMeta: metav1.ObjectMeta{Namespace: "foo", Name: "bar"}}
// token := &authenticationv1.TokenRequest{} // token := &authenticationv1.TokenRequest{}
// c.SubResourceClient("token").Create(ctx, sa, token) // c.SubResource("token").Create(ctx, sa, token)
// //
// - Pod eviction creation: // - Pod eviction creation:
// pod := &corev1.Pod{ObjectMeta: metav1.ObjectMeta{Namespace: "foo", Name: "bar"}} // pod := &corev1.Pod{ObjectMeta: metav1.ObjectMeta{Namespace: "foo", Name: "bar"}}
// c.SubResourceClient("eviction").Create(ctx, pod, &policyv1.Eviction{}) // c.SubResource("eviction").Create(ctx, pod, &policyv1.Eviction{})
// //
// - Pod binding creation: // - Pod binding creation:
// pod := &corev1.Pod{ObjectMeta: metav1.ObjectMeta{Namespace: "foo", Name: "bar"}} // pod := &corev1.Pod{ObjectMeta: metav1.ObjectMeta{Namespace: "foo", Name: "bar"}}
// binding := &corev1.Binding{Target: corev1.ObjectReference{Name: "my-node"}} // binding := &corev1.Binding{Target: corev1.ObjectReference{Name: "my-node"}}
// c.SubResourceClient("binding").Create(ctx, pod, binding) // c.SubResource("binding").Create(ctx, pod, binding)
// //
// - CertificateSigningRequest approval: // - CertificateSigningRequest approval:
// csr := &certificatesv1.CertificateSigningRequest{ // csr := &certificatesv1.CertificateSigningRequest{
@ -115,17 +115,17 @@ type SubResourceClientConstructor interface {
// }}, // }},
// }, // },
// } // }
// c.SubResourceClient("approval").Update(ctx, csr) // c.SubResource("approval").Update(ctx, csr)
// //
// - Scale retrieval: // - Scale retrieval:
// dep := &appsv1.Deployment{ObjectMeta: metav1.ObjectMeta{Namespace: "foo", Name: "bar"}} // dep := &appsv1.Deployment{ObjectMeta: metav1.ObjectMeta{Namespace: "foo", Name: "bar"}}
// scale := &autoscalingv1.Scale{} // scale := &autoscalingv1.Scale{}
// c.SubResourceClient("scale").Get(ctx, dep, scale) // c.SubResource("scale").Get(ctx, dep, scale)
// //
// - Scale update: // - Scale update:
// dep := &appsv1.Deployment{ObjectMeta: metav1.ObjectMeta{Namespace: "foo", Name: "bar"}} // dep := &appsv1.Deployment{ObjectMeta: metav1.ObjectMeta{Namespace: "foo", Name: "bar"}}
// scale := &autoscalingv1.Scale{Spec: autoscalingv1.ScaleSpec{Replicas: 2}} // scale := &autoscalingv1.Scale{Spec: autoscalingv1.ScaleSpec{Replicas: 2}}
// c.SubResourceClient("scale").Update(ctx, dep, client.WithSubResourceBody(scale)) // c.SubResource("scale").Update(ctx, dep, client.WithSubResourceBody(scale))
SubResource(subResource string) SubResourceClient SubResource(subResource string) SubResourceClient
} }
@ -193,7 +193,7 @@ type IndexerFunc func(Object) []string
// FieldIndexer knows how to index over a particular "field" such that it // FieldIndexer knows how to index over a particular "field" such that it
// can later be used by a field selector. // can later be used by a field selector.
type FieldIndexer interface { type FieldIndexer interface {
// IndexFields adds an index with the given field name on the given object type // IndexField adds an index with the given field name on the given object type
// by using the given function to extract the value for that field. If you want // by using the given function to extract the value for that field. If you want
// compatibility with the Kubernetes API server, only return one key, and only use // compatibility with the Kubernetes API server, only return one key, and only use
// fields that the API server supports. Otherwise, you can return multiple keys, // fields that the API server supports. Otherwise, you can return multiple keys,

View File

@ -20,7 +20,6 @@ import (
"context" "context"
"errors" "errors"
"net/http" "net/http"
"time"
"github.com/go-logr/logr" "github.com/go-logr/logr"
"k8s.io/apimachinery/pkg/api/meta" "k8s.io/apimachinery/pkg/api/meta"
@ -65,8 +64,8 @@ type Cluster interface {
// GetRESTMapper returns a RESTMapper // GetRESTMapper returns a RESTMapper
GetRESTMapper() meta.RESTMapper GetRESTMapper() meta.RESTMapper
// GetAPIReader returns a reader that will be configured to use the API server. // GetAPIReader returns a reader that will be configured to use the API server directly.
// This should be used sparingly and only when the client does not fit your // This should be used sparingly and only when the cached client does not fit your
// use case. // use case.
GetAPIReader() client.Reader GetAPIReader() client.Reader
@ -88,16 +87,6 @@ type Options struct {
// If none is set, it defaults to log.Log global logger. // If none is set, it defaults to log.Log global logger.
Logger logr.Logger Logger logr.Logger
// SyncPeriod determines the minimum frequency at which watched resources are
// reconciled. A lower period will correct entropy more quickly, but reduce
// responsiveness to change if there are many watched resources. Change this
// value only if you know what you are doing. Defaults to 10 hours if unset.
// there will a 10 percent jitter between the SyncPeriod of all controllers
// so that all controllers will not send list requests simultaneously.
//
// Deprecated: Use Cache.SyncPeriod instead.
SyncPeriod *time.Duration
// HTTPClient is the http client that will be used to create the default // HTTPClient is the http client that will be used to create the default
// Cache and Client. If not set the rest.HTTPClientFor function will be used // Cache and Client. If not set the rest.HTTPClientFor function will be used
// to create the http client. // to create the http client.
@ -194,9 +183,6 @@ func New(config *rest.Config, opts ...Option) (Cluster, error) {
if cacheOpts.HTTPClient == nil { if cacheOpts.HTTPClient == nil {
cacheOpts.HTTPClient = options.HTTPClient cacheOpts.HTTPClient = options.HTTPClient
} }
if cacheOpts.SyncPeriod == nil {
cacheOpts.SyncPeriod = options.SyncPeriod
}
} }
cache, err := options.NewCache(config, cacheOpts) cache, err := options.NewCache(config, cacheOpts)
if err != nil { if err != nil {

View File

@ -53,4 +53,10 @@ type Controller struct {
// NeedLeaderElection indicates whether the controller needs to use leader election. // NeedLeaderElection indicates whether the controller needs to use leader election.
// Defaults to true, which means the controller will use leader election. // Defaults to true, which means the controller will use leader election.
NeedLeaderElection *bool NeedLeaderElection *bool
// UsePriorityQueue configures the controllers queue to use the controller-runtime provided
// priority queue.
//
// Note: This flag is disabled by default until a future version. It's currently in beta.
UsePriorityQueue *bool
} }

View File

@ -24,7 +24,9 @@ import (
"github.com/go-logr/logr" "github.com/go-logr/logr"
"k8s.io/client-go/util/workqueue" "k8s.io/client-go/util/workqueue"
"k8s.io/klog/v2" "k8s.io/klog/v2"
"k8s.io/utils/ptr"
"sigs.k8s.io/controller-runtime/pkg/controller/priorityqueue"
"sigs.k8s.io/controller-runtime/pkg/internal/controller" "sigs.k8s.io/controller-runtime/pkg/internal/controller"
"sigs.k8s.io/controller-runtime/pkg/manager" "sigs.k8s.io/controller-runtime/pkg/manager"
"sigs.k8s.io/controller-runtime/pkg/reconcile" "sigs.k8s.io/controller-runtime/pkg/reconcile"
@ -189,11 +191,21 @@ func NewTypedUnmanaged[request comparable](name string, mgr manager.Manager, opt
} }
if options.RateLimiter == nil { if options.RateLimiter == nil {
options.RateLimiter = workqueue.DefaultTypedControllerRateLimiter[request]() if ptr.Deref(mgr.GetControllerOptions().UsePriorityQueue, false) {
options.RateLimiter = workqueue.NewTypedItemExponentialFailureRateLimiter[request](5*time.Millisecond, 1000*time.Second)
} else {
options.RateLimiter = workqueue.DefaultTypedControllerRateLimiter[request]()
}
} }
if options.NewQueue == nil { if options.NewQueue == nil {
options.NewQueue = func(controllerName string, rateLimiter workqueue.TypedRateLimiter[request]) workqueue.TypedRateLimitingInterface[request] { options.NewQueue = func(controllerName string, rateLimiter workqueue.TypedRateLimiter[request]) workqueue.TypedRateLimitingInterface[request] {
if ptr.Deref(mgr.GetControllerOptions().UsePriorityQueue, false) {
return priorityqueue.New(controllerName, func(o *priorityqueue.Opts[request]) {
o.Log = mgr.GetLogger().WithValues("controller", controllerName)
o.RateLimiter = rateLimiter
})
}
return workqueue.NewTypedRateLimitingQueueWithConfig(rateLimiter, workqueue.TypedRateLimitingQueueConfig[request]{ return workqueue.NewTypedRateLimitingQueueWithConfig(rateLimiter, workqueue.TypedRateLimitingQueueConfig[request]{
Name: controllerName, Name: controllerName,
}) })

View File

@ -0,0 +1,146 @@
package priorityqueue
import (
"sync"
"time"
"k8s.io/client-go/util/workqueue"
"k8s.io/utils/clock"
)
// This file is mostly a copy of unexported code from
// https://github.com/kubernetes/kubernetes/blob/1d8828ce707ed9dd7a6a9756385419cce1d202ac/staging/src/k8s.io/client-go/util/workqueue/metrics.go
//
// The only two differences are the addition of mapLock in defaultQueueMetrics and converging retryMetrics into queueMetrics.
type queueMetrics[T comparable] interface {
add(item T)
get(item T)
done(item T)
updateUnfinishedWork()
retry()
}
func newQueueMetrics[T comparable](mp workqueue.MetricsProvider, name string, clock clock.Clock) queueMetrics[T] {
if len(name) == 0 {
return noMetrics[T]{}
}
return &defaultQueueMetrics[T]{
clock: clock,
depth: mp.NewDepthMetric(name),
adds: mp.NewAddsMetric(name),
latency: mp.NewLatencyMetric(name),
workDuration: mp.NewWorkDurationMetric(name),
unfinishedWorkSeconds: mp.NewUnfinishedWorkSecondsMetric(name),
longestRunningProcessor: mp.NewLongestRunningProcessorSecondsMetric(name),
addTimes: map[T]time.Time{},
processingStartTimes: map[T]time.Time{},
retries: mp.NewRetriesMetric(name),
}
}
// defaultQueueMetrics expects the caller to lock before setting any metrics.
type defaultQueueMetrics[T comparable] struct {
clock clock.Clock
// current depth of a workqueue
depth workqueue.GaugeMetric
// total number of adds handled by a workqueue
adds workqueue.CounterMetric
// how long an item stays in a workqueue
latency workqueue.HistogramMetric
// how long processing an item from a workqueue takes
workDuration workqueue.HistogramMetric
mapLock sync.RWMutex
addTimes map[T]time.Time
processingStartTimes map[T]time.Time
// how long have current threads been working?
unfinishedWorkSeconds workqueue.SettableGaugeMetric
longestRunningProcessor workqueue.SettableGaugeMetric
retries workqueue.CounterMetric
}
// add is called for ready items only
func (m *defaultQueueMetrics[T]) add(item T) {
if m == nil {
return
}
m.adds.Inc()
m.depth.Inc()
m.mapLock.Lock()
defer m.mapLock.Unlock()
if _, exists := m.addTimes[item]; !exists {
m.addTimes[item] = m.clock.Now()
}
}
func (m *defaultQueueMetrics[T]) get(item T) {
if m == nil {
return
}
m.depth.Dec()
m.mapLock.Lock()
defer m.mapLock.Unlock()
m.processingStartTimes[item] = m.clock.Now()
if startTime, exists := m.addTimes[item]; exists {
m.latency.Observe(m.sinceInSeconds(startTime))
delete(m.addTimes, item)
}
}
func (m *defaultQueueMetrics[T]) done(item T) {
if m == nil {
return
}
m.mapLock.Lock()
defer m.mapLock.Unlock()
if startTime, exists := m.processingStartTimes[item]; exists {
m.workDuration.Observe(m.sinceInSeconds(startTime))
delete(m.processingStartTimes, item)
}
}
func (m *defaultQueueMetrics[T]) updateUnfinishedWork() {
m.mapLock.RLock()
defer m.mapLock.RUnlock()
// Note that a summary metric would be better for this, but prometheus
// doesn't seem to have non-hacky ways to reset the summary metrics.
var total float64
var oldest float64
for _, t := range m.processingStartTimes {
age := m.sinceInSeconds(t)
total += age
if age > oldest {
oldest = age
}
}
m.unfinishedWorkSeconds.Set(total)
m.longestRunningProcessor.Set(oldest)
}
// Gets the time since the specified start in seconds.
func (m *defaultQueueMetrics[T]) sinceInSeconds(start time.Time) float64 {
return m.clock.Since(start).Seconds()
}
func (m *defaultQueueMetrics[T]) retry() {
m.retries.Inc()
}
type noMetrics[T any] struct{}
func (noMetrics[T]) add(item T) {}
func (noMetrics[T]) get(item T) {}
func (noMetrics[T]) done(item T) {}
func (noMetrics[T]) updateUnfinishedWork() {}
func (noMetrics[T]) retry() {}

View File

@ -0,0 +1,401 @@
package priorityqueue
import (
"sync"
"sync/atomic"
"time"
"github.com/go-logr/logr"
"github.com/google/btree"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/client-go/util/workqueue"
"k8s.io/utils/clock"
"k8s.io/utils/ptr"
"sigs.k8s.io/controller-runtime/pkg/internal/metrics"
)
// AddOpts describes the options for adding items to the queue.
type AddOpts struct {
After time.Duration
RateLimited bool
Priority int
}
// PriorityQueue is a priority queue for a controller. It
// internally de-duplicates all items that are added to
// it. It will use the max of the passed priorities and the
// min of possible durations.
type PriorityQueue[T comparable] interface {
workqueue.TypedRateLimitingInterface[T]
AddWithOpts(o AddOpts, Items ...T)
GetWithPriority() (item T, priority int, shutdown bool)
}
// Opts contains the options for a PriorityQueue.
type Opts[T comparable] struct {
// Ratelimiter is being used when AddRateLimited is called. Defaults to a per-item exponential backoff
// limiter with an initial delay of five milliseconds and a max delay of 1000 seconds.
RateLimiter workqueue.TypedRateLimiter[T]
MetricProvider workqueue.MetricsProvider
Log logr.Logger
}
// Opt allows to configure a PriorityQueue.
type Opt[T comparable] func(*Opts[T])
// New constructs a new PriorityQueue.
func New[T comparable](name string, o ...Opt[T]) PriorityQueue[T] {
opts := &Opts[T]{}
for _, f := range o {
f(opts)
}
if opts.RateLimiter == nil {
opts.RateLimiter = workqueue.NewTypedItemExponentialFailureRateLimiter[T](5*time.Millisecond, 1000*time.Second)
}
if opts.MetricProvider == nil {
opts.MetricProvider = metrics.WorkqueueMetricsProvider{}
}
pq := &priorityqueue[T]{
log: opts.Log,
items: map[T]*item[T]{},
queue: btree.NewG(32, less[T]),
becameReady: sets.Set[T]{},
metrics: newQueueMetrics[T](opts.MetricProvider, name, clock.RealClock{}),
// itemOrWaiterAdded indicates that an item or
// waiter was added. It must be buffered, because
// if we currently process items we can't tell
// if that included the new item/waiter.
itemOrWaiterAdded: make(chan struct{}, 1),
rateLimiter: opts.RateLimiter,
locked: sets.Set[T]{},
done: make(chan struct{}),
get: make(chan item[T]),
now: time.Now,
tick: time.Tick,
}
go pq.spin()
go pq.logState()
if _, ok := pq.metrics.(noMetrics[T]); !ok {
go pq.updateUnfinishedWorkLoop()
}
return pq
}
type priorityqueue[T comparable] struct {
log logr.Logger
// lock has to be acquired for any access any of items, queue, addedCounter
// or becameReady
lock sync.Mutex
items map[T]*item[T]
queue bTree[*item[T]]
// addedCounter is a counter of elements added, we need it
// because unixNano is not guaranteed to be unique.
addedCounter uint64
// becameReady holds items that are in the queue, were added
// with non-zero after and became ready. We need it to call the
// metrics add exactly once for them.
becameReady sets.Set[T]
metrics queueMetrics[T]
itemOrWaiterAdded chan struct{}
rateLimiter workqueue.TypedRateLimiter[T]
// locked contains the keys we handed out through Get() and that haven't
// yet been returned through Done().
locked sets.Set[T]
lockedLock sync.RWMutex
shutdown atomic.Bool
done chan struct{}
get chan item[T]
// waiters is the number of routines blocked in Get, we use it to determine
// if we can push items.
waiters atomic.Int64
// Configurable for testing
now func() time.Time
tick func(time.Duration) <-chan time.Time
}
func (w *priorityqueue[T]) AddWithOpts(o AddOpts, items ...T) {
w.lock.Lock()
defer w.lock.Unlock()
for _, key := range items {
if o.RateLimited {
after := w.rateLimiter.When(key)
if o.After == 0 || after < o.After {
o.After = after
}
}
var readyAt *time.Time
if o.After > 0 {
readyAt = ptr.To(w.now().Add(o.After))
w.metrics.retry()
}
if _, ok := w.items[key]; !ok {
item := &item[T]{
Key: key,
AddedCounter: w.addedCounter,
Priority: o.Priority,
ReadyAt: readyAt,
}
w.items[key] = item
w.queue.ReplaceOrInsert(item)
if item.ReadyAt == nil {
w.metrics.add(key)
}
w.addedCounter++
continue
}
// The b-tree de-duplicates based on ordering and any change here
// will affect the order - Just delete and re-add.
item, _ := w.queue.Delete(w.items[key])
if o.Priority > item.Priority {
item.Priority = o.Priority
}
if item.ReadyAt != nil && (readyAt == nil || readyAt.Before(*item.ReadyAt)) {
if readyAt == nil && !w.becameReady.Has(key) {
w.metrics.add(key)
}
item.ReadyAt = readyAt
}
w.queue.ReplaceOrInsert(item)
}
if len(items) > 0 {
w.notifyItemOrWaiterAdded()
}
}
func (w *priorityqueue[T]) notifyItemOrWaiterAdded() {
select {
case w.itemOrWaiterAdded <- struct{}{}:
default:
}
}
func (w *priorityqueue[T]) spin() {
blockForever := make(chan time.Time)
var nextReady <-chan time.Time
nextReady = blockForever
for {
select {
case <-w.done:
return
case <-w.itemOrWaiterAdded:
case <-nextReady:
}
nextReady = blockForever
func() {
w.lock.Lock()
defer w.lock.Unlock()
w.lockedLock.Lock()
defer w.lockedLock.Unlock()
// manipulating the tree from within Ascend might lead to panics, so
// track what we want to delete and do it after we are done ascending.
var toDelete []*item[T]
w.queue.Ascend(func(item *item[T]) bool {
if item.ReadyAt != nil {
if readyAt := item.ReadyAt.Sub(w.now()); readyAt > 0 {
nextReady = w.tick(readyAt)
return false
}
if !w.becameReady.Has(item.Key) {
w.metrics.add(item.Key)
w.becameReady.Insert(item.Key)
}
}
if w.waiters.Load() == 0 {
// Have to keep iterating here to ensure we update metrics
// for further items that became ready and set nextReady.
return true
}
// Item is locked, we can not hand it out
if w.locked.Has(item.Key) {
return true
}
w.metrics.get(item.Key)
w.locked.Insert(item.Key)
w.waiters.Add(-1)
delete(w.items, item.Key)
toDelete = append(toDelete, item)
w.becameReady.Delete(item.Key)
w.get <- *item
return true
})
for _, item := range toDelete {
w.queue.Delete(item)
}
}()
}
}
func (w *priorityqueue[T]) Add(item T) {
w.AddWithOpts(AddOpts{}, item)
}
func (w *priorityqueue[T]) AddAfter(item T, after time.Duration) {
w.AddWithOpts(AddOpts{After: after}, item)
}
func (w *priorityqueue[T]) AddRateLimited(item T) {
w.AddWithOpts(AddOpts{RateLimited: true}, item)
}
func (w *priorityqueue[T]) GetWithPriority() (_ T, priority int, shutdown bool) {
w.waiters.Add(1)
w.notifyItemOrWaiterAdded()
item := <-w.get
return item.Key, item.Priority, w.shutdown.Load()
}
func (w *priorityqueue[T]) Get() (item T, shutdown bool) {
key, _, shutdown := w.GetWithPriority()
return key, shutdown
}
func (w *priorityqueue[T]) Forget(item T) {
w.rateLimiter.Forget(item)
}
func (w *priorityqueue[T]) NumRequeues(item T) int {
return w.rateLimiter.NumRequeues(item)
}
func (w *priorityqueue[T]) ShuttingDown() bool {
return w.shutdown.Load()
}
func (w *priorityqueue[T]) Done(item T) {
w.lockedLock.Lock()
defer w.lockedLock.Unlock()
w.locked.Delete(item)
w.metrics.done(item)
w.notifyItemOrWaiterAdded()
}
func (w *priorityqueue[T]) ShutDown() {
w.shutdown.Store(true)
close(w.done)
}
// ShutDownWithDrain just calls ShutDown, as the draining
// functionality is not used by controller-runtime.
func (w *priorityqueue[T]) ShutDownWithDrain() {
w.ShutDown()
}
// Len returns the number of items that are ready to be
// picked up. It does not include items that are not yet
// ready.
func (w *priorityqueue[T]) Len() int {
w.lock.Lock()
defer w.lock.Unlock()
var result int
w.queue.Ascend(func(item *item[T]) bool {
if item.ReadyAt == nil || item.ReadyAt.Compare(w.now()) <= 0 {
result++
return true
}
return false
})
return result
}
func (w *priorityqueue[T]) logState() {
t := time.Tick(10 * time.Second)
for {
select {
case <-w.done:
return
case <-t:
}
// Log level may change at runtime, so keep the
// loop going even if a given level is currently
// not enabled.
if !w.log.V(5).Enabled() {
continue
}
w.lock.Lock()
items := make([]*item[T], 0, len(w.items))
w.queue.Ascend(func(item *item[T]) bool {
items = append(items, item)
return true
})
w.lock.Unlock()
w.log.V(5).Info("workqueue_items", "items", items)
}
}
func less[T comparable](a, b *item[T]) bool {
if a.ReadyAt == nil && b.ReadyAt != nil {
return true
}
if b.ReadyAt == nil && a.ReadyAt != nil {
return false
}
if a.ReadyAt != nil && b.ReadyAt != nil && !a.ReadyAt.Equal(*b.ReadyAt) {
return a.ReadyAt.Before(*b.ReadyAt)
}
if a.Priority != b.Priority {
return a.Priority > b.Priority
}
return a.AddedCounter < b.AddedCounter
}
type item[T comparable] struct {
Key T `json:"key"`
AddedCounter uint64 `json:"addedCounter"`
Priority int `json:"priority"`
ReadyAt *time.Time `json:"readyAt,omitempty"`
}
func (w *priorityqueue[T]) updateUnfinishedWorkLoop() {
t := time.Tick(500 * time.Millisecond) // borrowed from workqueue: https://github.com/kubernetes/kubernetes/blob/67a807bf142c7a2a5ecfdb2a5d24b4cdea4cc79c/staging/src/k8s.io/client-go/util/workqueue/queue.go#L182
for {
select {
case <-w.done:
return
case <-t:
}
w.metrics.updateUnfinishedWork()
}
}
type bTree[T any] interface {
ReplaceOrInsert(item T) (_ T, _ bool)
Delete(item T) (T, bool)
Ascend(iterator btree.ItemIteratorG[T])
}

View File

@ -18,9 +18,11 @@ package handler
import ( import (
"context" "context"
"time"
"k8s.io/client-go/util/workqueue" "k8s.io/client-go/util/workqueue"
"sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/controller/priorityqueue"
"sigs.k8s.io/controller-runtime/pkg/event" "sigs.k8s.io/controller-runtime/pkg/event"
"sigs.k8s.io/controller-runtime/pkg/reconcile" "sigs.k8s.io/controller-runtime/pkg/reconcile"
) )
@ -133,3 +135,67 @@ func (h TypedFuncs[object, request]) Generic(ctx context.Context, e event.TypedG
h.GenericFunc(ctx, e, q) h.GenericFunc(ctx, e, q)
} }
} }
// LowPriority is the priority set by WithLowPriorityWhenUnchanged
const LowPriority = -100
// WithLowPriorityWhenUnchanged reduces the priority of events stemming from the initial listwatch or from a resync if
// and only if a priorityqueue.PriorityQueue is used. If not, it does nothing.
func WithLowPriorityWhenUnchanged[object client.Object, request comparable](u TypedEventHandler[object, request]) TypedEventHandler[object, request] {
return TypedFuncs[object, request]{
CreateFunc: func(ctx context.Context, tce event.TypedCreateEvent[object], trli workqueue.TypedRateLimitingInterface[request]) {
// Due to how the handlers are factored, we have to wrap the workqueue to be able
// to inject custom behavior.
u.Create(ctx, tce, workqueueWithCustomAddFunc[request]{
TypedRateLimitingInterface: trli,
addFunc: func(item request, q workqueue.TypedRateLimitingInterface[request]) {
priorityQueue, isPriorityQueue := q.(priorityqueue.PriorityQueue[request])
if !isPriorityQueue {
q.Add(item)
return
}
var priority int
if isObjectUnchanged(tce) {
priority = LowPriority
}
priorityQueue.AddWithOpts(priorityqueue.AddOpts{Priority: priority}, item)
},
})
},
UpdateFunc: func(ctx context.Context, tue event.TypedUpdateEvent[object], trli workqueue.TypedRateLimitingInterface[request]) {
u.Update(ctx, tue, workqueueWithCustomAddFunc[request]{
TypedRateLimitingInterface: trli,
addFunc: func(item request, q workqueue.TypedRateLimitingInterface[request]) {
priorityQueue, isPriorityQueue := q.(priorityqueue.PriorityQueue[request])
if !isPriorityQueue {
q.Add(item)
return
}
var priority int
if tue.ObjectOld.GetResourceVersion() == tue.ObjectNew.GetResourceVersion() {
priority = LowPriority
}
priorityQueue.AddWithOpts(priorityqueue.AddOpts{Priority: priority}, item)
},
})
},
DeleteFunc: u.Delete,
GenericFunc: u.Generic,
}
}
type workqueueWithCustomAddFunc[request comparable] struct {
workqueue.TypedRateLimitingInterface[request]
addFunc func(item request, q workqueue.TypedRateLimitingInterface[request])
}
func (w workqueueWithCustomAddFunc[request]) Add(item request) {
w.addFunc(item, w.TypedRateLimitingInterface)
}
// isObjectUnchanged checks if the object in a create event is unchanged, for example because
// we got it in our initial listwatch. The heuristic it uses is to check if the object is older
// than one minute.
func isObjectUnchanged[object client.Object](e event.TypedCreateEvent[object]) bool {
return e.Object.GetCreationTimestamp().Time.Before(time.Now().Add(-time.Minute))
}

View File

@ -21,9 +21,11 @@ import (
"errors" "errors"
"fmt" "fmt"
"sync" "sync"
"sync/atomic"
"time" "time"
"github.com/go-logr/logr" "github.com/go-logr/logr"
"golang.org/x/sync/errgroup"
"k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/types"
utilruntime "k8s.io/apimachinery/pkg/util/runtime" utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/uuid" "k8s.io/apimachinery/pkg/util/uuid"
@ -169,43 +171,66 @@ func (c *Controller[request]) Start(ctx context.Context) error {
defer utilruntime.HandleCrash() defer utilruntime.HandleCrash()
// NB(directxman12): launch the sources *before* trying to wait for the // NB(directxman12): launch the sources *before* trying to wait for the
// caches to sync so that they have a chance to register their intendeded // caches to sync so that they have a chance to register their intended
// caches. // caches.
errGroup := &errgroup.Group{}
for _, watch := range c.startWatches { for _, watch := range c.startWatches {
c.LogConstructor(nil).Info("Starting EventSource", "source", fmt.Sprintf("%s", watch)) log := c.LogConstructor(nil)
_, ok := watch.(interface {
String() string
})
if err := watch.Start(ctx, c.Queue); err != nil {
return err
}
}
// Start the SharedIndexInformer factories to begin populating the SharedIndexInformer caches
c.LogConstructor(nil).Info("Starting Controller")
for _, watch := range c.startWatches {
syncingSource, ok := watch.(source.SyncingSource)
if !ok { if !ok {
continue log = log.WithValues("source", fmt.Sprintf("%T", watch))
} else {
log = log.WithValues("source", fmt.Sprintf("%s", watch))
} }
didStartSyncingSource := &atomic.Bool{}
if err := func() error { errGroup.Go(func() error {
// use a context with timeout for launching sources and syncing caches. // Use a timeout for starting and syncing the source to avoid silently
// blocking startup indefinitely if it doesn't come up.
sourceStartCtx, cancel := context.WithTimeout(ctx, c.CacheSyncTimeout) sourceStartCtx, cancel := context.WithTimeout(ctx, c.CacheSyncTimeout)
defer cancel() defer cancel()
// WaitForSync waits for a definitive timeout, and returns if there sourceStartErrChan := make(chan error, 1) // Buffer chan to not leak goroutine if we time out
// is an error or a timeout go func() {
if err := syncingSource.WaitForSync(sourceStartCtx); err != nil { defer close(sourceStartErrChan)
err := fmt.Errorf("failed to wait for %s caches to sync: %w", c.Name, err) log.Info("Starting EventSource")
c.LogConstructor(nil).Error(err, "Could not wait for Cache to sync") if err := watch.Start(ctx, c.Queue); err != nil {
return err sourceStartErrChan <- err
} return
}
syncingSource, ok := watch.(source.TypedSyncingSource[request])
if !ok {
return
}
didStartSyncingSource.Store(true)
if err := syncingSource.WaitForSync(sourceStartCtx); err != nil {
err := fmt.Errorf("failed to wait for %s caches to sync %v: %w", c.Name, syncingSource, err)
log.Error(err, "Could not wait for Cache to sync")
sourceStartErrChan <- err
}
}()
return nil select {
}(); err != nil { case err := <-sourceStartErrChan:
return err return err
} case <-sourceStartCtx.Done():
if didStartSyncingSource.Load() { // We are racing with WaitForSync, wait for it to let it tell us what happened
return <-sourceStartErrChan
}
if ctx.Err() != nil { // Don't return an error if the root context got cancelled
return nil
}
return fmt.Errorf("timed out waiting for source %s to Start. Please ensure that its Start() method is non-blocking", watch)
}
})
} }
if err := errGroup.Wait(); err != nil {
return err
}
c.LogConstructor(nil).Info("Starting Controller")
// All the watches have been started, we can reset the local slice. // All the watches have been started, we can reset the local slice.
// //
@ -311,7 +336,7 @@ func (c *Controller[request]) reconcileHandler(ctx context.Context, req request)
ctrlmetrics.ReconcileErrors.WithLabelValues(c.Name).Inc() ctrlmetrics.ReconcileErrors.WithLabelValues(c.Name).Inc()
ctrlmetrics.ReconcileTotal.WithLabelValues(c.Name, labelError).Inc() ctrlmetrics.ReconcileTotal.WithLabelValues(c.Name, labelError).Inc()
if !result.IsZero() { if !result.IsZero() {
log.Info("Warning: Reconciler returned both a non-zero result and a non-nil error. The result will always be ignored if the error is non-nil and the non-nil error causes reqeueuing with exponential backoff. For more details, see: https://pkg.go.dev/sigs.k8s.io/controller-runtime/pkg/reconcile#Reconciler") log.Info("Warning: Reconciler returned both a non-zero result and a non-nil error. The result will always be ignored if the error is non-nil and the non-nil error causes requeuing with exponential backoff. For more details, see: https://pkg.go.dev/sigs.k8s.io/controller-runtime/pkg/reconcile#Reconciler")
} }
log.Error(err, "Reconciler error") log.Error(err, "Reconciler error")
case result.RequeueAfter > 0: case result.RequeueAfter > 0:

View File

@ -0,0 +1,131 @@
/*
Copyright 2018 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package metrics
import (
"github.com/prometheus/client_golang/prometheus"
"k8s.io/client-go/util/workqueue"
"sigs.k8s.io/controller-runtime/pkg/metrics"
)
// This file is copied and adapted from k8s.io/component-base/metrics/prometheus/workqueue
// which registers metrics to the k8s legacy Registry. We require very
// similar functionality, but must register metrics to a different Registry.
// Metrics subsystem and all keys used by the workqueue.
const (
WorkQueueSubsystem = metrics.WorkQueueSubsystem
DepthKey = metrics.DepthKey
AddsKey = metrics.AddsKey
QueueLatencyKey = metrics.QueueLatencyKey
WorkDurationKey = metrics.WorkDurationKey
UnfinishedWorkKey = metrics.UnfinishedWorkKey
LongestRunningProcessorKey = metrics.LongestRunningProcessorKey
RetriesKey = metrics.RetriesKey
)
var (
depth = prometheus.NewGaugeVec(prometheus.GaugeOpts{
Subsystem: WorkQueueSubsystem,
Name: DepthKey,
Help: "Current depth of workqueue",
}, []string{"name", "controller"})
adds = prometheus.NewCounterVec(prometheus.CounterOpts{
Subsystem: WorkQueueSubsystem,
Name: AddsKey,
Help: "Total number of adds handled by workqueue",
}, []string{"name", "controller"})
latency = prometheus.NewHistogramVec(prometheus.HistogramOpts{
Subsystem: WorkQueueSubsystem,
Name: QueueLatencyKey,
Help: "How long in seconds an item stays in workqueue before being requested",
Buckets: prometheus.ExponentialBuckets(10e-9, 10, 12),
}, []string{"name", "controller"})
workDuration = prometheus.NewHistogramVec(prometheus.HistogramOpts{
Subsystem: WorkQueueSubsystem,
Name: WorkDurationKey,
Help: "How long in seconds processing an item from workqueue takes.",
Buckets: prometheus.ExponentialBuckets(10e-9, 10, 12),
}, []string{"name", "controller"})
unfinished = prometheus.NewGaugeVec(prometheus.GaugeOpts{
Subsystem: WorkQueueSubsystem,
Name: UnfinishedWorkKey,
Help: "How many seconds of work has been done that " +
"is in progress and hasn't been observed by work_duration. Large " +
"values indicate stuck threads. One can deduce the number of stuck " +
"threads by observing the rate at which this increases.",
}, []string{"name", "controller"})
longestRunningProcessor = prometheus.NewGaugeVec(prometheus.GaugeOpts{
Subsystem: WorkQueueSubsystem,
Name: LongestRunningProcessorKey,
Help: "How many seconds has the longest running " +
"processor for workqueue been running.",
}, []string{"name", "controller"})
retries = prometheus.NewCounterVec(prometheus.CounterOpts{
Subsystem: WorkQueueSubsystem,
Name: RetriesKey,
Help: "Total number of retries handled by workqueue",
}, []string{"name", "controller"})
)
func init() {
metrics.Registry.MustRegister(depth)
metrics.Registry.MustRegister(adds)
metrics.Registry.MustRegister(latency)
metrics.Registry.MustRegister(workDuration)
metrics.Registry.MustRegister(unfinished)
metrics.Registry.MustRegister(longestRunningProcessor)
metrics.Registry.MustRegister(retries)
workqueue.SetProvider(WorkqueueMetricsProvider{})
}
type WorkqueueMetricsProvider struct{}
func (WorkqueueMetricsProvider) NewDepthMetric(name string) workqueue.GaugeMetric {
return depth.WithLabelValues(name, name)
}
func (WorkqueueMetricsProvider) NewAddsMetric(name string) workqueue.CounterMetric {
return adds.WithLabelValues(name, name)
}
func (WorkqueueMetricsProvider) NewLatencyMetric(name string) workqueue.HistogramMetric {
return latency.WithLabelValues(name, name)
}
func (WorkqueueMetricsProvider) NewWorkDurationMetric(name string) workqueue.HistogramMetric {
return workDuration.WithLabelValues(name, name)
}
func (WorkqueueMetricsProvider) NewUnfinishedWorkSecondsMetric(name string) workqueue.SettableGaugeMetric {
return unfinished.WithLabelValues(name, name)
}
func (WorkqueueMetricsProvider) NewLongestRunningProcessorSecondsMetric(name string) workqueue.SettableGaugeMetric {
return longestRunningProcessor.WithLabelValues(name, name)
}
func (WorkqueueMetricsProvider) NewRetriesMetric(name string) workqueue.CounterMetric {
return retries.WithLabelValues(name, name)
}

View File

@ -52,7 +52,7 @@ func (ks *Kind[object, request]) Start(ctx context.Context, queue workqueue.Type
// cache.GetInformer will block until its context is cancelled if the cache was already started and it can not // cache.GetInformer will block until its context is cancelled if the cache was already started and it can not
// sync that informer (most commonly due to RBAC issues). // sync that informer (most commonly due to RBAC issues).
ctx, ks.startCancel = context.WithCancel(ctx) ctx, ks.startCancel = context.WithCancel(ctx)
ks.startedErr = make(chan error) ks.startedErr = make(chan error, 1) // Buffer chan to not leak goroutines if WaitForSync isn't called
go func() { go func() {
var ( var (
i cache.Informer i cache.Informer

View File

@ -275,7 +275,7 @@ func (s *defaultServer) createListener(ctx context.Context, log logr.Logger) (ne
return s.options.ListenConfig.Listen(ctx, "tcp", s.options.BindAddress) return s.options.ListenConfig.Listen(ctx, "tcp", s.options.BindAddress)
} }
cfg := &tls.Config{ //nolint:gosec cfg := &tls.Config{
NextProtos: []string{"h2"}, NextProtos: []string{"h2"},
} }
// fallback TLS config ready, will now mutate if passer wants full control over it // fallback TLS config ready, will now mutate if passer wants full control over it

View File

@ -16,15 +16,6 @@ limitations under the License.
package metrics package metrics
import (
"github.com/prometheus/client_golang/prometheus"
"k8s.io/client-go/util/workqueue"
)
// This file is copied and adapted from k8s.io/component-base/metrics/prometheus/workqueue
// which registers metrics to the k8s legacy Registry. We require very
// similar functionality, but must register metrics to a different Registry.
// Metrics subsystem and all keys used by the workqueue. // Metrics subsystem and all keys used by the workqueue.
const ( const (
WorkQueueSubsystem = "workqueue" WorkQueueSubsystem = "workqueue"
@ -36,95 +27,3 @@ const (
LongestRunningProcessorKey = "longest_running_processor_seconds" LongestRunningProcessorKey = "longest_running_processor_seconds"
RetriesKey = "retries_total" RetriesKey = "retries_total"
) )
var (
depth = prometheus.NewGaugeVec(prometheus.GaugeOpts{
Subsystem: WorkQueueSubsystem,
Name: DepthKey,
Help: "Current depth of workqueue",
}, []string{"name", "controller"})
adds = prometheus.NewCounterVec(prometheus.CounterOpts{
Subsystem: WorkQueueSubsystem,
Name: AddsKey,
Help: "Total number of adds handled by workqueue",
}, []string{"name", "controller"})
latency = prometheus.NewHistogramVec(prometheus.HistogramOpts{
Subsystem: WorkQueueSubsystem,
Name: QueueLatencyKey,
Help: "How long in seconds an item stays in workqueue before being requested",
Buckets: prometheus.ExponentialBuckets(10e-9, 10, 12),
}, []string{"name", "controller"})
workDuration = prometheus.NewHistogramVec(prometheus.HistogramOpts{
Subsystem: WorkQueueSubsystem,
Name: WorkDurationKey,
Help: "How long in seconds processing an item from workqueue takes.",
Buckets: prometheus.ExponentialBuckets(10e-9, 10, 12),
}, []string{"name", "controller"})
unfinished = prometheus.NewGaugeVec(prometheus.GaugeOpts{
Subsystem: WorkQueueSubsystem,
Name: UnfinishedWorkKey,
Help: "How many seconds of work has been done that " +
"is in progress and hasn't been observed by work_duration. Large " +
"values indicate stuck threads. One can deduce the number of stuck " +
"threads by observing the rate at which this increases.",
}, []string{"name", "controller"})
longestRunningProcessor = prometheus.NewGaugeVec(prometheus.GaugeOpts{
Subsystem: WorkQueueSubsystem,
Name: LongestRunningProcessorKey,
Help: "How many seconds has the longest running " +
"processor for workqueue been running.",
}, []string{"name", "controller"})
retries = prometheus.NewCounterVec(prometheus.CounterOpts{
Subsystem: WorkQueueSubsystem,
Name: RetriesKey,
Help: "Total number of retries handled by workqueue",
}, []string{"name", "controller"})
)
func init() {
Registry.MustRegister(depth)
Registry.MustRegister(adds)
Registry.MustRegister(latency)
Registry.MustRegister(workDuration)
Registry.MustRegister(unfinished)
Registry.MustRegister(longestRunningProcessor)
Registry.MustRegister(retries)
workqueue.SetProvider(workqueueMetricsProvider{})
}
type workqueueMetricsProvider struct{}
func (workqueueMetricsProvider) NewDepthMetric(name string) workqueue.GaugeMetric {
return depth.WithLabelValues(name, name)
}
func (workqueueMetricsProvider) NewAddsMetric(name string) workqueue.CounterMetric {
return adds.WithLabelValues(name, name)
}
func (workqueueMetricsProvider) NewLatencyMetric(name string) workqueue.HistogramMetric {
return latency.WithLabelValues(name, name)
}
func (workqueueMetricsProvider) NewWorkDurationMetric(name string) workqueue.HistogramMetric {
return workDuration.WithLabelValues(name, name)
}
func (workqueueMetricsProvider) NewUnfinishedWorkSecondsMetric(name string) workqueue.SettableGaugeMetric {
return unfinished.WithLabelValues(name, name)
}
func (workqueueMetricsProvider) NewLongestRunningProcessorSecondsMetric(name string) workqueue.SettableGaugeMetric {
return longestRunningProcessor.WithLabelValues(name, name)
}
func (workqueueMetricsProvider) NewRetriesMetric(name string) workqueue.CounterMetric {
return retries.WithLabelValues(name, name)
}

View File

@ -173,7 +173,8 @@ func (TypedResourceVersionChangedPredicate[T]) Update(e event.TypedUpdateEvent[T
// The metadata.generation field of an object is incremented by the API server when writes are made to the spec field of an object. // The metadata.generation field of an object is incremented by the API server when writes are made to the spec field of an object.
// This allows a controller to ignore update events where the spec is unchanged, and only the metadata and/or status fields are changed. // This allows a controller to ignore update events where the spec is unchanged, and only the metadata and/or status fields are changed.
// //
// For CustomResource objects the Generation is only incremented when the status subresource is enabled. // For CustomResource objects the Generation is incremented when spec is changed, or status changed and status not modeled as subresource.
// subresource status update will not increase Generation.
// //
// Caveats: // Caveats:
// //
@ -191,7 +192,8 @@ type GenerationChangedPredicate = TypedGenerationChangedPredicate[client.Object]
// The metadata.generation field of an object is incremented by the API server when writes are made to the spec field of an object. // The metadata.generation field of an object is incremented by the API server when writes are made to the spec field of an object.
// This allows a controller to ignore update events where the spec is unchanged, and only the metadata and/or status fields are changed. // This allows a controller to ignore update events where the spec is unchanged, and only the metadata and/or status fields are changed.
// //
// For CustomResource objects the Generation is only incremented when the status subresource is enabled. // For CustomResource objects the Generation is incremented when spec is changed, or status changed and status not modeled as subresource.
// subresource status update will not increase Generation.
// //
// Caveats: // Caveats:
// //

View File

@ -1,84 +0,0 @@
/*
Copyright 2018 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package admission
import (
"context"
"encoding/json"
"net/http"
admissionv1 "k8s.io/api/admission/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
)
// Defaulter defines functions for setting defaults on resources.
// Deprecated: Ue CustomDefaulter instead.
type Defaulter interface {
runtime.Object
Default()
}
// DefaultingWebhookFor creates a new Webhook for Defaulting the provided type.
// Deprecated: Use WithCustomDefaulter instead.
func DefaultingWebhookFor(scheme *runtime.Scheme, defaulter Defaulter) *Webhook {
return &Webhook{
Handler: &mutatingHandler{defaulter: defaulter, decoder: NewDecoder(scheme)},
}
}
type mutatingHandler struct {
defaulter Defaulter
decoder Decoder
}
// Handle handles admission requests.
func (h *mutatingHandler) Handle(ctx context.Context, req Request) Response {
if h.decoder == nil {
panic("decoder should never be nil")
}
if h.defaulter == nil {
panic("defaulter should never be nil")
}
// always skip when a DELETE operation received in mutation handler
// describe in https://github.com/kubernetes-sigs/controller-runtime/issues/1762
if req.Operation == admissionv1.Delete {
return Response{AdmissionResponse: admissionv1.AdmissionResponse{
Allowed: true,
Result: &metav1.Status{
Code: http.StatusOK,
},
}}
}
// Get the object in the request
obj := h.defaulter.DeepCopyObject().(Defaulter)
if err := h.decoder.Decode(req, obj); err != nil {
return Errored(http.StatusBadRequest, err)
}
// Default the object
obj.Default()
marshalled, err := json.Marshal(obj)
if err != nil {
return Errored(http.StatusInternalServerError, err)
}
// Create the patch
return PatchResponseFromRaw(req.Object.Raw, marshalled)
}

View File

@ -21,11 +21,14 @@ import (
"encoding/json" "encoding/json"
"errors" "errors"
"net/http" "net/http"
"slices"
"gomodules.xyz/jsonpatch/v2"
admissionv1 "k8s.io/api/admission/v1" admissionv1 "k8s.io/api/admission/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors" apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/util/sets"
) )
// CustomDefaulter defines functions for setting defaults on resources. // CustomDefaulter defines functions for setting defaults on resources.
@ -33,17 +36,41 @@ type CustomDefaulter interface {
Default(ctx context.Context, obj runtime.Object) error Default(ctx context.Context, obj runtime.Object) error
} }
type defaulterOptions struct {
removeUnknownOrOmitableFields bool
}
// DefaulterOption defines the type of a CustomDefaulter's option
type DefaulterOption func(*defaulterOptions)
// DefaulterRemoveUnknownOrOmitableFields makes the defaulter prune fields that are in the json object retrieved by the
// webhook but not in the local go type json representation. This happens for example when the CRD in the apiserver has
// fields that our go type doesn't know about, because it's outdated, or the field has a zero value and is `omitempty`.
func DefaulterRemoveUnknownOrOmitableFields(o *defaulterOptions) {
o.removeUnknownOrOmitableFields = true
}
// WithCustomDefaulter creates a new Webhook for a CustomDefaulter interface. // WithCustomDefaulter creates a new Webhook for a CustomDefaulter interface.
func WithCustomDefaulter(scheme *runtime.Scheme, obj runtime.Object, defaulter CustomDefaulter) *Webhook { func WithCustomDefaulter(scheme *runtime.Scheme, obj runtime.Object, defaulter CustomDefaulter, opts ...DefaulterOption) *Webhook {
options := &defaulterOptions{}
for _, o := range opts {
o(options)
}
return &Webhook{ return &Webhook{
Handler: &defaulterForType{object: obj, defaulter: defaulter, decoder: NewDecoder(scheme)}, Handler: &defaulterForType{
object: obj,
defaulter: defaulter,
decoder: NewDecoder(scheme),
removeUnknownOrOmitableFields: options.removeUnknownOrOmitableFields,
},
} }
} }
type defaulterForType struct { type defaulterForType struct {
defaulter CustomDefaulter defaulter CustomDefaulter
object runtime.Object object runtime.Object
decoder Decoder decoder Decoder
removeUnknownOrOmitableFields bool
} }
// Handle handles admission requests. // Handle handles admission requests.
@ -76,6 +103,12 @@ func (h *defaulterForType) Handle(ctx context.Context, req Request) Response {
return Errored(http.StatusBadRequest, err) return Errored(http.StatusBadRequest, err)
} }
// Keep a copy of the object if needed
var originalObj runtime.Object
if !h.removeUnknownOrOmitableFields {
originalObj = obj.DeepCopyObject()
}
// Default the object // Default the object
if err := h.defaulter.Default(ctx, obj); err != nil { if err := h.defaulter.Default(ctx, obj); err != nil {
var apiStatus apierrors.APIStatus var apiStatus apierrors.APIStatus
@ -90,5 +123,43 @@ func (h *defaulterForType) Handle(ctx context.Context, req Request) Response {
if err != nil { if err != nil {
return Errored(http.StatusInternalServerError, err) return Errored(http.StatusInternalServerError, err)
} }
return PatchResponseFromRaw(req.Object.Raw, marshalled)
handlerResponse := PatchResponseFromRaw(req.Object.Raw, marshalled)
if !h.removeUnknownOrOmitableFields {
handlerResponse = h.dropSchemeRemovals(handlerResponse, originalObj, req.Object.Raw)
}
return handlerResponse
}
func (h *defaulterForType) dropSchemeRemovals(r Response, original runtime.Object, raw []byte) Response {
const opRemove = "remove"
if !r.Allowed || r.PatchType == nil {
return r
}
// If we don't have removals in the patch.
if !slices.ContainsFunc(r.Patches, func(o jsonpatch.JsonPatchOperation) bool { return o.Operation == opRemove }) {
return r
}
// Get the raw to original patch
marshalledOriginal, err := json.Marshal(original)
if err != nil {
return Errored(http.StatusInternalServerError, err)
}
patchOriginal, err := jsonpatch.CreatePatch(raw, marshalledOriginal)
if err != nil {
return Errored(http.StatusInternalServerError, err)
}
removedByScheme := sets.New(slices.DeleteFunc(patchOriginal, func(p jsonpatch.JsonPatchOperation) bool { return p.Operation != opRemove })...)
r.Patches = slices.DeleteFunc(r.Patches, func(p jsonpatch.JsonPatchOperation) bool {
return p.Operation == opRemove && removedByScheme.Has(p)
})
if len(r.Patches) == 0 {
r.PatchType = nil
}
return r
} }

View File

@ -1,127 +0,0 @@
/*
Copyright 2018 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package admission
import (
"context"
"errors"
"fmt"
"net/http"
v1 "k8s.io/api/admission/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/runtime"
)
// Warnings represents warning messages.
type Warnings []string
// Validator defines functions for validating an operation.
// The custom resource kind which implements this interface can validate itself.
// To validate the custom resource with another specific struct, use CustomValidator instead.
// Deprecated: Use CustomValidator instead.
type Validator interface {
runtime.Object
// ValidateCreate validates the object on creation.
// The optional warnings will be added to the response as warning messages.
// Return an error if the object is invalid.
ValidateCreate() (warnings Warnings, err error)
// ValidateUpdate validates the object on update. The oldObj is the object before the update.
// The optional warnings will be added to the response as warning messages.
// Return an error if the object is invalid.
ValidateUpdate(old runtime.Object) (warnings Warnings, err error)
// ValidateDelete validates the object on deletion.
// The optional warnings will be added to the response as warning messages.
// Return an error if the object is invalid.
ValidateDelete() (warnings Warnings, err error)
}
// ValidatingWebhookFor creates a new Webhook for validating the provided type.
// Deprecated: Use WithCustomValidator instead.
func ValidatingWebhookFor(scheme *runtime.Scheme, validator Validator) *Webhook {
return &Webhook{
Handler: &validatingHandler{validator: validator, decoder: NewDecoder(scheme)},
}
}
type validatingHandler struct {
validator Validator
decoder Decoder
}
// Handle handles admission requests.
func (h *validatingHandler) Handle(ctx context.Context, req Request) Response {
if h.decoder == nil {
panic("decoder should never be nil")
}
if h.validator == nil {
panic("validator should never be nil")
}
// Get the object in the request
obj := h.validator.DeepCopyObject().(Validator)
var err error
var warnings []string
switch req.Operation {
case v1.Connect:
// No validation for connect requests.
// TODO(vincepri): Should we validate CONNECT requests? In what cases?
case v1.Create:
if err = h.decoder.Decode(req, obj); err != nil {
return Errored(http.StatusBadRequest, err)
}
warnings, err = obj.ValidateCreate()
case v1.Update:
oldObj := obj.DeepCopyObject()
err = h.decoder.DecodeRaw(req.Object, obj)
if err != nil {
return Errored(http.StatusBadRequest, err)
}
err = h.decoder.DecodeRaw(req.OldObject, oldObj)
if err != nil {
return Errored(http.StatusBadRequest, err)
}
warnings, err = obj.ValidateUpdate(oldObj)
case v1.Delete:
// In reference to PR: https://github.com/kubernetes/kubernetes/pull/76346
// OldObject contains the object being deleted
err = h.decoder.DecodeRaw(req.OldObject, obj)
if err != nil {
return Errored(http.StatusBadRequest, err)
}
warnings, err = obj.ValidateDelete()
default:
return Errored(http.StatusBadRequest, fmt.Errorf("unknown operation %q", req.Operation))
}
if err != nil {
var apiStatus apierrors.APIStatus
if errors.As(err, &apiStatus) {
return validationResponseFromStatus(false, apiStatus.Status()).WithWarnings(warnings...)
}
return Denied(err.Error()).WithWarnings(warnings...)
}
return Allowed("").WithWarnings(warnings...)
}

View File

@ -27,6 +27,9 @@ import (
"k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime"
) )
// Warnings represents warning messages.
type Warnings []string
// CustomValidator defines functions for validating an operation. // CustomValidator defines functions for validating an operation.
// The object to be validated is passed into methods as a parameter. // The object to be validated is passed into methods as a parameter.
type CustomValidator interface { type CustomValidator interface {

View File

@ -23,14 +23,6 @@ import (
// define some aliases for common bits of the webhook functionality // define some aliases for common bits of the webhook functionality
// Defaulter defines functions for setting defaults on resources.
// Deprecated: Use CustomDefaulter instead.
type Defaulter = admission.Defaulter
// Validator defines functions for validating an operation.
// Deprecated: Use CustomValidator instead.
type Validator = admission.Validator
// CustomDefaulter defines functions for setting defaults on resources. // CustomDefaulter defines functions for setting defaults on resources.
type CustomDefaulter = admission.CustomDefaulter type CustomDefaulter = admission.CustomDefaulter

View File

@ -190,7 +190,7 @@ func (s *DefaultServer) Start(ctx context.Context) error {
log.Info("Starting webhook server") log.Info("Starting webhook server")
cfg := &tls.Config{ //nolint:gosec cfg := &tls.Config{
NextProtos: []string{"h2"}, NextProtos: []string{"h2"},
} }
// fallback TLS config ready, will now mutate if passer wants full control over it // fallback TLS config ready, will now mutate if passer wants full control over it
@ -272,7 +272,7 @@ func (s *DefaultServer) Start(ctx context.Context) error {
// server has been started. // server has been started.
func (s *DefaultServer) StartedChecker() healthz.Checker { func (s *DefaultServer) StartedChecker() healthz.Checker {
config := &tls.Config{ config := &tls.Config{
InsecureSkipVerify: true, //nolint:gosec // config is used to connect to our own webhook port. InsecureSkipVerify: true,
} }
return func(req *http.Request) error { return func(req *http.Request) error {
s.mu.Lock() s.mu.Lock()