From 636fa26ab8d63c93bc9b4e910ebfcccc0ff7ccb3 Mon Sep 17 00:00:00 2001 From: "dependabot[bot]" <49699333+dependabot[bot]@users.noreply.github.com> Date: Mon, 27 Jan 2025 20:58:32 +0000 Subject: [PATCH] rebase: bump sigs.k8s.io/controller-runtime Bumps the k8s-dependencies group with 1 update in the / directory: [sigs.k8s.io/controller-runtime](https://github.com/kubernetes-sigs/controller-runtime). Updates `sigs.k8s.io/controller-runtime` from 0.19.4 to 0.20.1 - [Release notes](https://github.com/kubernetes-sigs/controller-runtime/releases) - [Changelog](https://github.com/kubernetes-sigs/controller-runtime/blob/main/RELEASE.md) - [Commits](https://github.com/kubernetes-sigs/controller-runtime/compare/v0.19.4...v0.20.1) --- updated-dependencies: - dependency-name: sigs.k8s.io/controller-runtime dependency-type: direct:production update-type: version-update:semver-minor dependency-group: k8s-dependencies ... Signed-off-by: dependabot[bot] --- go.mod | 6 +- go.sum | 7 +- vendor/github.com/google/btree/.travis.yml | 1 - vendor/github.com/google/btree/README.md | 2 - vendor/github.com/google/btree/btree.go | 5 +- .../github.com/google/btree/btree_generic.go | 1083 +++++++++++++++++ vendor/golang.org/x/exp/maps/maps.go | 94 -- vendor/golang.org/x/sync/errgroup/errgroup.go | 135 ++ vendor/golang.org/x/sync/errgroup/go120.go | 13 + .../golang.org/x/sync/errgroup/pre_go120.go | 14 + vendor/modules.txt | 14 +- .../controller-runtime/pkg/cache/cache.go | 27 +- .../pkg/cache/delegating_by_gvk_cache.go | 7 +- .../pkg/cache/internal/informers.go | 6 +- .../pkg/client/apiutil/restmapper.go | 125 +- .../controller-runtime/pkg/client/client.go | 4 +- .../pkg/client/interfaces.go | 14 +- .../controller-runtime/pkg/cluster/cluster.go | 18 +- .../pkg/config/controller.go | 6 + .../pkg/controller/controller.go | 14 +- .../pkg/controller/priorityqueue/metrics.go | 146 +++ .../controller/priorityqueue/priorityqueue.go | 401 ++++++ .../pkg/handler/eventhandler.go | 66 + .../pkg/internal/controller/controller.go | 81 +- .../pkg/internal/metrics/workqueue.go | 131 ++ .../pkg/internal/source/kind.go | 2 +- .../pkg/metrics/server/server.go | 2 +- .../pkg/metrics/workqueue.go | 101 -- .../pkg/predicate/predicate.go | 6 +- .../pkg/webhook/admission/defaulter.go | 84 -- .../pkg/webhook/admission/defaulter_custom.go | 83 +- .../pkg/webhook/admission/validator.go | 127 -- .../pkg/webhook/admission/validator_custom.go | 3 + .../controller-runtime/pkg/webhook/alias.go | 8 - .../controller-runtime/pkg/webhook/server.go | 4 +- 35 files changed, 2279 insertions(+), 561 deletions(-) delete mode 100644 vendor/github.com/google/btree/.travis.yml create mode 100644 vendor/github.com/google/btree/btree_generic.go delete mode 100644 vendor/golang.org/x/exp/maps/maps.go create mode 100644 vendor/golang.org/x/sync/errgroup/errgroup.go create mode 100644 vendor/golang.org/x/sync/errgroup/go120.go create mode 100644 vendor/golang.org/x/sync/errgroup/pre_go120.go create mode 100644 vendor/sigs.k8s.io/controller-runtime/pkg/controller/priorityqueue/metrics.go create mode 100644 vendor/sigs.k8s.io/controller-runtime/pkg/controller/priorityqueue/priorityqueue.go create mode 100644 vendor/sigs.k8s.io/controller-runtime/pkg/internal/metrics/workqueue.go delete mode 100644 vendor/sigs.k8s.io/controller-runtime/pkg/webhook/admission/defaulter.go delete mode 100644 vendor/sigs.k8s.io/controller-runtime/pkg/webhook/admission/validator.go diff --git a/go.mod b/go.mod index 0b889cee9..7114f8d1d 100644 --- a/go.mod +++ b/go.mod @@ -42,7 +42,7 @@ require ( k8s.io/mount-utils v0.32.1 k8s.io/pod-security-admission v0.32.1 k8s.io/utils v0.0.0-20241104100929-3ea5e8cea738 - sigs.k8s.io/controller-runtime v0.19.4 + sigs.k8s.io/controller-runtime v0.20.1 ) require ( @@ -101,7 +101,7 @@ require ( github.com/godbus/dbus/v5 v5.1.0 // indirect github.com/gogo/protobuf v1.3.2 // indirect github.com/golang-jwt/jwt/v5 v5.2.1 // indirect - github.com/google/btree v1.0.1 // indirect + github.com/google/btree v1.1.3 // indirect github.com/google/cadvisor v0.51.0 // indirect github.com/google/cel-go v0.22.0 // indirect github.com/google/gnostic-models v0.6.8 // indirect @@ -188,7 +188,7 @@ require ( gopkg.in/natefinch/lumberjack.v2 v2.2.1 // indirect gopkg.in/yaml.v2 v2.4.0 // indirect gopkg.in/yaml.v3 v3.0.1 // indirect - k8s.io/apiextensions-apiserver v0.31.1 // indirect + k8s.io/apiextensions-apiserver v0.32.0 // indirect k8s.io/apiserver v0.32.1 // indirect k8s.io/component-base v0.32.1 // indirect k8s.io/component-helpers v0.32.1 // indirect diff --git a/go.sum b/go.sum index 1d78a04f3..89afa3d76 100644 --- a/go.sum +++ b/go.sum @@ -1745,8 +1745,9 @@ github.com/gomodules/jsonpatch/v2 v2.2.0 h1:QBjDK/nX43P4z/Os3gnk8VeFdLDgBuMns1Wl github.com/gomodules/jsonpatch/v2 v2.2.0/go.mod h1:WXp+iVDkoLQqPudfQ9GBlwB2eZ5DKOnjQZCYdOS8GPY= github.com/google/btree v0.0.0-20180813153112-4030bb1f1f0c/go.mod h1:lNA+9X1NB3Zf8V7Ke586lFgjr2dZNuvo3lPJSGZ5JPQ= github.com/google/btree v1.0.0/go.mod h1:lNA+9X1NB3Zf8V7Ke586lFgjr2dZNuvo3lPJSGZ5JPQ= -github.com/google/btree v1.0.1 h1:gK4Kx5IaGY9CD5sPJ36FHiBJ6ZXl0kilRiiCj+jdYp4= github.com/google/btree v1.0.1/go.mod h1:xXMiIv4Fb/0kKde4SpL7qlzvu5cMJDRkFDxJfI9uaxA= +github.com/google/btree v1.1.3 h1:CVpQJjYgC4VbzxeGVHfvZrv1ctoYCAI8vbl07Fcxlyg= +github.com/google/btree v1.1.3/go.mod h1:qOPhT0dTNdNzV6Z/lhRX0YXUafgPLFUh+gZMl761Gm4= github.com/google/cadvisor v0.51.0 h1:BspqSPdZoLKrnvuZNOvM/KiJ/A+RdixwagN20n+2H8k= github.com/google/cadvisor v0.51.0/go.mod h1:czGE/c/P/i0QFpVNKTFrIEzord9Y10YfpwuaSWXELc0= github.com/google/cel-go v0.22.0 h1:b3FJZxpiv1vTMo2/5RDUqAHPxkT8mmMfJIrq1llbf7g= @@ -3657,8 +3658,8 @@ rsc.io/sampler v1.3.0/go.mod h1:T1hPZKmBbMNahiBKFy5HrXp6adAjACjK9JXDnKaTXpA= sigs.k8s.io/apiserver-network-proxy/konnectivity-client v0.31.0 h1:CPT0ExVicCzcpeN4baWEV2ko2Z/AsiZgEdwgcfwLgMo= sigs.k8s.io/apiserver-network-proxy/konnectivity-client v0.31.0/go.mod h1:Ve9uj1L+deCXFrPOk1LpFXqTg7LCFzFso6PA48q/XZw= sigs.k8s.io/controller-runtime v0.2.2/go.mod h1:9dyohw3ZtoXQuV1e766PHUn+cmrRCIcBh6XIMFNMZ+I= -sigs.k8s.io/controller-runtime v0.19.4 h1:SUmheabttt0nx8uJtoII4oIP27BVVvAKFvdvGFwV/Qo= -sigs.k8s.io/controller-runtime v0.19.4/go.mod h1:iRmWllt8IlaLjvTTDLhRBXIEtkCK6hwVBJJsYS9Ajf4= +sigs.k8s.io/controller-runtime v0.20.1 h1:JbGMAG/X94NeM3xvjenVUaBjy6Ui4Ogd/J5ZtjZnHaE= +sigs.k8s.io/controller-runtime v0.20.1/go.mod h1:BrP3w158MwvB3ZbNpaAcIKkHQ7YGpYnzpoSTZ8E14WU= sigs.k8s.io/json v0.0.0-20221116044647-bc3834ca7abd/go.mod h1:B8JuhiUyNFVKdsE8h686QcCxMaH6HrOAZj4vswFpcB0= sigs.k8s.io/json v0.0.0-20241010143419-9aa6b5e7a4b3 h1:/Rv+M11QRah1itp8VhT6HoVx1Ray9eB4DBr+K+/sCJ8= sigs.k8s.io/json v0.0.0-20241010143419-9aa6b5e7a4b3/go.mod h1:18nIHnGi6636UCz6m8i4DhaJ65T6EruyzmoQqI2BVDo= diff --git a/vendor/github.com/google/btree/.travis.yml b/vendor/github.com/google/btree/.travis.yml deleted file mode 100644 index 4f2ee4d97..000000000 --- a/vendor/github.com/google/btree/.travis.yml +++ /dev/null @@ -1 +0,0 @@ -language: go diff --git a/vendor/github.com/google/btree/README.md b/vendor/github.com/google/btree/README.md index 6062a4dac..eab5dbf7b 100644 --- a/vendor/github.com/google/btree/README.md +++ b/vendor/github.com/google/btree/README.md @@ -1,7 +1,5 @@ # BTree implementation for Go -![Travis CI Build Status](https://api.travis-ci.org/google/btree.svg?branch=master) - This package provides an in-memory B-Tree implementation for Go, useful as an ordered, mutable data structure. diff --git a/vendor/github.com/google/btree/btree.go b/vendor/github.com/google/btree/btree.go index b83acdbc6..6f5184fef 100644 --- a/vendor/github.com/google/btree/btree.go +++ b/vendor/github.com/google/btree/btree.go @@ -12,6 +12,9 @@ // See the License for the specific language governing permissions and // limitations under the License. +//go:build !go1.18 +// +build !go1.18 + // Package btree implements in-memory B-Trees of arbitrary degree. // // btree implements an in-memory B-Tree for use as an ordered data structure. @@ -476,7 +479,7 @@ func (n *node) growChildAndRemove(i int, item Item, minItems int, typ toRemove) child := n.mutableChild(i) // merge with right child mergeItem := n.items.removeAt(i) - mergeChild := n.children.removeAt(i + 1) + mergeChild := n.children.removeAt(i + 1).mutableFor(n.cow) child.items = append(child.items, mergeItem) child.items = append(child.items, mergeChild.items...) child.children = append(child.children, mergeChild.children...) diff --git a/vendor/github.com/google/btree/btree_generic.go b/vendor/github.com/google/btree/btree_generic.go new file mode 100644 index 000000000..e44a0f488 --- /dev/null +++ b/vendor/github.com/google/btree/btree_generic.go @@ -0,0 +1,1083 @@ +// Copyright 2014-2022 Google Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//go:build go1.18 +// +build go1.18 + +// In Go 1.18 and beyond, a BTreeG generic is created, and BTree is a specific +// instantiation of that generic for the Item interface, with a backwards- +// compatible API. Before go1.18, generics are not supported, +// and BTree is just an implementation based around the Item interface. + +// Package btree implements in-memory B-Trees of arbitrary degree. +// +// btree implements an in-memory B-Tree for use as an ordered data structure. +// It is not meant for persistent storage solutions. +// +// It has a flatter structure than an equivalent red-black or other binary tree, +// which in some cases yields better memory usage and/or performance. +// See some discussion on the matter here: +// http://google-opensource.blogspot.com/2013/01/c-containers-that-save-memory-and-time.html +// Note, though, that this project is in no way related to the C++ B-Tree +// implementation written about there. +// +// Within this tree, each node contains a slice of items and a (possibly nil) +// slice of children. For basic numeric values or raw structs, this can cause +// efficiency differences when compared to equivalent C++ template code that +// stores values in arrays within the node: +// * Due to the overhead of storing values as interfaces (each +// value needs to be stored as the value itself, then 2 words for the +// interface pointing to that value and its type), resulting in higher +// memory use. +// * Since interfaces can point to values anywhere in memory, values are +// most likely not stored in contiguous blocks, resulting in a higher +// number of cache misses. +// These issues don't tend to matter, though, when working with strings or other +// heap-allocated structures, since C++-equivalent structures also must store +// pointers and also distribute their values across the heap. +// +// This implementation is designed to be a drop-in replacement to gollrb.LLRB +// trees, (http://github.com/petar/gollrb), an excellent and probably the most +// widely used ordered tree implementation in the Go ecosystem currently. +// Its functions, therefore, exactly mirror those of +// llrb.LLRB where possible. Unlike gollrb, though, we currently don't +// support storing multiple equivalent values. +// +// There are two implementations; those suffixed with 'G' are generics, usable +// for any type, and require a passed-in "less" function to define their ordering. +// Those without this prefix are specific to the 'Item' interface, and use +// its 'Less' function for ordering. +package btree + +import ( + "fmt" + "io" + "sort" + "strings" + "sync" +) + +// Item represents a single object in the tree. +type Item interface { + // Less tests whether the current item is less than the given argument. + // + // This must provide a strict weak ordering. + // If !a.Less(b) && !b.Less(a), we treat this to mean a == b (i.e. we can only + // hold one of either a or b in the tree). + Less(than Item) bool +} + +const ( + DefaultFreeListSize = 32 +) + +// FreeListG represents a free list of btree nodes. By default each +// BTree has its own FreeList, but multiple BTrees can share the same +// FreeList, in particular when they're created with Clone. +// Two Btrees using the same freelist are safe for concurrent write access. +type FreeListG[T any] struct { + mu sync.Mutex + freelist []*node[T] +} + +// NewFreeListG creates a new free list. +// size is the maximum size of the returned free list. +func NewFreeListG[T any](size int) *FreeListG[T] { + return &FreeListG[T]{freelist: make([]*node[T], 0, size)} +} + +func (f *FreeListG[T]) newNode() (n *node[T]) { + f.mu.Lock() + index := len(f.freelist) - 1 + if index < 0 { + f.mu.Unlock() + return new(node[T]) + } + n = f.freelist[index] + f.freelist[index] = nil + f.freelist = f.freelist[:index] + f.mu.Unlock() + return +} + +func (f *FreeListG[T]) freeNode(n *node[T]) (out bool) { + f.mu.Lock() + if len(f.freelist) < cap(f.freelist) { + f.freelist = append(f.freelist, n) + out = true + } + f.mu.Unlock() + return +} + +// ItemIteratorG allows callers of {A/De}scend* to iterate in-order over portions of +// the tree. When this function returns false, iteration will stop and the +// associated Ascend* function will immediately return. +type ItemIteratorG[T any] func(item T) bool + +// Ordered represents the set of types for which the '<' operator work. +type Ordered interface { + ~int | ~int8 | ~int16 | ~int32 | ~int64 | ~uint | ~uint8 | ~uint16 | ~uint32 | ~uint64 | ~float32 | ~float64 | ~string +} + +// Less[T] returns a default LessFunc that uses the '<' operator for types that support it. +func Less[T Ordered]() LessFunc[T] { + return func(a, b T) bool { return a < b } +} + +// NewOrderedG creates a new B-Tree for ordered types. +func NewOrderedG[T Ordered](degree int) *BTreeG[T] { + return NewG[T](degree, Less[T]()) +} + +// NewG creates a new B-Tree with the given degree. +// +// NewG(2), for example, will create a 2-3-4 tree (each node contains 1-3 items +// and 2-4 children). +// +// The passed-in LessFunc determines how objects of type T are ordered. +func NewG[T any](degree int, less LessFunc[T]) *BTreeG[T] { + return NewWithFreeListG(degree, less, NewFreeListG[T](DefaultFreeListSize)) +} + +// NewWithFreeListG creates a new B-Tree that uses the given node free list. +func NewWithFreeListG[T any](degree int, less LessFunc[T], f *FreeListG[T]) *BTreeG[T] { + if degree <= 1 { + panic("bad degree") + } + return &BTreeG[T]{ + degree: degree, + cow: ©OnWriteContext[T]{freelist: f, less: less}, + } +} + +// items stores items in a node. +type items[T any] []T + +// insertAt inserts a value into the given index, pushing all subsequent values +// forward. +func (s *items[T]) insertAt(index int, item T) { + var zero T + *s = append(*s, zero) + if index < len(*s) { + copy((*s)[index+1:], (*s)[index:]) + } + (*s)[index] = item +} + +// removeAt removes a value at a given index, pulling all subsequent values +// back. +func (s *items[T]) removeAt(index int) T { + item := (*s)[index] + copy((*s)[index:], (*s)[index+1:]) + var zero T + (*s)[len(*s)-1] = zero + *s = (*s)[:len(*s)-1] + return item +} + +// pop removes and returns the last element in the list. +func (s *items[T]) pop() (out T) { + index := len(*s) - 1 + out = (*s)[index] + var zero T + (*s)[index] = zero + *s = (*s)[:index] + return +} + +// truncate truncates this instance at index so that it contains only the +// first index items. index must be less than or equal to length. +func (s *items[T]) truncate(index int) { + var toClear items[T] + *s, toClear = (*s)[:index], (*s)[index:] + var zero T + for i := 0; i < len(toClear); i++ { + toClear[i] = zero + } +} + +// find returns the index where the given item should be inserted into this +// list. 'found' is true if the item already exists in the list at the given +// index. +func (s items[T]) find(item T, less func(T, T) bool) (index int, found bool) { + i := sort.Search(len(s), func(i int) bool { + return less(item, s[i]) + }) + if i > 0 && !less(s[i-1], item) { + return i - 1, true + } + return i, false +} + +// node is an internal node in a tree. +// +// It must at all times maintain the invariant that either +// * len(children) == 0, len(items) unconstrained +// * len(children) == len(items) + 1 +type node[T any] struct { + items items[T] + children items[*node[T]] + cow *copyOnWriteContext[T] +} + +func (n *node[T]) mutableFor(cow *copyOnWriteContext[T]) *node[T] { + if n.cow == cow { + return n + } + out := cow.newNode() + if cap(out.items) >= len(n.items) { + out.items = out.items[:len(n.items)] + } else { + out.items = make(items[T], len(n.items), cap(n.items)) + } + copy(out.items, n.items) + // Copy children + if cap(out.children) >= len(n.children) { + out.children = out.children[:len(n.children)] + } else { + out.children = make(items[*node[T]], len(n.children), cap(n.children)) + } + copy(out.children, n.children) + return out +} + +func (n *node[T]) mutableChild(i int) *node[T] { + c := n.children[i].mutableFor(n.cow) + n.children[i] = c + return c +} + +// split splits the given node at the given index. The current node shrinks, +// and this function returns the item that existed at that index and a new node +// containing all items/children after it. +func (n *node[T]) split(i int) (T, *node[T]) { + item := n.items[i] + next := n.cow.newNode() + next.items = append(next.items, n.items[i+1:]...) + n.items.truncate(i) + if len(n.children) > 0 { + next.children = append(next.children, n.children[i+1:]...) + n.children.truncate(i + 1) + } + return item, next +} + +// maybeSplitChild checks if a child should be split, and if so splits it. +// Returns whether or not a split occurred. +func (n *node[T]) maybeSplitChild(i, maxItems int) bool { + if len(n.children[i].items) < maxItems { + return false + } + first := n.mutableChild(i) + item, second := first.split(maxItems / 2) + n.items.insertAt(i, item) + n.children.insertAt(i+1, second) + return true +} + +// insert inserts an item into the subtree rooted at this node, making sure +// no nodes in the subtree exceed maxItems items. Should an equivalent item be +// be found/replaced by insert, it will be returned. +func (n *node[T]) insert(item T, maxItems int) (_ T, _ bool) { + i, found := n.items.find(item, n.cow.less) + if found { + out := n.items[i] + n.items[i] = item + return out, true + } + if len(n.children) == 0 { + n.items.insertAt(i, item) + return + } + if n.maybeSplitChild(i, maxItems) { + inTree := n.items[i] + switch { + case n.cow.less(item, inTree): + // no change, we want first split node + case n.cow.less(inTree, item): + i++ // we want second split node + default: + out := n.items[i] + n.items[i] = item + return out, true + } + } + return n.mutableChild(i).insert(item, maxItems) +} + +// get finds the given key in the subtree and returns it. +func (n *node[T]) get(key T) (_ T, _ bool) { + i, found := n.items.find(key, n.cow.less) + if found { + return n.items[i], true + } else if len(n.children) > 0 { + return n.children[i].get(key) + } + return +} + +// min returns the first item in the subtree. +func min[T any](n *node[T]) (_ T, found bool) { + if n == nil { + return + } + for len(n.children) > 0 { + n = n.children[0] + } + if len(n.items) == 0 { + return + } + return n.items[0], true +} + +// max returns the last item in the subtree. +func max[T any](n *node[T]) (_ T, found bool) { + if n == nil { + return + } + for len(n.children) > 0 { + n = n.children[len(n.children)-1] + } + if len(n.items) == 0 { + return + } + return n.items[len(n.items)-1], true +} + +// toRemove details what item to remove in a node.remove call. +type toRemove int + +const ( + removeItem toRemove = iota // removes the given item + removeMin // removes smallest item in the subtree + removeMax // removes largest item in the subtree +) + +// remove removes an item from the subtree rooted at this node. +func (n *node[T]) remove(item T, minItems int, typ toRemove) (_ T, _ bool) { + var i int + var found bool + switch typ { + case removeMax: + if len(n.children) == 0 { + return n.items.pop(), true + } + i = len(n.items) + case removeMin: + if len(n.children) == 0 { + return n.items.removeAt(0), true + } + i = 0 + case removeItem: + i, found = n.items.find(item, n.cow.less) + if len(n.children) == 0 { + if found { + return n.items.removeAt(i), true + } + return + } + default: + panic("invalid type") + } + // If we get to here, we have children. + if len(n.children[i].items) <= minItems { + return n.growChildAndRemove(i, item, minItems, typ) + } + child := n.mutableChild(i) + // Either we had enough items to begin with, or we've done some + // merging/stealing, because we've got enough now and we're ready to return + // stuff. + if found { + // The item exists at index 'i', and the child we've selected can give us a + // predecessor, since if we've gotten here it's got > minItems items in it. + out := n.items[i] + // We use our special-case 'remove' call with typ=maxItem to pull the + // predecessor of item i (the rightmost leaf of our immediate left child) + // and set it into where we pulled the item from. + var zero T + n.items[i], _ = child.remove(zero, minItems, removeMax) + return out, true + } + // Final recursive call. Once we're here, we know that the item isn't in this + // node and that the child is big enough to remove from. + return child.remove(item, minItems, typ) +} + +// growChildAndRemove grows child 'i' to make sure it's possible to remove an +// item from it while keeping it at minItems, then calls remove to actually +// remove it. +// +// Most documentation says we have to do two sets of special casing: +// 1) item is in this node +// 2) item is in child +// In both cases, we need to handle the two subcases: +// A) node has enough values that it can spare one +// B) node doesn't have enough values +// For the latter, we have to check: +// a) left sibling has node to spare +// b) right sibling has node to spare +// c) we must merge +// To simplify our code here, we handle cases #1 and #2 the same: +// If a node doesn't have enough items, we make sure it does (using a,b,c). +// We then simply redo our remove call, and the second time (regardless of +// whether we're in case 1 or 2), we'll have enough items and can guarantee +// that we hit case A. +func (n *node[T]) growChildAndRemove(i int, item T, minItems int, typ toRemove) (T, bool) { + if i > 0 && len(n.children[i-1].items) > minItems { + // Steal from left child + child := n.mutableChild(i) + stealFrom := n.mutableChild(i - 1) + stolenItem := stealFrom.items.pop() + child.items.insertAt(0, n.items[i-1]) + n.items[i-1] = stolenItem + if len(stealFrom.children) > 0 { + child.children.insertAt(0, stealFrom.children.pop()) + } + } else if i < len(n.items) && len(n.children[i+1].items) > minItems { + // steal from right child + child := n.mutableChild(i) + stealFrom := n.mutableChild(i + 1) + stolenItem := stealFrom.items.removeAt(0) + child.items = append(child.items, n.items[i]) + n.items[i] = stolenItem + if len(stealFrom.children) > 0 { + child.children = append(child.children, stealFrom.children.removeAt(0)) + } + } else { + if i >= len(n.items) { + i-- + } + child := n.mutableChild(i) + // merge with right child + mergeItem := n.items.removeAt(i) + mergeChild := n.children.removeAt(i + 1) + child.items = append(child.items, mergeItem) + child.items = append(child.items, mergeChild.items...) + child.children = append(child.children, mergeChild.children...) + n.cow.freeNode(mergeChild) + } + return n.remove(item, minItems, typ) +} + +type direction int + +const ( + descend = direction(-1) + ascend = direction(+1) +) + +type optionalItem[T any] struct { + item T + valid bool +} + +func optional[T any](item T) optionalItem[T] { + return optionalItem[T]{item: item, valid: true} +} +func empty[T any]() optionalItem[T] { + return optionalItem[T]{} +} + +// iterate provides a simple method for iterating over elements in the tree. +// +// When ascending, the 'start' should be less than 'stop' and when descending, +// the 'start' should be greater than 'stop'. Setting 'includeStart' to true +// will force the iterator to include the first item when it equals 'start', +// thus creating a "greaterOrEqual" or "lessThanEqual" rather than just a +// "greaterThan" or "lessThan" queries. +func (n *node[T]) iterate(dir direction, start, stop optionalItem[T], includeStart bool, hit bool, iter ItemIteratorG[T]) (bool, bool) { + var ok, found bool + var index int + switch dir { + case ascend: + if start.valid { + index, _ = n.items.find(start.item, n.cow.less) + } + for i := index; i < len(n.items); i++ { + if len(n.children) > 0 { + if hit, ok = n.children[i].iterate(dir, start, stop, includeStart, hit, iter); !ok { + return hit, false + } + } + if !includeStart && !hit && start.valid && !n.cow.less(start.item, n.items[i]) { + hit = true + continue + } + hit = true + if stop.valid && !n.cow.less(n.items[i], stop.item) { + return hit, false + } + if !iter(n.items[i]) { + return hit, false + } + } + if len(n.children) > 0 { + if hit, ok = n.children[len(n.children)-1].iterate(dir, start, stop, includeStart, hit, iter); !ok { + return hit, false + } + } + case descend: + if start.valid { + index, found = n.items.find(start.item, n.cow.less) + if !found { + index = index - 1 + } + } else { + index = len(n.items) - 1 + } + for i := index; i >= 0; i-- { + if start.valid && !n.cow.less(n.items[i], start.item) { + if !includeStart || hit || n.cow.less(start.item, n.items[i]) { + continue + } + } + if len(n.children) > 0 { + if hit, ok = n.children[i+1].iterate(dir, start, stop, includeStart, hit, iter); !ok { + return hit, false + } + } + if stop.valid && !n.cow.less(stop.item, n.items[i]) { + return hit, false // continue + } + hit = true + if !iter(n.items[i]) { + return hit, false + } + } + if len(n.children) > 0 { + if hit, ok = n.children[0].iterate(dir, start, stop, includeStart, hit, iter); !ok { + return hit, false + } + } + } + return hit, true +} + +// print is used for testing/debugging purposes. +func (n *node[T]) print(w io.Writer, level int) { + fmt.Fprintf(w, "%sNODE:%v\n", strings.Repeat(" ", level), n.items) + for _, c := range n.children { + c.print(w, level+1) + } +} + +// BTreeG is a generic implementation of a B-Tree. +// +// BTreeG stores items of type T in an ordered structure, allowing easy insertion, +// removal, and iteration. +// +// Write operations are not safe for concurrent mutation by multiple +// goroutines, but Read operations are. +type BTreeG[T any] struct { + degree int + length int + root *node[T] + cow *copyOnWriteContext[T] +} + +// LessFunc[T] determines how to order a type 'T'. It should implement a strict +// ordering, and should return true if within that ordering, 'a' < 'b'. +type LessFunc[T any] func(a, b T) bool + +// copyOnWriteContext pointers determine node ownership... a tree with a write +// context equivalent to a node's write context is allowed to modify that node. +// A tree whose write context does not match a node's is not allowed to modify +// it, and must create a new, writable copy (IE: it's a Clone). +// +// When doing any write operation, we maintain the invariant that the current +// node's context is equal to the context of the tree that requested the write. +// We do this by, before we descend into any node, creating a copy with the +// correct context if the contexts don't match. +// +// Since the node we're currently visiting on any write has the requesting +// tree's context, that node is modifiable in place. Children of that node may +// not share context, but before we descend into them, we'll make a mutable +// copy. +type copyOnWriteContext[T any] struct { + freelist *FreeListG[T] + less LessFunc[T] +} + +// Clone clones the btree, lazily. Clone should not be called concurrently, +// but the original tree (t) and the new tree (t2) can be used concurrently +// once the Clone call completes. +// +// The internal tree structure of b is marked read-only and shared between t and +// t2. Writes to both t and t2 use copy-on-write logic, creating new nodes +// whenever one of b's original nodes would have been modified. Read operations +// should have no performance degredation. Write operations for both t and t2 +// will initially experience minor slow-downs caused by additional allocs and +// copies due to the aforementioned copy-on-write logic, but should converge to +// the original performance characteristics of the original tree. +func (t *BTreeG[T]) Clone() (t2 *BTreeG[T]) { + // Create two entirely new copy-on-write contexts. + // This operation effectively creates three trees: + // the original, shared nodes (old b.cow) + // the new b.cow nodes + // the new out.cow nodes + cow1, cow2 := *t.cow, *t.cow + out := *t + t.cow = &cow1 + out.cow = &cow2 + return &out +} + +// maxItems returns the max number of items to allow per node. +func (t *BTreeG[T]) maxItems() int { + return t.degree*2 - 1 +} + +// minItems returns the min number of items to allow per node (ignored for the +// root node). +func (t *BTreeG[T]) minItems() int { + return t.degree - 1 +} + +func (c *copyOnWriteContext[T]) newNode() (n *node[T]) { + n = c.freelist.newNode() + n.cow = c + return +} + +type freeType int + +const ( + ftFreelistFull freeType = iota // node was freed (available for GC, not stored in freelist) + ftStored // node was stored in the freelist for later use + ftNotOwned // node was ignored by COW, since it's owned by another one +) + +// freeNode frees a node within a given COW context, if it's owned by that +// context. It returns what happened to the node (see freeType const +// documentation). +func (c *copyOnWriteContext[T]) freeNode(n *node[T]) freeType { + if n.cow == c { + // clear to allow GC + n.items.truncate(0) + n.children.truncate(0) + n.cow = nil + if c.freelist.freeNode(n) { + return ftStored + } else { + return ftFreelistFull + } + } else { + return ftNotOwned + } +} + +// ReplaceOrInsert adds the given item to the tree. If an item in the tree +// already equals the given one, it is removed from the tree and returned, +// and the second return value is true. Otherwise, (zeroValue, false) +// +// nil cannot be added to the tree (will panic). +func (t *BTreeG[T]) ReplaceOrInsert(item T) (_ T, _ bool) { + if t.root == nil { + t.root = t.cow.newNode() + t.root.items = append(t.root.items, item) + t.length++ + return + } else { + t.root = t.root.mutableFor(t.cow) + if len(t.root.items) >= t.maxItems() { + item2, second := t.root.split(t.maxItems() / 2) + oldroot := t.root + t.root = t.cow.newNode() + t.root.items = append(t.root.items, item2) + t.root.children = append(t.root.children, oldroot, second) + } + } + out, outb := t.root.insert(item, t.maxItems()) + if !outb { + t.length++ + } + return out, outb +} + +// Delete removes an item equal to the passed in item from the tree, returning +// it. If no such item exists, returns (zeroValue, false). +func (t *BTreeG[T]) Delete(item T) (T, bool) { + return t.deleteItem(item, removeItem) +} + +// DeleteMin removes the smallest item in the tree and returns it. +// If no such item exists, returns (zeroValue, false). +func (t *BTreeG[T]) DeleteMin() (T, bool) { + var zero T + return t.deleteItem(zero, removeMin) +} + +// DeleteMax removes the largest item in the tree and returns it. +// If no such item exists, returns (zeroValue, false). +func (t *BTreeG[T]) DeleteMax() (T, bool) { + var zero T + return t.deleteItem(zero, removeMax) +} + +func (t *BTreeG[T]) deleteItem(item T, typ toRemove) (_ T, _ bool) { + if t.root == nil || len(t.root.items) == 0 { + return + } + t.root = t.root.mutableFor(t.cow) + out, outb := t.root.remove(item, t.minItems(), typ) + if len(t.root.items) == 0 && len(t.root.children) > 0 { + oldroot := t.root + t.root = t.root.children[0] + t.cow.freeNode(oldroot) + } + if outb { + t.length-- + } + return out, outb +} + +// AscendRange calls the iterator for every value in the tree within the range +// [greaterOrEqual, lessThan), until iterator returns false. +func (t *BTreeG[T]) AscendRange(greaterOrEqual, lessThan T, iterator ItemIteratorG[T]) { + if t.root == nil { + return + } + t.root.iterate(ascend, optional[T](greaterOrEqual), optional[T](lessThan), true, false, iterator) +} + +// AscendLessThan calls the iterator for every value in the tree within the range +// [first, pivot), until iterator returns false. +func (t *BTreeG[T]) AscendLessThan(pivot T, iterator ItemIteratorG[T]) { + if t.root == nil { + return + } + t.root.iterate(ascend, empty[T](), optional(pivot), false, false, iterator) +} + +// AscendGreaterOrEqual calls the iterator for every value in the tree within +// the range [pivot, last], until iterator returns false. +func (t *BTreeG[T]) AscendGreaterOrEqual(pivot T, iterator ItemIteratorG[T]) { + if t.root == nil { + return + } + t.root.iterate(ascend, optional[T](pivot), empty[T](), true, false, iterator) +} + +// Ascend calls the iterator for every value in the tree within the range +// [first, last], until iterator returns false. +func (t *BTreeG[T]) Ascend(iterator ItemIteratorG[T]) { + if t.root == nil { + return + } + t.root.iterate(ascend, empty[T](), empty[T](), false, false, iterator) +} + +// DescendRange calls the iterator for every value in the tree within the range +// [lessOrEqual, greaterThan), until iterator returns false. +func (t *BTreeG[T]) DescendRange(lessOrEqual, greaterThan T, iterator ItemIteratorG[T]) { + if t.root == nil { + return + } + t.root.iterate(descend, optional[T](lessOrEqual), optional[T](greaterThan), true, false, iterator) +} + +// DescendLessOrEqual calls the iterator for every value in the tree within the range +// [pivot, first], until iterator returns false. +func (t *BTreeG[T]) DescendLessOrEqual(pivot T, iterator ItemIteratorG[T]) { + if t.root == nil { + return + } + t.root.iterate(descend, optional[T](pivot), empty[T](), true, false, iterator) +} + +// DescendGreaterThan calls the iterator for every value in the tree within +// the range [last, pivot), until iterator returns false. +func (t *BTreeG[T]) DescendGreaterThan(pivot T, iterator ItemIteratorG[T]) { + if t.root == nil { + return + } + t.root.iterate(descend, empty[T](), optional[T](pivot), false, false, iterator) +} + +// Descend calls the iterator for every value in the tree within the range +// [last, first], until iterator returns false. +func (t *BTreeG[T]) Descend(iterator ItemIteratorG[T]) { + if t.root == nil { + return + } + t.root.iterate(descend, empty[T](), empty[T](), false, false, iterator) +} + +// Get looks for the key item in the tree, returning it. It returns +// (zeroValue, false) if unable to find that item. +func (t *BTreeG[T]) Get(key T) (_ T, _ bool) { + if t.root == nil { + return + } + return t.root.get(key) +} + +// Min returns the smallest item in the tree, or (zeroValue, false) if the tree is empty. +func (t *BTreeG[T]) Min() (_ T, _ bool) { + return min(t.root) +} + +// Max returns the largest item in the tree, or (zeroValue, false) if the tree is empty. +func (t *BTreeG[T]) Max() (_ T, _ bool) { + return max(t.root) +} + +// Has returns true if the given key is in the tree. +func (t *BTreeG[T]) Has(key T) bool { + _, ok := t.Get(key) + return ok +} + +// Len returns the number of items currently in the tree. +func (t *BTreeG[T]) Len() int { + return t.length +} + +// Clear removes all items from the btree. If addNodesToFreelist is true, +// t's nodes are added to its freelist as part of this call, until the freelist +// is full. Otherwise, the root node is simply dereferenced and the subtree +// left to Go's normal GC processes. +// +// This can be much faster +// than calling Delete on all elements, because that requires finding/removing +// each element in the tree and updating the tree accordingly. It also is +// somewhat faster than creating a new tree to replace the old one, because +// nodes from the old tree are reclaimed into the freelist for use by the new +// one, instead of being lost to the garbage collector. +// +// This call takes: +// O(1): when addNodesToFreelist is false, this is a single operation. +// O(1): when the freelist is already full, it breaks out immediately +// O(freelist size): when the freelist is empty and the nodes are all owned +// by this tree, nodes are added to the freelist until full. +// O(tree size): when all nodes are owned by another tree, all nodes are +// iterated over looking for nodes to add to the freelist, and due to +// ownership, none are. +func (t *BTreeG[T]) Clear(addNodesToFreelist bool) { + if t.root != nil && addNodesToFreelist { + t.root.reset(t.cow) + } + t.root, t.length = nil, 0 +} + +// reset returns a subtree to the freelist. It breaks out immediately if the +// freelist is full, since the only benefit of iterating is to fill that +// freelist up. Returns true if parent reset call should continue. +func (n *node[T]) reset(c *copyOnWriteContext[T]) bool { + for _, child := range n.children { + if !child.reset(c) { + return false + } + } + return c.freeNode(n) != ftFreelistFull +} + +// Int implements the Item interface for integers. +type Int int + +// Less returns true if int(a) < int(b). +func (a Int) Less(b Item) bool { + return a < b.(Int) +} + +// BTree is an implementation of a B-Tree. +// +// BTree stores Item instances in an ordered structure, allowing easy insertion, +// removal, and iteration. +// +// Write operations are not safe for concurrent mutation by multiple +// goroutines, but Read operations are. +type BTree BTreeG[Item] + +var itemLess LessFunc[Item] = func(a, b Item) bool { + return a.Less(b) +} + +// New creates a new B-Tree with the given degree. +// +// New(2), for example, will create a 2-3-4 tree (each node contains 1-3 items +// and 2-4 children). +func New(degree int) *BTree { + return (*BTree)(NewG[Item](degree, itemLess)) +} + +// FreeList represents a free list of btree nodes. By default each +// BTree has its own FreeList, but multiple BTrees can share the same +// FreeList. +// Two Btrees using the same freelist are safe for concurrent write access. +type FreeList FreeListG[Item] + +// NewFreeList creates a new free list. +// size is the maximum size of the returned free list. +func NewFreeList(size int) *FreeList { + return (*FreeList)(NewFreeListG[Item](size)) +} + +// NewWithFreeList creates a new B-Tree that uses the given node free list. +func NewWithFreeList(degree int, f *FreeList) *BTree { + return (*BTree)(NewWithFreeListG[Item](degree, itemLess, (*FreeListG[Item])(f))) +} + +// ItemIterator allows callers of Ascend* to iterate in-order over portions of +// the tree. When this function returns false, iteration will stop and the +// associated Ascend* function will immediately return. +type ItemIterator ItemIteratorG[Item] + +// Clone clones the btree, lazily. Clone should not be called concurrently, +// but the original tree (t) and the new tree (t2) can be used concurrently +// once the Clone call completes. +// +// The internal tree structure of b is marked read-only and shared between t and +// t2. Writes to both t and t2 use copy-on-write logic, creating new nodes +// whenever one of b's original nodes would have been modified. Read operations +// should have no performance degredation. Write operations for both t and t2 +// will initially experience minor slow-downs caused by additional allocs and +// copies due to the aforementioned copy-on-write logic, but should converge to +// the original performance characteristics of the original tree. +func (t *BTree) Clone() (t2 *BTree) { + return (*BTree)((*BTreeG[Item])(t).Clone()) +} + +// Delete removes an item equal to the passed in item from the tree, returning +// it. If no such item exists, returns nil. +func (t *BTree) Delete(item Item) Item { + i, _ := (*BTreeG[Item])(t).Delete(item) + return i +} + +// DeleteMax removes the largest item in the tree and returns it. +// If no such item exists, returns nil. +func (t *BTree) DeleteMax() Item { + i, _ := (*BTreeG[Item])(t).DeleteMax() + return i +} + +// DeleteMin removes the smallest item in the tree and returns it. +// If no such item exists, returns nil. +func (t *BTree) DeleteMin() Item { + i, _ := (*BTreeG[Item])(t).DeleteMin() + return i +} + +// Get looks for the key item in the tree, returning it. It returns nil if +// unable to find that item. +func (t *BTree) Get(key Item) Item { + i, _ := (*BTreeG[Item])(t).Get(key) + return i +} + +// Max returns the largest item in the tree, or nil if the tree is empty. +func (t *BTree) Max() Item { + i, _ := (*BTreeG[Item])(t).Max() + return i +} + +// Min returns the smallest item in the tree, or nil if the tree is empty. +func (t *BTree) Min() Item { + i, _ := (*BTreeG[Item])(t).Min() + return i +} + +// Has returns true if the given key is in the tree. +func (t *BTree) Has(key Item) bool { + return (*BTreeG[Item])(t).Has(key) +} + +// ReplaceOrInsert adds the given item to the tree. If an item in the tree +// already equals the given one, it is removed from the tree and returned. +// Otherwise, nil is returned. +// +// nil cannot be added to the tree (will panic). +func (t *BTree) ReplaceOrInsert(item Item) Item { + i, _ := (*BTreeG[Item])(t).ReplaceOrInsert(item) + return i +} + +// AscendRange calls the iterator for every value in the tree within the range +// [greaterOrEqual, lessThan), until iterator returns false. +func (t *BTree) AscendRange(greaterOrEqual, lessThan Item, iterator ItemIterator) { + (*BTreeG[Item])(t).AscendRange(greaterOrEqual, lessThan, (ItemIteratorG[Item])(iterator)) +} + +// AscendLessThan calls the iterator for every value in the tree within the range +// [first, pivot), until iterator returns false. +func (t *BTree) AscendLessThan(pivot Item, iterator ItemIterator) { + (*BTreeG[Item])(t).AscendLessThan(pivot, (ItemIteratorG[Item])(iterator)) +} + +// AscendGreaterOrEqual calls the iterator for every value in the tree within +// the range [pivot, last], until iterator returns false. +func (t *BTree) AscendGreaterOrEqual(pivot Item, iterator ItemIterator) { + (*BTreeG[Item])(t).AscendGreaterOrEqual(pivot, (ItemIteratorG[Item])(iterator)) +} + +// Ascend calls the iterator for every value in the tree within the range +// [first, last], until iterator returns false. +func (t *BTree) Ascend(iterator ItemIterator) { + (*BTreeG[Item])(t).Ascend((ItemIteratorG[Item])(iterator)) +} + +// DescendRange calls the iterator for every value in the tree within the range +// [lessOrEqual, greaterThan), until iterator returns false. +func (t *BTree) DescendRange(lessOrEqual, greaterThan Item, iterator ItemIterator) { + (*BTreeG[Item])(t).DescendRange(lessOrEqual, greaterThan, (ItemIteratorG[Item])(iterator)) +} + +// DescendLessOrEqual calls the iterator for every value in the tree within the range +// [pivot, first], until iterator returns false. +func (t *BTree) DescendLessOrEqual(pivot Item, iterator ItemIterator) { + (*BTreeG[Item])(t).DescendLessOrEqual(pivot, (ItemIteratorG[Item])(iterator)) +} + +// DescendGreaterThan calls the iterator for every value in the tree within +// the range [last, pivot), until iterator returns false. +func (t *BTree) DescendGreaterThan(pivot Item, iterator ItemIterator) { + (*BTreeG[Item])(t).DescendGreaterThan(pivot, (ItemIteratorG[Item])(iterator)) +} + +// Descend calls the iterator for every value in the tree within the range +// [last, first], until iterator returns false. +func (t *BTree) Descend(iterator ItemIterator) { + (*BTreeG[Item])(t).Descend((ItemIteratorG[Item])(iterator)) +} + +// Len returns the number of items currently in the tree. +func (t *BTree) Len() int { + return (*BTreeG[Item])(t).Len() +} + +// Clear removes all items from the btree. If addNodesToFreelist is true, +// t's nodes are added to its freelist as part of this call, until the freelist +// is full. Otherwise, the root node is simply dereferenced and the subtree +// left to Go's normal GC processes. +// +// This can be much faster +// than calling Delete on all elements, because that requires finding/removing +// each element in the tree and updating the tree accordingly. It also is +// somewhat faster than creating a new tree to replace the old one, because +// nodes from the old tree are reclaimed into the freelist for use by the new +// one, instead of being lost to the garbage collector. +// +// This call takes: +// O(1): when addNodesToFreelist is false, this is a single operation. +// O(1): when the freelist is already full, it breaks out immediately +// O(freelist size): when the freelist is empty and the nodes are all owned +// by this tree, nodes are added to the freelist until full. +// O(tree size): when all nodes are owned by another tree, all nodes are +// iterated over looking for nodes to add to the freelist, and due to +// ownership, none are. +func (t *BTree) Clear(addNodesToFreelist bool) { + (*BTreeG[Item])(t).Clear(addNodesToFreelist) +} diff --git a/vendor/golang.org/x/exp/maps/maps.go b/vendor/golang.org/x/exp/maps/maps.go deleted file mode 100644 index ecc0dabb7..000000000 --- a/vendor/golang.org/x/exp/maps/maps.go +++ /dev/null @@ -1,94 +0,0 @@ -// Copyright 2021 The Go Authors. All rights reserved. -// Use of this source code is governed by a BSD-style -// license that can be found in the LICENSE file. - -// Package maps defines various functions useful with maps of any type. -package maps - -// Keys returns the keys of the map m. -// The keys will be in an indeterminate order. -func Keys[M ~map[K]V, K comparable, V any](m M) []K { - r := make([]K, 0, len(m)) - for k := range m { - r = append(r, k) - } - return r -} - -// Values returns the values of the map m. -// The values will be in an indeterminate order. -func Values[M ~map[K]V, K comparable, V any](m M) []V { - r := make([]V, 0, len(m)) - for _, v := range m { - r = append(r, v) - } - return r -} - -// Equal reports whether two maps contain the same key/value pairs. -// Values are compared using ==. -func Equal[M1, M2 ~map[K]V, K, V comparable](m1 M1, m2 M2) bool { - if len(m1) != len(m2) { - return false - } - for k, v1 := range m1 { - if v2, ok := m2[k]; !ok || v1 != v2 { - return false - } - } - return true -} - -// EqualFunc is like Equal, but compares values using eq. -// Keys are still compared with ==. -func EqualFunc[M1 ~map[K]V1, M2 ~map[K]V2, K comparable, V1, V2 any](m1 M1, m2 M2, eq func(V1, V2) bool) bool { - if len(m1) != len(m2) { - return false - } - for k, v1 := range m1 { - if v2, ok := m2[k]; !ok || !eq(v1, v2) { - return false - } - } - return true -} - -// Clear removes all entries from m, leaving it empty. -func Clear[M ~map[K]V, K comparable, V any](m M) { - for k := range m { - delete(m, k) - } -} - -// Clone returns a copy of m. This is a shallow clone: -// the new keys and values are set using ordinary assignment. -func Clone[M ~map[K]V, K comparable, V any](m M) M { - // Preserve nil in case it matters. - if m == nil { - return nil - } - r := make(M, len(m)) - for k, v := range m { - r[k] = v - } - return r -} - -// Copy copies all key/value pairs in src adding them to dst. -// When a key in src is already present in dst, -// the value in dst will be overwritten by the value associated -// with the key in src. -func Copy[M1 ~map[K]V, M2 ~map[K]V, K comparable, V any](dst M1, src M2) { - for k, v := range src { - dst[k] = v - } -} - -// DeleteFunc deletes any key/value pairs from m for which del returns true. -func DeleteFunc[M ~map[K]V, K comparable, V any](m M, del func(K, V) bool) { - for k, v := range m { - if del(k, v) { - delete(m, k) - } - } -} diff --git a/vendor/golang.org/x/sync/errgroup/errgroup.go b/vendor/golang.org/x/sync/errgroup/errgroup.go new file mode 100644 index 000000000..948a3ee63 --- /dev/null +++ b/vendor/golang.org/x/sync/errgroup/errgroup.go @@ -0,0 +1,135 @@ +// Copyright 2016 The Go Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +// Package errgroup provides synchronization, error propagation, and Context +// cancelation for groups of goroutines working on subtasks of a common task. +// +// [errgroup.Group] is related to [sync.WaitGroup] but adds handling of tasks +// returning errors. +package errgroup + +import ( + "context" + "fmt" + "sync" +) + +type token struct{} + +// A Group is a collection of goroutines working on subtasks that are part of +// the same overall task. +// +// A zero Group is valid, has no limit on the number of active goroutines, +// and does not cancel on error. +type Group struct { + cancel func(error) + + wg sync.WaitGroup + + sem chan token + + errOnce sync.Once + err error +} + +func (g *Group) done() { + if g.sem != nil { + <-g.sem + } + g.wg.Done() +} + +// WithContext returns a new Group and an associated Context derived from ctx. +// +// The derived Context is canceled the first time a function passed to Go +// returns a non-nil error or the first time Wait returns, whichever occurs +// first. +func WithContext(ctx context.Context) (*Group, context.Context) { + ctx, cancel := withCancelCause(ctx) + return &Group{cancel: cancel}, ctx +} + +// Wait blocks until all function calls from the Go method have returned, then +// returns the first non-nil error (if any) from them. +func (g *Group) Wait() error { + g.wg.Wait() + if g.cancel != nil { + g.cancel(g.err) + } + return g.err +} + +// Go calls the given function in a new goroutine. +// It blocks until the new goroutine can be added without the number of +// active goroutines in the group exceeding the configured limit. +// +// The first call to return a non-nil error cancels the group's context, if the +// group was created by calling WithContext. The error will be returned by Wait. +func (g *Group) Go(f func() error) { + if g.sem != nil { + g.sem <- token{} + } + + g.wg.Add(1) + go func() { + defer g.done() + + if err := f(); err != nil { + g.errOnce.Do(func() { + g.err = err + if g.cancel != nil { + g.cancel(g.err) + } + }) + } + }() +} + +// TryGo calls the given function in a new goroutine only if the number of +// active goroutines in the group is currently below the configured limit. +// +// The return value reports whether the goroutine was started. +func (g *Group) TryGo(f func() error) bool { + if g.sem != nil { + select { + case g.sem <- token{}: + // Note: this allows barging iff channels in general allow barging. + default: + return false + } + } + + g.wg.Add(1) + go func() { + defer g.done() + + if err := f(); err != nil { + g.errOnce.Do(func() { + g.err = err + if g.cancel != nil { + g.cancel(g.err) + } + }) + } + }() + return true +} + +// SetLimit limits the number of active goroutines in this group to at most n. +// A negative value indicates no limit. +// +// Any subsequent call to the Go method will block until it can add an active +// goroutine without exceeding the configured limit. +// +// The limit must not be modified while any goroutines in the group are active. +func (g *Group) SetLimit(n int) { + if n < 0 { + g.sem = nil + return + } + if len(g.sem) != 0 { + panic(fmt.Errorf("errgroup: modify limit while %v goroutines in the group are still active", len(g.sem))) + } + g.sem = make(chan token, n) +} diff --git a/vendor/golang.org/x/sync/errgroup/go120.go b/vendor/golang.org/x/sync/errgroup/go120.go new file mode 100644 index 000000000..f93c740b6 --- /dev/null +++ b/vendor/golang.org/x/sync/errgroup/go120.go @@ -0,0 +1,13 @@ +// Copyright 2023 The Go Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +//go:build go1.20 + +package errgroup + +import "context" + +func withCancelCause(parent context.Context) (context.Context, func(error)) { + return context.WithCancelCause(parent) +} diff --git a/vendor/golang.org/x/sync/errgroup/pre_go120.go b/vendor/golang.org/x/sync/errgroup/pre_go120.go new file mode 100644 index 000000000..88ce33434 --- /dev/null +++ b/vendor/golang.org/x/sync/errgroup/pre_go120.go @@ -0,0 +1,14 @@ +// Copyright 2023 The Go Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +//go:build !go1.20 + +package errgroup + +import "context" + +func withCancelCause(parent context.Context) (context.Context, func(error)) { + ctx, cancel := context.WithCancel(parent) + return ctx, func(error) { cancel() } +} diff --git a/vendor/modules.txt b/vendor/modules.txt index 12f47331e..77f68a4ed 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -364,8 +364,8 @@ github.com/golang/protobuf/ptypes github.com/golang/protobuf/ptypes/any github.com/golang/protobuf/ptypes/duration github.com/golang/protobuf/ptypes/timestamp -# github.com/google/btree v1.0.1 -## explicit; go 1.12 +# github.com/google/btree v1.1.3 +## explicit; go 1.18 github.com/google/btree # github.com/google/cadvisor v0.51.0 ## explicit; go 1.21 @@ -856,7 +856,6 @@ golang.org/x/crypto/ssh/internal/bcrypt_pbkdf # golang.org/x/exp v0.0.0-20240719175910-8a7402abbf56 ## explicit; go 1.20 golang.org/x/exp/constraints -golang.org/x/exp/maps golang.org/x/exp/slices # golang.org/x/net v0.34.0 ## explicit; go 1.18 @@ -879,6 +878,7 @@ golang.org/x/oauth2 golang.org/x/oauth2/internal # golang.org/x/sync v0.10.0 ## explicit; go 1.18 +golang.org/x/sync/errgroup golang.org/x/sync/singleflight # golang.org/x/sys v0.29.0 ## explicit; go 1.18 @@ -1124,7 +1124,7 @@ k8s.io/api/storage/v1 k8s.io/api/storage/v1alpha1 k8s.io/api/storage/v1beta1 k8s.io/api/storagemigration/v1alpha1 -# k8s.io/apiextensions-apiserver v0.31.1 => k8s.io/apiextensions-apiserver v0.32.1 +# k8s.io/apiextensions-apiserver v0.32.0 => k8s.io/apiextensions-apiserver v0.32.1 ## explicit; go 1.23.0 k8s.io/apiextensions-apiserver/pkg/apis/apiextensions k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1 @@ -2035,8 +2035,8 @@ sigs.k8s.io/apiserver-network-proxy/konnectivity-client/pkg/client sigs.k8s.io/apiserver-network-proxy/konnectivity-client/pkg/client/metrics sigs.k8s.io/apiserver-network-proxy/konnectivity-client/pkg/common/metrics sigs.k8s.io/apiserver-network-proxy/konnectivity-client/proto/client -# sigs.k8s.io/controller-runtime v0.19.4 -## explicit; go 1.22.0 +# sigs.k8s.io/controller-runtime v0.20.1 +## explicit; go 1.23.0 sigs.k8s.io/controller-runtime/pkg/cache sigs.k8s.io/controller-runtime/pkg/cache/internal sigs.k8s.io/controller-runtime/pkg/certwatcher @@ -2047,6 +2047,7 @@ sigs.k8s.io/controller-runtime/pkg/client/config sigs.k8s.io/controller-runtime/pkg/cluster sigs.k8s.io/controller-runtime/pkg/config sigs.k8s.io/controller-runtime/pkg/controller +sigs.k8s.io/controller-runtime/pkg/controller/priorityqueue sigs.k8s.io/controller-runtime/pkg/event sigs.k8s.io/controller-runtime/pkg/handler sigs.k8s.io/controller-runtime/pkg/healthz @@ -2055,6 +2056,7 @@ sigs.k8s.io/controller-runtime/pkg/internal/controller/metrics sigs.k8s.io/controller-runtime/pkg/internal/field/selector sigs.k8s.io/controller-runtime/pkg/internal/httpserver sigs.k8s.io/controller-runtime/pkg/internal/log +sigs.k8s.io/controller-runtime/pkg/internal/metrics sigs.k8s.io/controller-runtime/pkg/internal/recorder sigs.k8s.io/controller-runtime/pkg/internal/source sigs.k8s.io/controller-runtime/pkg/internal/syncs diff --git a/vendor/sigs.k8s.io/controller-runtime/pkg/cache/cache.go b/vendor/sigs.k8s.io/controller-runtime/pkg/cache/cache.go index 406380d32..8f14bfdbf 100644 --- a/vendor/sigs.k8s.io/controller-runtime/pkg/cache/cache.go +++ b/vendor/sigs.k8s.io/controller-runtime/pkg/cache/cache.go @@ -19,11 +19,12 @@ package cache import ( "context" "fmt" + "maps" "net/http" + "slices" "sort" "time" - "golang.org/x/exp/maps" corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/meta" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -231,15 +232,16 @@ type Options struct { // This will be used for all object types, unless it is set in ByObject or // DefaultNamespaces. // - // Defaults to false. + // Defaults to true. DefaultEnableWatchBookmarks *bool // ByObject restricts the cache's ListWatch to the desired fields per GVK at the specified object. // If unset, this will fall through to the Default* settings. ByObject map[client.Object]ByObject - // newInformer allows overriding of NewSharedIndexInformer for testing. - newInformer *func(toolscache.ListerWatcher, runtime.Object, time.Duration, toolscache.Indexers) toolscache.SharedIndexInformer + // NewInformer allows overriding of NewSharedIndexInformer, for example for testing + // or if someone wants to write their own Informer. + NewInformer func(toolscache.ListerWatcher, runtime.Object, time.Duration, toolscache.Indexers) toolscache.SharedIndexInformer } // ByObject offers more fine-grained control over the cache's ListWatch by object. @@ -291,7 +293,7 @@ type ByObject struct { // assume bookmarks are returned at any specific interval, nor may they // assume the server will send any BOOKMARK event during a session. // - // Defaults to false. + // Defaults to true. EnableWatchBookmarks *bool } @@ -326,7 +328,7 @@ type Config struct { // assume bookmarks are returned at any specific interval, nor may they // assume the server will send any BOOKMARK event during a session. // - // Defaults to false. + // Defaults to true. EnableWatchBookmarks *bool } @@ -430,8 +432,8 @@ func newCache(restConfig *rest.Config, opts Options) newCacheFunc { Transform: config.Transform, WatchErrorHandler: opts.DefaultWatchErrorHandler, UnsafeDisableDeepCopy: ptr.Deref(config.UnsafeDisableDeepCopy, false), - EnableWatchBookmarks: ptr.Deref(config.EnableWatchBookmarks, false), - NewInformer: opts.newInformer, + EnableWatchBookmarks: ptr.Deref(config.EnableWatchBookmarks, true), + NewInformer: opts.NewInformer, }), readerFailOnMissingInformer: opts.ReaderFailOnMissingInformer, } @@ -467,6 +469,8 @@ func defaultOpts(config *rest.Config, opts Options) (Options, error) { } } + opts.ByObject = maps.Clone(opts.ByObject) + opts.DefaultNamespaces = maps.Clone(opts.DefaultNamespaces) for obj, byObject := range opts.ByObject { isNamespaced, err := apiutil.IsObjectNamespaced(obj, opts.Scheme, opts.Mapper) if err != nil { @@ -478,6 +482,8 @@ func defaultOpts(config *rest.Config, opts Options) (Options, error) { if isNamespaced && byObject.Namespaces == nil { byObject.Namespaces = maps.Clone(opts.DefaultNamespaces) + } else { + byObject.Namespaces = maps.Clone(byObject.Namespaces) } // Default the namespace-level configs first, because they need to use the undefaulted type-level config @@ -485,7 +491,6 @@ func defaultOpts(config *rest.Config, opts Options) (Options, error) { for namespace, config := range byObject.Namespaces { // 1. Default from the undefaulted type-level config config = defaultConfig(config, byObjectToConfig(byObject)) - // 2. Default from the namespace-level config. This was defaulted from the global default config earlier, but // might not have an entry for the current namespace. if defaultNamespaceSettings, hasDefaultNamespace := opts.DefaultNamespaces[namespace]; hasDefaultNamespace { @@ -498,7 +503,7 @@ func defaultOpts(config *rest.Config, opts Options) (Options, error) { if namespace == metav1.NamespaceAll { config.FieldSelector = fields.AndSelectors( appendIfNotNil( - namespaceAllSelector(maps.Keys(byObject.Namespaces)), + namespaceAllSelector(slices.Collect(maps.Keys(byObject.Namespaces))), config.FieldSelector, )..., ) @@ -529,7 +534,7 @@ func defaultOpts(config *rest.Config, opts Options) (Options, error) { if namespace == metav1.NamespaceAll { cfg.FieldSelector = fields.AndSelectors( appendIfNotNil( - namespaceAllSelector(maps.Keys(opts.DefaultNamespaces)), + namespaceAllSelector(slices.Collect(maps.Keys(opts.DefaultNamespaces))), cfg.FieldSelector, )..., ) diff --git a/vendor/sigs.k8s.io/controller-runtime/pkg/cache/delegating_by_gvk_cache.go b/vendor/sigs.k8s.io/controller-runtime/pkg/cache/delegating_by_gvk_cache.go index 4db8208a6..46bd243c6 100644 --- a/vendor/sigs.k8s.io/controller-runtime/pkg/cache/delegating_by_gvk_cache.go +++ b/vendor/sigs.k8s.io/controller-runtime/pkg/cache/delegating_by_gvk_cache.go @@ -18,10 +18,11 @@ package cache import ( "context" + "maps" + "slices" "strings" "sync" - "golang.org/x/exp/maps" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime/schema" "sigs.k8s.io/controller-runtime/pkg/client" @@ -73,7 +74,7 @@ func (dbt *delegatingByGVKCache) GetInformerForKind(ctx context.Context, gvk sch } func (dbt *delegatingByGVKCache) Start(ctx context.Context) error { - allCaches := maps.Values(dbt.caches) + allCaches := slices.Collect(maps.Values(dbt.caches)) allCaches = append(allCaches, dbt.defaultCache) wg := &sync.WaitGroup{} @@ -100,7 +101,7 @@ func (dbt *delegatingByGVKCache) Start(ctx context.Context) error { func (dbt *delegatingByGVKCache) WaitForCacheSync(ctx context.Context) bool { synced := true - for _, cache := range append(maps.Values(dbt.caches), dbt.defaultCache) { + for _, cache := range append(slices.Collect(maps.Values(dbt.caches)), dbt.defaultCache) { if !cache.WaitForCacheSync(ctx) { synced = false } diff --git a/vendor/sigs.k8s.io/controller-runtime/pkg/cache/internal/informers.go b/vendor/sigs.k8s.io/controller-runtime/pkg/cache/internal/informers.go index a40382d6f..097ee7a45 100644 --- a/vendor/sigs.k8s.io/controller-runtime/pkg/cache/internal/informers.go +++ b/vendor/sigs.k8s.io/controller-runtime/pkg/cache/internal/informers.go @@ -47,7 +47,7 @@ type InformersOpts struct { Mapper meta.RESTMapper ResyncPeriod time.Duration Namespace string - NewInformer *func(cache.ListerWatcher, runtime.Object, time.Duration, cache.Indexers) cache.SharedIndexInformer + NewInformer func(cache.ListerWatcher, runtime.Object, time.Duration, cache.Indexers) cache.SharedIndexInformer Selector Selector Transform cache.TransformFunc UnsafeDisableDeepCopy bool @@ -59,7 +59,7 @@ type InformersOpts struct { func NewInformers(config *rest.Config, options *InformersOpts) *Informers { newInformer := cache.NewSharedIndexInformer if options.NewInformer != nil { - newInformer = *options.NewInformer + newInformer = options.NewInformer } return &Informers{ config: config, @@ -585,7 +585,7 @@ func newGVKFixupWatcher(gvk schema.GroupVersionKind, watcher watch.Interface) wa // hammer the apiserver with list requests simultaneously. func calculateResyncPeriod(resync time.Duration) time.Duration { // the factor will fall into [0.9, 1.1) - factor := rand.Float64()/5.0 + 0.9 //nolint:gosec + factor := rand.Float64()/5.0 + 0.9 return time.Duration(float64(resync.Nanoseconds()) * factor) } diff --git a/vendor/sigs.k8s.io/controller-runtime/pkg/client/apiutil/restmapper.go b/vendor/sigs.k8s.io/controller-runtime/pkg/client/apiutil/restmapper.go index 927be22b4..ad898617f 100644 --- a/vendor/sigs.k8s.io/controller-runtime/pkg/client/apiutil/restmapper.go +++ b/vendor/sigs.k8s.io/controller-runtime/pkg/client/apiutil/restmapper.go @@ -28,6 +28,7 @@ import ( "k8s.io/client-go/discovery" "k8s.io/client-go/rest" "k8s.io/client-go/restmapper" + "k8s.io/utils/ptr" ) // NewDynamicRESTMapper returns a dynamic RESTMapper for cfg. The dynamic @@ -41,6 +42,7 @@ func NewDynamicRESTMapper(cfg *rest.Config, httpClient *http.Client) (meta.RESTM if err != nil { return nil, err } + return &mapper{ mapper: restmapper.NewDiscoveryRESTMapper([]*restmapper.APIGroupResources{}), client: client, @@ -53,11 +55,15 @@ func NewDynamicRESTMapper(cfg *rest.Config, httpClient *http.Client) (meta.RESTM // client for discovery information to do REST mappings. type mapper struct { mapper meta.RESTMapper - client discovery.DiscoveryInterface + client discovery.AggregatedDiscoveryInterface knownGroups map[string]*restmapper.APIGroupResources apiGroups map[string]*metav1.APIGroup + initialDiscoveryDone bool + // mutex to provide thread-safe mapper reloading. + // It protects all fields in the mapper as well as methods + // that have the `Locked` suffix. mu sync.RWMutex } @@ -159,28 +165,42 @@ func (m *mapper) addKnownGroupAndReload(groupName string, versions ...string) er versions = nil } + m.mu.Lock() + defer m.mu.Unlock() // If no specific versions are set by user, we will scan all available ones for the API group. // This operation requires 2 requests: /api and /apis, but only once. For all subsequent calls // this data will be taken from cache. - if len(versions) == 0 { - apiGroup, err := m.findAPIGroupByName(groupName) + // + // We always run this once, because if the server supports aggregated discovery, this will + // load everything with two api calls which we assume is overall cheaper. + if len(versions) == 0 || !m.initialDiscoveryDone { + apiGroup, didAggregatedDiscovery, err := m.findAPIGroupByNameAndMaybeAggregatedDiscoveryLocked(groupName) if err != nil { return err } - if apiGroup != nil { + if apiGroup != nil && len(versions) == 0 { for _, version := range apiGroup.Versions { versions = append(versions, version.Version) } } - } - m.mu.Lock() - defer m.mu.Unlock() - - // Create or fetch group resources from cache. - groupResources := &restmapper.APIGroupResources{ - Group: metav1.APIGroup{Name: groupName}, - VersionedResources: make(map[string][]metav1.APIResource), + // No need to do anything further if aggregatedDiscovery is supported and we did a lookup + if didAggregatedDiscovery { + failedGroups := make(map[schema.GroupVersion]error) + for _, version := range versions { + if m.knownGroups[groupName] == nil || m.knownGroups[groupName].VersionedResources[version] == nil { + failedGroups[schema.GroupVersion{Group: groupName, Version: version}] = &meta.NoResourceMatchError{ + PartialResource: schema.GroupVersionResource{ + Group: groupName, + Version: version, + }} + } + } + if len(failedGroups) > 0 { + return ptr.To(ErrResourceDiscoveryFailed(failedGroups)) + } + return nil + } } // Update information for group resources about versioned resources. @@ -194,13 +214,26 @@ func (m *mapper) addKnownGroupAndReload(groupName string, versions ...string) er return fmt.Errorf("failed to get API group resources: %w", err) } - if _, ok := m.knownGroups[groupName]; ok { - groupResources = m.knownGroups[groupName] - } + m.addGroupVersionResourcesToCacheAndReloadLocked(groupVersionResources) + return nil +} +// addGroupVersionResourcesToCacheAndReloadLocked does what the name suggests. The mutex must be held when +// calling it. +func (m *mapper) addGroupVersionResourcesToCacheAndReloadLocked(gvr map[schema.GroupVersion]*metav1.APIResourceList) { // Update information for group resources about the API group by adding new versions. - // Ignore the versions that are already registered. - for groupVersion, resources := range groupVersionResources { + // Ignore the versions that are already registered + for groupVersion, resources := range gvr { + var groupResources *restmapper.APIGroupResources + if _, ok := m.knownGroups[groupVersion.Group]; ok { + groupResources = m.knownGroups[groupVersion.Group] + } else { + groupResources = &restmapper.APIGroupResources{ + Group: metav1.APIGroup{Name: groupVersion.Group}, + VersionedResources: make(map[string][]metav1.APIResource), + } + } + version := groupVersion.Version groupResources.VersionedResources[version] = resources.APIResources @@ -214,60 +247,56 @@ func (m *mapper) addKnownGroupAndReload(groupName string, versions ...string) er if !found { groupResources.Group.Versions = append(groupResources.Group.Versions, metav1.GroupVersionForDiscovery{ - GroupVersion: metav1.GroupVersion{Group: groupName, Version: version}.String(), + GroupVersion: metav1.GroupVersion{Group: groupVersion.Group, Version: version}.String(), Version: version, }) } + + // Update data in the cache. + m.knownGroups[groupVersion.Group] = groupResources } - // Update data in the cache. - m.knownGroups[groupName] = groupResources - - // Finally, update the group with received information and regenerate the mapper. + // Finally, reload the mapper. updatedGroupResources := make([]*restmapper.APIGroupResources, 0, len(m.knownGroups)) for _, agr := range m.knownGroups { updatedGroupResources = append(updatedGroupResources, agr) } m.mapper = restmapper.NewDiscoveryRESTMapper(updatedGroupResources) - return nil } -// findAPIGroupByNameLocked returns API group by its name. -func (m *mapper) findAPIGroupByName(groupName string) (*metav1.APIGroup, error) { - // Looking in the cache first. - { - m.mu.RLock() - group, ok := m.apiGroups[groupName] - m.mu.RUnlock() - if ok { - return group, nil - } +// findAPIGroupByNameAndMaybeAggregatedDiscoveryLocked tries to find the passed apiGroup. +// If the server supports aggregated discovery, it will always perform that. +func (m *mapper) findAPIGroupByNameAndMaybeAggregatedDiscoveryLocked(groupName string) (_ *metav1.APIGroup, didAggregatedDiscovery bool, _ error) { + // Looking in the cache first + group, ok := m.apiGroups[groupName] + if ok { + return group, false, nil } // Update the cache if nothing was found. - apiGroups, err := m.client.ServerGroups() + apiGroups, maybeResources, _, err := m.client.GroupsAndMaybeResources() if err != nil { - return nil, fmt.Errorf("failed to get server groups: %w", err) + return nil, false, fmt.Errorf("failed to get server groups: %w", err) } if len(apiGroups.Groups) == 0 { - return nil, fmt.Errorf("received an empty API groups list") + return nil, false, fmt.Errorf("received an empty API groups list") } - m.mu.Lock() + m.initialDiscoveryDone = true + if len(maybeResources) > 0 { + didAggregatedDiscovery = true + m.addGroupVersionResourcesToCacheAndReloadLocked(maybeResources) + } for i := range apiGroups.Groups { group := &apiGroups.Groups[i] m.apiGroups[group.Name] = group } - m.mu.Unlock() // Looking in the cache again. - m.mu.RLock() - defer m.mu.RUnlock() - // Don't return an error here if the API group is not present. // The reloaded RESTMapper will take care of returning a NoMatchError. - return m.apiGroups[groupName], nil + return m.apiGroups[groupName], didAggregatedDiscovery, nil } // fetchGroupVersionResourcesLocked fetches the resources for the specified group and its versions. @@ -283,10 +312,10 @@ func (m *mapper) fetchGroupVersionResourcesLocked(groupName string, versions ... if apierrors.IsNotFound(err) { // If the version is not found, we remove the group from the cache // so it gets refreshed on the next call. - if m.isAPIGroupCached(groupVersion) { + if m.isAPIGroupCachedLocked(groupVersion) { delete(m.apiGroups, groupName) } - if m.isGroupVersionCached(groupVersion) { + if m.isGroupVersionCachedLocked(groupVersion) { delete(m.knownGroups, groupName) } continue @@ -308,8 +337,8 @@ func (m *mapper) fetchGroupVersionResourcesLocked(groupName string, versions ... return groupVersionResources, nil } -// isGroupVersionCached checks if a version for a group is cached in the known groups cache. -func (m *mapper) isGroupVersionCached(gv schema.GroupVersion) bool { +// isGroupVersionCachedLocked checks if a version for a group is cached in the known groups cache. +func (m *mapper) isGroupVersionCachedLocked(gv schema.GroupVersion) bool { if cachedGroup, ok := m.knownGroups[gv.Group]; ok { _, cached := cachedGroup.VersionedResources[gv.Version] return cached @@ -318,8 +347,8 @@ func (m *mapper) isGroupVersionCached(gv schema.GroupVersion) bool { return false } -// isAPIGroupCached checks if a version for a group is cached in the api groups cache. -func (m *mapper) isAPIGroupCached(gv schema.GroupVersion) bool { +// isAPIGroupCachedLocked checks if a version for a group is cached in the api groups cache. +func (m *mapper) isAPIGroupCachedLocked(gv schema.GroupVersion) bool { cachedGroup, ok := m.apiGroups[gv.Group] if !ok { return false diff --git a/vendor/sigs.k8s.io/controller-runtime/pkg/client/client.go b/vendor/sigs.k8s.io/controller-runtime/pkg/client/client.go index fe9862b81..6d8744017 100644 --- a/vendor/sigs.k8s.io/controller-runtime/pkg/client/client.go +++ b/vendor/sigs.k8s.io/controller-runtime/pkg/client/client.go @@ -113,11 +113,11 @@ func newClient(config *rest.Config, options Options) (*client, error) { } if config.WarningHandler == nil { - // By default, we de-duplicate and surface warnings. + // By default, we surface warnings. config.WarningHandler = log.NewKubeAPIWarningLogger( log.Log.WithName("KubeAPIWarningLogger"), log.KubeAPIWarningLoggerOptions{ - Deduplicate: true, + Deduplicate: false, }, ) } diff --git a/vendor/sigs.k8s.io/controller-runtime/pkg/client/interfaces.go b/vendor/sigs.k8s.io/controller-runtime/pkg/client/interfaces.go index 3cd745e4c..3b282fc2c 100644 --- a/vendor/sigs.k8s.io/controller-runtime/pkg/client/interfaces.go +++ b/vendor/sigs.k8s.io/controller-runtime/pkg/client/interfaces.go @@ -94,16 +94,16 @@ type SubResourceClientConstructor interface { // - ServiceAccount token creation: // sa := &corev1.ServiceAccount{ObjectMeta: metav1.ObjectMeta{Namespace: "foo", Name: "bar"}} // token := &authenticationv1.TokenRequest{} - // c.SubResourceClient("token").Create(ctx, sa, token) + // c.SubResource("token").Create(ctx, sa, token) // // - Pod eviction creation: // pod := &corev1.Pod{ObjectMeta: metav1.ObjectMeta{Namespace: "foo", Name: "bar"}} - // c.SubResourceClient("eviction").Create(ctx, pod, &policyv1.Eviction{}) + // c.SubResource("eviction").Create(ctx, pod, &policyv1.Eviction{}) // // - Pod binding creation: // pod := &corev1.Pod{ObjectMeta: metav1.ObjectMeta{Namespace: "foo", Name: "bar"}} // binding := &corev1.Binding{Target: corev1.ObjectReference{Name: "my-node"}} - // c.SubResourceClient("binding").Create(ctx, pod, binding) + // c.SubResource("binding").Create(ctx, pod, binding) // // - CertificateSigningRequest approval: // csr := &certificatesv1.CertificateSigningRequest{ @@ -115,17 +115,17 @@ type SubResourceClientConstructor interface { // }}, // }, // } - // c.SubResourceClient("approval").Update(ctx, csr) + // c.SubResource("approval").Update(ctx, csr) // // - Scale retrieval: // dep := &appsv1.Deployment{ObjectMeta: metav1.ObjectMeta{Namespace: "foo", Name: "bar"}} // scale := &autoscalingv1.Scale{} - // c.SubResourceClient("scale").Get(ctx, dep, scale) + // c.SubResource("scale").Get(ctx, dep, scale) // // - Scale update: // dep := &appsv1.Deployment{ObjectMeta: metav1.ObjectMeta{Namespace: "foo", Name: "bar"}} // scale := &autoscalingv1.Scale{Spec: autoscalingv1.ScaleSpec{Replicas: 2}} - // c.SubResourceClient("scale").Update(ctx, dep, client.WithSubResourceBody(scale)) + // c.SubResource("scale").Update(ctx, dep, client.WithSubResourceBody(scale)) SubResource(subResource string) SubResourceClient } @@ -193,7 +193,7 @@ type IndexerFunc func(Object) []string // FieldIndexer knows how to index over a particular "field" such that it // can later be used by a field selector. type FieldIndexer interface { - // IndexFields adds an index with the given field name on the given object type + // IndexField adds an index with the given field name on the given object type // by using the given function to extract the value for that field. If you want // compatibility with the Kubernetes API server, only return one key, and only use // fields that the API server supports. Otherwise, you can return multiple keys, diff --git a/vendor/sigs.k8s.io/controller-runtime/pkg/cluster/cluster.go b/vendor/sigs.k8s.io/controller-runtime/pkg/cluster/cluster.go index 248893ea3..0603f4cde 100644 --- a/vendor/sigs.k8s.io/controller-runtime/pkg/cluster/cluster.go +++ b/vendor/sigs.k8s.io/controller-runtime/pkg/cluster/cluster.go @@ -20,7 +20,6 @@ import ( "context" "errors" "net/http" - "time" "github.com/go-logr/logr" "k8s.io/apimachinery/pkg/api/meta" @@ -65,8 +64,8 @@ type Cluster interface { // GetRESTMapper returns a RESTMapper GetRESTMapper() meta.RESTMapper - // GetAPIReader returns a reader that will be configured to use the API server. - // This should be used sparingly and only when the client does not fit your + // GetAPIReader returns a reader that will be configured to use the API server directly. + // This should be used sparingly and only when the cached client does not fit your // use case. GetAPIReader() client.Reader @@ -88,16 +87,6 @@ type Options struct { // If none is set, it defaults to log.Log global logger. Logger logr.Logger - // SyncPeriod determines the minimum frequency at which watched resources are - // reconciled. A lower period will correct entropy more quickly, but reduce - // responsiveness to change if there are many watched resources. Change this - // value only if you know what you are doing. Defaults to 10 hours if unset. - // there will a 10 percent jitter between the SyncPeriod of all controllers - // so that all controllers will not send list requests simultaneously. - // - // Deprecated: Use Cache.SyncPeriod instead. - SyncPeriod *time.Duration - // HTTPClient is the http client that will be used to create the default // Cache and Client. If not set the rest.HTTPClientFor function will be used // to create the http client. @@ -194,9 +183,6 @@ func New(config *rest.Config, opts ...Option) (Cluster, error) { if cacheOpts.HTTPClient == nil { cacheOpts.HTTPClient = options.HTTPClient } - if cacheOpts.SyncPeriod == nil { - cacheOpts.SyncPeriod = options.SyncPeriod - } } cache, err := options.NewCache(config, cacheOpts) if err != nil { diff --git a/vendor/sigs.k8s.io/controller-runtime/pkg/config/controller.go b/vendor/sigs.k8s.io/controller-runtime/pkg/config/controller.go index 999ef07e2..0b2aa0cb7 100644 --- a/vendor/sigs.k8s.io/controller-runtime/pkg/config/controller.go +++ b/vendor/sigs.k8s.io/controller-runtime/pkg/config/controller.go @@ -53,4 +53,10 @@ type Controller struct { // NeedLeaderElection indicates whether the controller needs to use leader election. // Defaults to true, which means the controller will use leader election. NeedLeaderElection *bool + + // UsePriorityQueue configures the controllers queue to use the controller-runtime provided + // priority queue. + // + // Note: This flag is disabled by default until a future version. It's currently in beta. + UsePriorityQueue *bool } diff --git a/vendor/sigs.k8s.io/controller-runtime/pkg/controller/controller.go b/vendor/sigs.k8s.io/controller-runtime/pkg/controller/controller.go index f2496236d..5c5b249ef 100644 --- a/vendor/sigs.k8s.io/controller-runtime/pkg/controller/controller.go +++ b/vendor/sigs.k8s.io/controller-runtime/pkg/controller/controller.go @@ -24,7 +24,9 @@ import ( "github.com/go-logr/logr" "k8s.io/client-go/util/workqueue" "k8s.io/klog/v2" + "k8s.io/utils/ptr" + "sigs.k8s.io/controller-runtime/pkg/controller/priorityqueue" "sigs.k8s.io/controller-runtime/pkg/internal/controller" "sigs.k8s.io/controller-runtime/pkg/manager" "sigs.k8s.io/controller-runtime/pkg/reconcile" @@ -189,11 +191,21 @@ func NewTypedUnmanaged[request comparable](name string, mgr manager.Manager, opt } if options.RateLimiter == nil { - options.RateLimiter = workqueue.DefaultTypedControllerRateLimiter[request]() + if ptr.Deref(mgr.GetControllerOptions().UsePriorityQueue, false) { + options.RateLimiter = workqueue.NewTypedItemExponentialFailureRateLimiter[request](5*time.Millisecond, 1000*time.Second) + } else { + options.RateLimiter = workqueue.DefaultTypedControllerRateLimiter[request]() + } } if options.NewQueue == nil { options.NewQueue = func(controllerName string, rateLimiter workqueue.TypedRateLimiter[request]) workqueue.TypedRateLimitingInterface[request] { + if ptr.Deref(mgr.GetControllerOptions().UsePriorityQueue, false) { + return priorityqueue.New(controllerName, func(o *priorityqueue.Opts[request]) { + o.Log = mgr.GetLogger().WithValues("controller", controllerName) + o.RateLimiter = rateLimiter + }) + } return workqueue.NewTypedRateLimitingQueueWithConfig(rateLimiter, workqueue.TypedRateLimitingQueueConfig[request]{ Name: controllerName, }) diff --git a/vendor/sigs.k8s.io/controller-runtime/pkg/controller/priorityqueue/metrics.go b/vendor/sigs.k8s.io/controller-runtime/pkg/controller/priorityqueue/metrics.go new file mode 100644 index 000000000..36626646f --- /dev/null +++ b/vendor/sigs.k8s.io/controller-runtime/pkg/controller/priorityqueue/metrics.go @@ -0,0 +1,146 @@ +package priorityqueue + +import ( + "sync" + "time" + + "k8s.io/client-go/util/workqueue" + "k8s.io/utils/clock" +) + +// This file is mostly a copy of unexported code from +// https://github.com/kubernetes/kubernetes/blob/1d8828ce707ed9dd7a6a9756385419cce1d202ac/staging/src/k8s.io/client-go/util/workqueue/metrics.go +// +// The only two differences are the addition of mapLock in defaultQueueMetrics and converging retryMetrics into queueMetrics. + +type queueMetrics[T comparable] interface { + add(item T) + get(item T) + done(item T) + updateUnfinishedWork() + retry() +} + +func newQueueMetrics[T comparable](mp workqueue.MetricsProvider, name string, clock clock.Clock) queueMetrics[T] { + if len(name) == 0 { + return noMetrics[T]{} + } + return &defaultQueueMetrics[T]{ + clock: clock, + depth: mp.NewDepthMetric(name), + adds: mp.NewAddsMetric(name), + latency: mp.NewLatencyMetric(name), + workDuration: mp.NewWorkDurationMetric(name), + unfinishedWorkSeconds: mp.NewUnfinishedWorkSecondsMetric(name), + longestRunningProcessor: mp.NewLongestRunningProcessorSecondsMetric(name), + addTimes: map[T]time.Time{}, + processingStartTimes: map[T]time.Time{}, + retries: mp.NewRetriesMetric(name), + } +} + +// defaultQueueMetrics expects the caller to lock before setting any metrics. +type defaultQueueMetrics[T comparable] struct { + clock clock.Clock + + // current depth of a workqueue + depth workqueue.GaugeMetric + // total number of adds handled by a workqueue + adds workqueue.CounterMetric + // how long an item stays in a workqueue + latency workqueue.HistogramMetric + // how long processing an item from a workqueue takes + workDuration workqueue.HistogramMetric + + mapLock sync.RWMutex + addTimes map[T]time.Time + processingStartTimes map[T]time.Time + + // how long have current threads been working? + unfinishedWorkSeconds workqueue.SettableGaugeMetric + longestRunningProcessor workqueue.SettableGaugeMetric + + retries workqueue.CounterMetric +} + +// add is called for ready items only +func (m *defaultQueueMetrics[T]) add(item T) { + if m == nil { + return + } + + m.adds.Inc() + m.depth.Inc() + + m.mapLock.Lock() + defer m.mapLock.Unlock() + + if _, exists := m.addTimes[item]; !exists { + m.addTimes[item] = m.clock.Now() + } +} + +func (m *defaultQueueMetrics[T]) get(item T) { + if m == nil { + return + } + + m.depth.Dec() + + m.mapLock.Lock() + defer m.mapLock.Unlock() + + m.processingStartTimes[item] = m.clock.Now() + if startTime, exists := m.addTimes[item]; exists { + m.latency.Observe(m.sinceInSeconds(startTime)) + delete(m.addTimes, item) + } +} + +func (m *defaultQueueMetrics[T]) done(item T) { + if m == nil { + return + } + + m.mapLock.Lock() + defer m.mapLock.Unlock() + if startTime, exists := m.processingStartTimes[item]; exists { + m.workDuration.Observe(m.sinceInSeconds(startTime)) + delete(m.processingStartTimes, item) + } +} + +func (m *defaultQueueMetrics[T]) updateUnfinishedWork() { + m.mapLock.RLock() + defer m.mapLock.RUnlock() + // Note that a summary metric would be better for this, but prometheus + // doesn't seem to have non-hacky ways to reset the summary metrics. + var total float64 + var oldest float64 + for _, t := range m.processingStartTimes { + age := m.sinceInSeconds(t) + total += age + if age > oldest { + oldest = age + } + } + m.unfinishedWorkSeconds.Set(total) + m.longestRunningProcessor.Set(oldest) +} + +// Gets the time since the specified start in seconds. +func (m *defaultQueueMetrics[T]) sinceInSeconds(start time.Time) float64 { + return m.clock.Since(start).Seconds() +} + +func (m *defaultQueueMetrics[T]) retry() { + m.retries.Inc() +} + +type noMetrics[T any] struct{} + +func (noMetrics[T]) add(item T) {} +func (noMetrics[T]) get(item T) {} +func (noMetrics[T]) done(item T) {} +func (noMetrics[T]) updateUnfinishedWork() {} +func (noMetrics[T]) retry() {} diff --git a/vendor/sigs.k8s.io/controller-runtime/pkg/controller/priorityqueue/priorityqueue.go b/vendor/sigs.k8s.io/controller-runtime/pkg/controller/priorityqueue/priorityqueue.go new file mode 100644 index 000000000..996369f47 --- /dev/null +++ b/vendor/sigs.k8s.io/controller-runtime/pkg/controller/priorityqueue/priorityqueue.go @@ -0,0 +1,401 @@ +package priorityqueue + +import ( + "sync" + "sync/atomic" + "time" + + "github.com/go-logr/logr" + "github.com/google/btree" + "k8s.io/apimachinery/pkg/util/sets" + "k8s.io/client-go/util/workqueue" + "k8s.io/utils/clock" + "k8s.io/utils/ptr" + "sigs.k8s.io/controller-runtime/pkg/internal/metrics" +) + +// AddOpts describes the options for adding items to the queue. +type AddOpts struct { + After time.Duration + RateLimited bool + Priority int +} + +// PriorityQueue is a priority queue for a controller. It +// internally de-duplicates all items that are added to +// it. It will use the max of the passed priorities and the +// min of possible durations. +type PriorityQueue[T comparable] interface { + workqueue.TypedRateLimitingInterface[T] + AddWithOpts(o AddOpts, Items ...T) + GetWithPriority() (item T, priority int, shutdown bool) +} + +// Opts contains the options for a PriorityQueue. +type Opts[T comparable] struct { + // Ratelimiter is being used when AddRateLimited is called. Defaults to a per-item exponential backoff + // limiter with an initial delay of five milliseconds and a max delay of 1000 seconds. + RateLimiter workqueue.TypedRateLimiter[T] + MetricProvider workqueue.MetricsProvider + Log logr.Logger +} + +// Opt allows to configure a PriorityQueue. +type Opt[T comparable] func(*Opts[T]) + +// New constructs a new PriorityQueue. +func New[T comparable](name string, o ...Opt[T]) PriorityQueue[T] { + opts := &Opts[T]{} + for _, f := range o { + f(opts) + } + + if opts.RateLimiter == nil { + opts.RateLimiter = workqueue.NewTypedItemExponentialFailureRateLimiter[T](5*time.Millisecond, 1000*time.Second) + } + + if opts.MetricProvider == nil { + opts.MetricProvider = metrics.WorkqueueMetricsProvider{} + } + + pq := &priorityqueue[T]{ + log: opts.Log, + items: map[T]*item[T]{}, + queue: btree.NewG(32, less[T]), + becameReady: sets.Set[T]{}, + metrics: newQueueMetrics[T](opts.MetricProvider, name, clock.RealClock{}), + // itemOrWaiterAdded indicates that an item or + // waiter was added. It must be buffered, because + // if we currently process items we can't tell + // if that included the new item/waiter. + itemOrWaiterAdded: make(chan struct{}, 1), + rateLimiter: opts.RateLimiter, + locked: sets.Set[T]{}, + done: make(chan struct{}), + get: make(chan item[T]), + now: time.Now, + tick: time.Tick, + } + + go pq.spin() + go pq.logState() + if _, ok := pq.metrics.(noMetrics[T]); !ok { + go pq.updateUnfinishedWorkLoop() + } + + return pq +} + +type priorityqueue[T comparable] struct { + log logr.Logger + // lock has to be acquired for any access any of items, queue, addedCounter + // or becameReady + lock sync.Mutex + items map[T]*item[T] + queue bTree[*item[T]] + + // addedCounter is a counter of elements added, we need it + // because unixNano is not guaranteed to be unique. + addedCounter uint64 + + // becameReady holds items that are in the queue, were added + // with non-zero after and became ready. We need it to call the + // metrics add exactly once for them. + becameReady sets.Set[T] + metrics queueMetrics[T] + + itemOrWaiterAdded chan struct{} + + rateLimiter workqueue.TypedRateLimiter[T] + + // locked contains the keys we handed out through Get() and that haven't + // yet been returned through Done(). + locked sets.Set[T] + lockedLock sync.RWMutex + + shutdown atomic.Bool + done chan struct{} + + get chan item[T] + + // waiters is the number of routines blocked in Get, we use it to determine + // if we can push items. + waiters atomic.Int64 + + // Configurable for testing + now func() time.Time + tick func(time.Duration) <-chan time.Time +} + +func (w *priorityqueue[T]) AddWithOpts(o AddOpts, items ...T) { + w.lock.Lock() + defer w.lock.Unlock() + + for _, key := range items { + if o.RateLimited { + after := w.rateLimiter.When(key) + if o.After == 0 || after < o.After { + o.After = after + } + } + + var readyAt *time.Time + if o.After > 0 { + readyAt = ptr.To(w.now().Add(o.After)) + w.metrics.retry() + } + if _, ok := w.items[key]; !ok { + item := &item[T]{ + Key: key, + AddedCounter: w.addedCounter, + Priority: o.Priority, + ReadyAt: readyAt, + } + w.items[key] = item + w.queue.ReplaceOrInsert(item) + if item.ReadyAt == nil { + w.metrics.add(key) + } + w.addedCounter++ + continue + } + + // The b-tree de-duplicates based on ordering and any change here + // will affect the order - Just delete and re-add. + item, _ := w.queue.Delete(w.items[key]) + if o.Priority > item.Priority { + item.Priority = o.Priority + } + + if item.ReadyAt != nil && (readyAt == nil || readyAt.Before(*item.ReadyAt)) { + if readyAt == nil && !w.becameReady.Has(key) { + w.metrics.add(key) + } + item.ReadyAt = readyAt + } + + w.queue.ReplaceOrInsert(item) + } + + if len(items) > 0 { + w.notifyItemOrWaiterAdded() + } +} + +func (w *priorityqueue[T]) notifyItemOrWaiterAdded() { + select { + case w.itemOrWaiterAdded <- struct{}{}: + default: + } +} + +func (w *priorityqueue[T]) spin() { + blockForever := make(chan time.Time) + var nextReady <-chan time.Time + nextReady = blockForever + + for { + select { + case <-w.done: + return + case <-w.itemOrWaiterAdded: + case <-nextReady: + } + + nextReady = blockForever + + func() { + w.lock.Lock() + defer w.lock.Unlock() + + w.lockedLock.Lock() + defer w.lockedLock.Unlock() + + // manipulating the tree from within Ascend might lead to panics, so + // track what we want to delete and do it after we are done ascending. + var toDelete []*item[T] + w.queue.Ascend(func(item *item[T]) bool { + if item.ReadyAt != nil { + if readyAt := item.ReadyAt.Sub(w.now()); readyAt > 0 { + nextReady = w.tick(readyAt) + return false + } + if !w.becameReady.Has(item.Key) { + w.metrics.add(item.Key) + w.becameReady.Insert(item.Key) + } + } + + if w.waiters.Load() == 0 { + // Have to keep iterating here to ensure we update metrics + // for further items that became ready and set nextReady. + return true + } + + // Item is locked, we can not hand it out + if w.locked.Has(item.Key) { + return true + } + + w.metrics.get(item.Key) + w.locked.Insert(item.Key) + w.waiters.Add(-1) + delete(w.items, item.Key) + toDelete = append(toDelete, item) + w.becameReady.Delete(item.Key) + w.get <- *item + + return true + }) + + for _, item := range toDelete { + w.queue.Delete(item) + } + }() + } +} + +func (w *priorityqueue[T]) Add(item T) { + w.AddWithOpts(AddOpts{}, item) +} + +func (w *priorityqueue[T]) AddAfter(item T, after time.Duration) { + w.AddWithOpts(AddOpts{After: after}, item) +} + +func (w *priorityqueue[T]) AddRateLimited(item T) { + w.AddWithOpts(AddOpts{RateLimited: true}, item) +} + +func (w *priorityqueue[T]) GetWithPriority() (_ T, priority int, shutdown bool) { + w.waiters.Add(1) + + w.notifyItemOrWaiterAdded() + item := <-w.get + + return item.Key, item.Priority, w.shutdown.Load() +} + +func (w *priorityqueue[T]) Get() (item T, shutdown bool) { + key, _, shutdown := w.GetWithPriority() + return key, shutdown +} + +func (w *priorityqueue[T]) Forget(item T) { + w.rateLimiter.Forget(item) +} + +func (w *priorityqueue[T]) NumRequeues(item T) int { + return w.rateLimiter.NumRequeues(item) +} + +func (w *priorityqueue[T]) ShuttingDown() bool { + return w.shutdown.Load() +} + +func (w *priorityqueue[T]) Done(item T) { + w.lockedLock.Lock() + defer w.lockedLock.Unlock() + w.locked.Delete(item) + w.metrics.done(item) + w.notifyItemOrWaiterAdded() +} + +func (w *priorityqueue[T]) ShutDown() { + w.shutdown.Store(true) + close(w.done) +} + +// ShutDownWithDrain just calls ShutDown, as the draining +// functionality is not used by controller-runtime. +func (w *priorityqueue[T]) ShutDownWithDrain() { + w.ShutDown() +} + +// Len returns the number of items that are ready to be +// picked up. It does not include items that are not yet +// ready. +func (w *priorityqueue[T]) Len() int { + w.lock.Lock() + defer w.lock.Unlock() + + var result int + w.queue.Ascend(func(item *item[T]) bool { + if item.ReadyAt == nil || item.ReadyAt.Compare(w.now()) <= 0 { + result++ + return true + } + return false + }) + + return result +} + +func (w *priorityqueue[T]) logState() { + t := time.Tick(10 * time.Second) + for { + select { + case <-w.done: + return + case <-t: + } + + // Log level may change at runtime, so keep the + // loop going even if a given level is currently + // not enabled. + if !w.log.V(5).Enabled() { + continue + } + w.lock.Lock() + items := make([]*item[T], 0, len(w.items)) + w.queue.Ascend(func(item *item[T]) bool { + items = append(items, item) + return true + }) + w.lock.Unlock() + + w.log.V(5).Info("workqueue_items", "items", items) + } +} + +func less[T comparable](a, b *item[T]) bool { + if a.ReadyAt == nil && b.ReadyAt != nil { + return true + } + if b.ReadyAt == nil && a.ReadyAt != nil { + return false + } + if a.ReadyAt != nil && b.ReadyAt != nil && !a.ReadyAt.Equal(*b.ReadyAt) { + return a.ReadyAt.Before(*b.ReadyAt) + } + if a.Priority != b.Priority { + return a.Priority > b.Priority + } + + return a.AddedCounter < b.AddedCounter +} + +type item[T comparable] struct { + Key T `json:"key"` + AddedCounter uint64 `json:"addedCounter"` + Priority int `json:"priority"` + ReadyAt *time.Time `json:"readyAt,omitempty"` +} + +func (w *priorityqueue[T]) updateUnfinishedWorkLoop() { + t := time.Tick(500 * time.Millisecond) // borrowed from workqueue: https://github.com/kubernetes/kubernetes/blob/67a807bf142c7a2a5ecfdb2a5d24b4cdea4cc79c/staging/src/k8s.io/client-go/util/workqueue/queue.go#L182 + for { + select { + case <-w.done: + return + case <-t: + } + w.metrics.updateUnfinishedWork() + } +} + +type bTree[T any] interface { + ReplaceOrInsert(item T) (_ T, _ bool) + Delete(item T) (T, bool) + Ascend(iterator btree.ItemIteratorG[T]) +} diff --git a/vendor/sigs.k8s.io/controller-runtime/pkg/handler/eventhandler.go b/vendor/sigs.k8s.io/controller-runtime/pkg/handler/eventhandler.go index ea4bcee31..57107f20e 100644 --- a/vendor/sigs.k8s.io/controller-runtime/pkg/handler/eventhandler.go +++ b/vendor/sigs.k8s.io/controller-runtime/pkg/handler/eventhandler.go @@ -18,9 +18,11 @@ package handler import ( "context" + "time" "k8s.io/client-go/util/workqueue" "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/controller/priorityqueue" "sigs.k8s.io/controller-runtime/pkg/event" "sigs.k8s.io/controller-runtime/pkg/reconcile" ) @@ -133,3 +135,67 @@ func (h TypedFuncs[object, request]) Generic(ctx context.Context, e event.TypedG h.GenericFunc(ctx, e, q) } } + +// LowPriority is the priority set by WithLowPriorityWhenUnchanged +const LowPriority = -100 + +// WithLowPriorityWhenUnchanged reduces the priority of events stemming from the initial listwatch or from a resync if +// and only if a priorityqueue.PriorityQueue is used. If not, it does nothing. +func WithLowPriorityWhenUnchanged[object client.Object, request comparable](u TypedEventHandler[object, request]) TypedEventHandler[object, request] { + return TypedFuncs[object, request]{ + CreateFunc: func(ctx context.Context, tce event.TypedCreateEvent[object], trli workqueue.TypedRateLimitingInterface[request]) { + // Due to how the handlers are factored, we have to wrap the workqueue to be able + // to inject custom behavior. + u.Create(ctx, tce, workqueueWithCustomAddFunc[request]{ + TypedRateLimitingInterface: trli, + addFunc: func(item request, q workqueue.TypedRateLimitingInterface[request]) { + priorityQueue, isPriorityQueue := q.(priorityqueue.PriorityQueue[request]) + if !isPriorityQueue { + q.Add(item) + return + } + var priority int + if isObjectUnchanged(tce) { + priority = LowPriority + } + priorityQueue.AddWithOpts(priorityqueue.AddOpts{Priority: priority}, item) + }, + }) + }, + UpdateFunc: func(ctx context.Context, tue event.TypedUpdateEvent[object], trli workqueue.TypedRateLimitingInterface[request]) { + u.Update(ctx, tue, workqueueWithCustomAddFunc[request]{ + TypedRateLimitingInterface: trli, + addFunc: func(item request, q workqueue.TypedRateLimitingInterface[request]) { + priorityQueue, isPriorityQueue := q.(priorityqueue.PriorityQueue[request]) + if !isPriorityQueue { + q.Add(item) + return + } + var priority int + if tue.ObjectOld.GetResourceVersion() == tue.ObjectNew.GetResourceVersion() { + priority = LowPriority + } + priorityQueue.AddWithOpts(priorityqueue.AddOpts{Priority: priority}, item) + }, + }) + }, + DeleteFunc: u.Delete, + GenericFunc: u.Generic, + } +} + +type workqueueWithCustomAddFunc[request comparable] struct { + workqueue.TypedRateLimitingInterface[request] + addFunc func(item request, q workqueue.TypedRateLimitingInterface[request]) +} + +func (w workqueueWithCustomAddFunc[request]) Add(item request) { + w.addFunc(item, w.TypedRateLimitingInterface) +} + +// isObjectUnchanged checks if the object in a create event is unchanged, for example because +// we got it in our initial listwatch. The heuristic it uses is to check if the object is older +// than one minute. +func isObjectUnchanged[object client.Object](e event.TypedCreateEvent[object]) bool { + return e.Object.GetCreationTimestamp().Time.Before(time.Now().Add(-time.Minute)) +} diff --git a/vendor/sigs.k8s.io/controller-runtime/pkg/internal/controller/controller.go b/vendor/sigs.k8s.io/controller-runtime/pkg/internal/controller/controller.go index dfe407f3b..cc734dfb6 100644 --- a/vendor/sigs.k8s.io/controller-runtime/pkg/internal/controller/controller.go +++ b/vendor/sigs.k8s.io/controller-runtime/pkg/internal/controller/controller.go @@ -21,9 +21,11 @@ import ( "errors" "fmt" "sync" + "sync/atomic" "time" "github.com/go-logr/logr" + "golang.org/x/sync/errgroup" "k8s.io/apimachinery/pkg/types" utilruntime "k8s.io/apimachinery/pkg/util/runtime" "k8s.io/apimachinery/pkg/util/uuid" @@ -169,43 +171,66 @@ func (c *Controller[request]) Start(ctx context.Context) error { defer utilruntime.HandleCrash() // NB(directxman12): launch the sources *before* trying to wait for the - // caches to sync so that they have a chance to register their intendeded + // caches to sync so that they have a chance to register their intended // caches. + errGroup := &errgroup.Group{} for _, watch := range c.startWatches { - c.LogConstructor(nil).Info("Starting EventSource", "source", fmt.Sprintf("%s", watch)) + log := c.LogConstructor(nil) + _, ok := watch.(interface { + String() string + }) - if err := watch.Start(ctx, c.Queue); err != nil { - return err - } - } - - // Start the SharedIndexInformer factories to begin populating the SharedIndexInformer caches - c.LogConstructor(nil).Info("Starting Controller") - - for _, watch := range c.startWatches { - syncingSource, ok := watch.(source.SyncingSource) if !ok { - continue + log = log.WithValues("source", fmt.Sprintf("%T", watch)) + } else { + log = log.WithValues("source", fmt.Sprintf("%s", watch)) } - - if err := func() error { - // use a context with timeout for launching sources and syncing caches. + didStartSyncingSource := &atomic.Bool{} + errGroup.Go(func() error { + // Use a timeout for starting and syncing the source to avoid silently + // blocking startup indefinitely if it doesn't come up. sourceStartCtx, cancel := context.WithTimeout(ctx, c.CacheSyncTimeout) defer cancel() - // WaitForSync waits for a definitive timeout, and returns if there - // is an error or a timeout - if err := syncingSource.WaitForSync(sourceStartCtx); err != nil { - err := fmt.Errorf("failed to wait for %s caches to sync: %w", c.Name, err) - c.LogConstructor(nil).Error(err, "Could not wait for Cache to sync") - return err - } + sourceStartErrChan := make(chan error, 1) // Buffer chan to not leak goroutine if we time out + go func() { + defer close(sourceStartErrChan) + log.Info("Starting EventSource") + if err := watch.Start(ctx, c.Queue); err != nil { + sourceStartErrChan <- err + return + } + syncingSource, ok := watch.(source.TypedSyncingSource[request]) + if !ok { + return + } + didStartSyncingSource.Store(true) + if err := syncingSource.WaitForSync(sourceStartCtx); err != nil { + err := fmt.Errorf("failed to wait for %s caches to sync %v: %w", c.Name, syncingSource, err) + log.Error(err, "Could not wait for Cache to sync") + sourceStartErrChan <- err + } + }() - return nil - }(); err != nil { - return err - } + select { + case err := <-sourceStartErrChan: + return err + case <-sourceStartCtx.Done(): + if didStartSyncingSource.Load() { // We are racing with WaitForSync, wait for it to let it tell us what happened + return <-sourceStartErrChan + } + if ctx.Err() != nil { // Don't return an error if the root context got cancelled + return nil + } + return fmt.Errorf("timed out waiting for source %s to Start. Please ensure that its Start() method is non-blocking", watch) + } + }) } + if err := errGroup.Wait(); err != nil { + return err + } + + c.LogConstructor(nil).Info("Starting Controller") // All the watches have been started, we can reset the local slice. // @@ -311,7 +336,7 @@ func (c *Controller[request]) reconcileHandler(ctx context.Context, req request) ctrlmetrics.ReconcileErrors.WithLabelValues(c.Name).Inc() ctrlmetrics.ReconcileTotal.WithLabelValues(c.Name, labelError).Inc() if !result.IsZero() { - log.Info("Warning: Reconciler returned both a non-zero result and a non-nil error. The result will always be ignored if the error is non-nil and the non-nil error causes reqeueuing with exponential backoff. For more details, see: https://pkg.go.dev/sigs.k8s.io/controller-runtime/pkg/reconcile#Reconciler") + log.Info("Warning: Reconciler returned both a non-zero result and a non-nil error. The result will always be ignored if the error is non-nil and the non-nil error causes requeuing with exponential backoff. For more details, see: https://pkg.go.dev/sigs.k8s.io/controller-runtime/pkg/reconcile#Reconciler") } log.Error(err, "Reconciler error") case result.RequeueAfter > 0: diff --git a/vendor/sigs.k8s.io/controller-runtime/pkg/internal/metrics/workqueue.go b/vendor/sigs.k8s.io/controller-runtime/pkg/internal/metrics/workqueue.go new file mode 100644 index 000000000..86da340af --- /dev/null +++ b/vendor/sigs.k8s.io/controller-runtime/pkg/internal/metrics/workqueue.go @@ -0,0 +1,131 @@ +/* +Copyright 2018 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package metrics + +import ( + "github.com/prometheus/client_golang/prometheus" + "k8s.io/client-go/util/workqueue" + "sigs.k8s.io/controller-runtime/pkg/metrics" +) + +// This file is copied and adapted from k8s.io/component-base/metrics/prometheus/workqueue +// which registers metrics to the k8s legacy Registry. We require very +// similar functionality, but must register metrics to a different Registry. + +// Metrics subsystem and all keys used by the workqueue. +const ( + WorkQueueSubsystem = metrics.WorkQueueSubsystem + DepthKey = metrics.DepthKey + AddsKey = metrics.AddsKey + QueueLatencyKey = metrics.QueueLatencyKey + WorkDurationKey = metrics.WorkDurationKey + UnfinishedWorkKey = metrics.UnfinishedWorkKey + LongestRunningProcessorKey = metrics.LongestRunningProcessorKey + RetriesKey = metrics.RetriesKey +) + +var ( + depth = prometheus.NewGaugeVec(prometheus.GaugeOpts{ + Subsystem: WorkQueueSubsystem, + Name: DepthKey, + Help: "Current depth of workqueue", + }, []string{"name", "controller"}) + + adds = prometheus.NewCounterVec(prometheus.CounterOpts{ + Subsystem: WorkQueueSubsystem, + Name: AddsKey, + Help: "Total number of adds handled by workqueue", + }, []string{"name", "controller"}) + + latency = prometheus.NewHistogramVec(prometheus.HistogramOpts{ + Subsystem: WorkQueueSubsystem, + Name: QueueLatencyKey, + Help: "How long in seconds an item stays in workqueue before being requested", + Buckets: prometheus.ExponentialBuckets(10e-9, 10, 12), + }, []string{"name", "controller"}) + + workDuration = prometheus.NewHistogramVec(prometheus.HistogramOpts{ + Subsystem: WorkQueueSubsystem, + Name: WorkDurationKey, + Help: "How long in seconds processing an item from workqueue takes.", + Buckets: prometheus.ExponentialBuckets(10e-9, 10, 12), + }, []string{"name", "controller"}) + + unfinished = prometheus.NewGaugeVec(prometheus.GaugeOpts{ + Subsystem: WorkQueueSubsystem, + Name: UnfinishedWorkKey, + Help: "How many seconds of work has been done that " + + "is in progress and hasn't been observed by work_duration. Large " + + "values indicate stuck threads. One can deduce the number of stuck " + + "threads by observing the rate at which this increases.", + }, []string{"name", "controller"}) + + longestRunningProcessor = prometheus.NewGaugeVec(prometheus.GaugeOpts{ + Subsystem: WorkQueueSubsystem, + Name: LongestRunningProcessorKey, + Help: "How many seconds has the longest running " + + "processor for workqueue been running.", + }, []string{"name", "controller"}) + + retries = prometheus.NewCounterVec(prometheus.CounterOpts{ + Subsystem: WorkQueueSubsystem, + Name: RetriesKey, + Help: "Total number of retries handled by workqueue", + }, []string{"name", "controller"}) +) + +func init() { + metrics.Registry.MustRegister(depth) + metrics.Registry.MustRegister(adds) + metrics.Registry.MustRegister(latency) + metrics.Registry.MustRegister(workDuration) + metrics.Registry.MustRegister(unfinished) + metrics.Registry.MustRegister(longestRunningProcessor) + metrics.Registry.MustRegister(retries) + + workqueue.SetProvider(WorkqueueMetricsProvider{}) +} + +type WorkqueueMetricsProvider struct{} + +func (WorkqueueMetricsProvider) NewDepthMetric(name string) workqueue.GaugeMetric { + return depth.WithLabelValues(name, name) +} + +func (WorkqueueMetricsProvider) NewAddsMetric(name string) workqueue.CounterMetric { + return adds.WithLabelValues(name, name) +} + +func (WorkqueueMetricsProvider) NewLatencyMetric(name string) workqueue.HistogramMetric { + return latency.WithLabelValues(name, name) +} + +func (WorkqueueMetricsProvider) NewWorkDurationMetric(name string) workqueue.HistogramMetric { + return workDuration.WithLabelValues(name, name) +} + +func (WorkqueueMetricsProvider) NewUnfinishedWorkSecondsMetric(name string) workqueue.SettableGaugeMetric { + return unfinished.WithLabelValues(name, name) +} + +func (WorkqueueMetricsProvider) NewLongestRunningProcessorSecondsMetric(name string) workqueue.SettableGaugeMetric { + return longestRunningProcessor.WithLabelValues(name, name) +} + +func (WorkqueueMetricsProvider) NewRetriesMetric(name string) workqueue.CounterMetric { + return retries.WithLabelValues(name, name) +} diff --git a/vendor/sigs.k8s.io/controller-runtime/pkg/internal/source/kind.go b/vendor/sigs.k8s.io/controller-runtime/pkg/internal/source/kind.go index 4999edc43..2fdfbde8e 100644 --- a/vendor/sigs.k8s.io/controller-runtime/pkg/internal/source/kind.go +++ b/vendor/sigs.k8s.io/controller-runtime/pkg/internal/source/kind.go @@ -52,7 +52,7 @@ func (ks *Kind[object, request]) Start(ctx context.Context, queue workqueue.Type // cache.GetInformer will block until its context is cancelled if the cache was already started and it can not // sync that informer (most commonly due to RBAC issues). ctx, ks.startCancel = context.WithCancel(ctx) - ks.startedErr = make(chan error) + ks.startedErr = make(chan error, 1) // Buffer chan to not leak goroutines if WaitForSync isn't called go func() { var ( i cache.Informer diff --git a/vendor/sigs.k8s.io/controller-runtime/pkg/metrics/server/server.go b/vendor/sigs.k8s.io/controller-runtime/pkg/metrics/server/server.go index 5eb0c62a7..939c333f7 100644 --- a/vendor/sigs.k8s.io/controller-runtime/pkg/metrics/server/server.go +++ b/vendor/sigs.k8s.io/controller-runtime/pkg/metrics/server/server.go @@ -275,7 +275,7 @@ func (s *defaultServer) createListener(ctx context.Context, log logr.Logger) (ne return s.options.ListenConfig.Listen(ctx, "tcp", s.options.BindAddress) } - cfg := &tls.Config{ //nolint:gosec + cfg := &tls.Config{ NextProtos: []string{"h2"}, } // fallback TLS config ready, will now mutate if passer wants full control over it diff --git a/vendor/sigs.k8s.io/controller-runtime/pkg/metrics/workqueue.go b/vendor/sigs.k8s.io/controller-runtime/pkg/metrics/workqueue.go index 590653e70..cd7ccc773 100644 --- a/vendor/sigs.k8s.io/controller-runtime/pkg/metrics/workqueue.go +++ b/vendor/sigs.k8s.io/controller-runtime/pkg/metrics/workqueue.go @@ -16,15 +16,6 @@ limitations under the License. package metrics -import ( - "github.com/prometheus/client_golang/prometheus" - "k8s.io/client-go/util/workqueue" -) - -// This file is copied and adapted from k8s.io/component-base/metrics/prometheus/workqueue -// which registers metrics to the k8s legacy Registry. We require very -// similar functionality, but must register metrics to a different Registry. - // Metrics subsystem and all keys used by the workqueue. const ( WorkQueueSubsystem = "workqueue" @@ -36,95 +27,3 @@ const ( LongestRunningProcessorKey = "longest_running_processor_seconds" RetriesKey = "retries_total" ) - -var ( - depth = prometheus.NewGaugeVec(prometheus.GaugeOpts{ - Subsystem: WorkQueueSubsystem, - Name: DepthKey, - Help: "Current depth of workqueue", - }, []string{"name", "controller"}) - - adds = prometheus.NewCounterVec(prometheus.CounterOpts{ - Subsystem: WorkQueueSubsystem, - Name: AddsKey, - Help: "Total number of adds handled by workqueue", - }, []string{"name", "controller"}) - - latency = prometheus.NewHistogramVec(prometheus.HistogramOpts{ - Subsystem: WorkQueueSubsystem, - Name: QueueLatencyKey, - Help: "How long in seconds an item stays in workqueue before being requested", - Buckets: prometheus.ExponentialBuckets(10e-9, 10, 12), - }, []string{"name", "controller"}) - - workDuration = prometheus.NewHistogramVec(prometheus.HistogramOpts{ - Subsystem: WorkQueueSubsystem, - Name: WorkDurationKey, - Help: "How long in seconds processing an item from workqueue takes.", - Buckets: prometheus.ExponentialBuckets(10e-9, 10, 12), - }, []string{"name", "controller"}) - - unfinished = prometheus.NewGaugeVec(prometheus.GaugeOpts{ - Subsystem: WorkQueueSubsystem, - Name: UnfinishedWorkKey, - Help: "How many seconds of work has been done that " + - "is in progress and hasn't been observed by work_duration. Large " + - "values indicate stuck threads. One can deduce the number of stuck " + - "threads by observing the rate at which this increases.", - }, []string{"name", "controller"}) - - longestRunningProcessor = prometheus.NewGaugeVec(prometheus.GaugeOpts{ - Subsystem: WorkQueueSubsystem, - Name: LongestRunningProcessorKey, - Help: "How many seconds has the longest running " + - "processor for workqueue been running.", - }, []string{"name", "controller"}) - - retries = prometheus.NewCounterVec(prometheus.CounterOpts{ - Subsystem: WorkQueueSubsystem, - Name: RetriesKey, - Help: "Total number of retries handled by workqueue", - }, []string{"name", "controller"}) -) - -func init() { - Registry.MustRegister(depth) - Registry.MustRegister(adds) - Registry.MustRegister(latency) - Registry.MustRegister(workDuration) - Registry.MustRegister(unfinished) - Registry.MustRegister(longestRunningProcessor) - Registry.MustRegister(retries) - - workqueue.SetProvider(workqueueMetricsProvider{}) -} - -type workqueueMetricsProvider struct{} - -func (workqueueMetricsProvider) NewDepthMetric(name string) workqueue.GaugeMetric { - return depth.WithLabelValues(name, name) -} - -func (workqueueMetricsProvider) NewAddsMetric(name string) workqueue.CounterMetric { - return adds.WithLabelValues(name, name) -} - -func (workqueueMetricsProvider) NewLatencyMetric(name string) workqueue.HistogramMetric { - return latency.WithLabelValues(name, name) -} - -func (workqueueMetricsProvider) NewWorkDurationMetric(name string) workqueue.HistogramMetric { - return workDuration.WithLabelValues(name, name) -} - -func (workqueueMetricsProvider) NewUnfinishedWorkSecondsMetric(name string) workqueue.SettableGaugeMetric { - return unfinished.WithLabelValues(name, name) -} - -func (workqueueMetricsProvider) NewLongestRunningProcessorSecondsMetric(name string) workqueue.SettableGaugeMetric { - return longestRunningProcessor.WithLabelValues(name, name) -} - -func (workqueueMetricsProvider) NewRetriesMetric(name string) workqueue.CounterMetric { - return retries.WithLabelValues(name, name) -} diff --git a/vendor/sigs.k8s.io/controller-runtime/pkg/predicate/predicate.go b/vendor/sigs.k8s.io/controller-runtime/pkg/predicate/predicate.go index 90918db57..ce33975f3 100644 --- a/vendor/sigs.k8s.io/controller-runtime/pkg/predicate/predicate.go +++ b/vendor/sigs.k8s.io/controller-runtime/pkg/predicate/predicate.go @@ -173,7 +173,8 @@ func (TypedResourceVersionChangedPredicate[T]) Update(e event.TypedUpdateEvent[T // The metadata.generation field of an object is incremented by the API server when writes are made to the spec field of an object. // This allows a controller to ignore update events where the spec is unchanged, and only the metadata and/or status fields are changed. // -// For CustomResource objects the Generation is only incremented when the status subresource is enabled. +// For CustomResource objects the Generation is incremented when spec is changed, or status changed and status not modeled as subresource. +// subresource status update will not increase Generation. // // Caveats: // @@ -191,7 +192,8 @@ type GenerationChangedPredicate = TypedGenerationChangedPredicate[client.Object] // The metadata.generation field of an object is incremented by the API server when writes are made to the spec field of an object. // This allows a controller to ignore update events where the spec is unchanged, and only the metadata and/or status fields are changed. // -// For CustomResource objects the Generation is only incremented when the status subresource is enabled. +// For CustomResource objects the Generation is incremented when spec is changed, or status changed and status not modeled as subresource. +// subresource status update will not increase Generation. // // Caveats: // diff --git a/vendor/sigs.k8s.io/controller-runtime/pkg/webhook/admission/defaulter.go b/vendor/sigs.k8s.io/controller-runtime/pkg/webhook/admission/defaulter.go deleted file mode 100644 index efbbf6028..000000000 --- a/vendor/sigs.k8s.io/controller-runtime/pkg/webhook/admission/defaulter.go +++ /dev/null @@ -1,84 +0,0 @@ -/* -Copyright 2018 The Kubernetes Authors. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package admission - -import ( - "context" - "encoding/json" - "net/http" - - admissionv1 "k8s.io/api/admission/v1" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/runtime" -) - -// Defaulter defines functions for setting defaults on resources. -// Deprecated: Ue CustomDefaulter instead. -type Defaulter interface { - runtime.Object - Default() -} - -// DefaultingWebhookFor creates a new Webhook for Defaulting the provided type. -// Deprecated: Use WithCustomDefaulter instead. -func DefaultingWebhookFor(scheme *runtime.Scheme, defaulter Defaulter) *Webhook { - return &Webhook{ - Handler: &mutatingHandler{defaulter: defaulter, decoder: NewDecoder(scheme)}, - } -} - -type mutatingHandler struct { - defaulter Defaulter - decoder Decoder -} - -// Handle handles admission requests. -func (h *mutatingHandler) Handle(ctx context.Context, req Request) Response { - if h.decoder == nil { - panic("decoder should never be nil") - } - if h.defaulter == nil { - panic("defaulter should never be nil") - } - - // always skip when a DELETE operation received in mutation handler - // describe in https://github.com/kubernetes-sigs/controller-runtime/issues/1762 - if req.Operation == admissionv1.Delete { - return Response{AdmissionResponse: admissionv1.AdmissionResponse{ - Allowed: true, - Result: &metav1.Status{ - Code: http.StatusOK, - }, - }} - } - - // Get the object in the request - obj := h.defaulter.DeepCopyObject().(Defaulter) - if err := h.decoder.Decode(req, obj); err != nil { - return Errored(http.StatusBadRequest, err) - } - - // Default the object - obj.Default() - marshalled, err := json.Marshal(obj) - if err != nil { - return Errored(http.StatusInternalServerError, err) - } - - // Create the patch - return PatchResponseFromRaw(req.Object.Raw, marshalled) -} diff --git a/vendor/sigs.k8s.io/controller-runtime/pkg/webhook/admission/defaulter_custom.go b/vendor/sigs.k8s.io/controller-runtime/pkg/webhook/admission/defaulter_custom.go index d15dec7a0..a703cbd2c 100644 --- a/vendor/sigs.k8s.io/controller-runtime/pkg/webhook/admission/defaulter_custom.go +++ b/vendor/sigs.k8s.io/controller-runtime/pkg/webhook/admission/defaulter_custom.go @@ -21,11 +21,14 @@ import ( "encoding/json" "errors" "net/http" + "slices" + "gomodules.xyz/jsonpatch/v2" admissionv1 "k8s.io/api/admission/v1" apierrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/util/sets" ) // CustomDefaulter defines functions for setting defaults on resources. @@ -33,17 +36,41 @@ type CustomDefaulter interface { Default(ctx context.Context, obj runtime.Object) error } +type defaulterOptions struct { + removeUnknownOrOmitableFields bool +} + +// DefaulterOption defines the type of a CustomDefaulter's option +type DefaulterOption func(*defaulterOptions) + +// DefaulterRemoveUnknownOrOmitableFields makes the defaulter prune fields that are in the json object retrieved by the +// webhook but not in the local go type json representation. This happens for example when the CRD in the apiserver has +// fields that our go type doesn't know about, because it's outdated, or the field has a zero value and is `omitempty`. +func DefaulterRemoveUnknownOrOmitableFields(o *defaulterOptions) { + o.removeUnknownOrOmitableFields = true +} + // WithCustomDefaulter creates a new Webhook for a CustomDefaulter interface. -func WithCustomDefaulter(scheme *runtime.Scheme, obj runtime.Object, defaulter CustomDefaulter) *Webhook { +func WithCustomDefaulter(scheme *runtime.Scheme, obj runtime.Object, defaulter CustomDefaulter, opts ...DefaulterOption) *Webhook { + options := &defaulterOptions{} + for _, o := range opts { + o(options) + } return &Webhook{ - Handler: &defaulterForType{object: obj, defaulter: defaulter, decoder: NewDecoder(scheme)}, + Handler: &defaulterForType{ + object: obj, + defaulter: defaulter, + decoder: NewDecoder(scheme), + removeUnknownOrOmitableFields: options.removeUnknownOrOmitableFields, + }, } } type defaulterForType struct { - defaulter CustomDefaulter - object runtime.Object - decoder Decoder + defaulter CustomDefaulter + object runtime.Object + decoder Decoder + removeUnknownOrOmitableFields bool } // Handle handles admission requests. @@ -76,6 +103,12 @@ func (h *defaulterForType) Handle(ctx context.Context, req Request) Response { return Errored(http.StatusBadRequest, err) } + // Keep a copy of the object if needed + var originalObj runtime.Object + if !h.removeUnknownOrOmitableFields { + originalObj = obj.DeepCopyObject() + } + // Default the object if err := h.defaulter.Default(ctx, obj); err != nil { var apiStatus apierrors.APIStatus @@ -90,5 +123,43 @@ func (h *defaulterForType) Handle(ctx context.Context, req Request) Response { if err != nil { return Errored(http.StatusInternalServerError, err) } - return PatchResponseFromRaw(req.Object.Raw, marshalled) + + handlerResponse := PatchResponseFromRaw(req.Object.Raw, marshalled) + if !h.removeUnknownOrOmitableFields { + handlerResponse = h.dropSchemeRemovals(handlerResponse, originalObj, req.Object.Raw) + } + return handlerResponse +} + +func (h *defaulterForType) dropSchemeRemovals(r Response, original runtime.Object, raw []byte) Response { + const opRemove = "remove" + if !r.Allowed || r.PatchType == nil { + return r + } + + // If we don't have removals in the patch. + if !slices.ContainsFunc(r.Patches, func(o jsonpatch.JsonPatchOperation) bool { return o.Operation == opRemove }) { + return r + } + + // Get the raw to original patch + marshalledOriginal, err := json.Marshal(original) + if err != nil { + return Errored(http.StatusInternalServerError, err) + } + + patchOriginal, err := jsonpatch.CreatePatch(raw, marshalledOriginal) + if err != nil { + return Errored(http.StatusInternalServerError, err) + } + removedByScheme := sets.New(slices.DeleteFunc(patchOriginal, func(p jsonpatch.JsonPatchOperation) bool { return p.Operation != opRemove })...) + + r.Patches = slices.DeleteFunc(r.Patches, func(p jsonpatch.JsonPatchOperation) bool { + return p.Operation == opRemove && removedByScheme.Has(p) + }) + + if len(r.Patches) == 0 { + r.PatchType = nil + } + return r } diff --git a/vendor/sigs.k8s.io/controller-runtime/pkg/webhook/admission/validator.go b/vendor/sigs.k8s.io/controller-runtime/pkg/webhook/admission/validator.go deleted file mode 100644 index b28a56eef..000000000 --- a/vendor/sigs.k8s.io/controller-runtime/pkg/webhook/admission/validator.go +++ /dev/null @@ -1,127 +0,0 @@ -/* -Copyright 2018 The Kubernetes Authors. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package admission - -import ( - "context" - "errors" - "fmt" - "net/http" - - v1 "k8s.io/api/admission/v1" - apierrors "k8s.io/apimachinery/pkg/api/errors" - "k8s.io/apimachinery/pkg/runtime" -) - -// Warnings represents warning messages. -type Warnings []string - -// Validator defines functions for validating an operation. -// The custom resource kind which implements this interface can validate itself. -// To validate the custom resource with another specific struct, use CustomValidator instead. -// Deprecated: Use CustomValidator instead. -type Validator interface { - runtime.Object - - // ValidateCreate validates the object on creation. - // The optional warnings will be added to the response as warning messages. - // Return an error if the object is invalid. - ValidateCreate() (warnings Warnings, err error) - - // ValidateUpdate validates the object on update. The oldObj is the object before the update. - // The optional warnings will be added to the response as warning messages. - // Return an error if the object is invalid. - ValidateUpdate(old runtime.Object) (warnings Warnings, err error) - - // ValidateDelete validates the object on deletion. - // The optional warnings will be added to the response as warning messages. - // Return an error if the object is invalid. - ValidateDelete() (warnings Warnings, err error) -} - -// ValidatingWebhookFor creates a new Webhook for validating the provided type. -// Deprecated: Use WithCustomValidator instead. -func ValidatingWebhookFor(scheme *runtime.Scheme, validator Validator) *Webhook { - return &Webhook{ - Handler: &validatingHandler{validator: validator, decoder: NewDecoder(scheme)}, - } -} - -type validatingHandler struct { - validator Validator - decoder Decoder -} - -// Handle handles admission requests. -func (h *validatingHandler) Handle(ctx context.Context, req Request) Response { - if h.decoder == nil { - panic("decoder should never be nil") - } - if h.validator == nil { - panic("validator should never be nil") - } - // Get the object in the request - obj := h.validator.DeepCopyObject().(Validator) - - var err error - var warnings []string - - switch req.Operation { - case v1.Connect: - // No validation for connect requests. - // TODO(vincepri): Should we validate CONNECT requests? In what cases? - case v1.Create: - if err = h.decoder.Decode(req, obj); err != nil { - return Errored(http.StatusBadRequest, err) - } - - warnings, err = obj.ValidateCreate() - case v1.Update: - oldObj := obj.DeepCopyObject() - - err = h.decoder.DecodeRaw(req.Object, obj) - if err != nil { - return Errored(http.StatusBadRequest, err) - } - err = h.decoder.DecodeRaw(req.OldObject, oldObj) - if err != nil { - return Errored(http.StatusBadRequest, err) - } - - warnings, err = obj.ValidateUpdate(oldObj) - case v1.Delete: - // In reference to PR: https://github.com/kubernetes/kubernetes/pull/76346 - // OldObject contains the object being deleted - err = h.decoder.DecodeRaw(req.OldObject, obj) - if err != nil { - return Errored(http.StatusBadRequest, err) - } - - warnings, err = obj.ValidateDelete() - default: - return Errored(http.StatusBadRequest, fmt.Errorf("unknown operation %q", req.Operation)) - } - - if err != nil { - var apiStatus apierrors.APIStatus - if errors.As(err, &apiStatus) { - return validationResponseFromStatus(false, apiStatus.Status()).WithWarnings(warnings...) - } - return Denied(err.Error()).WithWarnings(warnings...) - } - return Allowed("").WithWarnings(warnings...) -} diff --git a/vendor/sigs.k8s.io/controller-runtime/pkg/webhook/admission/validator_custom.go b/vendor/sigs.k8s.io/controller-runtime/pkg/webhook/admission/validator_custom.go index b8f194401..ef1be52a8 100644 --- a/vendor/sigs.k8s.io/controller-runtime/pkg/webhook/admission/validator_custom.go +++ b/vendor/sigs.k8s.io/controller-runtime/pkg/webhook/admission/validator_custom.go @@ -27,6 +27,9 @@ import ( "k8s.io/apimachinery/pkg/runtime" ) +// Warnings represents warning messages. +type Warnings []string + // CustomValidator defines functions for validating an operation. // The object to be validated is passed into methods as a parameter. type CustomValidator interface { diff --git a/vendor/sigs.k8s.io/controller-runtime/pkg/webhook/alias.go b/vendor/sigs.k8s.io/controller-runtime/pkg/webhook/alias.go index e8439e2ea..2882e7bab 100644 --- a/vendor/sigs.k8s.io/controller-runtime/pkg/webhook/alias.go +++ b/vendor/sigs.k8s.io/controller-runtime/pkg/webhook/alias.go @@ -23,14 +23,6 @@ import ( // define some aliases for common bits of the webhook functionality -// Defaulter defines functions for setting defaults on resources. -// Deprecated: Use CustomDefaulter instead. -type Defaulter = admission.Defaulter - -// Validator defines functions for validating an operation. -// Deprecated: Use CustomValidator instead. -type Validator = admission.Validator - // CustomDefaulter defines functions for setting defaults on resources. type CustomDefaulter = admission.CustomDefaulter diff --git a/vendor/sigs.k8s.io/controller-runtime/pkg/webhook/server.go b/vendor/sigs.k8s.io/controller-runtime/pkg/webhook/server.go index f8820e8b7..4d8ae9ec7 100644 --- a/vendor/sigs.k8s.io/controller-runtime/pkg/webhook/server.go +++ b/vendor/sigs.k8s.io/controller-runtime/pkg/webhook/server.go @@ -190,7 +190,7 @@ func (s *DefaultServer) Start(ctx context.Context) error { log.Info("Starting webhook server") - cfg := &tls.Config{ //nolint:gosec + cfg := &tls.Config{ NextProtos: []string{"h2"}, } // fallback TLS config ready, will now mutate if passer wants full control over it @@ -272,7 +272,7 @@ func (s *DefaultServer) Start(ctx context.Context) error { // server has been started. func (s *DefaultServer) StartedChecker() healthz.Checker { config := &tls.Config{ - InsecureSkipVerify: true, //nolint:gosec // config is used to connect to our own webhook port. + InsecureSkipVerify: true, } return func(req *http.Request) error { s.mu.Lock()