mirror of
https://github.com/ceph/ceph-csi.git
synced 2025-06-13 10:33:35 +00:00
rebase: Bump sigs.k8s.io/controller-runtime from 0.15.1 to 0.16.0
Bumps [sigs.k8s.io/controller-runtime](https://github.com/kubernetes-sigs/controller-runtime) from 0.15.1 to 0.16.0. - [Release notes](https://github.com/kubernetes-sigs/controller-runtime/releases) - [Changelog](https://github.com/kubernetes-sigs/controller-runtime/blob/main/RELEASE.md) - [Commits](https://github.com/kubernetes-sigs/controller-runtime/compare/v0.15.1...v0.16.0) --- updated-dependencies: - dependency-name: sigs.k8s.io/controller-runtime dependency-type: direct:production update-type: version-update:semver-minor ... Signed-off-by: dependabot[bot] <support@github.com>
This commit is contained in:
committed by
mergify[bot]
parent
97d9f701ec
commit
a51516501c
337
vendor/sigs.k8s.io/controller-runtime/pkg/cache/cache.go
generated
vendored
337
vendor/sigs.k8s.io/controller-runtime/pkg/cache/cache.go
generated
vendored
@ -22,8 +22,8 @@ import (
|
||||
"net/http"
|
||||
"time"
|
||||
|
||||
corev1 "k8s.io/api/core/v1"
|
||||
"k8s.io/apimachinery/pkg/api/meta"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/apimachinery/pkg/fields"
|
||||
"k8s.io/apimachinery/pkg/labels"
|
||||
"k8s.io/apimachinery/pkg/runtime"
|
||||
@ -31,6 +31,7 @@ import (
|
||||
"k8s.io/client-go/kubernetes/scheme"
|
||||
"k8s.io/client-go/rest"
|
||||
toolscache "k8s.io/client-go/tools/cache"
|
||||
"k8s.io/utils/pointer"
|
||||
|
||||
"sigs.k8s.io/controller-runtime/pkg/cache/internal"
|
||||
"sigs.k8s.io/controller-runtime/pkg/client"
|
||||
@ -43,14 +44,28 @@ var (
|
||||
defaultSyncPeriod = 10 * time.Hour
|
||||
)
|
||||
|
||||
// InformerGetOptions defines the behavior of how informers are retrieved.
|
||||
type InformerGetOptions internal.GetOptions
|
||||
|
||||
// InformerGetOption defines an option that alters the behavior of how informers are retrieved.
|
||||
type InformerGetOption func(*InformerGetOptions)
|
||||
|
||||
// BlockUntilSynced determines whether a get request for an informer should block
|
||||
// until the informer's cache has synced.
|
||||
func BlockUntilSynced(shouldBlock bool) InformerGetOption {
|
||||
return func(opts *InformerGetOptions) {
|
||||
opts.BlockUntilSynced = &shouldBlock
|
||||
}
|
||||
}
|
||||
|
||||
// Cache knows how to load Kubernetes objects, fetch informers to request
|
||||
// to receive events for Kubernetes objects (at a low-level),
|
||||
// and add indices to fields on the objects stored in the cache.
|
||||
type Cache interface {
|
||||
// Cache acts as a client to objects stored in the cache.
|
||||
// Reader acts as a client to objects stored in the cache.
|
||||
client.Reader
|
||||
|
||||
// Cache loads informers and adds field indices.
|
||||
// Informers loads informers and adds field indices.
|
||||
Informers
|
||||
}
|
||||
|
||||
@ -60,49 +75,53 @@ type Cache interface {
|
||||
type Informers interface {
|
||||
// GetInformer fetches or constructs an informer for the given object that corresponds to a single
|
||||
// API kind and resource.
|
||||
GetInformer(ctx context.Context, obj client.Object) (Informer, error)
|
||||
GetInformer(ctx context.Context, obj client.Object, opts ...InformerGetOption) (Informer, error)
|
||||
|
||||
// GetInformerForKind is similar to GetInformer, except that it takes a group-version-kind, instead
|
||||
// of the underlying object.
|
||||
GetInformerForKind(ctx context.Context, gvk schema.GroupVersionKind) (Informer, error)
|
||||
GetInformerForKind(ctx context.Context, gvk schema.GroupVersionKind, opts ...InformerGetOption) (Informer, error)
|
||||
|
||||
// Start runs all the informers known to this cache until the context is closed.
|
||||
// It blocks.
|
||||
Start(ctx context.Context) error
|
||||
|
||||
// WaitForCacheSync waits for all the caches to sync. Returns false if it could not sync a cache.
|
||||
// WaitForCacheSync waits for all the caches to sync. Returns false if it could not sync a cache.
|
||||
WaitForCacheSync(ctx context.Context) bool
|
||||
|
||||
// Informers knows how to add indices to the caches (informers) that it manages.
|
||||
// FieldIndexer adds indices to the managed informers.
|
||||
client.FieldIndexer
|
||||
}
|
||||
|
||||
// Informer - informer allows you interact with the underlying informer.
|
||||
// Informer allows you to interact with the underlying informer.
|
||||
type Informer interface {
|
||||
// AddEventHandler adds an event handler to the shared informer using the shared informer's resync
|
||||
// period. Events to a single handler are delivered sequentially, but there is no coordination
|
||||
// period. Events to a single handler are delivered sequentially, but there is no coordination
|
||||
// between different handlers.
|
||||
// It returns a registration handle for the handler that can be used to remove
|
||||
// the handler again.
|
||||
// the handler again and an error if the handler cannot be added.
|
||||
AddEventHandler(handler toolscache.ResourceEventHandler) (toolscache.ResourceEventHandlerRegistration, error)
|
||||
|
||||
// AddEventHandlerWithResyncPeriod adds an event handler to the shared informer using the
|
||||
// specified resync period. Events to a single handler are delivered sequentially, but there is
|
||||
// specified resync period. Events to a single handler are delivered sequentially, but there is
|
||||
// no coordination between different handlers.
|
||||
// It returns a registration handle for the handler that can be used to remove
|
||||
// the handler again and an error if the handler cannot be added.
|
||||
AddEventHandlerWithResyncPeriod(handler toolscache.ResourceEventHandler, resyncPeriod time.Duration) (toolscache.ResourceEventHandlerRegistration, error)
|
||||
// RemoveEventHandler removes a formerly added event handler given by
|
||||
|
||||
// RemoveEventHandler removes a previously added event handler given by
|
||||
// its registration handle.
|
||||
// This function is guaranteed to be idempotent, and thread-safe.
|
||||
// This function is guaranteed to be idempotent and thread-safe.
|
||||
RemoveEventHandler(handle toolscache.ResourceEventHandlerRegistration) error
|
||||
// AddIndexers adds more indexers to this store. If you call this after you already have data
|
||||
|
||||
// AddIndexers adds indexers to this store. If this is called after there is already data
|
||||
// in the store, the results are undefined.
|
||||
AddIndexers(indexers toolscache.Indexers) error
|
||||
|
||||
// HasSynced return true if the informers underlying store has synced.
|
||||
HasSynced() bool
|
||||
}
|
||||
|
||||
// Options are the optional arguments for creating a new InformersMap object.
|
||||
// Options are the optional arguments for creating a new Cache object.
|
||||
type Options struct {
|
||||
// HTTPClient is the http client to use for the REST client
|
||||
HTTPClient *http.Client
|
||||
@ -140,45 +159,80 @@ type Options struct {
|
||||
// instead of `reconcile.Result{}`.
|
||||
SyncPeriod *time.Duration
|
||||
|
||||
// Namespaces restricts the cache's ListWatch to the desired namespaces
|
||||
// Default watches all namespaces
|
||||
Namespaces []string
|
||||
// ReaderFailOnMissingInformer configures the cache to return a ErrResourceNotCached error when a user
|
||||
// requests, using Get() and List(), a resource the cache does not already have an informer for.
|
||||
//
|
||||
// This error is distinct from an errors.NotFound.
|
||||
//
|
||||
// Defaults to false, which means that the cache will start a new informer
|
||||
// for every new requested resource.
|
||||
ReaderFailOnMissingInformer bool
|
||||
|
||||
// DefaultLabelSelector will be used as a label selectors for all object types
|
||||
// unless they have a more specific selector set in ByObject.
|
||||
// DefaultNamespaces maps namespace names to cache configs. If set, only
|
||||
// the namespaces in here will be watched and it will by used to default
|
||||
// ByObject.Namespaces for all objects if that is nil.
|
||||
//
|
||||
// The options in the Config that are nil will be defaulted from
|
||||
// the respective Default* settings.
|
||||
DefaultNamespaces map[string]Config
|
||||
|
||||
// DefaultLabelSelector will be used as a label selector for all objects
|
||||
// unless there is already one set in ByObject or DefaultNamespaces.
|
||||
DefaultLabelSelector labels.Selector
|
||||
|
||||
// DefaultFieldSelector will be used as a field selectors for all object types
|
||||
// unless they have a more specific selector set in ByObject.
|
||||
// DefaultFieldSelector will be used as a field selector for all object types
|
||||
// unless there is already one set in ByObject or DefaultNamespaces.
|
||||
DefaultFieldSelector fields.Selector
|
||||
|
||||
// DefaultTransform will be used as transform for all object types
|
||||
// unless they have a more specific transform set in ByObject.
|
||||
// unless there is already one set in ByObject or DefaultNamespaces.
|
||||
DefaultTransform toolscache.TransformFunc
|
||||
|
||||
// ByObject restricts the cache's ListWatch to the desired fields per GVK at the specified object.
|
||||
ByObject map[client.Object]ByObject
|
||||
|
||||
// UnsafeDisableDeepCopy indicates not to deep copy objects during get or
|
||||
// list objects for EVERY object.
|
||||
// DefaultUnsafeDisableDeepCopy is the default for UnsafeDisableDeepCopy
|
||||
// for everything that doesn't specify this.
|
||||
//
|
||||
// Be very careful with this, when enabled you must DeepCopy any object before mutating it,
|
||||
// otherwise you will mutate the object in the cache.
|
||||
//
|
||||
// This is a global setting for all objects, and can be overridden by the ByObject setting.
|
||||
UnsafeDisableDeepCopy *bool
|
||||
// This will be used for all object types, unless it is set in ByObject or
|
||||
// DefaultNamespaces.
|
||||
DefaultUnsafeDisableDeepCopy *bool
|
||||
|
||||
// ByObject restricts the cache's ListWatch to the desired fields per GVK at the specified object.
|
||||
// object, this will fall through to Default* settings.
|
||||
ByObject map[client.Object]ByObject
|
||||
|
||||
// newInformer allows overriding of NewSharedIndexInformer for testing.
|
||||
newInformer *func(toolscache.ListerWatcher, runtime.Object, time.Duration, toolscache.Indexers) toolscache.SharedIndexInformer
|
||||
}
|
||||
|
||||
// ByObject offers more fine-grained control over the cache's ListWatch by object.
|
||||
type ByObject struct {
|
||||
// Namespaces maps a namespace name to cache configs. If set, only the
|
||||
// namespaces in this map will be cached.
|
||||
//
|
||||
// Settings in the map value that are unset will be defaulted.
|
||||
// Use an empty value for the specific setting to prevent that.
|
||||
//
|
||||
// A nil map allows to default this to the cache's DefaultNamespaces setting.
|
||||
// An empty map prevents this and means that all namespaces will be cached.
|
||||
//
|
||||
// The defaulting follows the following precedence order:
|
||||
// 1. ByObject
|
||||
// 2. DefaultNamespaces[namespace]
|
||||
// 3. Default*
|
||||
//
|
||||
// This must be unset for cluster-scoped objects.
|
||||
Namespaces map[string]Config
|
||||
|
||||
// Label represents a label selector for the object.
|
||||
Label labels.Selector
|
||||
|
||||
// Field represents a field selector for the object.
|
||||
Field fields.Selector
|
||||
|
||||
// Transform is a map from objects to transformer functions which
|
||||
// get applied when objects of the transformation are about to be committed
|
||||
// to cache.
|
||||
// Transform is a transformer function for the object which gets applied
|
||||
// when objects of the transformation are about to be committed to the cache.
|
||||
//
|
||||
// This function is called both for new objects to enter the cache,
|
||||
// and for updated objects.
|
||||
@ -191,48 +245,120 @@ type ByObject struct {
|
||||
UnsafeDisableDeepCopy *bool
|
||||
}
|
||||
|
||||
// Config describes all potential options for a given watch.
|
||||
type Config struct {
|
||||
// LabelSelector specifies a label selector. A nil value allows to
|
||||
// default this.
|
||||
//
|
||||
// Set to labels.Everything() if you don't want this defaulted.
|
||||
LabelSelector labels.Selector
|
||||
|
||||
// FieldSelector specifics a field selector. A nil value allows to
|
||||
// default this.
|
||||
//
|
||||
// Set to fields.Everything() if you don't want this defaulted.
|
||||
FieldSelector fields.Selector
|
||||
|
||||
// Transform specifies a transform func. A nil value allows to default
|
||||
// this.
|
||||
//
|
||||
// Set to an empty func to prevent this:
|
||||
// func(in interface{}) (interface{}, error) { return in, nil }
|
||||
Transform toolscache.TransformFunc
|
||||
|
||||
// UnsafeDisableDeepCopy specifies if List and Get requests against the
|
||||
// cache should not DeepCopy. A nil value allows to default this.
|
||||
UnsafeDisableDeepCopy *bool
|
||||
}
|
||||
|
||||
// NewCacheFunc - Function for creating a new cache from the options and a rest config.
|
||||
type NewCacheFunc func(config *rest.Config, opts Options) (Cache, error)
|
||||
|
||||
// New initializes and returns a new Cache.
|
||||
func New(config *rest.Config, opts Options) (Cache, error) {
|
||||
if len(opts.Namespaces) == 0 {
|
||||
opts.Namespaces = []string{metav1.NamespaceAll}
|
||||
}
|
||||
if len(opts.Namespaces) > 1 {
|
||||
return newMultiNamespaceCache(config, opts)
|
||||
}
|
||||
|
||||
opts, err := defaultOpts(config, opts)
|
||||
func New(cfg *rest.Config, opts Options) (Cache, error) {
|
||||
opts, err := defaultOpts(cfg, opts)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
byGVK, err := convertToInformerOptsByGVK(opts.ByObject, opts.Scheme)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
newCacheFunc := newCache(cfg, opts)
|
||||
|
||||
var defaultCache Cache
|
||||
if len(opts.DefaultNamespaces) > 0 {
|
||||
defaultConfig := optionDefaultsToConfig(&opts)
|
||||
defaultCache = newMultiNamespaceCache(newCacheFunc, opts.Scheme, opts.Mapper, opts.DefaultNamespaces, &defaultConfig)
|
||||
} else {
|
||||
defaultCache = newCacheFunc(optionDefaultsToConfig(&opts), corev1.NamespaceAll)
|
||||
}
|
||||
// Set the default selector and transform.
|
||||
byGVK[schema.GroupVersionKind{}] = internal.InformersOptsByGVK{
|
||||
Selector: internal.Selector{
|
||||
Label: opts.DefaultLabelSelector,
|
||||
Field: opts.DefaultFieldSelector,
|
||||
},
|
||||
|
||||
if len(opts.ByObject) == 0 {
|
||||
return defaultCache, nil
|
||||
}
|
||||
|
||||
delegating := &delegatingByGVKCache{
|
||||
scheme: opts.Scheme,
|
||||
caches: make(map[schema.GroupVersionKind]Cache, len(opts.ByObject)),
|
||||
defaultCache: defaultCache,
|
||||
}
|
||||
|
||||
for obj, config := range opts.ByObject {
|
||||
gvk, err := apiutil.GVKForObject(obj, opts.Scheme)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to get GVK for type %T: %w", obj, err)
|
||||
}
|
||||
var cache Cache
|
||||
if len(config.Namespaces) > 0 {
|
||||
cache = newMultiNamespaceCache(newCacheFunc, opts.Scheme, opts.Mapper, config.Namespaces, nil)
|
||||
} else {
|
||||
cache = newCacheFunc(byObjectToConfig(config), corev1.NamespaceAll)
|
||||
}
|
||||
delegating.caches[gvk] = cache
|
||||
}
|
||||
|
||||
return delegating, nil
|
||||
}
|
||||
|
||||
func optionDefaultsToConfig(opts *Options) Config {
|
||||
return Config{
|
||||
LabelSelector: opts.DefaultLabelSelector,
|
||||
FieldSelector: opts.DefaultFieldSelector,
|
||||
Transform: opts.DefaultTransform,
|
||||
UnsafeDisableDeepCopy: opts.UnsafeDisableDeepCopy,
|
||||
UnsafeDisableDeepCopy: opts.DefaultUnsafeDisableDeepCopy,
|
||||
}
|
||||
}
|
||||
|
||||
return &informerCache{
|
||||
scheme: opts.Scheme,
|
||||
Informers: internal.NewInformers(config, &internal.InformersOpts{
|
||||
HTTPClient: opts.HTTPClient,
|
||||
Scheme: opts.Scheme,
|
||||
Mapper: opts.Mapper,
|
||||
ResyncPeriod: *opts.SyncPeriod,
|
||||
Namespace: opts.Namespaces[0],
|
||||
ByGVK: byGVK,
|
||||
}),
|
||||
}, nil
|
||||
func byObjectToConfig(byObject ByObject) Config {
|
||||
return Config{
|
||||
LabelSelector: byObject.Label,
|
||||
FieldSelector: byObject.Field,
|
||||
Transform: byObject.Transform,
|
||||
UnsafeDisableDeepCopy: byObject.UnsafeDisableDeepCopy,
|
||||
}
|
||||
}
|
||||
|
||||
type newCacheFunc func(config Config, namespace string) Cache
|
||||
|
||||
func newCache(restConfig *rest.Config, opts Options) newCacheFunc {
|
||||
return func(config Config, namespace string) Cache {
|
||||
return &informerCache{
|
||||
scheme: opts.Scheme,
|
||||
Informers: internal.NewInformers(restConfig, &internal.InformersOpts{
|
||||
HTTPClient: opts.HTTPClient,
|
||||
Scheme: opts.Scheme,
|
||||
Mapper: opts.Mapper,
|
||||
ResyncPeriod: *opts.SyncPeriod,
|
||||
Namespace: namespace,
|
||||
Selector: internal.Selector{
|
||||
Label: config.LabelSelector,
|
||||
Field: config.FieldSelector,
|
||||
},
|
||||
Transform: config.Transform,
|
||||
UnsafeDisableDeepCopy: pointer.BoolDeref(config.UnsafeDisableDeepCopy, false),
|
||||
NewInformer: opts.newInformer,
|
||||
}),
|
||||
readerFailOnMissingInformer: opts.ReaderFailOnMissingInformer,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func defaultOpts(config *rest.Config, opts Options) (Options, error) {
|
||||
@ -241,15 +367,12 @@ func defaultOpts(config *rest.Config, opts Options) (Options, error) {
|
||||
config.UserAgent = rest.DefaultKubernetesUserAgent()
|
||||
}
|
||||
|
||||
logger := log.WithName("setup")
|
||||
|
||||
// Use the rest HTTP client for the provided config if unset
|
||||
if opts.HTTPClient == nil {
|
||||
var err error
|
||||
opts.HTTPClient, err = rest.HTTPClientFor(config)
|
||||
if err != nil {
|
||||
logger.Error(err, "Failed to get HTTP client")
|
||||
return opts, fmt.Errorf("could not create HTTP client from config: %w", err)
|
||||
return Options{}, fmt.Errorf("could not create HTTP client from config: %w", err)
|
||||
}
|
||||
}
|
||||
|
||||
@ -263,11 +386,54 @@ func defaultOpts(config *rest.Config, opts Options) (Options, error) {
|
||||
var err error
|
||||
opts.Mapper, err = apiutil.NewDiscoveryRESTMapper(config, opts.HTTPClient)
|
||||
if err != nil {
|
||||
logger.Error(err, "Failed to get API Group-Resources")
|
||||
return opts, fmt.Errorf("could not create RESTMapper from config: %w", err)
|
||||
return Options{}, fmt.Errorf("could not create RESTMapper from config: %w", err)
|
||||
}
|
||||
}
|
||||
|
||||
for namespace, cfg := range opts.DefaultNamespaces {
|
||||
cfg = defaultConfig(cfg, optionDefaultsToConfig(&opts))
|
||||
opts.DefaultNamespaces[namespace] = cfg
|
||||
}
|
||||
|
||||
for obj, byObject := range opts.ByObject {
|
||||
isNamespaced, err := apiutil.IsObjectNamespaced(obj, opts.Scheme, opts.Mapper)
|
||||
if err != nil {
|
||||
return opts, fmt.Errorf("failed to determine if %T is namespaced: %w", obj, err)
|
||||
}
|
||||
if !isNamespaced && byObject.Namespaces != nil {
|
||||
return opts, fmt.Errorf("type %T is not namespaced, but its ByObject.Namespaces setting is not nil", obj)
|
||||
}
|
||||
|
||||
// Default the namespace-level configs first, because they need to use the undefaulted type-level config.
|
||||
for namespace, config := range byObject.Namespaces {
|
||||
// 1. Default from the undefaulted type-level config
|
||||
config = defaultConfig(config, byObjectToConfig(byObject))
|
||||
|
||||
// 2. Default from the namespace-level config. This was defaulted from the global default config earlier, but
|
||||
// might not have an entry for the current namespace.
|
||||
if defaultNamespaceSettings, hasDefaultNamespace := opts.DefaultNamespaces[namespace]; hasDefaultNamespace {
|
||||
config = defaultConfig(config, defaultNamespaceSettings)
|
||||
}
|
||||
|
||||
// 3. Default from the global defaults
|
||||
config = defaultConfig(config, optionDefaultsToConfig(&opts))
|
||||
|
||||
byObject.Namespaces[namespace] = config
|
||||
}
|
||||
|
||||
defaultedConfig := defaultConfig(byObjectToConfig(byObject), optionDefaultsToConfig(&opts))
|
||||
byObject.Label = defaultedConfig.LabelSelector
|
||||
byObject.Field = defaultedConfig.FieldSelector
|
||||
byObject.Transform = defaultedConfig.Transform
|
||||
byObject.UnsafeDisableDeepCopy = defaultedConfig.UnsafeDisableDeepCopy
|
||||
|
||||
if byObject.Namespaces == nil {
|
||||
byObject.Namespaces = opts.DefaultNamespaces
|
||||
}
|
||||
|
||||
opts.ByObject[obj] = byObject
|
||||
}
|
||||
|
||||
// Default the resync period to 10 hours if unset
|
||||
if opts.SyncPeriod == nil {
|
||||
opts.SyncPeriod = &defaultSyncPeriod
|
||||
@ -275,24 +441,19 @@ func defaultOpts(config *rest.Config, opts Options) (Options, error) {
|
||||
return opts, nil
|
||||
}
|
||||
|
||||
func convertToInformerOptsByGVK(in map[client.Object]ByObject, scheme *runtime.Scheme) (map[schema.GroupVersionKind]internal.InformersOptsByGVK, error) {
|
||||
out := map[schema.GroupVersionKind]internal.InformersOptsByGVK{}
|
||||
for object, byObject := range in {
|
||||
gvk, err := apiutil.GVKForObject(object, scheme)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if _, ok := out[gvk]; ok {
|
||||
return nil, fmt.Errorf("duplicate cache options for GVK %v, cache.Options.ByObject has multiple types with the same GroupVersionKind", gvk)
|
||||
}
|
||||
out[gvk] = internal.InformersOptsByGVK{
|
||||
Selector: internal.Selector{
|
||||
Field: byObject.Field,
|
||||
Label: byObject.Label,
|
||||
},
|
||||
Transform: byObject.Transform,
|
||||
UnsafeDisableDeepCopy: byObject.UnsafeDisableDeepCopy,
|
||||
}
|
||||
func defaultConfig(toDefault, defaultFrom Config) Config {
|
||||
if toDefault.LabelSelector == nil {
|
||||
toDefault.LabelSelector = defaultFrom.LabelSelector
|
||||
}
|
||||
return out, nil
|
||||
if toDefault.FieldSelector == nil {
|
||||
toDefault.FieldSelector = defaultFrom.FieldSelector
|
||||
}
|
||||
if toDefault.Transform == nil {
|
||||
toDefault.Transform = defaultFrom.Transform
|
||||
}
|
||||
if toDefault.UnsafeDisableDeepCopy == nil {
|
||||
toDefault.UnsafeDisableDeepCopy = defaultFrom.UnsafeDisableDeepCopy
|
||||
}
|
||||
|
||||
return toDefault
|
||||
}
|
||||
|
127
vendor/sigs.k8s.io/controller-runtime/pkg/cache/delegating_by_gvk_cache.go
generated
vendored
Normal file
127
vendor/sigs.k8s.io/controller-runtime/pkg/cache/delegating_by_gvk_cache.go
generated
vendored
Normal file
@ -0,0 +1,127 @@
|
||||
/*
|
||||
Copyright 2023 The Kubernetes Authors.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package cache
|
||||
|
||||
import (
|
||||
"context"
|
||||
"strings"
|
||||
"sync"
|
||||
|
||||
"golang.org/x/exp/maps"
|
||||
"k8s.io/apimachinery/pkg/runtime"
|
||||
"k8s.io/apimachinery/pkg/runtime/schema"
|
||||
"sigs.k8s.io/controller-runtime/pkg/client"
|
||||
"sigs.k8s.io/controller-runtime/pkg/client/apiutil"
|
||||
)
|
||||
|
||||
// delegatingByGVKCache delegates to a type-specific cache if present
|
||||
// and uses the defaultCache otherwise.
|
||||
type delegatingByGVKCache struct {
|
||||
scheme *runtime.Scheme
|
||||
caches map[schema.GroupVersionKind]Cache
|
||||
defaultCache Cache
|
||||
}
|
||||
|
||||
func (dbt *delegatingByGVKCache) Get(ctx context.Context, key client.ObjectKey, obj client.Object, opts ...client.GetOption) error {
|
||||
cache, err := dbt.cacheForObject(obj)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
return cache.Get(ctx, key, obj, opts...)
|
||||
}
|
||||
|
||||
func (dbt *delegatingByGVKCache) List(ctx context.Context, list client.ObjectList, opts ...client.ListOption) error {
|
||||
cache, err := dbt.cacheForObject(list)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
return cache.List(ctx, list, opts...)
|
||||
}
|
||||
|
||||
func (dbt *delegatingByGVKCache) GetInformer(ctx context.Context, obj client.Object, opts ...InformerGetOption) (Informer, error) {
|
||||
cache, err := dbt.cacheForObject(obj)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return cache.GetInformer(ctx, obj, opts...)
|
||||
}
|
||||
|
||||
func (dbt *delegatingByGVKCache) GetInformerForKind(ctx context.Context, gvk schema.GroupVersionKind, opts ...InformerGetOption) (Informer, error) {
|
||||
return dbt.cacheForGVK(gvk).GetInformerForKind(ctx, gvk, opts...)
|
||||
}
|
||||
|
||||
func (dbt *delegatingByGVKCache) Start(ctx context.Context) error {
|
||||
allCaches := maps.Values(dbt.caches)
|
||||
allCaches = append(allCaches, dbt.defaultCache)
|
||||
|
||||
wg := &sync.WaitGroup{}
|
||||
errs := make(chan error)
|
||||
for idx := range allCaches {
|
||||
cache := allCaches[idx]
|
||||
wg.Add(1)
|
||||
go func() {
|
||||
defer wg.Done()
|
||||
if err := cache.Start(ctx); err != nil {
|
||||
errs <- err
|
||||
}
|
||||
}()
|
||||
}
|
||||
|
||||
select {
|
||||
case err := <-errs:
|
||||
return err
|
||||
case <-ctx.Done():
|
||||
wg.Wait()
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
||||
func (dbt *delegatingByGVKCache) WaitForCacheSync(ctx context.Context) bool {
|
||||
synced := true
|
||||
for _, cache := range append(maps.Values(dbt.caches), dbt.defaultCache) {
|
||||
if !cache.WaitForCacheSync(ctx) {
|
||||
synced = false
|
||||
}
|
||||
}
|
||||
|
||||
return synced
|
||||
}
|
||||
|
||||
func (dbt *delegatingByGVKCache) IndexField(ctx context.Context, obj client.Object, field string, extractValue client.IndexerFunc) error {
|
||||
cache, err := dbt.cacheForObject(obj)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
return cache.IndexField(ctx, obj, field, extractValue)
|
||||
}
|
||||
|
||||
func (dbt *delegatingByGVKCache) cacheForObject(o runtime.Object) (Cache, error) {
|
||||
gvk, err := apiutil.GVKForObject(o, dbt.scheme)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
gvk.Kind = strings.TrimSuffix(gvk.Kind, "List")
|
||||
return dbt.cacheForGVK(gvk), nil
|
||||
}
|
||||
|
||||
func (dbt *delegatingByGVKCache) cacheForGVK(gvk schema.GroupVersionKind) Cache {
|
||||
if specific, hasSpecific := dbt.caches[gvk]; hasSpecific {
|
||||
return specific
|
||||
}
|
||||
|
||||
return dbt.defaultCache
|
||||
}
|
74
vendor/sigs.k8s.io/controller-runtime/pkg/cache/informer_cache.go
generated
vendored
74
vendor/sigs.k8s.io/controller-runtime/pkg/cache/informer_cache.go
generated
vendored
@ -27,6 +27,7 @@ import (
|
||||
"k8s.io/apimachinery/pkg/runtime"
|
||||
"k8s.io/apimachinery/pkg/runtime/schema"
|
||||
"k8s.io/client-go/tools/cache"
|
||||
|
||||
"sigs.k8s.io/controller-runtime/pkg/cache/internal"
|
||||
"sigs.k8s.io/controller-runtime/pkg/client"
|
||||
"sigs.k8s.io/controller-runtime/pkg/client/apiutil"
|
||||
@ -45,11 +46,28 @@ func (*ErrCacheNotStarted) Error() string {
|
||||
return "the cache is not started, can not read objects"
|
||||
}
|
||||
|
||||
var _ error = (*ErrCacheNotStarted)(nil)
|
||||
|
||||
// ErrResourceNotCached indicates that the resource type
|
||||
// the client asked the cache for is not cached, i.e. the
|
||||
// corresponding informer does not exist yet.
|
||||
type ErrResourceNotCached struct {
|
||||
GVK schema.GroupVersionKind
|
||||
}
|
||||
|
||||
// Error returns the error
|
||||
func (r ErrResourceNotCached) Error() string {
|
||||
return fmt.Sprintf("%s is not cached", r.GVK.String())
|
||||
}
|
||||
|
||||
var _ error = (*ErrResourceNotCached)(nil)
|
||||
|
||||
// informerCache is a Kubernetes Object cache populated from internal.Informers.
|
||||
// informerCache wraps internal.Informers.
|
||||
type informerCache struct {
|
||||
scheme *runtime.Scheme
|
||||
*internal.Informers
|
||||
readerFailOnMissingInformer bool
|
||||
}
|
||||
|
||||
// Get implements Reader.
|
||||
@ -59,7 +77,7 @@ func (ic *informerCache) Get(ctx context.Context, key client.ObjectKey, out clie
|
||||
return err
|
||||
}
|
||||
|
||||
started, cache, err := ic.Informers.Get(ctx, gvk, out)
|
||||
started, cache, err := ic.getInformerForKind(ctx, gvk, out)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
@ -67,7 +85,7 @@ func (ic *informerCache) Get(ctx context.Context, key client.ObjectKey, out clie
|
||||
if !started {
|
||||
return &ErrCacheNotStarted{}
|
||||
}
|
||||
return cache.Reader.Get(ctx, key, out)
|
||||
return cache.Reader.Get(ctx, key, out, opts...)
|
||||
}
|
||||
|
||||
// List implements Reader.
|
||||
@ -77,7 +95,7 @@ func (ic *informerCache) List(ctx context.Context, out client.ObjectList, opts .
|
||||
return err
|
||||
}
|
||||
|
||||
started, cache, err := ic.Informers.Get(ctx, *gvk, cacheTypeObj)
|
||||
started, cache, err := ic.getInformerForKind(ctx, *gvk, cacheTypeObj)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
@ -123,33 +141,53 @@ func (ic *informerCache) objectTypeForListObject(list client.ObjectList) (*schem
|
||||
return &gvk, cacheTypeObj, nil
|
||||
}
|
||||
|
||||
// GetInformerForKind returns the informer for the GroupVersionKind.
|
||||
func (ic *informerCache) GetInformerForKind(ctx context.Context, gvk schema.GroupVersionKind) (Informer, error) {
|
||||
func applyGetOptions(opts ...InformerGetOption) *internal.GetOptions {
|
||||
cfg := &InformerGetOptions{}
|
||||
for _, opt := range opts {
|
||||
opt(cfg)
|
||||
}
|
||||
return (*internal.GetOptions)(cfg)
|
||||
}
|
||||
|
||||
// GetInformerForKind returns the informer for the GroupVersionKind. If no informer exists, one will be started.
|
||||
func (ic *informerCache) GetInformerForKind(ctx context.Context, gvk schema.GroupVersionKind, opts ...InformerGetOption) (Informer, error) {
|
||||
// Map the gvk to an object
|
||||
obj, err := ic.scheme.New(gvk)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
_, i, err := ic.Informers.Get(ctx, gvk, obj)
|
||||
_, i, err := ic.Informers.Get(ctx, gvk, obj, applyGetOptions(opts...))
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return i.Informer, err
|
||||
return i.Informer, nil
|
||||
}
|
||||
|
||||
// GetInformer returns the informer for the obj.
|
||||
func (ic *informerCache) GetInformer(ctx context.Context, obj client.Object) (Informer, error) {
|
||||
// GetInformer returns the informer for the obj. If no informer exists, one will be started.
|
||||
func (ic *informerCache) GetInformer(ctx context.Context, obj client.Object, opts ...InformerGetOption) (Informer, error) {
|
||||
gvk, err := apiutil.GVKForObject(obj, ic.scheme)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
_, i, err := ic.Informers.Get(ctx, gvk, obj)
|
||||
_, i, err := ic.Informers.Get(ctx, gvk, obj, applyGetOptions(opts...))
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return i.Informer, err
|
||||
return i.Informer, nil
|
||||
}
|
||||
|
||||
func (ic *informerCache) getInformerForKind(ctx context.Context, gvk schema.GroupVersionKind, obj runtime.Object) (bool, *internal.Cache, error) {
|
||||
if ic.readerFailOnMissingInformer {
|
||||
cache, started, ok := ic.Informers.Peek(gvk, obj)
|
||||
if !ok {
|
||||
return false, nil, &ErrResourceNotCached{GVK: gvk}
|
||||
}
|
||||
return started, cache, nil
|
||||
}
|
||||
|
||||
return ic.Informers.Get(ctx, gvk, obj, &internal.GetOptions{})
|
||||
}
|
||||
|
||||
// NeedLeaderElection implements the LeaderElectionRunnable interface
|
||||
@ -158,11 +196,11 @@ func (ic *informerCache) NeedLeaderElection() bool {
|
||||
return false
|
||||
}
|
||||
|
||||
// IndexField adds an indexer to the underlying cache, using extraction function to get
|
||||
// value(s) from the given field. This index can then be used by passing a field selector
|
||||
// IndexField adds an indexer to the underlying informer, using extractValue function to get
|
||||
// value(s) from the given field. This index can then be used by passing a field selector
|
||||
// to List. For one-to-one compatibility with "normal" field selectors, only return one value.
|
||||
// The values may be anything. They will automatically be prefixed with the namespace of the
|
||||
// given object, if present. The objects passed are guaranteed to be objects of the correct type.
|
||||
// The values may be anything. They will automatically be prefixed with the namespace of the
|
||||
// given object, if present. The objects passed are guaranteed to be objects of the correct type.
|
||||
func (ic *informerCache) IndexField(ctx context.Context, obj client.Object, field string, extractValue client.IndexerFunc) error {
|
||||
informer, err := ic.GetInformer(ctx, obj)
|
||||
if err != nil {
|
||||
@ -171,7 +209,7 @@ func (ic *informerCache) IndexField(ctx context.Context, obj client.Object, fiel
|
||||
return indexByField(informer, field, extractValue)
|
||||
}
|
||||
|
||||
func indexByField(indexer Informer, field string, extractor client.IndexerFunc) error {
|
||||
func indexByField(informer Informer, field string, extractValue client.IndexerFunc) error {
|
||||
indexFunc := func(objRaw interface{}) ([]string, error) {
|
||||
// TODO(directxman12): check if this is the correct type?
|
||||
obj, isObj := objRaw.(client.Object)
|
||||
@ -184,7 +222,7 @@ func indexByField(indexer Informer, field string, extractor client.IndexerFunc)
|
||||
}
|
||||
ns := meta.GetNamespace()
|
||||
|
||||
rawVals := extractor(obj)
|
||||
rawVals := extractValue(obj)
|
||||
var vals []string
|
||||
if ns == "" {
|
||||
// if we're not doubling the keys for the namespaced case, just create a new slice with same length
|
||||
@ -207,5 +245,5 @@ func indexByField(indexer Informer, field string, extractor client.IndexerFunc)
|
||||
return vals, nil
|
||||
}
|
||||
|
||||
return indexer.AddIndexers(cache.Indexers{internal.FieldIndexName(field): indexFunc})
|
||||
return informer.AddIndexers(cache.Indexers{internal.FieldIndexName(field): indexFunc})
|
||||
}
|
||||
|
18
vendor/sigs.k8s.io/controller-runtime/pkg/cache/internal/cache_reader.go
generated
vendored
18
vendor/sigs.k8s.io/controller-runtime/pkg/cache/internal/cache_reader.go
generated
vendored
@ -53,7 +53,7 @@ type CacheReader struct {
|
||||
}
|
||||
|
||||
// Get checks the indexer for the object and writes a copy of it if found.
|
||||
func (c *CacheReader) Get(_ context.Context, key client.ObjectKey, out client.Object, opts ...client.GetOption) error {
|
||||
func (c *CacheReader) Get(_ context.Context, key client.ObjectKey, out client.Object, _ ...client.GetOption) error {
|
||||
if c.scopeName == apimeta.RESTScopeNameRoot {
|
||||
key.Namespace = ""
|
||||
}
|
||||
@ -67,9 +67,9 @@ func (c *CacheReader) Get(_ context.Context, key client.ObjectKey, out client.Ob
|
||||
|
||||
// Not found, return an error
|
||||
if !exists {
|
||||
// Resource gets transformed into Kind in the error anyway, so this is fine
|
||||
return apierrors.NewNotFound(schema.GroupResource{
|
||||
Group: c.groupVersionKind.Group,
|
||||
Group: c.groupVersionKind.Group,
|
||||
// Resource gets set as Kind in the error so this is fine
|
||||
Resource: c.groupVersionKind.Kind,
|
||||
}, key.Name)
|
||||
}
|
||||
@ -111,6 +111,10 @@ func (c *CacheReader) List(_ context.Context, out client.ObjectList, opts ...cli
|
||||
listOpts := client.ListOptions{}
|
||||
listOpts.ApplyOptions(opts)
|
||||
|
||||
if listOpts.Continue != "" {
|
||||
return fmt.Errorf("continue list option is not supported by the cache")
|
||||
}
|
||||
|
||||
switch {
|
||||
case listOpts.FieldSelector != nil:
|
||||
// TODO(directxman12): support more complicated field selectors by
|
||||
@ -119,8 +123,8 @@ func (c *CacheReader) List(_ context.Context, out client.ObjectList, opts ...cli
|
||||
if !requiresExact {
|
||||
return fmt.Errorf("non-exact field matches are not supported by the cache")
|
||||
}
|
||||
// list all objects by the field selector. If this is namespaced and we have one, ask for the
|
||||
// namespaced index key. Otherwise, ask for the non-namespaced variant by using the fake "all namespaces"
|
||||
// list all objects by the field selector. If this is namespaced and we have one, ask for the
|
||||
// namespaced index key. Otherwise, ask for the non-namespaced variant by using the fake "all namespaces"
|
||||
// namespace.
|
||||
objs, err = c.indexer.ByIndex(FieldIndexName(field), KeyToNamespacedKey(listOpts.Namespace, val))
|
||||
case listOpts.Namespace != "":
|
||||
@ -175,7 +179,7 @@ func (c *CacheReader) List(_ context.Context, out client.ObjectList, opts ...cli
|
||||
}
|
||||
|
||||
// objectKeyToStorageKey converts an object key to store key.
|
||||
// It's akin to MetaNamespaceKeyFunc. It's separate from
|
||||
// It's akin to MetaNamespaceKeyFunc. It's separate from
|
||||
// String to allow keeping the key format easily in sync with
|
||||
// MetaNamespaceKeyFunc.
|
||||
func objectKeyToStoreKey(k client.ObjectKey) string {
|
||||
@ -191,7 +195,7 @@ func FieldIndexName(field string) string {
|
||||
return "field:" + field
|
||||
}
|
||||
|
||||
// noNamespaceNamespace is used as the "namespace" when we want to list across all namespaces.
|
||||
// allNamespacesNamespace is used as the "namespace" when we want to list across all namespaces.
|
||||
const allNamespacesNamespace = "__all_namespaces"
|
||||
|
||||
// KeyToNamespacedKey prefixes the given index key with a namespace
|
||||
|
124
vendor/sigs.k8s.io/controller-runtime/pkg/cache/internal/informers.go
generated
vendored
124
vendor/sigs.k8s.io/controller-runtime/pkg/cache/internal/informers.go
generated
vendored
@ -40,24 +40,23 @@ import (
|
||||
|
||||
// InformersOpts configures an InformerMap.
|
||||
type InformersOpts struct {
|
||||
HTTPClient *http.Client
|
||||
Scheme *runtime.Scheme
|
||||
Mapper meta.RESTMapper
|
||||
ResyncPeriod time.Duration
|
||||
Namespace string
|
||||
ByGVK map[schema.GroupVersionKind]InformersOptsByGVK
|
||||
}
|
||||
|
||||
// InformersOptsByGVK configured additional by group version kind (or object)
|
||||
// in an InformerMap.
|
||||
type InformersOptsByGVK struct {
|
||||
HTTPClient *http.Client
|
||||
Scheme *runtime.Scheme
|
||||
Mapper meta.RESTMapper
|
||||
ResyncPeriod time.Duration
|
||||
Namespace string
|
||||
NewInformer *func(cache.ListerWatcher, runtime.Object, time.Duration, cache.Indexers) cache.SharedIndexInformer
|
||||
Selector Selector
|
||||
Transform cache.TransformFunc
|
||||
UnsafeDisableDeepCopy *bool
|
||||
UnsafeDisableDeepCopy bool
|
||||
}
|
||||
|
||||
// NewInformers creates a new InformersMap that can create informers under the hood.
|
||||
func NewInformers(config *rest.Config, options *InformersOpts) *Informers {
|
||||
newInformer := cache.NewSharedIndexInformer
|
||||
if options.NewInformer != nil {
|
||||
newInformer = *options.NewInformer
|
||||
}
|
||||
return &Informers{
|
||||
config: config,
|
||||
httpClient: options.HTTPClient,
|
||||
@ -68,12 +67,15 @@ func NewInformers(config *rest.Config, options *InformersOpts) *Informers {
|
||||
Unstructured: make(map[schema.GroupVersionKind]*Cache),
|
||||
Metadata: make(map[schema.GroupVersionKind]*Cache),
|
||||
},
|
||||
codecs: serializer.NewCodecFactory(options.Scheme),
|
||||
paramCodec: runtime.NewParameterCodec(options.Scheme),
|
||||
resync: options.ResyncPeriod,
|
||||
startWait: make(chan struct{}),
|
||||
namespace: options.Namespace,
|
||||
byGVK: options.ByGVK,
|
||||
codecs: serializer.NewCodecFactory(options.Scheme),
|
||||
paramCodec: runtime.NewParameterCodec(options.Scheme),
|
||||
resync: options.ResyncPeriod,
|
||||
startWait: make(chan struct{}),
|
||||
namespace: options.Namespace,
|
||||
selector: options.Selector,
|
||||
transform: options.Transform,
|
||||
unsafeDisableDeepCopy: options.UnsafeDisableDeepCopy,
|
||||
newInformer: newInformer,
|
||||
}
|
||||
}
|
||||
|
||||
@ -92,6 +94,13 @@ type tracker struct {
|
||||
Metadata map[schema.GroupVersionKind]*Cache
|
||||
}
|
||||
|
||||
// GetOptions provides configuration to customize the behavior when
|
||||
// getting an informer.
|
||||
type GetOptions struct {
|
||||
// BlockUntilSynced controls if the informer retrieval will block until the informer is synced. Defaults to `true`.
|
||||
BlockUntilSynced *bool
|
||||
}
|
||||
|
||||
// Informers create and caches Informers for (runtime.Object, schema.GroupVersionKind) pairs.
|
||||
// It uses a standard parameter codec constructed based on the given generated Scheme.
|
||||
type Informers struct {
|
||||
@ -144,49 +153,15 @@ type Informers struct {
|
||||
// default or empty string means all namespaces
|
||||
namespace string
|
||||
|
||||
byGVK map[schema.GroupVersionKind]InformersOptsByGVK
|
||||
selector Selector
|
||||
transform cache.TransformFunc
|
||||
unsafeDisableDeepCopy bool
|
||||
|
||||
// NewInformer allows overriding of the shared index informer constructor for testing.
|
||||
newInformer func(cache.ListerWatcher, runtime.Object, time.Duration, cache.Indexers) cache.SharedIndexInformer
|
||||
}
|
||||
|
||||
func (ip *Informers) getSelector(gvk schema.GroupVersionKind) Selector {
|
||||
if ip.byGVK == nil {
|
||||
return Selector{}
|
||||
}
|
||||
if res, ok := ip.byGVK[gvk]; ok {
|
||||
return res.Selector
|
||||
}
|
||||
if res, ok := ip.byGVK[schema.GroupVersionKind{}]; ok {
|
||||
return res.Selector
|
||||
}
|
||||
return Selector{}
|
||||
}
|
||||
|
||||
func (ip *Informers) getTransform(gvk schema.GroupVersionKind) cache.TransformFunc {
|
||||
if ip.byGVK == nil {
|
||||
return nil
|
||||
}
|
||||
if res, ok := ip.byGVK[gvk]; ok {
|
||||
return res.Transform
|
||||
}
|
||||
if res, ok := ip.byGVK[schema.GroupVersionKind{}]; ok {
|
||||
return res.Transform
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (ip *Informers) getDisableDeepCopy(gvk schema.GroupVersionKind) bool {
|
||||
if ip.byGVK == nil {
|
||||
return false
|
||||
}
|
||||
if res, ok := ip.byGVK[gvk]; ok && res.UnsafeDisableDeepCopy != nil {
|
||||
return *res.UnsafeDisableDeepCopy
|
||||
}
|
||||
if res, ok := ip.byGVK[schema.GroupVersionKind{}]; ok && res.UnsafeDisableDeepCopy != nil {
|
||||
return *res.UnsafeDisableDeepCopy
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
// Start calls Run on each of the informers and sets started to true. Blocks on the context.
|
||||
// Start calls Run on each of the informers and sets started to true. Blocks on the context.
|
||||
// It doesn't return start because it can't return an error, and it's not a runnable directly.
|
||||
func (ip *Informers) Start(ctx context.Context) error {
|
||||
func() {
|
||||
@ -271,18 +246,19 @@ func (ip *Informers) WaitForCacheSync(ctx context.Context) bool {
|
||||
return cache.WaitForCacheSync(ctx.Done(), ip.getHasSyncedFuncs()...)
|
||||
}
|
||||
|
||||
func (ip *Informers) get(gvk schema.GroupVersionKind, obj runtime.Object) (res *Cache, started bool, ok bool) {
|
||||
// Peek attempts to get the informer for the GVK, but does not start one if one does not exist.
|
||||
func (ip *Informers) Peek(gvk schema.GroupVersionKind, obj runtime.Object) (res *Cache, started bool, ok bool) {
|
||||
ip.mu.RLock()
|
||||
defer ip.mu.RUnlock()
|
||||
i, ok := ip.informersByType(obj)[gvk]
|
||||
return i, ip.started, ok
|
||||
}
|
||||
|
||||
// Get will create a new Informer and add it to the map of specificInformersMap if none exists. Returns
|
||||
// Get will create a new Informer and add it to the map of specificInformersMap if none exists. Returns
|
||||
// the Informer from the map.
|
||||
func (ip *Informers) Get(ctx context.Context, gvk schema.GroupVersionKind, obj runtime.Object) (bool, *Cache, error) {
|
||||
func (ip *Informers) Get(ctx context.Context, gvk schema.GroupVersionKind, obj runtime.Object, opts *GetOptions) (bool, *Cache, error) {
|
||||
// Return the informer if it is found
|
||||
i, started, ok := ip.get(gvk, obj)
|
||||
i, started, ok := ip.Peek(gvk, obj)
|
||||
if !ok {
|
||||
var err error
|
||||
if i, started, err = ip.addInformerToMap(gvk, obj); err != nil {
|
||||
@ -290,7 +266,12 @@ func (ip *Informers) Get(ctx context.Context, gvk schema.GroupVersionKind, obj r
|
||||
}
|
||||
}
|
||||
|
||||
if started && !i.Informer.HasSynced() {
|
||||
shouldBlock := true
|
||||
if opts.BlockUntilSynced != nil {
|
||||
shouldBlock = *opts.BlockUntilSynced
|
||||
}
|
||||
|
||||
if shouldBlock && started && !i.Informer.HasSynced() {
|
||||
// Wait for it to sync before returning the Informer so that folks don't read from a stale cache.
|
||||
if !cache.WaitForCacheSync(ctx.Done(), i.Informer.HasSynced) {
|
||||
return started, nil, apierrors.NewTimeoutError(fmt.Sprintf("failed waiting for %T Informer to sync", obj), 0)
|
||||
@ -311,11 +292,12 @@ func (ip *Informers) informersByType(obj runtime.Object) map[schema.GroupVersion
|
||||
}
|
||||
}
|
||||
|
||||
// addInformerToMap either returns an existing informer or creates a new informer, adds it to the map and returns it.
|
||||
func (ip *Informers) addInformerToMap(gvk schema.GroupVersionKind, obj runtime.Object) (*Cache, bool, error) {
|
||||
ip.mu.Lock()
|
||||
defer ip.mu.Unlock()
|
||||
|
||||
// Check the cache to see if we already have an Informer. If we do, return the Informer.
|
||||
// Check the cache to see if we already have an Informer. If we do, return the Informer.
|
||||
// This is for the case where 2 routines tried to get the informer when it wasn't in the map
|
||||
// so neither returned early, but the first one created it.
|
||||
if i, ok := ip.informersByType(obj)[gvk]; ok {
|
||||
@ -327,13 +309,13 @@ func (ip *Informers) addInformerToMap(gvk schema.GroupVersionKind, obj runtime.O
|
||||
if err != nil {
|
||||
return nil, false, err
|
||||
}
|
||||
sharedIndexInformer := cache.NewSharedIndexInformer(&cache.ListWatch{
|
||||
sharedIndexInformer := ip.newInformer(&cache.ListWatch{
|
||||
ListFunc: func(opts metav1.ListOptions) (runtime.Object, error) {
|
||||
ip.getSelector(gvk).ApplyToList(&opts)
|
||||
ip.selector.ApplyToList(&opts)
|
||||
return listWatcher.ListFunc(opts)
|
||||
},
|
||||
WatchFunc: func(opts metav1.ListOptions) (watch.Interface, error) {
|
||||
ip.getSelector(gvk).ApplyToList(&opts)
|
||||
ip.selector.ApplyToList(&opts)
|
||||
opts.Watch = true // Watch needs to be set to true separately
|
||||
return listWatcher.WatchFunc(opts)
|
||||
},
|
||||
@ -342,7 +324,7 @@ func (ip *Informers) addInformerToMap(gvk schema.GroupVersionKind, obj runtime.O
|
||||
})
|
||||
|
||||
// Check to see if there is a transformer for this gvk
|
||||
if err := sharedIndexInformer.SetTransform(ip.getTransform(gvk)); err != nil {
|
||||
if err := sharedIndexInformer.SetTransform(ip.transform); err != nil {
|
||||
return nil, false, err
|
||||
}
|
||||
|
||||
@ -358,7 +340,7 @@ func (ip *Informers) addInformerToMap(gvk schema.GroupVersionKind, obj runtime.O
|
||||
indexer: sharedIndexInformer.GetIndexer(),
|
||||
groupVersionKind: gvk,
|
||||
scopeName: mapping.Scope.Name(),
|
||||
disableDeepCopy: ip.getDisableDeepCopy(gvk),
|
||||
disableDeepCopy: ip.unsafeDisableDeepCopy,
|
||||
},
|
||||
}
|
||||
ip.informersByType(obj)[gvk] = i
|
||||
@ -382,7 +364,7 @@ func (ip *Informers) makeListWatcher(gvk schema.GroupVersionKind, obj runtime.Ob
|
||||
// Figure out if the GVK we're dealing with is global, or namespace scoped.
|
||||
var namespace string
|
||||
if mapping.Scope.Name() == meta.RESTScopeNameNamespace {
|
||||
namespace = restrictNamespaceBySelector(ip.namespace, ip.getSelector(gvk))
|
||||
namespace = restrictNamespaceBySelector(ip.namespace, ip.selector)
|
||||
}
|
||||
|
||||
switch obj.(type) {
|
||||
|
55
vendor/sigs.k8s.io/controller-runtime/pkg/cache/internal/transformers.go
generated
vendored
55
vendor/sigs.k8s.io/controller-runtime/pkg/cache/internal/transformers.go
generated
vendored
@ -1,55 +0,0 @@
|
||||
package internal
|
||||
|
||||
import (
|
||||
"k8s.io/apimachinery/pkg/runtime"
|
||||
"k8s.io/apimachinery/pkg/runtime/schema"
|
||||
"k8s.io/client-go/tools/cache"
|
||||
|
||||
"sigs.k8s.io/controller-runtime/pkg/client/apiutil"
|
||||
)
|
||||
|
||||
// TransformFuncByGVK provides access to the correct transform function for
|
||||
// any given GVK.
|
||||
type TransformFuncByGVK interface {
|
||||
Set(runtime.Object, *runtime.Scheme, cache.TransformFunc) error
|
||||
Get(schema.GroupVersionKind) cache.TransformFunc
|
||||
SetDefault(transformer cache.TransformFunc)
|
||||
}
|
||||
|
||||
type transformFuncByGVK struct {
|
||||
defaultTransform cache.TransformFunc
|
||||
transformers map[schema.GroupVersionKind]cache.TransformFunc
|
||||
}
|
||||
|
||||
// TransformFuncByGVKFromMap creates a TransformFuncByGVK from a map that
|
||||
// maps GVKs to TransformFuncs.
|
||||
func TransformFuncByGVKFromMap(in map[schema.GroupVersionKind]cache.TransformFunc) TransformFuncByGVK {
|
||||
byGVK := &transformFuncByGVK{}
|
||||
if defaultFunc, hasDefault := in[schema.GroupVersionKind{}]; hasDefault {
|
||||
byGVK.defaultTransform = defaultFunc
|
||||
}
|
||||
delete(in, schema.GroupVersionKind{})
|
||||
byGVK.transformers = in
|
||||
return byGVK
|
||||
}
|
||||
|
||||
func (t *transformFuncByGVK) SetDefault(transformer cache.TransformFunc) {
|
||||
t.defaultTransform = transformer
|
||||
}
|
||||
|
||||
func (t *transformFuncByGVK) Set(obj runtime.Object, scheme *runtime.Scheme, transformer cache.TransformFunc) error {
|
||||
gvk, err := apiutil.GVKForObject(obj, scheme)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
t.transformers[gvk] = transformer
|
||||
return nil
|
||||
}
|
||||
|
||||
func (t transformFuncByGVK) Get(gvk schema.GroupVersionKind) cache.TransformFunc {
|
||||
if val, ok := t.transformers[gvk]; ok {
|
||||
return val
|
||||
}
|
||||
return t.defaultTransform
|
||||
}
|
160
vendor/sigs.k8s.io/controller-runtime/pkg/cache/multi_namespace_cache.go
generated
vendored
160
vendor/sigs.k8s.io/controller-runtime/pkg/cache/multi_namespace_cache.go
generated
vendored
@ -25,8 +25,8 @@ import (
|
||||
apimeta "k8s.io/apimachinery/pkg/api/meta"
|
||||
"k8s.io/apimachinery/pkg/runtime"
|
||||
"k8s.io/apimachinery/pkg/runtime/schema"
|
||||
"k8s.io/client-go/rest"
|
||||
toolscache "k8s.io/client-go/tools/cache"
|
||||
|
||||
"sigs.k8s.io/controller-runtime/pkg/client"
|
||||
"sigs.k8s.io/controller-runtime/pkg/client/apiutil"
|
||||
)
|
||||
@ -34,49 +34,31 @@ import (
|
||||
// a new global namespaced cache to handle cluster scoped resources.
|
||||
const globalCache = "_cluster-scope"
|
||||
|
||||
// MultiNamespacedCacheBuilder - Builder function to create a new multi-namespaced cache.
|
||||
// This will scope the cache to a list of namespaces. Listing for all namespaces
|
||||
// will list for all the namespaces that this knows about. By default this will create
|
||||
// a global cache for cluster scoped resource. Note that this is not intended
|
||||
// to be used for excluding namespaces, this is better done via a Predicate. Also note that
|
||||
// you may face performance issues when using this with a high number of namespaces.
|
||||
//
|
||||
// Deprecated: Use cache.Options.Namespaces instead.
|
||||
func MultiNamespacedCacheBuilder(namespaces []string) NewCacheFunc {
|
||||
return func(config *rest.Config, opts Options) (Cache, error) {
|
||||
opts.Namespaces = namespaces
|
||||
return newMultiNamespaceCache(config, opts)
|
||||
}
|
||||
}
|
||||
|
||||
func newMultiNamespaceCache(config *rest.Config, opts Options) (Cache, error) {
|
||||
if len(opts.Namespaces) < 2 {
|
||||
return nil, fmt.Errorf("must specify more than one namespace to use multi-namespace cache")
|
||||
}
|
||||
opts, err := defaultOpts(config, opts)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
func newMultiNamespaceCache(
|
||||
newCache newCacheFunc,
|
||||
scheme *runtime.Scheme,
|
||||
restMapper apimeta.RESTMapper,
|
||||
namespaces map[string]Config,
|
||||
globalConfig *Config, // may be nil in which case no cache for cluster-scoped objects will be created
|
||||
) Cache {
|
||||
// Create every namespace cache.
|
||||
caches := map[string]Cache{}
|
||||
for _, ns := range opts.Namespaces {
|
||||
opts.Namespaces = []string{ns}
|
||||
c, err := New(config, opts)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
caches[ns] = c
|
||||
for namespace, config := range namespaces {
|
||||
caches[namespace] = newCache(config, namespace)
|
||||
}
|
||||
|
||||
// Create a cache for cluster scoped resources.
|
||||
opts.Namespaces = []string{}
|
||||
gCache, err := New(config, opts)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("error creating global cache: %w", err)
|
||||
// Create a cache for cluster scoped resources if requested
|
||||
var clusterCache Cache
|
||||
if globalConfig != nil {
|
||||
clusterCache = newCache(*globalConfig, corev1.NamespaceAll)
|
||||
}
|
||||
|
||||
return &multiNamespaceCache{namespaceToCache: caches, Scheme: opts.Scheme, RESTMapper: opts.Mapper, clusterCache: gCache}, nil
|
||||
return &multiNamespaceCache{
|
||||
namespaceToCache: caches,
|
||||
Scheme: scheme,
|
||||
RESTMapper: restMapper,
|
||||
clusterCache: clusterCache,
|
||||
}
|
||||
}
|
||||
|
||||
// multiNamespaceCache knows how to handle multiple namespaced caches
|
||||
@ -84,90 +66,96 @@ func newMultiNamespaceCache(config *rest.Config, opts Options) (Cache, error) {
|
||||
// operator to a list of namespaces instead of watching every namespace
|
||||
// in the cluster.
|
||||
type multiNamespaceCache struct {
|
||||
namespaceToCache map[string]Cache
|
||||
Scheme *runtime.Scheme
|
||||
RESTMapper apimeta.RESTMapper
|
||||
namespaceToCache map[string]Cache
|
||||
clusterCache Cache
|
||||
}
|
||||
|
||||
var _ Cache = &multiNamespaceCache{}
|
||||
|
||||
// Methods for multiNamespaceCache to conform to the Informers interface.
|
||||
func (c *multiNamespaceCache) GetInformer(ctx context.Context, obj client.Object) (Informer, error) {
|
||||
informers := map[string]Informer{}
|
||||
|
||||
// If the object is clusterscoped, get the informer from clusterCache,
|
||||
func (c *multiNamespaceCache) GetInformer(ctx context.Context, obj client.Object, opts ...InformerGetOption) (Informer, error) {
|
||||
// If the object is cluster scoped, get the informer from clusterCache,
|
||||
// if not use the namespaced caches.
|
||||
isNamespaced, err := apiutil.IsObjectNamespaced(obj, c.Scheme, c.RESTMapper)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if !isNamespaced {
|
||||
clusterCacheInf, err := c.clusterCache.GetInformer(ctx, obj)
|
||||
clusterCacheInformer, err := c.clusterCache.GetInformer(ctx, obj, opts...)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
informers[globalCache] = clusterCacheInf
|
||||
|
||||
return &multiNamespaceInformer{namespaceToInformer: informers}, nil
|
||||
return &multiNamespaceInformer{
|
||||
namespaceToInformer: map[string]Informer{
|
||||
globalCache: clusterCacheInformer,
|
||||
},
|
||||
}, nil
|
||||
}
|
||||
|
||||
namespaceToInformer := map[string]Informer{}
|
||||
for ns, cache := range c.namespaceToCache {
|
||||
informer, err := cache.GetInformer(ctx, obj)
|
||||
informer, err := cache.GetInformer(ctx, obj, opts...)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
informers[ns] = informer
|
||||
namespaceToInformer[ns] = informer
|
||||
}
|
||||
|
||||
return &multiNamespaceInformer{namespaceToInformer: informers}, nil
|
||||
return &multiNamespaceInformer{namespaceToInformer: namespaceToInformer}, nil
|
||||
}
|
||||
|
||||
func (c *multiNamespaceCache) GetInformerForKind(ctx context.Context, gvk schema.GroupVersionKind) (Informer, error) {
|
||||
informers := map[string]Informer{}
|
||||
|
||||
// If the object is clusterscoped, get the informer from clusterCache,
|
||||
func (c *multiNamespaceCache) GetInformerForKind(ctx context.Context, gvk schema.GroupVersionKind, opts ...InformerGetOption) (Informer, error) {
|
||||
// If the object is cluster scoped, get the informer from clusterCache,
|
||||
// if not use the namespaced caches.
|
||||
isNamespaced, err := apiutil.IsGVKNamespaced(gvk, c.RESTMapper)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if !isNamespaced {
|
||||
clusterCacheInf, err := c.clusterCache.GetInformerForKind(ctx, gvk)
|
||||
clusterCacheInformer, err := c.clusterCache.GetInformerForKind(ctx, gvk, opts...)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
informers[globalCache] = clusterCacheInf
|
||||
|
||||
return &multiNamespaceInformer{namespaceToInformer: informers}, nil
|
||||
return &multiNamespaceInformer{
|
||||
namespaceToInformer: map[string]Informer{
|
||||
globalCache: clusterCacheInformer,
|
||||
},
|
||||
}, nil
|
||||
}
|
||||
|
||||
namespaceToInformer := map[string]Informer{}
|
||||
for ns, cache := range c.namespaceToCache {
|
||||
informer, err := cache.GetInformerForKind(ctx, gvk)
|
||||
informer, err := cache.GetInformerForKind(ctx, gvk, opts...)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
informers[ns] = informer
|
||||
namespaceToInformer[ns] = informer
|
||||
}
|
||||
|
||||
return &multiNamespaceInformer{namespaceToInformer: informers}, nil
|
||||
return &multiNamespaceInformer{namespaceToInformer: namespaceToInformer}, nil
|
||||
}
|
||||
|
||||
func (c *multiNamespaceCache) Start(ctx context.Context) error {
|
||||
// start global cache
|
||||
go func() {
|
||||
err := c.clusterCache.Start(ctx)
|
||||
if err != nil {
|
||||
log.Error(err, "cluster scoped cache failed to start")
|
||||
}
|
||||
}()
|
||||
if c.clusterCache != nil {
|
||||
go func() {
|
||||
err := c.clusterCache.Start(ctx)
|
||||
if err != nil {
|
||||
log.Error(err, "cluster scoped cache failed to start")
|
||||
}
|
||||
}()
|
||||
}
|
||||
|
||||
// start namespaced caches
|
||||
for ns, cache := range c.namespaceToCache {
|
||||
go func(ns string, cache Cache) {
|
||||
err := cache.Start(ctx)
|
||||
if err != nil {
|
||||
log.Error(err, "multinamespace cache failed to start namespaced informer", "namespace", ns)
|
||||
if err := cache.Start(ctx); err != nil {
|
||||
log.Error(err, "multi-namespace cache failed to start namespaced informer", "namespace", ns)
|
||||
}
|
||||
}(ns, cache)
|
||||
}
|
||||
@ -179,13 +167,13 @@ func (c *multiNamespaceCache) Start(ctx context.Context) error {
|
||||
func (c *multiNamespaceCache) WaitForCacheSync(ctx context.Context) bool {
|
||||
synced := true
|
||||
for _, cache := range c.namespaceToCache {
|
||||
if s := cache.WaitForCacheSync(ctx); !s {
|
||||
synced = s
|
||||
if !cache.WaitForCacheSync(ctx) {
|
||||
synced = false
|
||||
}
|
||||
}
|
||||
|
||||
// check if cluster scoped cache has synced
|
||||
if !c.clusterCache.WaitForCacheSync(ctx) {
|
||||
if c.clusterCache != nil && !c.clusterCache.WaitForCacheSync(ctx) {
|
||||
synced = false
|
||||
}
|
||||
return synced
|
||||
@ -224,7 +212,7 @@ func (c *multiNamespaceCache) Get(ctx context.Context, key client.ObjectKey, obj
|
||||
if !ok {
|
||||
return fmt.Errorf("unable to get: %v because of unknown namespace for the cache", key)
|
||||
}
|
||||
return cache.Get(ctx, key, obj)
|
||||
return cache.Get(ctx, key, obj, opts...)
|
||||
}
|
||||
|
||||
// List multi namespace cache will get all the objects in the namespaces that the cache is watching if asked for all namespaces.
|
||||
@ -245,7 +233,7 @@ func (c *multiNamespaceCache) List(ctx context.Context, list client.ObjectList,
|
||||
if listOpts.Namespace != corev1.NamespaceAll {
|
||||
cache, ok := c.namespaceToCache[listOpts.Namespace]
|
||||
if !ok {
|
||||
return fmt.Errorf("unable to get: %v because of unknown namespace for the cache", listOpts.Namespace)
|
||||
return fmt.Errorf("unable to list: %v because of unknown namespace for the cache", listOpts.Namespace)
|
||||
}
|
||||
return cache.List(ctx, list, opts...)
|
||||
}
|
||||
@ -278,12 +266,14 @@ func (c *multiNamespaceCache) List(ctx context.Context, list client.ObjectList,
|
||||
return fmt.Errorf("object: %T must be a list type", list)
|
||||
}
|
||||
allItems = append(allItems, items...)
|
||||
|
||||
// The last list call should have the most correct resource version.
|
||||
resourceVersion = accessor.GetResourceVersion()
|
||||
if limitSet {
|
||||
// decrement Limit by the number of items
|
||||
// fetched from the current namespace.
|
||||
listOpts.Limit -= int64(len(items))
|
||||
|
||||
// if a Limit was set and the number of
|
||||
// items read has reached this set limit,
|
||||
// then stop reading.
|
||||
@ -325,9 +315,12 @@ func (h handlerRegistration) HasSynced() bool {
|
||||
|
||||
var _ Informer = &multiNamespaceInformer{}
|
||||
|
||||
// AddEventHandler adds the handler to each namespaced informer.
|
||||
// AddEventHandler adds the handler to each informer.
|
||||
func (i *multiNamespaceInformer) AddEventHandler(handler toolscache.ResourceEventHandler) (toolscache.ResourceEventHandlerRegistration, error) {
|
||||
handles := handlerRegistration{handles: make(map[string]toolscache.ResourceEventHandlerRegistration, len(i.namespaceToInformer))}
|
||||
handles := handlerRegistration{
|
||||
handles: make(map[string]toolscache.ResourceEventHandlerRegistration, len(i.namespaceToInformer)),
|
||||
}
|
||||
|
||||
for ns, informer := range i.namespaceToInformer {
|
||||
registration, err := informer.AddEventHandler(handler)
|
||||
if err != nil {
|
||||
@ -335,12 +328,16 @@ func (i *multiNamespaceInformer) AddEventHandler(handler toolscache.ResourceEven
|
||||
}
|
||||
handles.handles[ns] = registration
|
||||
}
|
||||
|
||||
return handles, nil
|
||||
}
|
||||
|
||||
// AddEventHandlerWithResyncPeriod adds the handler with a resync period to each namespaced informer.
|
||||
func (i *multiNamespaceInformer) AddEventHandlerWithResyncPeriod(handler toolscache.ResourceEventHandler, resyncPeriod time.Duration) (toolscache.ResourceEventHandlerRegistration, error) {
|
||||
handles := handlerRegistration{handles: make(map[string]toolscache.ResourceEventHandlerRegistration, len(i.namespaceToInformer))}
|
||||
handles := handlerRegistration{
|
||||
handles: make(map[string]toolscache.ResourceEventHandlerRegistration, len(i.namespaceToInformer)),
|
||||
}
|
||||
|
||||
for ns, informer := range i.namespaceToInformer {
|
||||
registration, err := informer.AddEventHandlerWithResyncPeriod(handler, resyncPeriod)
|
||||
if err != nil {
|
||||
@ -348,14 +345,15 @@ func (i *multiNamespaceInformer) AddEventHandlerWithResyncPeriod(handler toolsca
|
||||
}
|
||||
handles.handles[ns] = registration
|
||||
}
|
||||
|
||||
return handles, nil
|
||||
}
|
||||
|
||||
// RemoveEventHandler removes a formerly added event handler given by its registration handle.
|
||||
// RemoveEventHandler removes a previously added event handler given by its registration handle.
|
||||
func (i *multiNamespaceInformer) RemoveEventHandler(h toolscache.ResourceEventHandlerRegistration) error {
|
||||
handles, ok := h.(handlerRegistration)
|
||||
if !ok {
|
||||
return fmt.Errorf("it is not the registration returned by multiNamespaceInformer")
|
||||
return fmt.Errorf("registration is not a registration returned by multiNamespaceInformer")
|
||||
}
|
||||
for ns, informer := range i.namespaceToInformer {
|
||||
registration, ok := handles.handles[ns]
|
||||
@ -369,7 +367,7 @@ func (i *multiNamespaceInformer) RemoveEventHandler(h toolscache.ResourceEventHa
|
||||
return nil
|
||||
}
|
||||
|
||||
// AddIndexers adds the indexer for each namespaced informer.
|
||||
// AddIndexers adds the indexers to each informer.
|
||||
func (i *multiNamespaceInformer) AddIndexers(indexers toolscache.Indexers) error {
|
||||
for _, informer := range i.namespaceToInformer {
|
||||
err := informer.AddIndexers(indexers)
|
||||
@ -380,11 +378,11 @@ func (i *multiNamespaceInformer) AddIndexers(indexers toolscache.Indexers) error
|
||||
return nil
|
||||
}
|
||||
|
||||
// HasSynced checks if each namespaced informer has synced.
|
||||
// HasSynced checks if each informer has synced.
|
||||
func (i *multiNamespaceInformer) HasSynced() bool {
|
||||
for _, informer := range i.namespaceToInformer {
|
||||
if ok := informer.HasSynced(); !ok {
|
||||
return ok
|
||||
if !informer.HasSynced() {
|
||||
return false
|
||||
}
|
||||
}
|
||||
return true
|
||||
|
8
vendor/sigs.k8s.io/controller-runtime/pkg/client/client.go
generated
vendored
8
vendor/sigs.k8s.io/controller-runtime/pkg/client/client.go
generated
vendored
@ -77,10 +77,12 @@ type CacheOptions struct {
|
||||
// Reader is a cache-backed reader that will be used to read objects from the cache.
|
||||
// +required
|
||||
Reader Reader
|
||||
// DisableFor is a list of objects that should not be read from the cache.
|
||||
// DisableFor is a list of objects that should never be read from the cache.
|
||||
// Objects configured here always result in a live lookup.
|
||||
DisableFor []Object
|
||||
// Unstructured is a flag that indicates whether the cache-backed client should
|
||||
// read unstructured objects or lists from the cache.
|
||||
// If false, unstructured objects will always result in a live lookup.
|
||||
Unstructured bool
|
||||
}
|
||||
|
||||
@ -342,9 +344,11 @@ func (c *client) Get(ctx context.Context, key ObjectKey, obj Object, opts ...Get
|
||||
if isUncached, err := c.shouldBypassCache(obj); err != nil {
|
||||
return err
|
||||
} else if !isUncached {
|
||||
// Attempt to get from the cache.
|
||||
return c.cache.Get(ctx, key, obj, opts...)
|
||||
}
|
||||
|
||||
// Perform a live lookup.
|
||||
switch obj.(type) {
|
||||
case runtime.Unstructured:
|
||||
return c.unstructuredClient.Get(ctx, key, obj, opts...)
|
||||
@ -362,9 +366,11 @@ func (c *client) List(ctx context.Context, obj ObjectList, opts ...ListOption) e
|
||||
if isUncached, err := c.shouldBypassCache(obj); err != nil {
|
||||
return err
|
||||
} else if !isUncached {
|
||||
// Attempt to get from the cache.
|
||||
return c.cache.List(ctx, obj, opts...)
|
||||
}
|
||||
|
||||
// Perform a live lookup.
|
||||
switch x := obj.(type) {
|
||||
case runtime.Unstructured:
|
||||
return c.unstructuredClient.List(ctx, obj, opts...)
|
||||
|
42
vendor/sigs.k8s.io/controller-runtime/pkg/cluster/cluster.go
generated
vendored
42
vendor/sigs.k8s.io/controller-runtime/pkg/cluster/cluster.go
generated
vendored
@ -28,12 +28,11 @@ import (
|
||||
"k8s.io/client-go/kubernetes/scheme"
|
||||
"k8s.io/client-go/rest"
|
||||
"k8s.io/client-go/tools/record"
|
||||
"k8s.io/utils/pointer"
|
||||
"sigs.k8s.io/controller-runtime/pkg/client/apiutil"
|
||||
logf "sigs.k8s.io/controller-runtime/pkg/internal/log"
|
||||
|
||||
"sigs.k8s.io/controller-runtime/pkg/cache"
|
||||
"sigs.k8s.io/controller-runtime/pkg/client"
|
||||
"sigs.k8s.io/controller-runtime/pkg/client/apiutil"
|
||||
logf "sigs.k8s.io/controller-runtime/pkg/internal/log"
|
||||
intrec "sigs.k8s.io/controller-runtime/pkg/internal/recorder"
|
||||
)
|
||||
|
||||
@ -95,18 +94,10 @@ type Options struct {
|
||||
// value only if you know what you are doing. Defaults to 10 hours if unset.
|
||||
// there will a 10 percent jitter between the SyncPeriod of all controllers
|
||||
// so that all controllers will not send list requests simultaneously.
|
||||
//
|
||||
// Deprecated: Use Cache.SyncPeriod instead.
|
||||
SyncPeriod *time.Duration
|
||||
|
||||
// Namespace if specified restricts the manager's cache to watch objects in
|
||||
// the desired namespace Defaults to all namespaces
|
||||
//
|
||||
// Note: If a namespace is specified, controllers can still Watch for a
|
||||
// cluster-scoped resource (e.g Node). For namespaced resources the cache
|
||||
// will only hold objects from the desired namespace.
|
||||
//
|
||||
// Deprecated: Use Cache.Namespaces instead.
|
||||
Namespace string
|
||||
|
||||
// HTTPClient is the http client that will be used to create the default
|
||||
// Cache and Client. If not set the rest.HTTPClientFor function will be used
|
||||
// to create the http client.
|
||||
@ -141,18 +132,6 @@ type Options struct {
|
||||
// Only use a custom NewClient if you know what you are doing.
|
||||
NewClient client.NewClientFunc
|
||||
|
||||
// ClientDisableCacheFor tells the client that, if any cache is used, to bypass it
|
||||
// for the given objects.
|
||||
//
|
||||
// Deprecated: Use Client.Cache.DisableFor instead.
|
||||
ClientDisableCacheFor []client.Object
|
||||
|
||||
// DryRunClient specifies whether the client should be configured to enforce
|
||||
// dryRun mode.
|
||||
//
|
||||
// Deprecated: Use Client.DryRun instead.
|
||||
DryRunClient bool
|
||||
|
||||
// EventBroadcaster records Events emitted by the manager and sends them to the Kubernetes API
|
||||
// Use this to customize the event correlator and spam filter
|
||||
//
|
||||
@ -218,9 +197,6 @@ func New(config *rest.Config, opts ...Option) (Cluster, error) {
|
||||
if cacheOpts.SyncPeriod == nil {
|
||||
cacheOpts.SyncPeriod = options.SyncPeriod
|
||||
}
|
||||
if len(cacheOpts.Namespaces) == 0 && options.Namespace != "" {
|
||||
cacheOpts.Namespaces = []string{options.Namespace}
|
||||
}
|
||||
}
|
||||
cache, err := options.NewCache(config, cacheOpts)
|
||||
if err != nil {
|
||||
@ -247,16 +223,6 @@ func New(config *rest.Config, opts ...Option) (Cluster, error) {
|
||||
if clientOpts.Cache.Reader == nil {
|
||||
clientOpts.Cache.Reader = cache
|
||||
}
|
||||
|
||||
// For backward compatibility, the ClientDisableCacheFor option should
|
||||
// be appended to the DisableFor option in the client.
|
||||
clientOpts.Cache.DisableFor = append(clientOpts.Cache.DisableFor, options.ClientDisableCacheFor...)
|
||||
|
||||
if clientOpts.DryRun == nil && options.DryRunClient {
|
||||
// For backward compatibility, the DryRunClient (if set) option should override
|
||||
// the DryRun option in the client (if unset).
|
||||
clientOpts.DryRun = pointer.Bool(true)
|
||||
}
|
||||
}
|
||||
clientWriter, err := options.NewClient(config, clientOpts)
|
||||
if err != nil {
|
||||
|
1
vendor/sigs.k8s.io/controller-runtime/pkg/config/v1alpha1/zz_generated.deepcopy.go
generated
vendored
1
vendor/sigs.k8s.io/controller-runtime/pkg/config/v1alpha1/zz_generated.deepcopy.go
generated
vendored
@ -1,5 +1,4 @@
|
||||
//go:build !ignore_autogenerated
|
||||
// +build !ignore_autogenerated
|
||||
|
||||
// Code generated by controller-gen. DO NOT EDIT.
|
||||
|
||||
|
4
vendor/sigs.k8s.io/controller-runtime/pkg/controller/controller.go
generated
vendored
4
vendor/sigs.k8s.io/controller-runtime/pkg/controller/controller.go
generated
vendored
@ -159,7 +159,9 @@ func NewUnmanaged(name string, mgr manager.Manager, options Options) (Controller
|
||||
return &controller.Controller{
|
||||
Do: options.Reconciler,
|
||||
MakeQueue: func() workqueue.RateLimitingInterface {
|
||||
return workqueue.NewNamedRateLimitingQueue(options.RateLimiter, name)
|
||||
return workqueue.NewRateLimitingQueueWithConfig(options.RateLimiter, workqueue.RateLimitingQueueConfig{
|
||||
Name: name,
|
||||
})
|
||||
},
|
||||
MaxConcurrentReconciles: options.MaxConcurrentReconciles,
|
||||
CacheSyncTimeout: options.CacheSyncTimeout,
|
||||
|
8
vendor/sigs.k8s.io/controller-runtime/pkg/internal/controller/controller.go
generated
vendored
8
vendor/sigs.k8s.io/controller-runtime/pkg/internal/controller/controller.go
generated
vendored
@ -28,6 +28,7 @@ import (
|
||||
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
|
||||
"k8s.io/apimachinery/pkg/util/uuid"
|
||||
"k8s.io/client-go/util/workqueue"
|
||||
|
||||
"sigs.k8s.io/controller-runtime/pkg/handler"
|
||||
ctrlmetrics "sigs.k8s.io/controller-runtime/pkg/internal/controller/metrics"
|
||||
logf "sigs.k8s.io/controller-runtime/pkg/log"
|
||||
@ -311,6 +312,7 @@ func (c *Controller) reconcileHandler(ctx context.Context, obj interface{}) {
|
||||
|
||||
// RunInformersAndControllers the syncHandler, passing it the Namespace/Name string of the
|
||||
// resource to be synced.
|
||||
log.V(5).Info("Reconciling")
|
||||
result, err := c.Reconcile(ctx, req)
|
||||
switch {
|
||||
case err != nil:
|
||||
@ -321,8 +323,12 @@ func (c *Controller) reconcileHandler(ctx context.Context, obj interface{}) {
|
||||
}
|
||||
ctrlmetrics.ReconcileErrors.WithLabelValues(c.Name).Inc()
|
||||
ctrlmetrics.ReconcileTotal.WithLabelValues(c.Name, labelError).Inc()
|
||||
if !result.IsZero() {
|
||||
log.Info("Warning: Reconciler returned both a non-zero result and a non-nil error. The result will always be ignored if the error is non-nil and the non-nil error causes reqeueuing with exponential backoff. For more details, see: https://pkg.go.dev/sigs.k8s.io/controller-runtime/pkg/reconcile#Reconciler")
|
||||
}
|
||||
log.Error(err, "Reconciler error")
|
||||
case result.RequeueAfter > 0:
|
||||
log.V(5).Info(fmt.Sprintf("Reconcile done, requeueing after %s", result.RequeueAfter))
|
||||
// The result.RequeueAfter request will be lost, if it is returned
|
||||
// along with a non-nil error. But this is intended as
|
||||
// We need to drive to stable reconcile loops before queuing due
|
||||
@ -331,9 +337,11 @@ func (c *Controller) reconcileHandler(ctx context.Context, obj interface{}) {
|
||||
c.Queue.AddAfter(req, result.RequeueAfter)
|
||||
ctrlmetrics.ReconcileTotal.WithLabelValues(c.Name, labelRequeueAfter).Inc()
|
||||
case result.Requeue:
|
||||
log.V(5).Info("Reconcile done, requeueing")
|
||||
c.Queue.AddRateLimited(req)
|
||||
ctrlmetrics.ReconcileTotal.WithLabelValues(c.Name, labelRequeue).Inc()
|
||||
default:
|
||||
log.V(5).Info("Reconcile successful")
|
||||
// Finally, if no error occurs we Forget this item so it does not
|
||||
// get queued again until another change happens.
|
||||
c.Queue.Forget(obj)
|
||||
|
3
vendor/sigs.k8s.io/controller-runtime/pkg/log/deleg.go
generated
vendored
3
vendor/sigs.k8s.io/controller-runtime/pkg/log/deleg.go
generated
vendored
@ -188,6 +188,9 @@ func (l *delegatingLogSink) WithValues(tags ...interface{}) logr.LogSink {
|
||||
// provided, instead of the temporary initial one, if this method
|
||||
// has not been previously called.
|
||||
func (l *delegatingLogSink) Fulfill(actual logr.LogSink) {
|
||||
if actual == nil {
|
||||
actual = NullLogSink{}
|
||||
}
|
||||
if l.promise != nil {
|
||||
l.promise.Fulfill(actual)
|
||||
}
|
||||
|
11
vendor/sigs.k8s.io/controller-runtime/pkg/log/log.go
generated
vendored
11
vendor/sigs.k8s.io/controller-runtime/pkg/log/log.go
generated
vendored
@ -34,6 +34,7 @@ limitations under the License.
|
||||
package log
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"fmt"
|
||||
"os"
|
||||
@ -56,7 +57,15 @@ func eventuallyFulfillRoot() {
|
||||
}
|
||||
if time.Since(rootLogCreated).Seconds() >= 30 {
|
||||
if logFullfilled.CompareAndSwap(false, true) {
|
||||
fmt.Fprintf(os.Stderr, "[controller-runtime] log.SetLogger(...) was never called, logs will not be displayed:\n%s", debug.Stack())
|
||||
stack := debug.Stack()
|
||||
stackLines := bytes.Count(stack, []byte{'\n'})
|
||||
sep := []byte{'\n', '\t', '>', ' ', ' '}
|
||||
|
||||
fmt.Fprintf(os.Stderr,
|
||||
"[controller-runtime] log.SetLogger(...) was never called; logs will not be displayed.\nDetected at:%s%s", sep,
|
||||
// prefix every line, so it's clear this is a stack trace related to the above message
|
||||
bytes.Replace(stack, []byte{'\n'}, sep, stackLines-1),
|
||||
)
|
||||
SetLogger(logr.New(NullLogSink{}))
|
||||
}
|
||||
}
|
||||
|
126
vendor/sigs.k8s.io/controller-runtime/pkg/manager/internal.go
generated
vendored
126
vendor/sigs.k8s.io/controller-runtime/pkg/manager/internal.go
generated
vendored
@ -28,7 +28,6 @@ import (
|
||||
"time"
|
||||
|
||||
"github.com/go-logr/logr"
|
||||
"github.com/prometheus/client_golang/prometheus/promhttp"
|
||||
"k8s.io/apimachinery/pkg/api/meta"
|
||||
"k8s.io/apimachinery/pkg/runtime"
|
||||
kerrors "k8s.io/apimachinery/pkg/util/errors"
|
||||
@ -44,7 +43,7 @@ import (
|
||||
"sigs.k8s.io/controller-runtime/pkg/healthz"
|
||||
"sigs.k8s.io/controller-runtime/pkg/internal/httpserver"
|
||||
intrec "sigs.k8s.io/controller-runtime/pkg/internal/recorder"
|
||||
"sigs.k8s.io/controller-runtime/pkg/metrics"
|
||||
metricsserver "sigs.k8s.io/controller-runtime/pkg/metrics/server"
|
||||
"sigs.k8s.io/controller-runtime/pkg/webhook"
|
||||
)
|
||||
|
||||
@ -57,7 +56,6 @@ const (
|
||||
|
||||
defaultReadinessEndpoint = "/readyz"
|
||||
defaultLivenessEndpoint = "/healthz"
|
||||
defaultMetricsEndpoint = "/metrics"
|
||||
)
|
||||
|
||||
var _ Runnable = &controllerManager{}
|
||||
@ -84,11 +82,8 @@ type controllerManager struct {
|
||||
// on shutdown
|
||||
leaderElectionReleaseOnCancel bool
|
||||
|
||||
// metricsListener is used to serve prometheus metrics
|
||||
metricsListener net.Listener
|
||||
|
||||
// metricsExtraHandlers contains extra handlers to register on http server that serves metrics.
|
||||
metricsExtraHandlers map[string]http.Handler
|
||||
// metricsServer is used to serve prometheus metrics
|
||||
metricsServer metricsserver.Server
|
||||
|
||||
// healthProbeListener is used to serve liveness probe
|
||||
healthProbeListener net.Listener
|
||||
@ -184,28 +179,6 @@ func (cm *controllerManager) add(r Runnable) error {
|
||||
return cm.runnables.Add(r)
|
||||
}
|
||||
|
||||
// AddMetricsExtraHandler adds extra handler served on path to the http server that serves metrics.
|
||||
func (cm *controllerManager) AddMetricsExtraHandler(path string, handler http.Handler) error {
|
||||
cm.Lock()
|
||||
defer cm.Unlock()
|
||||
|
||||
if cm.started {
|
||||
return fmt.Errorf("unable to add new metrics handler because metrics endpoint has already been created")
|
||||
}
|
||||
|
||||
if path == defaultMetricsEndpoint {
|
||||
return fmt.Errorf("overriding builtin %s endpoint is not allowed", defaultMetricsEndpoint)
|
||||
}
|
||||
|
||||
if _, found := cm.metricsExtraHandlers[path]; found {
|
||||
return fmt.Errorf("can't register extra handler by duplicate path %q on metrics http server", path)
|
||||
}
|
||||
|
||||
cm.metricsExtraHandlers[path] = handler
|
||||
cm.logger.V(2).Info("Registering metrics http server extra handler", "path", path)
|
||||
return nil
|
||||
}
|
||||
|
||||
// AddHealthzCheck allows you to add Healthz checker.
|
||||
func (cm *controllerManager) AddHealthzCheck(name string, check healthz.Checker) error {
|
||||
cm.Lock()
|
||||
@ -296,31 +269,10 @@ func (cm *controllerManager) GetControllerOptions() config.Controller {
|
||||
return cm.controllerConfig
|
||||
}
|
||||
|
||||
func (cm *controllerManager) addMetricsServer() error {
|
||||
func (cm *controllerManager) addHealthProbeServer() error {
|
||||
mux := http.NewServeMux()
|
||||
srv := httpserver.New(mux)
|
||||
|
||||
handler := promhttp.HandlerFor(metrics.Registry, promhttp.HandlerOpts{
|
||||
ErrorHandling: promhttp.HTTPErrorOnError,
|
||||
})
|
||||
// TODO(JoelSpeed): Use existing Kubernetes machinery for serving metrics
|
||||
mux.Handle(defaultMetricsEndpoint, handler)
|
||||
for path, extraHandler := range cm.metricsExtraHandlers {
|
||||
mux.Handle(path, extraHandler)
|
||||
}
|
||||
|
||||
return cm.add(&server{
|
||||
Kind: "metrics",
|
||||
Log: cm.logger.WithValues("path", defaultMetricsEndpoint),
|
||||
Server: srv,
|
||||
Listener: cm.metricsListener,
|
||||
})
|
||||
}
|
||||
|
||||
func (cm *controllerManager) serveHealthProbes() {
|
||||
mux := http.NewServeMux()
|
||||
server := httpserver.New(mux)
|
||||
|
||||
if cm.readyzHandler != nil {
|
||||
mux.Handle(cm.readinessEndpointName, http.StripPrefix(cm.readinessEndpointName, cm.readyzHandler))
|
||||
// Append '/' suffix to handle subpaths
|
||||
@ -332,7 +284,12 @@ func (cm *controllerManager) serveHealthProbes() {
|
||||
mux.Handle(cm.livenessEndpointName+"/", http.StripPrefix(cm.livenessEndpointName, cm.healthzHandler))
|
||||
}
|
||||
|
||||
go cm.httpServe("health probe", cm.logger, server, cm.healthProbeListener)
|
||||
return cm.add(&server{
|
||||
Kind: "health probe",
|
||||
Log: cm.logger,
|
||||
Server: srv,
|
||||
Listener: cm.healthProbeListener,
|
||||
})
|
||||
}
|
||||
|
||||
func (cm *controllerManager) addPprofServer() error {
|
||||
@ -353,42 +310,6 @@ func (cm *controllerManager) addPprofServer() error {
|
||||
})
|
||||
}
|
||||
|
||||
func (cm *controllerManager) httpServe(kind string, log logr.Logger, server *http.Server, ln net.Listener) {
|
||||
log = log.WithValues("kind", kind, "addr", ln.Addr())
|
||||
|
||||
go func() {
|
||||
log.Info("Starting server")
|
||||
if err := server.Serve(ln); err != nil {
|
||||
if errors.Is(err, http.ErrServerClosed) {
|
||||
return
|
||||
}
|
||||
if atomic.LoadInt64(cm.stopProcedureEngaged) > 0 {
|
||||
// There might be cases where connections are still open and we try to shutdown
|
||||
// but not having enough time to close the connection causes an error in Serve
|
||||
//
|
||||
// In that case we want to avoid returning an error to the main error channel.
|
||||
log.Error(err, "error on Serve after stop has been engaged")
|
||||
return
|
||||
}
|
||||
cm.errChan <- err
|
||||
}
|
||||
}()
|
||||
|
||||
// Shutdown the server when stop is closed.
|
||||
<-cm.internalProceduresStop
|
||||
if err := server.Shutdown(cm.shutdownCtx); err != nil {
|
||||
if errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded) {
|
||||
// Avoid logging context related errors.
|
||||
return
|
||||
}
|
||||
if atomic.LoadInt64(cm.stopProcedureEngaged) > 0 {
|
||||
cm.logger.Error(err, "error on Shutdown after stop has been engaged")
|
||||
return
|
||||
}
|
||||
cm.errChan <- err
|
||||
}
|
||||
}
|
||||
|
||||
// Start starts the manager and waits indefinitely.
|
||||
// There is only two ways to have start return:
|
||||
// An error has occurred during in one of the internal operations,
|
||||
@ -441,15 +362,19 @@ func (cm *controllerManager) Start(ctx context.Context) (err error) {
|
||||
// Metrics should be served whether the controller is leader or not.
|
||||
// (If we don't serve metrics for non-leaders, prometheus will still scrape
|
||||
// the pod but will get a connection refused).
|
||||
if cm.metricsListener != nil {
|
||||
if err := cm.addMetricsServer(); err != nil {
|
||||
if cm.metricsServer != nil {
|
||||
// Note: We are adding the metrics server directly to HTTPServers here as matching on the
|
||||
// metricsserver.Server interface in cm.runnables.Add would be very brittle.
|
||||
if err := cm.runnables.HTTPServers.Add(cm.metricsServer, nil); err != nil {
|
||||
return fmt.Errorf("failed to add metrics server: %w", err)
|
||||
}
|
||||
}
|
||||
|
||||
// Serve health probes.
|
||||
if cm.healthProbeListener != nil {
|
||||
cm.serveHealthProbes()
|
||||
if err := cm.addHealthProbeServer(); err != nil {
|
||||
return fmt.Errorf("failed to add health probe server: %w", err)
|
||||
}
|
||||
}
|
||||
|
||||
// Add pprof server
|
||||
@ -459,7 +384,17 @@ func (cm *controllerManager) Start(ctx context.Context) (err error) {
|
||||
}
|
||||
}
|
||||
|
||||
// First start any webhook servers, which includes conversion, validation, and defaulting
|
||||
// First start any internal HTTP servers, which includes health probes, metrics and profiling if enabled.
|
||||
//
|
||||
// WARNING: Internal HTTP servers MUST start before any cache is populated, otherwise it would block
|
||||
// conversion webhooks to be ready for serving which make the cache never get ready.
|
||||
if err := cm.runnables.HTTPServers.Start(cm.internalCtx); err != nil {
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to start HTTP servers: %w", err)
|
||||
}
|
||||
}
|
||||
|
||||
// Start any webhook servers, which includes conversion, validation, and defaulting
|
||||
// webhooks that are registered.
|
||||
//
|
||||
// WARNING: Webhooks MUST start before any cache is populated, otherwise there is a race condition
|
||||
@ -591,10 +526,13 @@ func (cm *controllerManager) engageStopProcedure(stopComplete <-chan struct{}) e
|
||||
cm.logger.Info("Stopping and waiting for caches")
|
||||
cm.runnables.Caches.StopAndWait(cm.shutdownCtx)
|
||||
|
||||
// Webhooks should come last, as they might be still serving some requests.
|
||||
// Webhooks and internal HTTP servers should come last, as they might be still serving some requests.
|
||||
cm.logger.Info("Stopping and waiting for webhooks")
|
||||
cm.runnables.Webhooks.StopAndWait(cm.shutdownCtx)
|
||||
|
||||
cm.logger.Info("Stopping and waiting for HTTP servers")
|
||||
cm.runnables.HTTPServers.StopAndWait(cm.shutdownCtx)
|
||||
|
||||
// Proceed to close the manager and overall shutdown context.
|
||||
cm.logger.Info("Wait completed, proceeding to shutdown the manager")
|
||||
shutdownCancel()
|
||||
|
162
vendor/sigs.k8s.io/controller-runtime/pkg/manager/manager.go
generated
vendored
162
vendor/sigs.k8s.io/controller-runtime/pkg/manager/manager.go
generated
vendored
@ -18,7 +18,6 @@ package manager
|
||||
|
||||
import (
|
||||
"context"
|
||||
"crypto/tls"
|
||||
"errors"
|
||||
"fmt"
|
||||
"net"
|
||||
@ -34,6 +33,7 @@ import (
|
||||
"k8s.io/client-go/tools/leaderelection/resourcelock"
|
||||
"k8s.io/client-go/tools/record"
|
||||
"k8s.io/utils/pointer"
|
||||
metricsserver "sigs.k8s.io/controller-runtime/pkg/metrics/server"
|
||||
|
||||
"sigs.k8s.io/controller-runtime/pkg/cache"
|
||||
"sigs.k8s.io/controller-runtime/pkg/client"
|
||||
@ -44,7 +44,6 @@ import (
|
||||
intrec "sigs.k8s.io/controller-runtime/pkg/internal/recorder"
|
||||
"sigs.k8s.io/controller-runtime/pkg/leaderelection"
|
||||
"sigs.k8s.io/controller-runtime/pkg/log"
|
||||
"sigs.k8s.io/controller-runtime/pkg/metrics"
|
||||
"sigs.k8s.io/controller-runtime/pkg/recorder"
|
||||
"sigs.k8s.io/controller-runtime/pkg/webhook"
|
||||
)
|
||||
@ -66,13 +65,6 @@ type Manager interface {
|
||||
// election was configured.
|
||||
Elected() <-chan struct{}
|
||||
|
||||
// AddMetricsExtraHandler adds an extra handler served on path to the http server that serves metrics.
|
||||
// Might be useful to register some diagnostic endpoints e.g. pprof. Note that these endpoints meant to be
|
||||
// sensitive and shouldn't be exposed publicly.
|
||||
// If the simple path -> handler mapping offered here is not enough, a new http server/listener should be added as
|
||||
// Runnable to the manager via Add method.
|
||||
AddMetricsExtraHandler(path string, handler http.Handler) error
|
||||
|
||||
// AddHealthzCheck allows you to add Healthz checker
|
||||
AddHealthzCheck(name string, check healthz.Checker) error
|
||||
|
||||
@ -141,35 +133,6 @@ type Options struct {
|
||||
// Only use a custom NewClient if you know what you are doing.
|
||||
NewClient client.NewClientFunc
|
||||
|
||||
// SyncPeriod determines the minimum frequency at which watched resources are
|
||||
// reconciled. A lower period will correct entropy more quickly, but reduce
|
||||
// responsiveness to change if there are many watched resources. Change this
|
||||
// value only if you know what you are doing. Defaults to 10 hours if unset.
|
||||
// there will a 10 percent jitter between the SyncPeriod of all controllers
|
||||
// so that all controllers will not send list requests simultaneously.
|
||||
//
|
||||
// This applies to all controllers.
|
||||
//
|
||||
// A period sync happens for two reasons:
|
||||
// 1. To insure against a bug in the controller that causes an object to not
|
||||
// be requeued, when it otherwise should be requeued.
|
||||
// 2. To insure against an unknown bug in controller-runtime, or its dependencies,
|
||||
// that causes an object to not be requeued, when it otherwise should be
|
||||
// requeued, or to be removed from the queue, when it otherwise should not
|
||||
// be removed.
|
||||
//
|
||||
// If you want
|
||||
// 1. to insure against missed watch events, or
|
||||
// 2. to poll services that cannot be watched,
|
||||
// then we recommend that, instead of changing the default period, the
|
||||
// controller requeue, with a constant duration `t`, whenever the controller
|
||||
// is "done" with an object, and would otherwise not requeue it, i.e., we
|
||||
// recommend the `Reconcile` function return `reconcile.Result{RequeueAfter: t}`,
|
||||
// instead of `reconcile.Result{}`.
|
||||
//
|
||||
// Deprecated: Use Cache.SyncPeriod instead.
|
||||
SyncPeriod *time.Duration
|
||||
|
||||
// Logger is the logger that should be used by this manager.
|
||||
// If none is set, it defaults to log.Log global logger.
|
||||
Logger logr.Logger
|
||||
@ -240,27 +203,17 @@ type Options struct {
|
||||
// wait to force acquire leadership. This is measured against time of
|
||||
// last observed ack. Default is 15 seconds.
|
||||
LeaseDuration *time.Duration
|
||||
|
||||
// RenewDeadline is the duration that the acting controlplane will retry
|
||||
// refreshing leadership before giving up. Default is 10 seconds.
|
||||
RenewDeadline *time.Duration
|
||||
|
||||
// RetryPeriod is the duration the LeaderElector clients should wait
|
||||
// between tries of actions. Default is 2 seconds.
|
||||
RetryPeriod *time.Duration
|
||||
|
||||
// Namespace, if specified, restricts the manager's cache to watch objects in
|
||||
// the desired namespace. Defaults to all namespaces.
|
||||
//
|
||||
// Note: If a namespace is specified, controllers can still Watch for a
|
||||
// cluster-scoped resource (e.g Node). For namespaced resources, the cache
|
||||
// will only hold objects from the desired namespace.
|
||||
//
|
||||
// Deprecated: Use Cache.Namespaces instead.
|
||||
Namespace string
|
||||
|
||||
// MetricsBindAddress is the TCP address that the controller should bind to
|
||||
// for serving prometheus metrics.
|
||||
// It can be set to "0" to disable the metrics serving.
|
||||
MetricsBindAddress string
|
||||
// Metrics are the metricsserver.Options that will be used to create the metricsserver.Server.
|
||||
Metrics metricsserver.Options
|
||||
|
||||
// HealthProbeBindAddress is the TCP address that the controller should bind to
|
||||
// for serving health probes
|
||||
@ -280,34 +233,9 @@ type Options struct {
|
||||
// before exposing it to public.
|
||||
PprofBindAddress string
|
||||
|
||||
// Port is the port that the webhook server serves at.
|
||||
// It is used to set webhook.Server.Port if WebhookServer is not set.
|
||||
//
|
||||
// Deprecated: Use WebhookServer instead. A WebhookServer can be created via webhook.NewServer.
|
||||
Port int
|
||||
// Host is the hostname that the webhook server binds to.
|
||||
// It is used to set webhook.Server.Host if WebhookServer is not set.
|
||||
//
|
||||
// Deprecated: Use WebhookServer instead. A WebhookServer can be created via webhook.NewServer.
|
||||
Host string
|
||||
|
||||
// CertDir is the directory that contains the server key and certificate.
|
||||
// If not set, webhook server would look up the server key and certificate in
|
||||
// {TempDir}/k8s-webhook-server/serving-certs. The server key and certificate
|
||||
// must be named tls.key and tls.crt, respectively.
|
||||
// It is used to set webhook.Server.CertDir if WebhookServer is not set.
|
||||
//
|
||||
// Deprecated: Use WebhookServer instead. A WebhookServer can be created via webhook.NewServer.
|
||||
CertDir string
|
||||
|
||||
// TLSOpts is used to allow configuring the TLS config used for the webhook server.
|
||||
//
|
||||
// Deprecated: Use WebhookServer instead. A WebhookServer can be created via webhook.NewServer.
|
||||
TLSOpts []func(*tls.Config)
|
||||
|
||||
// WebhookServer is an externally configured webhook.Server. By default,
|
||||
// a Manager will create a default server using Port, Host, and CertDir;
|
||||
// if this is set, the Manager will use this server instead.
|
||||
// a Manager will create a server via webhook.NewServer with default settings.
|
||||
// If this is set, the Manager will use this server instead.
|
||||
WebhookServer webhook.Server
|
||||
|
||||
// BaseContext is the function that provides Context values to Runnables
|
||||
@ -315,18 +243,6 @@ type Options struct {
|
||||
// will receive a new Background Context instead.
|
||||
BaseContext BaseContextFunc
|
||||
|
||||
// ClientDisableCacheFor tells the client that, if any cache is used, to bypass it
|
||||
// for the given objects.
|
||||
//
|
||||
// Deprecated: Use Client.Cache.DisableCacheFor instead.
|
||||
ClientDisableCacheFor []client.Object
|
||||
|
||||
// DryRunClient specifies whether the client should be configured to enforce
|
||||
// dryRun mode.
|
||||
//
|
||||
// Deprecated: Use Client.DryRun instead.
|
||||
DryRunClient bool
|
||||
|
||||
// EventBroadcaster records Events emitted by the manager and sends them to the Kubernetes API
|
||||
// Use this to customize the event correlator and spam filter
|
||||
//
|
||||
@ -354,7 +270,7 @@ type Options struct {
|
||||
// Dependency injection for testing
|
||||
newRecorderProvider func(config *rest.Config, httpClient *http.Client, scheme *runtime.Scheme, logger logr.Logger, makeBroadcaster intrec.EventBroadcasterProducer) (*intrec.Provider, error)
|
||||
newResourceLock func(config *rest.Config, recorderProvider recorder.Provider, options leaderelection.Options) (resourcelock.Interface, error)
|
||||
newMetricsListener func(addr string) (net.Listener, error)
|
||||
newMetricsServer func(options metricsserver.Options, config *rest.Config, httpClient *http.Client) (metricsserver.Server, error)
|
||||
newHealthProbeListener func(addr string) (net.Listener, error)
|
||||
newPprofListener func(addr string) (net.Listener, error)
|
||||
}
|
||||
@ -391,6 +307,9 @@ type LeaderElectionRunnable interface {
|
||||
}
|
||||
|
||||
// New returns a new Manager for creating Controllers.
|
||||
// Note that if ContentType in the given config is not set, "application/vnd.kubernetes.protobuf"
|
||||
// will be used for all built-in resources of Kubernetes, and "application/json" is for other types
|
||||
// including all CRD resources.
|
||||
func New(config *rest.Config, options Options) (Manager, error) {
|
||||
if config == nil {
|
||||
return nil, errors.New("must specify Config")
|
||||
@ -402,15 +321,11 @@ func New(config *rest.Config, options Options) (Manager, error) {
|
||||
clusterOptions.Scheme = options.Scheme
|
||||
clusterOptions.MapperProvider = options.MapperProvider
|
||||
clusterOptions.Logger = options.Logger
|
||||
clusterOptions.SyncPeriod = options.SyncPeriod
|
||||
clusterOptions.NewCache = options.NewCache
|
||||
clusterOptions.NewClient = options.NewClient
|
||||
clusterOptions.Cache = options.Cache
|
||||
clusterOptions.Client = options.Client
|
||||
clusterOptions.Namespace = options.Namespace //nolint:staticcheck
|
||||
clusterOptions.ClientDisableCacheFor = options.ClientDisableCacheFor //nolint:staticcheck
|
||||
clusterOptions.DryRunClient = options.DryRunClient //nolint:staticcheck
|
||||
clusterOptions.EventBroadcaster = options.EventBroadcaster //nolint:staticcheck
|
||||
clusterOptions.EventBroadcaster = options.EventBroadcaster //nolint:staticcheck
|
||||
})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
@ -459,16 +374,12 @@ func New(config *rest.Config, options Options) (Manager, error) {
|
||||
}
|
||||
}
|
||||
|
||||
// Create the metrics listener. This will throw an error if the metrics bind
|
||||
// address is invalid or already in use.
|
||||
metricsListener, err := options.newMetricsListener(options.MetricsBindAddress)
|
||||
// Create the metrics server.
|
||||
metricsServer, err := options.newMetricsServer(options.Metrics, config, cluster.GetHTTPClient())
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// By default we have no extra endpoints to expose on metrics http server.
|
||||
metricsExtraHandlers := make(map[string]http.Handler)
|
||||
|
||||
// Create health probes listener. This will throw an error if the bind
|
||||
// address is invalid or already in use.
|
||||
healthProbeListener, err := options.newHealthProbeListener(options.HealthProbeBindAddress)
|
||||
@ -493,8 +404,7 @@ func New(config *rest.Config, options Options) (Manager, error) {
|
||||
errChan: errChan,
|
||||
recorderProvider: recorderProvider,
|
||||
resourceLock: resourceLock,
|
||||
metricsListener: metricsListener,
|
||||
metricsExtraHandlers: metricsExtraHandlers,
|
||||
metricsServer: metricsServer,
|
||||
controllerConfig: options.Controller,
|
||||
logger: options.Logger,
|
||||
elected: make(chan struct{}),
|
||||
@ -532,16 +442,16 @@ func (o Options) AndFrom(loader config.ControllerManagerConfiguration) (Options,
|
||||
|
||||
o = o.setLeaderElectionConfig(newObj)
|
||||
|
||||
if o.SyncPeriod == nil && newObj.SyncPeriod != nil {
|
||||
o.SyncPeriod = &newObj.SyncPeriod.Duration
|
||||
if o.Cache.SyncPeriod == nil && newObj.SyncPeriod != nil {
|
||||
o.Cache.SyncPeriod = &newObj.SyncPeriod.Duration
|
||||
}
|
||||
|
||||
if o.Namespace == "" && newObj.CacheNamespace != "" {
|
||||
o.Namespace = newObj.CacheNamespace
|
||||
if len(o.Cache.DefaultNamespaces) == 0 && newObj.CacheNamespace != "" {
|
||||
o.Cache.DefaultNamespaces = map[string]cache.Config{newObj.CacheNamespace: {}}
|
||||
}
|
||||
|
||||
if o.MetricsBindAddress == "" && newObj.Metrics.BindAddress != "" {
|
||||
o.MetricsBindAddress = newObj.Metrics.BindAddress
|
||||
if o.Metrics.BindAddress == "" && newObj.Metrics.BindAddress != "" {
|
||||
o.Metrics.BindAddress = newObj.Metrics.BindAddress
|
||||
}
|
||||
|
||||
if o.HealthProbeBindAddress == "" && newObj.Health.HealthProbeBindAddress != "" {
|
||||
@ -556,20 +466,15 @@ func (o Options) AndFrom(loader config.ControllerManagerConfiguration) (Options,
|
||||
o.LivenessEndpointName = newObj.Health.LivenessEndpointName
|
||||
}
|
||||
|
||||
if o.Port == 0 && newObj.Webhook.Port != nil {
|
||||
o.Port = *newObj.Webhook.Port
|
||||
}
|
||||
if o.Host == "" && newObj.Webhook.Host != "" {
|
||||
o.Host = newObj.Webhook.Host
|
||||
}
|
||||
if o.CertDir == "" && newObj.Webhook.CertDir != "" {
|
||||
o.CertDir = newObj.Webhook.CertDir
|
||||
}
|
||||
if o.WebhookServer == nil {
|
||||
port := 0
|
||||
if newObj.Webhook.Port != nil {
|
||||
port = *newObj.Webhook.Port
|
||||
}
|
||||
o.WebhookServer = webhook.NewServer(webhook.Options{
|
||||
Port: o.Port,
|
||||
Host: o.Host,
|
||||
CertDir: o.CertDir,
|
||||
Port: port,
|
||||
Host: newObj.Webhook.Host,
|
||||
CertDir: newObj.Webhook.CertDir,
|
||||
})
|
||||
}
|
||||
|
||||
@ -697,8 +602,8 @@ func setOptionsDefaults(options Options) Options {
|
||||
}
|
||||
}
|
||||
|
||||
if options.newMetricsListener == nil {
|
||||
options.newMetricsListener = metrics.NewListener
|
||||
if options.newMetricsServer == nil {
|
||||
options.newMetricsServer = metricsserver.NewServer
|
||||
}
|
||||
leaseDuration, renewDeadline, retryPeriod := defaultLeaseDuration, defaultRenewDeadline, defaultRetryPeriod
|
||||
if options.LeaseDuration == nil {
|
||||
@ -743,12 +648,7 @@ func setOptionsDefaults(options Options) Options {
|
||||
}
|
||||
|
||||
if options.WebhookServer == nil {
|
||||
options.WebhookServer = webhook.NewServer(webhook.Options{
|
||||
Host: options.Host,
|
||||
Port: options.Port,
|
||||
CertDir: options.CertDir,
|
||||
TLSOpts: options.TLSOpts,
|
||||
})
|
||||
options.WebhookServer = webhook.NewServer(webhook.Options{})
|
||||
}
|
||||
|
||||
return options
|
||||
|
4
vendor/sigs.k8s.io/controller-runtime/pkg/manager/runnable_group.go
generated
vendored
4
vendor/sigs.k8s.io/controller-runtime/pkg/manager/runnable_group.go
generated
vendored
@ -28,6 +28,7 @@ type runnableCheck func(ctx context.Context) bool
|
||||
// runnables handles all the runnables for a manager by grouping them accordingly to their
|
||||
// type (webhooks, caches etc.).
|
||||
type runnables struct {
|
||||
HTTPServers *runnableGroup
|
||||
Webhooks *runnableGroup
|
||||
Caches *runnableGroup
|
||||
LeaderElection *runnableGroup
|
||||
@ -37,6 +38,7 @@ type runnables struct {
|
||||
// newRunnables creates a new runnables object.
|
||||
func newRunnables(baseContext BaseContextFunc, errChan chan error) *runnables {
|
||||
return &runnables{
|
||||
HTTPServers: newRunnableGroup(baseContext, errChan),
|
||||
Webhooks: newRunnableGroup(baseContext, errChan),
|
||||
Caches: newRunnableGroup(baseContext, errChan),
|
||||
LeaderElection: newRunnableGroup(baseContext, errChan),
|
||||
@ -52,6 +54,8 @@ func newRunnables(baseContext BaseContextFunc, errChan chan error) *runnables {
|
||||
// The runnables added after Start are started directly.
|
||||
func (r *runnables) Add(fn Runnable) error {
|
||||
switch runnable := fn.(type) {
|
||||
case *server:
|
||||
return r.HTTPServers.Add(fn, nil)
|
||||
case hasCache:
|
||||
return r.Caches.Add(fn, func(ctx context.Context) bool {
|
||||
return runnable.GetCache().WaitForCacheSync(ctx)
|
||||
|
52
vendor/sigs.k8s.io/controller-runtime/pkg/metrics/listener.go
generated
vendored
52
vendor/sigs.k8s.io/controller-runtime/pkg/metrics/listener.go
generated
vendored
@ -1,52 +0,0 @@
|
||||
/*
|
||||
Copyright 2018 The Kubernetes Authors.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package metrics
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"net"
|
||||
|
||||
logf "sigs.k8s.io/controller-runtime/pkg/internal/log"
|
||||
)
|
||||
|
||||
var log = logf.RuntimeLog.WithName("metrics")
|
||||
|
||||
// DefaultBindAddress sets the default bind address for the metrics listener
|
||||
// The metrics is on by default.
|
||||
var DefaultBindAddress = ":8080"
|
||||
|
||||
// NewListener creates a new TCP listener bound to the given address.
|
||||
func NewListener(addr string) (net.Listener, error) {
|
||||
if addr == "" {
|
||||
// If the metrics bind address is empty, default to ":8080"
|
||||
addr = DefaultBindAddress
|
||||
}
|
||||
|
||||
// Add a case to disable metrics altogether
|
||||
if addr == "0" {
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
log.Info("Metrics server is starting to listen", "addr", addr)
|
||||
ln, err := net.Listen("tcp", addr)
|
||||
if err != nil {
|
||||
er := fmt.Errorf("error listening on %s: %w", addr, err)
|
||||
log.Error(er, "metrics server failed to listen. You may want to disable the metrics server or use another port if it is due to conflicts")
|
||||
return nil, er
|
||||
}
|
||||
return ln, nil
|
||||
}
|
26
vendor/sigs.k8s.io/controller-runtime/pkg/metrics/server/doc.go
generated
vendored
Normal file
26
vendor/sigs.k8s.io/controller-runtime/pkg/metrics/server/doc.go
generated
vendored
Normal file
@ -0,0 +1,26 @@
|
||||
/*
|
||||
Copyright 2023 The Kubernetes Authors.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
/*
|
||||
Package server provides the metrics server implementation.
|
||||
*/
|
||||
package server
|
||||
|
||||
import (
|
||||
logf "sigs.k8s.io/controller-runtime/pkg/internal/log"
|
||||
)
|
||||
|
||||
var log = logf.RuntimeLog.WithName("metrics")
|
312
vendor/sigs.k8s.io/controller-runtime/pkg/metrics/server/server.go
generated
vendored
Normal file
312
vendor/sigs.k8s.io/controller-runtime/pkg/metrics/server/server.go
generated
vendored
Normal file
@ -0,0 +1,312 @@
|
||||
/*
|
||||
Copyright 2018 The Kubernetes Authors.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package server
|
||||
|
||||
import (
|
||||
"context"
|
||||
"crypto/tls"
|
||||
"fmt"
|
||||
"net"
|
||||
"net/http"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/go-logr/logr"
|
||||
"github.com/prometheus/client_golang/prometheus/promhttp"
|
||||
"k8s.io/client-go/rest"
|
||||
certutil "k8s.io/client-go/util/cert"
|
||||
|
||||
"sigs.k8s.io/controller-runtime/pkg/certwatcher"
|
||||
"sigs.k8s.io/controller-runtime/pkg/internal/httpserver"
|
||||
"sigs.k8s.io/controller-runtime/pkg/metrics"
|
||||
)
|
||||
|
||||
const (
|
||||
defaultMetricsEndpoint = "/metrics"
|
||||
)
|
||||
|
||||
// DefaultBindAddress is the default bind address for the metrics server.
|
||||
var DefaultBindAddress = ":8080"
|
||||
|
||||
// Server is a server that serves metrics.
|
||||
type Server interface {
|
||||
// NeedLeaderElection implements the LeaderElectionRunnable interface, which indicates
|
||||
// the metrics server doesn't need leader election.
|
||||
NeedLeaderElection() bool
|
||||
|
||||
// Start runs the server.
|
||||
// It will install the metrics related resources depending on the server configuration.
|
||||
Start(ctx context.Context) error
|
||||
}
|
||||
|
||||
// Options are all available options for the metrics.Server
|
||||
type Options struct {
|
||||
// SecureServing enables serving metrics via https.
|
||||
// Per default metrics will be served via http.
|
||||
SecureServing bool
|
||||
|
||||
// BindAddress is the bind address for the metrics server.
|
||||
// It will be defaulted to ":8080" if unspecified.
|
||||
// Set this to "0" to disable the metrics server.
|
||||
BindAddress string
|
||||
|
||||
// ExtraHandlers contains a map of handlers (by path) which will be added to the metrics server.
|
||||
// This might be useful to register diagnostic endpoints e.g. pprof.
|
||||
// Note that pprof endpoints are meant to be sensitive and shouldn't be exposed publicly.
|
||||
// If the simple path -> handler mapping offered here is not enough, a new http
|
||||
// server/listener should be added as Runnable to the manager via the Add method.
|
||||
ExtraHandlers map[string]http.Handler
|
||||
|
||||
// FilterProvider provides a filter which is a func that is added around
|
||||
// the metrics and the extra handlers on the metrics server.
|
||||
// This can be e.g. used to enforce authentication and authorization on the handlers
|
||||
// endpoint by setting this field to filters.WithAuthenticationAndAuthorization.
|
||||
FilterProvider func(c *rest.Config, httpClient *http.Client) (Filter, error)
|
||||
|
||||
// CertDir is the directory that contains the server key and certificate. Defaults to
|
||||
// <temp-dir>/k8s-metrics-server/serving-certs.
|
||||
//
|
||||
// Note: This option is only used when TLSOpts does not set GetCertificate.
|
||||
// Note: If certificate or key doesn't exist a self-signed certificate will be used.
|
||||
CertDir string
|
||||
|
||||
// CertName is the server certificate name. Defaults to tls.crt.
|
||||
//
|
||||
// Note: This option is only used when TLSOpts does not set GetCertificate.
|
||||
// Note: If certificate or key doesn't exist a self-signed certificate will be used.
|
||||
CertName string
|
||||
|
||||
// KeyName is the server key name. Defaults to tls.key.
|
||||
//
|
||||
// Note: This option is only used when TLSOpts does not set GetCertificate.
|
||||
// Note: If certificate or key doesn't exist a self-signed certificate will be used.
|
||||
KeyName string
|
||||
|
||||
// TLSOpts is used to allow configuring the TLS config used for the server.
|
||||
// This also allows providing a certificate via GetCertificate.
|
||||
TLSOpts []func(*tls.Config)
|
||||
}
|
||||
|
||||
// Filter is a func that is added around metrics and extra handlers on the metrics server.
|
||||
type Filter func(log logr.Logger, handler http.Handler) (http.Handler, error)
|
||||
|
||||
// NewServer constructs a new metrics.Server from the provided options.
|
||||
func NewServer(o Options, config *rest.Config, httpClient *http.Client) (Server, error) {
|
||||
o.setDefaults()
|
||||
|
||||
// Skip server creation if metrics are disabled.
|
||||
if o.BindAddress == "0" {
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
// Validate that ExtraHandlers is not overwriting the default /metrics endpoint.
|
||||
if o.ExtraHandlers != nil {
|
||||
if _, ok := o.ExtraHandlers[defaultMetricsEndpoint]; ok {
|
||||
return nil, fmt.Errorf("overriding builtin %s endpoint is not allowed", defaultMetricsEndpoint)
|
||||
}
|
||||
}
|
||||
|
||||
// Create the metrics filter if a FilterProvider is set.
|
||||
var metricsFilter Filter
|
||||
if o.FilterProvider != nil {
|
||||
var err error
|
||||
metricsFilter, err = o.FilterProvider(config, httpClient)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("filter provider failed to create filter for the metrics server: %w", err)
|
||||
}
|
||||
}
|
||||
|
||||
return &defaultServer{
|
||||
metricsFilter: metricsFilter,
|
||||
options: o,
|
||||
}, nil
|
||||
}
|
||||
|
||||
// defaultServer is the default implementation used for Server.
|
||||
type defaultServer struct {
|
||||
options Options
|
||||
|
||||
// metricsFilter is a filter which is added around
|
||||
// the metrics and the extra handlers on the metrics server.
|
||||
metricsFilter Filter
|
||||
|
||||
// mu protects access to the bindAddr field.
|
||||
mu sync.RWMutex
|
||||
|
||||
// bindAddr is used to store the bindAddr after the listener has been created.
|
||||
// This is used during testing to figure out the port that has been chosen randomly.
|
||||
bindAddr string
|
||||
}
|
||||
|
||||
// setDefaults does defaulting for the Server.
|
||||
func (o *Options) setDefaults() {
|
||||
if o.BindAddress == "" {
|
||||
o.BindAddress = DefaultBindAddress
|
||||
}
|
||||
|
||||
if len(o.CertDir) == 0 {
|
||||
o.CertDir = filepath.Join(os.TempDir(), "k8s-metrics-server", "serving-certs")
|
||||
}
|
||||
|
||||
if len(o.CertName) == 0 {
|
||||
o.CertName = "tls.crt"
|
||||
}
|
||||
|
||||
if len(o.KeyName) == 0 {
|
||||
o.KeyName = "tls.key"
|
||||
}
|
||||
}
|
||||
|
||||
// NeedLeaderElection implements the LeaderElectionRunnable interface, which indicates
|
||||
// the metrics server doesn't need leader election.
|
||||
func (*defaultServer) NeedLeaderElection() bool {
|
||||
return false
|
||||
}
|
||||
|
||||
// Start runs the server.
|
||||
// It will install the metrics related resources depend on the server configuration.
|
||||
func (s *defaultServer) Start(ctx context.Context) error {
|
||||
log.Info("Starting metrics server")
|
||||
|
||||
listener, err := s.createListener(ctx, log)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to start metrics server: failed to create listener: %w", err)
|
||||
}
|
||||
// Storing bindAddr here so we can retrieve it during testing via GetBindAddr.
|
||||
s.mu.Lock()
|
||||
s.bindAddr = listener.Addr().String()
|
||||
s.mu.Unlock()
|
||||
|
||||
mux := http.NewServeMux()
|
||||
|
||||
handler := promhttp.HandlerFor(metrics.Registry, promhttp.HandlerOpts{
|
||||
ErrorHandling: promhttp.HTTPErrorOnError,
|
||||
})
|
||||
if s.metricsFilter != nil {
|
||||
log := log.WithValues("path", defaultMetricsEndpoint)
|
||||
var err error
|
||||
handler, err = s.metricsFilter(log, handler)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to start metrics server: failed to add metrics filter: %w", err)
|
||||
}
|
||||
}
|
||||
// TODO(JoelSpeed): Use existing Kubernetes machinery for serving metrics
|
||||
mux.Handle(defaultMetricsEndpoint, handler)
|
||||
|
||||
for path, extraHandler := range s.options.ExtraHandlers {
|
||||
if s.metricsFilter != nil {
|
||||
log := log.WithValues("path", path)
|
||||
var err error
|
||||
extraHandler, err = s.metricsFilter(log, extraHandler)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to start metrics server: failed to add metrics filter to extra handler for path %s: %w", path, err)
|
||||
}
|
||||
}
|
||||
mux.Handle(path, extraHandler)
|
||||
}
|
||||
|
||||
log.Info("Serving metrics server", "bindAddress", s.options.BindAddress, "secure", s.options.SecureServing)
|
||||
|
||||
srv := httpserver.New(mux)
|
||||
|
||||
idleConnsClosed := make(chan struct{})
|
||||
go func() {
|
||||
<-ctx.Done()
|
||||
log.Info("Shutting down metrics server with timeout of 1 minute")
|
||||
|
||||
ctx, cancel := context.WithTimeout(context.Background(), 1*time.Minute)
|
||||
defer cancel()
|
||||
if err := srv.Shutdown(ctx); err != nil {
|
||||
// Error from closing listeners, or context timeout
|
||||
log.Error(err, "error shutting down the HTTP server")
|
||||
}
|
||||
close(idleConnsClosed)
|
||||
}()
|
||||
|
||||
if err := srv.Serve(listener); err != nil && err != http.ErrServerClosed {
|
||||
return err
|
||||
}
|
||||
|
||||
<-idleConnsClosed
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *defaultServer) createListener(ctx context.Context, log logr.Logger) (net.Listener, error) {
|
||||
if !s.options.SecureServing {
|
||||
return net.Listen("tcp", s.options.BindAddress)
|
||||
}
|
||||
|
||||
cfg := &tls.Config{ //nolint:gosec
|
||||
NextProtos: []string{"h2"},
|
||||
}
|
||||
// fallback TLS config ready, will now mutate if passer wants full control over it
|
||||
for _, op := range s.options.TLSOpts {
|
||||
op(cfg)
|
||||
}
|
||||
|
||||
if cfg.GetCertificate == nil {
|
||||
certPath := filepath.Join(s.options.CertDir, s.options.CertName)
|
||||
keyPath := filepath.Join(s.options.CertDir, s.options.KeyName)
|
||||
|
||||
_, certErr := os.Stat(certPath)
|
||||
certExists := !os.IsNotExist(certErr)
|
||||
_, keyErr := os.Stat(keyPath)
|
||||
keyExists := !os.IsNotExist(keyErr)
|
||||
if certExists && keyExists {
|
||||
// Create the certificate watcher and
|
||||
// set the config's GetCertificate on the TLSConfig
|
||||
certWatcher, err := certwatcher.New(certPath, keyPath)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
cfg.GetCertificate = certWatcher.GetCertificate
|
||||
|
||||
go func() {
|
||||
if err := certWatcher.Start(ctx); err != nil {
|
||||
log.Error(err, "certificate watcher error")
|
||||
}
|
||||
}()
|
||||
}
|
||||
}
|
||||
|
||||
// If cfg.GetCertificate is still nil, i.e. we didn't configure a cert watcher, fallback to a self-signed certificate.
|
||||
if cfg.GetCertificate == nil {
|
||||
// Note: Using self-signed certificates here should be good enough. It's just important that we
|
||||
// encrypt the communication. For example kube-controller-manager also uses a self-signed certificate
|
||||
// for the metrics endpoint per default.
|
||||
cert, key, err := certutil.GenerateSelfSignedCertKeyWithFixtures("localhost", []net.IP{{127, 0, 0, 1}}, nil, "")
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to generate self-signed certificate for metrics server: %w", err)
|
||||
}
|
||||
|
||||
keyPair, err := tls.X509KeyPair(cert, key)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to create self-signed key pair for metrics server: %w", err)
|
||||
}
|
||||
cfg.Certificates = []tls.Certificate{keyPair}
|
||||
}
|
||||
|
||||
return tls.Listen("tcp", s.options.BindAddress, cfg)
|
||||
}
|
||||
|
||||
func (s *defaultServer) GetBindAddr() string {
|
||||
s.mu.RLock()
|
||||
defer s.mu.RUnlock()
|
||||
return s.bindAddr
|
||||
}
|
16
vendor/sigs.k8s.io/controller-runtime/pkg/reconcile/reconcile.go
generated
vendored
16
vendor/sigs.k8s.io/controller-runtime/pkg/reconcile/reconcile.go
generated
vendored
@ -89,8 +89,16 @@ instead the reconcile function observes this when reading the cluster state and
|
||||
*/
|
||||
type Reconciler interface {
|
||||
// Reconcile performs a full reconciliation for the object referred to by the Request.
|
||||
// The Controller will requeue the Request to be processed again if an error is non-nil or
|
||||
// Result.Requeue is true, otherwise upon completion it will remove the work from the queue.
|
||||
//
|
||||
// If the returned error is non-nil, the Result is ignored and the request will be
|
||||
// requeued using exponential backoff. The only exception is if the error is a
|
||||
// TerminalError in which case no requeuing happens.
|
||||
//
|
||||
// If the error is nil and the returned Result has a non-zero result.RequeueAfter, the request
|
||||
// will be requeued after the specified duration.
|
||||
//
|
||||
// If the error is nil and result.RequeueAfter is zero and result.Reque is true, the request
|
||||
// will be requeued using exponential backoff.
|
||||
Reconcile(context.Context, Request) (Result, error)
|
||||
}
|
||||
|
||||
@ -112,11 +120,15 @@ type terminalError struct {
|
||||
err error
|
||||
}
|
||||
|
||||
// This function will return nil if te.err is nil.
|
||||
func (te *terminalError) Unwrap() error {
|
||||
return te.err
|
||||
}
|
||||
|
||||
func (te *terminalError) Error() string {
|
||||
if te.err == nil {
|
||||
return "nil terminal error"
|
||||
}
|
||||
return "terminal error: " + te.err.Error()
|
||||
}
|
||||
|
||||
|
1
vendor/sigs.k8s.io/controller-runtime/pkg/webhook/admission/decode.go
generated
vendored
1
vendor/sigs.k8s.io/controller-runtime/pkg/webhook/admission/decode.go
generated
vendored
@ -71,6 +71,7 @@ func (d *Decoder) DecodeRaw(rawObj runtime.RawExtension, into runtime.Object) er
|
||||
return err
|
||||
}
|
||||
unstructuredInto.SetUnstructuredContent(object)
|
||||
return nil
|
||||
}
|
||||
|
||||
deserializer := d.codecs.UniversalDeserializer()
|
||||
|
6
vendor/sigs.k8s.io/controller-runtime/pkg/webhook/admission/http.go
generated
vendored
6
vendor/sigs.k8s.io/controller-runtime/pkg/webhook/admission/http.go
generated
vendored
@ -93,7 +93,7 @@ func (wh *Webhook) ServeHTTP(w http.ResponseWriter, r *http.Request) {
|
||||
wh.writeResponse(w, reviewResponse)
|
||||
return
|
||||
}
|
||||
wh.getLogger(&req).V(4).Info("received request")
|
||||
wh.getLogger(&req).V(5).Info("received request")
|
||||
|
||||
reviewResponse = wh.Handle(ctx, req)
|
||||
wh.writeResponseTyped(w, reviewResponse, actualAdmRevGVK)
|
||||
@ -136,11 +136,11 @@ func (wh *Webhook) writeAdmissionResponse(w io.Writer, ar v1.AdmissionReview) {
|
||||
}
|
||||
} else {
|
||||
res := ar.Response
|
||||
if log := wh.getLogger(nil); log.V(4).Enabled() {
|
||||
if log := wh.getLogger(nil); log.V(5).Enabled() {
|
||||
if res.Result != nil {
|
||||
log = log.WithValues("code", res.Result.Code, "reason", res.Result.Reason, "message", res.Result.Message)
|
||||
}
|
||||
log.V(4).Info("wrote response", "requestID", res.UID, "allowed", res.Allowed)
|
||||
log.V(5).Info("wrote response", "requestID", res.UID, "allowed", res.Allowed)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
47
vendor/sigs.k8s.io/controller-runtime/pkg/webhook/server.go
generated
vendored
47
vendor/sigs.k8s.io/controller-runtime/pkg/webhook/server.go
generated
vendored
@ -77,37 +77,33 @@ type Options struct {
|
||||
// It will be defaulted to 9443 if unspecified.
|
||||
Port int
|
||||
|
||||
// CertDir is the directory that contains the server key and certificate. The
|
||||
// server key and certificate.
|
||||
// CertDir is the directory that contains the server key and certificate. Defaults to
|
||||
// <temp-dir>/k8s-webhook-server/serving-certs.
|
||||
CertDir string
|
||||
|
||||
// CertName is the server certificate name. Defaults to tls.crt.
|
||||
//
|
||||
// Note: This option should only be set when TLSOpts does not override GetCertificate.
|
||||
// Note: This option is only used when TLSOpts does not set GetCertificate.
|
||||
CertName string
|
||||
|
||||
// KeyName is the server key name. Defaults to tls.key.
|
||||
//
|
||||
// Note: This option should only be set when TLSOpts does not override GetCertificate.
|
||||
// Note: This option is only used when TLSOpts does not set GetCertificate.
|
||||
KeyName string
|
||||
|
||||
// ClientCAName is the CA certificate name which server used to verify remote(client)'s certificate.
|
||||
// Defaults to "", which means server does not verify client's certificate.
|
||||
ClientCAName string
|
||||
|
||||
// TLSVersion is the minimum version of TLS supported. Accepts
|
||||
// "", "1.0", "1.1", "1.2" and "1.3" only ("" is equivalent to "1.0" for backwards compatibility)
|
||||
// Deprecated: Use TLSOpts instead.
|
||||
TLSMinVersion string
|
||||
|
||||
// TLSOpts is used to allow configuring the TLS config used for the server
|
||||
// TLSOpts is used to allow configuring the TLS config used for the server.
|
||||
// This also allows providing a certificate via GetCertificate.
|
||||
TLSOpts []func(*tls.Config)
|
||||
|
||||
// WebhookMux is the multiplexer that handles different webhooks.
|
||||
WebhookMux *http.ServeMux
|
||||
}
|
||||
|
||||
// NewServer constructs a new Server from the provided options.
|
||||
// NewServer constructs a new webhook.Server from the provided options.
|
||||
func NewServer(o Options) Server {
|
||||
return &DefaultServer{
|
||||
Options: o,
|
||||
@ -187,42 +183,15 @@ func (s *DefaultServer) Register(path string, hook http.Handler) {
|
||||
regLog.Info("Registering webhook")
|
||||
}
|
||||
|
||||
// tlsVersion converts from human-readable TLS version (for example "1.1")
|
||||
// to the values accepted by tls.Config (for example 0x301).
|
||||
func tlsVersion(version string) (uint16, error) {
|
||||
switch version {
|
||||
// default is previous behaviour
|
||||
case "":
|
||||
return tls.VersionTLS10, nil
|
||||
case "1.0":
|
||||
return tls.VersionTLS10, nil
|
||||
case "1.1":
|
||||
return tls.VersionTLS11, nil
|
||||
case "1.2":
|
||||
return tls.VersionTLS12, nil
|
||||
case "1.3":
|
||||
return tls.VersionTLS13, nil
|
||||
default:
|
||||
return 0, fmt.Errorf("invalid TLSMinVersion %v: expects 1.0, 1.1, 1.2, 1.3 or empty", version)
|
||||
}
|
||||
}
|
||||
|
||||
// Start runs the server.
|
||||
// It will install the webhook related resources depend on the server configuration.
|
||||
func (s *DefaultServer) Start(ctx context.Context) error {
|
||||
s.defaultingOnce.Do(s.setDefaults)
|
||||
|
||||
baseHookLog := log.WithName("webhooks")
|
||||
baseHookLog.Info("Starting webhook server")
|
||||
|
||||
tlsMinVersion, err := tlsVersion(s.Options.TLSMinVersion)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
log.Info("Starting webhook server")
|
||||
|
||||
cfg := &tls.Config{ //nolint:gosec
|
||||
NextProtos: []string{"h2"},
|
||||
MinVersion: tlsMinVersion,
|
||||
}
|
||||
// fallback TLS config ready, will now mutate if passer wants full control over it
|
||||
for _, op := range s.Options.TLSOpts {
|
||||
|
Reference in New Issue
Block a user