rebase: add controller runtime dependency

this commits add the controller runtime
and its dependency to the vendor.

Signed-off-by: Madhu Rajanna <madhupr007@gmail.com>
This commit is contained in:
Madhu Rajanna
2020-10-21 11:19:41 +05:30
committed by mergify[bot]
parent 14700b89d1
commit 5af3fe5deb
101 changed files with 11946 additions and 230 deletions

View File

@ -0,0 +1,185 @@
/*
Copyright 2018 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package internal
import (
"context"
"fmt"
"reflect"
"k8s.io/apimachinery/pkg/api/errors"
apimeta "k8s.io/apimachinery/pkg/api/meta"
"k8s.io/apimachinery/pkg/fields"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/selection"
"k8s.io/client-go/tools/cache"
"sigs.k8s.io/controller-runtime/pkg/client"
)
// CacheReader is a client.Reader
var _ client.Reader = &CacheReader{}
// CacheReader wraps a cache.Index to implement the client.CacheReader interface for a single type
type CacheReader struct {
// indexer is the underlying indexer wrapped by this cache.
indexer cache.Indexer
// groupVersionKind is the group-version-kind of the resource.
groupVersionKind schema.GroupVersionKind
}
// Get checks the indexer for the object and writes a copy of it if found
func (c *CacheReader) Get(_ context.Context, key client.ObjectKey, out runtime.Object) error {
storeKey := objectKeyToStoreKey(key)
// Lookup the object from the indexer cache
obj, exists, err := c.indexer.GetByKey(storeKey)
if err != nil {
return err
}
// Not found, return an error
if !exists {
// Resource gets transformed into Kind in the error anyway, so this is fine
return errors.NewNotFound(schema.GroupResource{
Group: c.groupVersionKind.Group,
Resource: c.groupVersionKind.Kind,
}, key.Name)
}
// Verify the result is a runtime.Object
if _, isObj := obj.(runtime.Object); !isObj {
// This should never happen
return fmt.Errorf("cache contained %T, which is not an Object", obj)
}
// deep copy to avoid mutating cache
// TODO(directxman12): revisit the decision to always deepcopy
obj = obj.(runtime.Object).DeepCopyObject()
// Copy the value of the item in the cache to the returned value
// TODO(directxman12): this is a terrible hack, pls fix (we should have deepcopyinto)
outVal := reflect.ValueOf(out)
objVal := reflect.ValueOf(obj)
if !objVal.Type().AssignableTo(outVal.Type()) {
return fmt.Errorf("cache had type %s, but %s was asked for", objVal.Type(), outVal.Type())
}
reflect.Indirect(outVal).Set(reflect.Indirect(objVal))
out.GetObjectKind().SetGroupVersionKind(c.groupVersionKind)
return nil
}
// List lists items out of the indexer and writes them to out
func (c *CacheReader) List(_ context.Context, out runtime.Object, opts ...client.ListOption) error {
var objs []interface{}
var err error
listOpts := client.ListOptions{}
listOpts.ApplyOptions(opts)
if listOpts.FieldSelector != nil {
// TODO(directxman12): support more complicated field selectors by
// combining multiple indices, GetIndexers, etc
field, val, requiresExact := requiresExactMatch(listOpts.FieldSelector)
if !requiresExact {
return fmt.Errorf("non-exact field matches are not supported by the cache")
}
// list all objects by the field selector. If this is namespaced and we have one, ask for the
// namespaced index key. Otherwise, ask for the non-namespaced variant by using the fake "all namespaces"
// namespace.
objs, err = c.indexer.ByIndex(FieldIndexName(field), KeyToNamespacedKey(listOpts.Namespace, val))
} else if listOpts.Namespace != "" {
objs, err = c.indexer.ByIndex(cache.NamespaceIndex, listOpts.Namespace)
} else {
objs = c.indexer.List()
}
if err != nil {
return err
}
var labelSel labels.Selector
if listOpts.LabelSelector != nil {
labelSel = listOpts.LabelSelector
}
runtimeObjs := make([]runtime.Object, 0, len(objs))
for _, item := range objs {
obj, isObj := item.(runtime.Object)
if !isObj {
return fmt.Errorf("cache contained %T, which is not an Object", obj)
}
meta, err := apimeta.Accessor(obj)
if err != nil {
return err
}
if labelSel != nil {
lbls := labels.Set(meta.GetLabels())
if !labelSel.Matches(lbls) {
continue
}
}
outObj := obj.DeepCopyObject()
outObj.GetObjectKind().SetGroupVersionKind(c.groupVersionKind)
runtimeObjs = append(runtimeObjs, outObj)
}
return apimeta.SetList(out, runtimeObjs)
}
// objectKeyToStorageKey converts an object key to store key.
// It's akin to MetaNamespaceKeyFunc. It's separate from
// String to allow keeping the key format easily in sync with
// MetaNamespaceKeyFunc.
func objectKeyToStoreKey(k client.ObjectKey) string {
if k.Namespace == "" {
return k.Name
}
return k.Namespace + "/" + k.Name
}
// requiresExactMatch checks if the given field selector is of the form `k=v` or `k==v`.
func requiresExactMatch(sel fields.Selector) (field, val string, required bool) {
reqs := sel.Requirements()
if len(reqs) != 1 {
return "", "", false
}
req := reqs[0]
if req.Operator != selection.Equals && req.Operator != selection.DoubleEquals {
return "", "", false
}
return req.Field, req.Value, true
}
// FieldIndexName constructs the name of the index over the given field,
// for use with an indexer.
func FieldIndexName(field string) string {
return "field:" + field
}
// noNamespaceNamespace is used as the "namespace" when we want to list across all namespaces
const allNamespacesNamespace = "__all_namespaces"
// KeyToNamespacedKey prefixes the given index key with a namespace
// for use in field selector indexes.
func KeyToNamespacedKey(ns string, baseKey string) string {
if ns != "" {
return ns + "/" + baseKey
}
return allNamespacesNamespace + "/" + baseKey
}

View File

@ -0,0 +1,103 @@
/*
Copyright 2018 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package internal
import (
"context"
"time"
"k8s.io/apimachinery/pkg/api/meta"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/cache"
)
// InformersMap create and caches Informers for (runtime.Object, schema.GroupVersionKind) pairs.
// It uses a standard parameter codec constructed based on the given generated Scheme.
type InformersMap struct {
// we abstract over the details of structured vs unstructured with the specificInformerMaps
structured *specificInformersMap
unstructured *specificInformersMap
// Scheme maps runtime.Objects to GroupVersionKinds
Scheme *runtime.Scheme
}
// NewInformersMap creates a new InformersMap that can create informers for
// both structured and unstructured objects.
func NewInformersMap(config *rest.Config,
scheme *runtime.Scheme,
mapper meta.RESTMapper,
resync time.Duration,
namespace string) *InformersMap {
return &InformersMap{
structured: newStructuredInformersMap(config, scheme, mapper, resync, namespace),
unstructured: newUnstructuredInformersMap(config, scheme, mapper, resync, namespace),
Scheme: scheme,
}
}
// Start calls Run on each of the informers and sets started to true. Blocks on the stop channel.
func (m *InformersMap) Start(stop <-chan struct{}) error {
go m.structured.Start(stop)
go m.unstructured.Start(stop)
<-stop
return nil
}
// WaitForCacheSync waits until all the caches have been started and synced.
func (m *InformersMap) WaitForCacheSync(stop <-chan struct{}) bool {
syncedFuncs := append([]cache.InformerSynced(nil), m.structured.HasSyncedFuncs()...)
syncedFuncs = append(syncedFuncs, m.unstructured.HasSyncedFuncs()...)
if !m.structured.waitForStarted(stop) {
return false
}
if !m.unstructured.waitForStarted(stop) {
return false
}
return cache.WaitForCacheSync(stop, syncedFuncs...)
}
// Get will create a new Informer and add it to the map of InformersMap if none exists. Returns
// the Informer from the map.
func (m *InformersMap) Get(ctx context.Context, gvk schema.GroupVersionKind, obj runtime.Object) (bool, *MapEntry, error) {
_, isUnstructured := obj.(*unstructured.Unstructured)
_, isUnstructuredList := obj.(*unstructured.UnstructuredList)
isUnstructured = isUnstructured || isUnstructuredList
if isUnstructured {
return m.unstructured.Get(ctx, gvk, obj)
}
return m.structured.Get(ctx, gvk, obj)
}
// newStructuredInformersMap creates a new InformersMap for structured objects.
func newStructuredInformersMap(config *rest.Config, scheme *runtime.Scheme, mapper meta.RESTMapper, resync time.Duration, namespace string) *specificInformersMap {
return newSpecificInformersMap(config, scheme, mapper, resync, namespace, createStructuredListWatch)
}
// newUnstructuredInformersMap creates a new InformersMap for unstructured objects.
func newUnstructuredInformersMap(config *rest.Config, scheme *runtime.Scheme, mapper meta.RESTMapper, resync time.Duration, namespace string) *specificInformersMap {
return newSpecificInformersMap(config, scheme, mapper, resync, namespace, createUnstructuredListWatch)
}

View File

@ -0,0 +1,313 @@
/*
Copyright 2018 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package internal
import (
"context"
"fmt"
"math/rand"
"sync"
"time"
apierrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/runtime/serializer"
"k8s.io/apimachinery/pkg/watch"
"k8s.io/client-go/dynamic"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/cache"
"sigs.k8s.io/controller-runtime/pkg/client/apiutil"
)
// clientListWatcherFunc knows how to create a ListWatcher
type createListWatcherFunc func(gvk schema.GroupVersionKind, ip *specificInformersMap) (*cache.ListWatch, error)
// newSpecificInformersMap returns a new specificInformersMap (like
// the generical InformersMap, except that it doesn't implement WaitForCacheSync).
func newSpecificInformersMap(config *rest.Config,
scheme *runtime.Scheme,
mapper meta.RESTMapper,
resync time.Duration,
namespace string,
createListWatcher createListWatcherFunc) *specificInformersMap {
ip := &specificInformersMap{
config: config,
Scheme: scheme,
mapper: mapper,
informersByGVK: make(map[schema.GroupVersionKind]*MapEntry),
codecs: serializer.NewCodecFactory(scheme),
paramCodec: runtime.NewParameterCodec(scheme),
resync: resync,
startWait: make(chan struct{}),
createListWatcher: createListWatcher,
namespace: namespace,
}
return ip
}
// MapEntry contains the cached data for an Informer
type MapEntry struct {
// Informer is the cached informer
Informer cache.SharedIndexInformer
// CacheReader wraps Informer and implements the CacheReader interface for a single type
Reader CacheReader
}
// specificInformersMap create and caches Informers for (runtime.Object, schema.GroupVersionKind) pairs.
// It uses a standard parameter codec constructed based on the given generated Scheme.
type specificInformersMap struct {
// Scheme maps runtime.Objects to GroupVersionKinds
Scheme *runtime.Scheme
// config is used to talk to the apiserver
config *rest.Config
// mapper maps GroupVersionKinds to Resources
mapper meta.RESTMapper
// informersByGVK is the cache of informers keyed by groupVersionKind
informersByGVK map[schema.GroupVersionKind]*MapEntry
// codecs is used to create a new REST client
codecs serializer.CodecFactory
// paramCodec is used by list and watch
paramCodec runtime.ParameterCodec
// stop is the stop channel to stop informers
stop <-chan struct{}
// resync is the base frequency the informers are resynced
// a 10 percent jitter will be added to the resync period between informers
// so that all informers will not send list requests simultaneously.
resync time.Duration
// mu guards access to the map
mu sync.RWMutex
// start is true if the informers have been started
started bool
// startWait is a channel that is closed after the
// informer has been started.
startWait chan struct{}
// createClient knows how to create a client and a list object,
// and allows for abstracting over the particulars of structured vs
// unstructured objects.
createListWatcher createListWatcherFunc
// namespace is the namespace that all ListWatches are restricted to
// default or empty string means all namespaces
namespace string
}
// Start calls Run on each of the informers and sets started to true. Blocks on the stop channel.
// It doesn't return start because it can't return an error, and it's not a runnable directly.
func (ip *specificInformersMap) Start(stop <-chan struct{}) {
func() {
ip.mu.Lock()
defer ip.mu.Unlock()
// Set the stop channel so it can be passed to informers that are added later
ip.stop = stop
// Start each informer
for _, informer := range ip.informersByGVK {
go informer.Informer.Run(stop)
}
// Set started to true so we immediately start any informers added later.
ip.started = true
close(ip.startWait)
}()
<-stop
}
func (ip *specificInformersMap) waitForStarted(stop <-chan struct{}) bool {
select {
case <-ip.startWait:
return true
case <-stop:
return false
}
}
// HasSyncedFuncs returns all the HasSynced functions for the informers in this map.
func (ip *specificInformersMap) HasSyncedFuncs() []cache.InformerSynced {
ip.mu.RLock()
defer ip.mu.RUnlock()
syncedFuncs := make([]cache.InformerSynced, 0, len(ip.informersByGVK))
for _, informer := range ip.informersByGVK {
syncedFuncs = append(syncedFuncs, informer.Informer.HasSynced)
}
return syncedFuncs
}
// Get will create a new Informer and add it to the map of specificInformersMap if none exists. Returns
// the Informer from the map.
func (ip *specificInformersMap) Get(ctx context.Context, gvk schema.GroupVersionKind, obj runtime.Object) (bool, *MapEntry, error) {
// Return the informer if it is found
i, started, ok := func() (*MapEntry, bool, bool) {
ip.mu.RLock()
defer ip.mu.RUnlock()
i, ok := ip.informersByGVK[gvk]
return i, ip.started, ok
}()
if !ok {
var err error
if i, started, err = ip.addInformerToMap(gvk, obj); err != nil {
return started, nil, err
}
}
if started && !i.Informer.HasSynced() {
// Wait for it to sync before returning the Informer so that folks don't read from a stale cache.
if !cache.WaitForCacheSync(ctx.Done(), i.Informer.HasSynced) {
return started, nil, apierrors.NewTimeoutError(fmt.Sprintf("failed waiting for %T Informer to sync", obj), 0)
}
}
return started, i, nil
}
func (ip *specificInformersMap) addInformerToMap(gvk schema.GroupVersionKind, obj runtime.Object) (*MapEntry, bool, error) {
ip.mu.Lock()
defer ip.mu.Unlock()
// Check the cache to see if we already have an Informer. If we do, return the Informer.
// This is for the case where 2 routines tried to get the informer when it wasn't in the map
// so neither returned early, but the first one created it.
if i, ok := ip.informersByGVK[gvk]; ok {
return i, ip.started, nil
}
// Create a NewSharedIndexInformer and add it to the map.
var lw *cache.ListWatch
lw, err := ip.createListWatcher(gvk, ip)
if err != nil {
return nil, false, err
}
ni := cache.NewSharedIndexInformer(lw, obj, resyncPeriod(ip.resync)(), cache.Indexers{
cache.NamespaceIndex: cache.MetaNamespaceIndexFunc,
})
i := &MapEntry{
Informer: ni,
Reader: CacheReader{indexer: ni.GetIndexer(), groupVersionKind: gvk},
}
ip.informersByGVK[gvk] = i
// Start the Informer if need by
// TODO(seans): write thorough tests and document what happens here - can you add indexers?
// can you add eventhandlers?
if ip.started {
go i.Informer.Run(ip.stop)
}
return i, ip.started, nil
}
// newListWatch returns a new ListWatch object that can be used to create a SharedIndexInformer.
func createStructuredListWatch(gvk schema.GroupVersionKind, ip *specificInformersMap) (*cache.ListWatch, error) {
// Kubernetes APIs work against Resources, not GroupVersionKinds. Map the
// groupVersionKind to the Resource API we will use.
mapping, err := ip.mapper.RESTMapping(gvk.GroupKind(), gvk.Version)
if err != nil {
return nil, err
}
client, err := apiutil.RESTClientForGVK(gvk, ip.config, ip.codecs)
if err != nil {
return nil, err
}
listGVK := gvk.GroupVersion().WithKind(gvk.Kind + "List")
listObj, err := ip.Scheme.New(listGVK)
if err != nil {
return nil, err
}
// TODO: the functions that make use of this ListWatch should be adapted to
// pass in their own contexts instead of relying on this fixed one here.
ctx := context.TODO()
// Create a new ListWatch for the obj
return &cache.ListWatch{
ListFunc: func(opts metav1.ListOptions) (runtime.Object, error) {
res := listObj.DeepCopyObject()
isNamespaceScoped := ip.namespace != "" && mapping.Scope.Name() != meta.RESTScopeNameRoot
err := client.Get().NamespaceIfScoped(ip.namespace, isNamespaceScoped).Resource(mapping.Resource.Resource).VersionedParams(&opts, ip.paramCodec).Do(ctx).Into(res)
return res, err
},
// Setup the watch function
WatchFunc: func(opts metav1.ListOptions) (watch.Interface, error) {
// Watch needs to be set to true separately
opts.Watch = true
isNamespaceScoped := ip.namespace != "" && mapping.Scope.Name() != meta.RESTScopeNameRoot
return client.Get().NamespaceIfScoped(ip.namespace, isNamespaceScoped).Resource(mapping.Resource.Resource).VersionedParams(&opts, ip.paramCodec).Watch(ctx)
},
}, nil
}
func createUnstructuredListWatch(gvk schema.GroupVersionKind, ip *specificInformersMap) (*cache.ListWatch, error) {
// Kubernetes APIs work against Resources, not GroupVersionKinds. Map the
// groupVersionKind to the Resource API we will use.
mapping, err := ip.mapper.RESTMapping(gvk.GroupKind(), gvk.Version)
if err != nil {
return nil, err
}
dynamicClient, err := dynamic.NewForConfig(ip.config)
if err != nil {
return nil, err
}
// TODO: the functions that make use of this ListWatch should be adapted to
// pass in their own contexts instead of relying on this fixed one here.
ctx := context.TODO()
// Create a new ListWatch for the obj
return &cache.ListWatch{
ListFunc: func(opts metav1.ListOptions) (runtime.Object, error) {
if ip.namespace != "" && mapping.Scope.Name() != meta.RESTScopeNameRoot {
return dynamicClient.Resource(mapping.Resource).Namespace(ip.namespace).List(ctx, opts)
}
return dynamicClient.Resource(mapping.Resource).List(ctx, opts)
},
// Setup the watch function
WatchFunc: func(opts metav1.ListOptions) (watch.Interface, error) {
// Watch needs to be set to true separately
opts.Watch = true
if ip.namespace != "" && mapping.Scope.Name() != meta.RESTScopeNameRoot {
return dynamicClient.Resource(mapping.Resource).Namespace(ip.namespace).Watch(ctx, opts)
}
return dynamicClient.Resource(mapping.Resource).Watch(ctx, opts)
},
}, nil
}
// resyncPeriod returns a function which generates a duration each time it is
// invoked; this is so that multiple controllers don't get into lock-step and all
// hammer the apiserver with list requests simultaneously.
func resyncPeriod(resync time.Duration) func() time.Duration {
return func() time.Duration {
// the factor will fall into [0.9, 1.1)
factor := rand.Float64()/5.0 + 0.9
return time.Duration(float64(resync.Nanoseconds()) * factor)
}
}