ceph-csi/vendor/sigs.k8s.io/controller-runtime/pkg/client/apiutil/dynamicrestmapper.go

302 lines
8.7 KiB
Go
Raw Normal View History

/*
Copyright 2019 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package apiutil
import (
"sync"
"sync/atomic"
"golang.org/x/time/rate"
"k8s.io/apimachinery/pkg/api/meta"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/client-go/discovery"
"k8s.io/client-go/rest"
"k8s.io/client-go/restmapper"
)
// dynamicRESTMapper is a RESTMapper that dynamically discovers resource
// types at runtime.
type dynamicRESTMapper struct {
mu sync.RWMutex // protects the following fields
staticMapper meta.RESTMapper
limiter *rate.Limiter
newMapper func() (meta.RESTMapper, error)
lazy bool
// Used for lazy init.
inited uint32
initMtx sync.Mutex
useLazyRestmapper bool
}
// DynamicRESTMapperOption is a functional option on the dynamicRESTMapper.
type DynamicRESTMapperOption func(*dynamicRESTMapper) error
// WithLimiter sets the RESTMapper's underlying limiter to lim.
func WithLimiter(lim *rate.Limiter) DynamicRESTMapperOption {
return func(drm *dynamicRESTMapper) error {
drm.limiter = lim
return nil
}
}
// WithLazyDiscovery prevents the RESTMapper from discovering REST mappings
// until an API call is made.
var WithLazyDiscovery DynamicRESTMapperOption = func(drm *dynamicRESTMapper) error {
drm.lazy = true
return nil
}
// WithExperimentalLazyMapper enables experimental more advanced Lazy Restmapping mechanism.
var WithExperimentalLazyMapper DynamicRESTMapperOption = func(drm *dynamicRESTMapper) error {
drm.useLazyRestmapper = true
return nil
}
// WithCustomMapper supports setting a custom RESTMapper refresher instead of
// the default method, which uses a discovery client.
//
// This exists mainly for testing, but can be useful if you need tighter control
// over how discovery is performed, which discovery endpoints are queried, etc.
func WithCustomMapper(newMapper func() (meta.RESTMapper, error)) DynamicRESTMapperOption {
return func(drm *dynamicRESTMapper) error {
drm.newMapper = newMapper
return nil
}
}
// NewDynamicRESTMapper returns a dynamic RESTMapper for cfg. The dynamic
// RESTMapper dynamically discovers resource types at runtime. opts
// configure the RESTMapper.
func NewDynamicRESTMapper(cfg *rest.Config, opts ...DynamicRESTMapperOption) (meta.RESTMapper, error) {
client, err := discovery.NewDiscoveryClientForConfig(cfg)
if err != nil {
return nil, err
}
drm := &dynamicRESTMapper{
limiter: rate.NewLimiter(rate.Limit(defaultRefillRate), defaultLimitSize),
newMapper: func() (meta.RESTMapper, error) {
groupResources, err := restmapper.GetAPIGroupResources(client)
if err != nil {
return nil, err
}
return restmapper.NewDiscoveryRESTMapper(groupResources), nil
},
}
for _, opt := range opts {
if err = opt(drm); err != nil {
return nil, err
}
}
if drm.useLazyRestmapper {
return newLazyRESTMapperWithClient(client)
}
if !drm.lazy {
if err := drm.setStaticMapper(); err != nil {
return nil, err
}
}
return drm, nil
}
var (
// defaultRefilRate is the default rate at which potential calls are
// added back to the "bucket" of allowed calls.
defaultRefillRate = 5
// defaultLimitSize is the default starting/max number of potential calls
// per second. Once a call is used, it's added back to the bucket at a rate
// of defaultRefillRate per second.
defaultLimitSize = 5
)
// setStaticMapper sets drm's staticMapper by querying its client, regardless
// of reload backoff.
func (drm *dynamicRESTMapper) setStaticMapper() error {
newMapper, err := drm.newMapper()
if err != nil {
return err
}
drm.staticMapper = newMapper
return nil
}
// init initializes drm only once if drm is lazy.
func (drm *dynamicRESTMapper) init() (err error) {
// skip init if drm is not lazy or has initialized
if !drm.lazy || atomic.LoadUint32(&drm.inited) != 0 {
return nil
}
drm.initMtx.Lock()
defer drm.initMtx.Unlock()
if drm.inited == 0 {
if err = drm.setStaticMapper(); err == nil {
atomic.StoreUint32(&drm.inited, 1)
}
}
return err
}
// checkAndReload attempts to call the given callback, which is assumed to be dependent
// on the data in the restmapper.
//
// If the callback returns an error matching meta.IsNoMatchErr, it will attempt to reload
// the RESTMapper's data and re-call the callback once that's occurred.
// If the callback returns any other error, the function will return immediately regardless.
//
// It will take care of ensuring that reloads are rate-limited and that extraneous calls
// aren't made. If a reload would exceed the limiters rate, it returns the error return by
// the callback.
// It's thread-safe, and worries about thread-safety for the callback (so the callback does
// not need to attempt to lock the restmapper).
func (drm *dynamicRESTMapper) checkAndReload(checkNeedsReload func() error) error {
// first, check the common path -- data is fresh enough
// (use an IIFE for the lock's defer)
err := func() error {
drm.mu.RLock()
defer drm.mu.RUnlock()
return checkNeedsReload()
}()
needsReload := meta.IsNoMatchError(err)
if !needsReload {
return err
}
// if the data wasn't fresh, we'll need to try and update it, so grab the lock...
drm.mu.Lock()
defer drm.mu.Unlock()
// ... and double-check that we didn't reload in the meantime
err = checkNeedsReload()
needsReload = meta.IsNoMatchError(err)
if !needsReload {
return err
}
// we're still stale, so grab a rate-limit token if we can...
if !drm.limiter.Allow() {
// return error from static mapper here, we have refreshed often enough (exceeding rate of provided limiter)
// so that client's can handle this the same way as a "normal" NoResourceMatchError / NoKindMatchError
return err
}
// ...reload...
if err := drm.setStaticMapper(); err != nil {
return err
}
// ...and return the results of the closure regardless
return checkNeedsReload()
}
// TODO: wrap reload errors on NoKindMatchError with go 1.13 errors.
func (drm *dynamicRESTMapper) KindFor(resource schema.GroupVersionResource) (schema.GroupVersionKind, error) {
if err := drm.init(); err != nil {
return schema.GroupVersionKind{}, err
}
var gvk schema.GroupVersionKind
err := drm.checkAndReload(func() error {
var err error
gvk, err = drm.staticMapper.KindFor(resource)
return err
})
return gvk, err
}
func (drm *dynamicRESTMapper) KindsFor(resource schema.GroupVersionResource) ([]schema.GroupVersionKind, error) {
if err := drm.init(); err != nil {
return nil, err
}
var gvks []schema.GroupVersionKind
err := drm.checkAndReload(func() error {
var err error
gvks, err = drm.staticMapper.KindsFor(resource)
return err
})
return gvks, err
}
func (drm *dynamicRESTMapper) ResourceFor(input schema.GroupVersionResource) (schema.GroupVersionResource, error) {
if err := drm.init(); err != nil {
return schema.GroupVersionResource{}, err
}
var gvr schema.GroupVersionResource
err := drm.checkAndReload(func() error {
var err error
gvr, err = drm.staticMapper.ResourceFor(input)
return err
})
return gvr, err
}
func (drm *dynamicRESTMapper) ResourcesFor(input schema.GroupVersionResource) ([]schema.GroupVersionResource, error) {
if err := drm.init(); err != nil {
return nil, err
}
var gvrs []schema.GroupVersionResource
err := drm.checkAndReload(func() error {
var err error
gvrs, err = drm.staticMapper.ResourcesFor(input)
return err
})
return gvrs, err
}
func (drm *dynamicRESTMapper) RESTMapping(gk schema.GroupKind, versions ...string) (*meta.RESTMapping, error) {
if err := drm.init(); err != nil {
return nil, err
}
var mapping *meta.RESTMapping
err := drm.checkAndReload(func() error {
var err error
mapping, err = drm.staticMapper.RESTMapping(gk, versions...)
return err
})
return mapping, err
}
func (drm *dynamicRESTMapper) RESTMappings(gk schema.GroupKind, versions ...string) ([]*meta.RESTMapping, error) {
if err := drm.init(); err != nil {
return nil, err
}
var mappings []*meta.RESTMapping
err := drm.checkAndReload(func() error {
var err error
mappings, err = drm.staticMapper.RESTMappings(gk, versions...)
return err
})
return mappings, err
}
func (drm *dynamicRESTMapper) ResourceSingularizer(resource string) (string, error) {
if err := drm.init(); err != nil {
return "", err
}
var singular string
err := drm.checkAndReload(func() error {
var err error
singular, err = drm.staticMapper.ResourceSingularizer(resource)
return err
})
return singular, err
}