2020-10-21 05:49:41 +00:00
|
|
|
/*
|
|
|
|
Copyright 2019 The Kubernetes Authors.
|
|
|
|
|
|
|
|
Licensed under the Apache License, Version 2.0 (the "License");
|
|
|
|
you may not use this file except in compliance with the License.
|
|
|
|
You may obtain a copy of the License at
|
|
|
|
|
|
|
|
http://www.apache.org/licenses/LICENSE-2.0
|
|
|
|
|
|
|
|
Unless required by applicable law or agreed to in writing, software
|
|
|
|
distributed under the License is distributed on an "AS IS" BASIS,
|
|
|
|
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
|
|
See the License for the specific language governing permissions and
|
|
|
|
limitations under the License.
|
|
|
|
*/
|
|
|
|
|
|
|
|
package cache
|
|
|
|
|
|
|
|
import (
|
|
|
|
"context"
|
|
|
|
"fmt"
|
|
|
|
"time"
|
|
|
|
|
|
|
|
corev1 "k8s.io/api/core/v1"
|
|
|
|
apimeta "k8s.io/apimachinery/pkg/api/meta"
|
|
|
|
"k8s.io/apimachinery/pkg/runtime"
|
|
|
|
"k8s.io/apimachinery/pkg/runtime/schema"
|
|
|
|
"k8s.io/client-go/rest"
|
|
|
|
toolscache "k8s.io/client-go/tools/cache"
|
|
|
|
"sigs.k8s.io/controller-runtime/pkg/client"
|
2021-06-25 05:02:01 +00:00
|
|
|
"sigs.k8s.io/controller-runtime/pkg/internal/objectutil"
|
2020-10-21 05:49:41 +00:00
|
|
|
)
|
|
|
|
|
2021-06-25 05:02:01 +00:00
|
|
|
// NewCacheFunc - Function for creating a new cache from the options and a rest config.
|
2020-10-21 05:49:41 +00:00
|
|
|
type NewCacheFunc func(config *rest.Config, opts Options) (Cache, error)
|
|
|
|
|
2021-06-25 05:02:01 +00:00
|
|
|
// a new global namespaced cache to handle cluster scoped resources.
|
|
|
|
const globalCache = "_cluster-scope"
|
|
|
|
|
2020-10-21 05:49:41 +00:00
|
|
|
// MultiNamespacedCacheBuilder - Builder function to create a new multi-namespaced cache.
|
|
|
|
// This will scope the cache to a list of namespaces. Listing for all namespaces
|
2021-06-25 05:02:01 +00:00
|
|
|
// will list for all the namespaces that this knows about. By default this will create
|
|
|
|
// a global cache for cluster scoped resource. Note that this is not intended
|
2020-10-21 05:49:41 +00:00
|
|
|
// to be used for excluding namespaces, this is better done via a Predicate. Also note that
|
|
|
|
// you may face performance issues when using this with a high number of namespaces.
|
|
|
|
func MultiNamespacedCacheBuilder(namespaces []string) NewCacheFunc {
|
|
|
|
return func(config *rest.Config, opts Options) (Cache, error) {
|
|
|
|
opts, err := defaultOpts(config, opts)
|
|
|
|
if err != nil {
|
|
|
|
return nil, err
|
|
|
|
}
|
2021-06-25 05:02:01 +00:00
|
|
|
|
2020-10-21 05:49:41 +00:00
|
|
|
caches := map[string]Cache{}
|
2021-06-25 05:02:01 +00:00
|
|
|
|
|
|
|
// create a cache for cluster scoped resources
|
|
|
|
gCache, err := New(config, opts)
|
|
|
|
if err != nil {
|
|
|
|
return nil, fmt.Errorf("error creating global cache %v", err)
|
|
|
|
}
|
|
|
|
|
2020-10-21 05:49:41 +00:00
|
|
|
for _, ns := range namespaces {
|
|
|
|
opts.Namespace = ns
|
|
|
|
c, err := New(config, opts)
|
|
|
|
if err != nil {
|
|
|
|
return nil, err
|
|
|
|
}
|
|
|
|
caches[ns] = c
|
|
|
|
}
|
2021-06-25 05:02:01 +00:00
|
|
|
return &multiNamespaceCache{namespaceToCache: caches, Scheme: opts.Scheme, RESTMapper: opts.Mapper, clusterCache: gCache}, nil
|
2020-10-21 05:49:41 +00:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
// multiNamespaceCache knows how to handle multiple namespaced caches
|
|
|
|
// Use this feature when scoping permissions for your
|
|
|
|
// operator to a list of namespaces instead of watching every namespace
|
|
|
|
// in the cluster.
|
|
|
|
type multiNamespaceCache struct {
|
|
|
|
namespaceToCache map[string]Cache
|
|
|
|
Scheme *runtime.Scheme
|
2021-06-25 05:02:01 +00:00
|
|
|
RESTMapper apimeta.RESTMapper
|
|
|
|
clusterCache Cache
|
2020-10-21 05:49:41 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
var _ Cache = &multiNamespaceCache{}
|
|
|
|
|
2021-06-25 05:02:01 +00:00
|
|
|
// Methods for multiNamespaceCache to conform to the Informers interface.
|
|
|
|
func (c *multiNamespaceCache) GetInformer(ctx context.Context, obj client.Object) (Informer, error) {
|
2020-10-21 05:49:41 +00:00
|
|
|
informers := map[string]Informer{}
|
2021-06-25 05:02:01 +00:00
|
|
|
|
|
|
|
// If the object is clusterscoped, get the informer from clusterCache,
|
|
|
|
// if not use the namespaced caches.
|
|
|
|
isNamespaced, err := objectutil.IsAPINamespaced(obj, c.Scheme, c.RESTMapper)
|
|
|
|
if err != nil {
|
|
|
|
return nil, err
|
|
|
|
}
|
|
|
|
if !isNamespaced {
|
|
|
|
clusterCacheInf, err := c.clusterCache.GetInformer(ctx, obj)
|
|
|
|
if err != nil {
|
|
|
|
return nil, err
|
|
|
|
}
|
|
|
|
informers[globalCache] = clusterCacheInf
|
|
|
|
|
|
|
|
return &multiNamespaceInformer{namespaceToInformer: informers}, nil
|
|
|
|
}
|
|
|
|
|
2020-10-21 05:49:41 +00:00
|
|
|
for ns, cache := range c.namespaceToCache {
|
|
|
|
informer, err := cache.GetInformer(ctx, obj)
|
|
|
|
if err != nil {
|
|
|
|
return nil, err
|
|
|
|
}
|
|
|
|
informers[ns] = informer
|
|
|
|
}
|
2021-06-25 05:02:01 +00:00
|
|
|
|
2020-10-21 05:49:41 +00:00
|
|
|
return &multiNamespaceInformer{namespaceToInformer: informers}, nil
|
|
|
|
}
|
|
|
|
|
|
|
|
func (c *multiNamespaceCache) GetInformerForKind(ctx context.Context, gvk schema.GroupVersionKind) (Informer, error) {
|
|
|
|
informers := map[string]Informer{}
|
2021-06-25 05:02:01 +00:00
|
|
|
|
|
|
|
// If the object is clusterscoped, get the informer from clusterCache,
|
|
|
|
// if not use the namespaced caches.
|
|
|
|
isNamespaced, err := objectutil.IsAPINamespacedWithGVK(gvk, c.Scheme, c.RESTMapper)
|
|
|
|
if err != nil {
|
|
|
|
return nil, err
|
|
|
|
}
|
|
|
|
if !isNamespaced {
|
|
|
|
clusterCacheInf, err := c.clusterCache.GetInformerForKind(ctx, gvk)
|
|
|
|
if err != nil {
|
|
|
|
return nil, err
|
|
|
|
}
|
|
|
|
informers[globalCache] = clusterCacheInf
|
|
|
|
|
|
|
|
return &multiNamespaceInformer{namespaceToInformer: informers}, nil
|
|
|
|
}
|
|
|
|
|
2020-10-21 05:49:41 +00:00
|
|
|
for ns, cache := range c.namespaceToCache {
|
|
|
|
informer, err := cache.GetInformerForKind(ctx, gvk)
|
|
|
|
if err != nil {
|
|
|
|
return nil, err
|
|
|
|
}
|
|
|
|
informers[ns] = informer
|
|
|
|
}
|
2021-06-25 05:02:01 +00:00
|
|
|
|
2020-10-21 05:49:41 +00:00
|
|
|
return &multiNamespaceInformer{namespaceToInformer: informers}, nil
|
|
|
|
}
|
|
|
|
|
2021-06-25 05:02:01 +00:00
|
|
|
func (c *multiNamespaceCache) Start(ctx context.Context) error {
|
|
|
|
// start global cache
|
|
|
|
go func() {
|
|
|
|
err := c.clusterCache.Start(ctx)
|
|
|
|
if err != nil {
|
|
|
|
log.Error(err, "cluster scoped cache failed to start")
|
|
|
|
}
|
|
|
|
}()
|
|
|
|
|
|
|
|
// start namespaced caches
|
2020-10-21 05:49:41 +00:00
|
|
|
for ns, cache := range c.namespaceToCache {
|
|
|
|
go func(ns string, cache Cache) {
|
2021-06-25 05:02:01 +00:00
|
|
|
err := cache.Start(ctx)
|
2020-10-21 05:49:41 +00:00
|
|
|
if err != nil {
|
|
|
|
log.Error(err, "multinamespace cache failed to start namespaced informer", "namespace", ns)
|
|
|
|
}
|
|
|
|
}(ns, cache)
|
|
|
|
}
|
2021-06-25 05:02:01 +00:00
|
|
|
|
|
|
|
<-ctx.Done()
|
2020-10-21 05:49:41 +00:00
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
2021-06-25 05:02:01 +00:00
|
|
|
func (c *multiNamespaceCache) WaitForCacheSync(ctx context.Context) bool {
|
2020-10-21 05:49:41 +00:00
|
|
|
synced := true
|
|
|
|
for _, cache := range c.namespaceToCache {
|
2021-06-25 05:02:01 +00:00
|
|
|
if s := cache.WaitForCacheSync(ctx); !s {
|
2020-10-21 05:49:41 +00:00
|
|
|
synced = s
|
|
|
|
}
|
|
|
|
}
|
2021-06-25 05:02:01 +00:00
|
|
|
|
|
|
|
// check if cluster scoped cache has synced
|
|
|
|
if !c.clusterCache.WaitForCacheSync(ctx) {
|
|
|
|
synced = false
|
|
|
|
}
|
2020-10-21 05:49:41 +00:00
|
|
|
return synced
|
|
|
|
}
|
|
|
|
|
2021-06-25 05:02:01 +00:00
|
|
|
func (c *multiNamespaceCache) IndexField(ctx context.Context, obj client.Object, field string, extractValue client.IndexerFunc) error {
|
|
|
|
isNamespaced, err := objectutil.IsAPINamespaced(obj, c.Scheme, c.RESTMapper)
|
|
|
|
if err != nil {
|
|
|
|
return nil //nolint:nilerr
|
|
|
|
}
|
|
|
|
|
|
|
|
if !isNamespaced {
|
|
|
|
return c.clusterCache.IndexField(ctx, obj, field, extractValue)
|
|
|
|
}
|
|
|
|
|
2020-10-21 05:49:41 +00:00
|
|
|
for _, cache := range c.namespaceToCache {
|
|
|
|
if err := cache.IndexField(ctx, obj, field, extractValue); err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
}
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
2021-06-25 05:02:01 +00:00
|
|
|
func (c *multiNamespaceCache) Get(ctx context.Context, key client.ObjectKey, obj client.Object) error {
|
|
|
|
isNamespaced, err := objectutil.IsAPINamespaced(obj, c.Scheme, c.RESTMapper)
|
|
|
|
if err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
|
|
|
|
if !isNamespaced {
|
|
|
|
// Look into the global cache to fetch the object
|
|
|
|
return c.clusterCache.Get(ctx, key, obj)
|
|
|
|
}
|
|
|
|
|
2020-10-21 05:49:41 +00:00
|
|
|
cache, ok := c.namespaceToCache[key.Namespace]
|
|
|
|
if !ok {
|
|
|
|
return fmt.Errorf("unable to get: %v because of unknown namespace for the cache", key)
|
|
|
|
}
|
|
|
|
return cache.Get(ctx, key, obj)
|
|
|
|
}
|
|
|
|
|
|
|
|
// List multi namespace cache will get all the objects in the namespaces that the cache is watching if asked for all namespaces.
|
2021-06-25 05:02:01 +00:00
|
|
|
func (c *multiNamespaceCache) List(ctx context.Context, list client.ObjectList, opts ...client.ListOption) error {
|
2020-10-21 05:49:41 +00:00
|
|
|
listOpts := client.ListOptions{}
|
|
|
|
listOpts.ApplyOptions(opts)
|
2021-06-25 05:02:01 +00:00
|
|
|
|
|
|
|
isNamespaced, err := objectutil.IsAPINamespaced(list, c.Scheme, c.RESTMapper)
|
|
|
|
if err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
|
|
|
|
if !isNamespaced {
|
|
|
|
// Look at the global cache to get the objects with the specified GVK
|
|
|
|
return c.clusterCache.List(ctx, list, opts...)
|
|
|
|
}
|
|
|
|
|
2020-10-21 05:49:41 +00:00
|
|
|
if listOpts.Namespace != corev1.NamespaceAll {
|
|
|
|
cache, ok := c.namespaceToCache[listOpts.Namespace]
|
|
|
|
if !ok {
|
|
|
|
return fmt.Errorf("unable to get: %v because of unknown namespace for the cache", listOpts.Namespace)
|
|
|
|
}
|
|
|
|
return cache.List(ctx, list, opts...)
|
|
|
|
}
|
|
|
|
|
2021-06-25 05:02:01 +00:00
|
|
|
listAccessor, err := apimeta.ListAccessor(list)
|
2020-10-21 05:49:41 +00:00
|
|
|
if err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
|
|
|
|
allItems, err := apimeta.ExtractList(list)
|
|
|
|
if err != nil {
|
|
|
|
return err
|
|
|
|
}
|
2021-06-25 05:02:01 +00:00
|
|
|
|
|
|
|
limitSet := listOpts.Limit > 0
|
|
|
|
|
2020-10-21 05:49:41 +00:00
|
|
|
var resourceVersion string
|
|
|
|
for _, cache := range c.namespaceToCache {
|
2021-06-25 05:02:01 +00:00
|
|
|
listObj := list.DeepCopyObject().(client.ObjectList)
|
|
|
|
err = cache.List(ctx, listObj, &listOpts)
|
2020-10-21 05:49:41 +00:00
|
|
|
if err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
items, err := apimeta.ExtractList(listObj)
|
|
|
|
if err != nil {
|
|
|
|
return err
|
|
|
|
}
|
2021-06-25 05:02:01 +00:00
|
|
|
accessor, err := apimeta.ListAccessor(listObj)
|
2020-10-21 05:49:41 +00:00
|
|
|
if err != nil {
|
|
|
|
return fmt.Errorf("object: %T must be a list type", list)
|
|
|
|
}
|
|
|
|
allItems = append(allItems, items...)
|
|
|
|
// The last list call should have the most correct resource version.
|
|
|
|
resourceVersion = accessor.GetResourceVersion()
|
2021-06-25 05:02:01 +00:00
|
|
|
if limitSet {
|
|
|
|
// decrement Limit by the number of items
|
|
|
|
// fetched from the current namespace.
|
|
|
|
listOpts.Limit -= int64(len(items))
|
|
|
|
// if a Limit was set and the number of
|
|
|
|
// items read has reached this set limit,
|
|
|
|
// then stop reading.
|
|
|
|
if listOpts.Limit == 0 {
|
|
|
|
break
|
|
|
|
}
|
|
|
|
}
|
2020-10-21 05:49:41 +00:00
|
|
|
}
|
|
|
|
listAccessor.SetResourceVersion(resourceVersion)
|
|
|
|
|
|
|
|
return apimeta.SetList(list, allItems)
|
|
|
|
}
|
|
|
|
|
2021-06-25 05:02:01 +00:00
|
|
|
// multiNamespaceInformer knows how to handle interacting with the underlying informer across multiple namespaces.
|
2020-10-21 05:49:41 +00:00
|
|
|
type multiNamespaceInformer struct {
|
|
|
|
namespaceToInformer map[string]Informer
|
|
|
|
}
|
|
|
|
|
|
|
|
var _ Informer = &multiNamespaceInformer{}
|
|
|
|
|
2021-06-25 05:02:01 +00:00
|
|
|
// AddEventHandler adds the handler to each namespaced informer.
|
2020-10-21 05:49:41 +00:00
|
|
|
func (i *multiNamespaceInformer) AddEventHandler(handler toolscache.ResourceEventHandler) {
|
|
|
|
for _, informer := range i.namespaceToInformer {
|
|
|
|
informer.AddEventHandler(handler)
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2021-06-25 05:02:01 +00:00
|
|
|
// AddEventHandlerWithResyncPeriod adds the handler with a resync period to each namespaced informer.
|
2020-10-21 05:49:41 +00:00
|
|
|
func (i *multiNamespaceInformer) AddEventHandlerWithResyncPeriod(handler toolscache.ResourceEventHandler, resyncPeriod time.Duration) {
|
|
|
|
for _, informer := range i.namespaceToInformer {
|
|
|
|
informer.AddEventHandlerWithResyncPeriod(handler, resyncPeriod)
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2021-06-25 05:02:01 +00:00
|
|
|
// AddIndexers adds the indexer for each namespaced informer.
|
2020-10-21 05:49:41 +00:00
|
|
|
func (i *multiNamespaceInformer) AddIndexers(indexers toolscache.Indexers) error {
|
|
|
|
for _, informer := range i.namespaceToInformer {
|
|
|
|
err := informer.AddIndexers(indexers)
|
|
|
|
if err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
}
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
2021-06-25 05:02:01 +00:00
|
|
|
// HasSynced checks if each namespaced informer has synced.
|
2020-10-21 05:49:41 +00:00
|
|
|
func (i *multiNamespaceInformer) HasSynced() bool {
|
|
|
|
for _, informer := range i.namespaceToInformer {
|
|
|
|
if ok := informer.HasSynced(); !ok {
|
|
|
|
return ok
|
|
|
|
}
|
|
|
|
}
|
|
|
|
return true
|
|
|
|
}
|