mirror of
https://github.com/ceph/ceph-csi.git
synced 2025-01-09 05:19:29 +00:00
7d9ab0ba8a
Bumps [sigs.k8s.io/controller-runtime](https://github.com/kubernetes-sigs/controller-runtime) from 0.14.2 to 0.14.4. - [Release notes](https://github.com/kubernetes-sigs/controller-runtime/releases) - [Changelog](https://github.com/kubernetes-sigs/controller-runtime/blob/master/RELEASE.md) - [Commits](https://github.com/kubernetes-sigs/controller-runtime/compare/v0.14.2...v0.14.4) --- updated-dependencies: - dependency-name: sigs.k8s.io/controller-runtime dependency-type: direct:production update-type: version-update:semver-patch ... Signed-off-by: dependabot[bot] <support@github.com>
302 lines
8.7 KiB
Go
302 lines
8.7 KiB
Go
/*
|
|
Copyright 2019 The Kubernetes Authors.
|
|
|
|
Licensed under the Apache License, Version 2.0 (the "License");
|
|
you may not use this file except in compliance with the License.
|
|
You may obtain a copy of the License at
|
|
|
|
http://www.apache.org/licenses/LICENSE-2.0
|
|
|
|
Unless required by applicable law or agreed to in writing, software
|
|
distributed under the License is distributed on an "AS IS" BASIS,
|
|
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
See the License for the specific language governing permissions and
|
|
limitations under the License.
|
|
*/
|
|
|
|
package apiutil
|
|
|
|
import (
|
|
"sync"
|
|
"sync/atomic"
|
|
|
|
"golang.org/x/time/rate"
|
|
"k8s.io/apimachinery/pkg/api/meta"
|
|
"k8s.io/apimachinery/pkg/runtime/schema"
|
|
"k8s.io/client-go/discovery"
|
|
"k8s.io/client-go/rest"
|
|
"k8s.io/client-go/restmapper"
|
|
)
|
|
|
|
// dynamicRESTMapper is a RESTMapper that dynamically discovers resource
|
|
// types at runtime.
|
|
type dynamicRESTMapper struct {
|
|
mu sync.RWMutex // protects the following fields
|
|
staticMapper meta.RESTMapper
|
|
limiter *rate.Limiter
|
|
newMapper func() (meta.RESTMapper, error)
|
|
|
|
lazy bool
|
|
// Used for lazy init.
|
|
inited uint32
|
|
initMtx sync.Mutex
|
|
|
|
useLazyRestmapper bool
|
|
}
|
|
|
|
// DynamicRESTMapperOption is a functional option on the dynamicRESTMapper.
|
|
type DynamicRESTMapperOption func(*dynamicRESTMapper) error
|
|
|
|
// WithLimiter sets the RESTMapper's underlying limiter to lim.
|
|
func WithLimiter(lim *rate.Limiter) DynamicRESTMapperOption {
|
|
return func(drm *dynamicRESTMapper) error {
|
|
drm.limiter = lim
|
|
return nil
|
|
}
|
|
}
|
|
|
|
// WithLazyDiscovery prevents the RESTMapper from discovering REST mappings
|
|
// until an API call is made.
|
|
var WithLazyDiscovery DynamicRESTMapperOption = func(drm *dynamicRESTMapper) error {
|
|
drm.lazy = true
|
|
return nil
|
|
}
|
|
|
|
// WithExperimentalLazyMapper enables experimental more advanced Lazy Restmapping mechanism.
|
|
var WithExperimentalLazyMapper DynamicRESTMapperOption = func(drm *dynamicRESTMapper) error {
|
|
drm.useLazyRestmapper = true
|
|
return nil
|
|
}
|
|
|
|
// WithCustomMapper supports setting a custom RESTMapper refresher instead of
|
|
// the default method, which uses a discovery client.
|
|
//
|
|
// This exists mainly for testing, but can be useful if you need tighter control
|
|
// over how discovery is performed, which discovery endpoints are queried, etc.
|
|
func WithCustomMapper(newMapper func() (meta.RESTMapper, error)) DynamicRESTMapperOption {
|
|
return func(drm *dynamicRESTMapper) error {
|
|
drm.newMapper = newMapper
|
|
return nil
|
|
}
|
|
}
|
|
|
|
// NewDynamicRESTMapper returns a dynamic RESTMapper for cfg. The dynamic
|
|
// RESTMapper dynamically discovers resource types at runtime. opts
|
|
// configure the RESTMapper.
|
|
func NewDynamicRESTMapper(cfg *rest.Config, opts ...DynamicRESTMapperOption) (meta.RESTMapper, error) {
|
|
client, err := discovery.NewDiscoveryClientForConfig(cfg)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
drm := &dynamicRESTMapper{
|
|
limiter: rate.NewLimiter(rate.Limit(defaultRefillRate), defaultLimitSize),
|
|
newMapper: func() (meta.RESTMapper, error) {
|
|
groupResources, err := restmapper.GetAPIGroupResources(client)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
return restmapper.NewDiscoveryRESTMapper(groupResources), nil
|
|
},
|
|
}
|
|
for _, opt := range opts {
|
|
if err = opt(drm); err != nil {
|
|
return nil, err
|
|
}
|
|
}
|
|
if drm.useLazyRestmapper {
|
|
return newLazyRESTMapperWithClient(client)
|
|
}
|
|
if !drm.lazy {
|
|
if err := drm.setStaticMapper(); err != nil {
|
|
return nil, err
|
|
}
|
|
}
|
|
return drm, nil
|
|
}
|
|
|
|
var (
|
|
// defaultRefilRate is the default rate at which potential calls are
|
|
// added back to the "bucket" of allowed calls.
|
|
defaultRefillRate = 5
|
|
// defaultLimitSize is the default starting/max number of potential calls
|
|
// per second. Once a call is used, it's added back to the bucket at a rate
|
|
// of defaultRefillRate per second.
|
|
defaultLimitSize = 5
|
|
)
|
|
|
|
// setStaticMapper sets drm's staticMapper by querying its client, regardless
|
|
// of reload backoff.
|
|
func (drm *dynamicRESTMapper) setStaticMapper() error {
|
|
newMapper, err := drm.newMapper()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
drm.staticMapper = newMapper
|
|
return nil
|
|
}
|
|
|
|
// init initializes drm only once if drm is lazy.
|
|
func (drm *dynamicRESTMapper) init() (err error) {
|
|
// skip init if drm is not lazy or has initialized
|
|
if !drm.lazy || atomic.LoadUint32(&drm.inited) != 0 {
|
|
return nil
|
|
}
|
|
|
|
drm.initMtx.Lock()
|
|
defer drm.initMtx.Unlock()
|
|
if drm.inited == 0 {
|
|
if err = drm.setStaticMapper(); err == nil {
|
|
atomic.StoreUint32(&drm.inited, 1)
|
|
}
|
|
}
|
|
return err
|
|
}
|
|
|
|
// checkAndReload attempts to call the given callback, which is assumed to be dependent
|
|
// on the data in the restmapper.
|
|
//
|
|
// If the callback returns an error matching meta.IsNoMatchErr, it will attempt to reload
|
|
// the RESTMapper's data and re-call the callback once that's occurred.
|
|
// If the callback returns any other error, the function will return immediately regardless.
|
|
//
|
|
// It will take care of ensuring that reloads are rate-limited and that extraneous calls
|
|
// aren't made. If a reload would exceed the limiters rate, it returns the error return by
|
|
// the callback.
|
|
// It's thread-safe, and worries about thread-safety for the callback (so the callback does
|
|
// not need to attempt to lock the restmapper).
|
|
func (drm *dynamicRESTMapper) checkAndReload(checkNeedsReload func() error) error {
|
|
// first, check the common path -- data is fresh enough
|
|
// (use an IIFE for the lock's defer)
|
|
err := func() error {
|
|
drm.mu.RLock()
|
|
defer drm.mu.RUnlock()
|
|
|
|
return checkNeedsReload()
|
|
}()
|
|
|
|
needsReload := meta.IsNoMatchError(err)
|
|
if !needsReload {
|
|
return err
|
|
}
|
|
|
|
// if the data wasn't fresh, we'll need to try and update it, so grab the lock...
|
|
drm.mu.Lock()
|
|
defer drm.mu.Unlock()
|
|
|
|
// ... and double-check that we didn't reload in the meantime
|
|
err = checkNeedsReload()
|
|
needsReload = meta.IsNoMatchError(err)
|
|
if !needsReload {
|
|
return err
|
|
}
|
|
|
|
// we're still stale, so grab a rate-limit token if we can...
|
|
if !drm.limiter.Allow() {
|
|
// return error from static mapper here, we have refreshed often enough (exceeding rate of provided limiter)
|
|
// so that client's can handle this the same way as a "normal" NoResourceMatchError / NoKindMatchError
|
|
return err
|
|
}
|
|
|
|
// ...reload...
|
|
if err := drm.setStaticMapper(); err != nil {
|
|
return err
|
|
}
|
|
|
|
// ...and return the results of the closure regardless
|
|
return checkNeedsReload()
|
|
}
|
|
|
|
// TODO: wrap reload errors on NoKindMatchError with go 1.13 errors.
|
|
|
|
func (drm *dynamicRESTMapper) KindFor(resource schema.GroupVersionResource) (schema.GroupVersionKind, error) {
|
|
if err := drm.init(); err != nil {
|
|
return schema.GroupVersionKind{}, err
|
|
}
|
|
var gvk schema.GroupVersionKind
|
|
err := drm.checkAndReload(func() error {
|
|
var err error
|
|
gvk, err = drm.staticMapper.KindFor(resource)
|
|
return err
|
|
})
|
|
return gvk, err
|
|
}
|
|
|
|
func (drm *dynamicRESTMapper) KindsFor(resource schema.GroupVersionResource) ([]schema.GroupVersionKind, error) {
|
|
if err := drm.init(); err != nil {
|
|
return nil, err
|
|
}
|
|
var gvks []schema.GroupVersionKind
|
|
err := drm.checkAndReload(func() error {
|
|
var err error
|
|
gvks, err = drm.staticMapper.KindsFor(resource)
|
|
return err
|
|
})
|
|
return gvks, err
|
|
}
|
|
|
|
func (drm *dynamicRESTMapper) ResourceFor(input schema.GroupVersionResource) (schema.GroupVersionResource, error) {
|
|
if err := drm.init(); err != nil {
|
|
return schema.GroupVersionResource{}, err
|
|
}
|
|
|
|
var gvr schema.GroupVersionResource
|
|
err := drm.checkAndReload(func() error {
|
|
var err error
|
|
gvr, err = drm.staticMapper.ResourceFor(input)
|
|
return err
|
|
})
|
|
return gvr, err
|
|
}
|
|
|
|
func (drm *dynamicRESTMapper) ResourcesFor(input schema.GroupVersionResource) ([]schema.GroupVersionResource, error) {
|
|
if err := drm.init(); err != nil {
|
|
return nil, err
|
|
}
|
|
var gvrs []schema.GroupVersionResource
|
|
err := drm.checkAndReload(func() error {
|
|
var err error
|
|
gvrs, err = drm.staticMapper.ResourcesFor(input)
|
|
return err
|
|
})
|
|
return gvrs, err
|
|
}
|
|
|
|
func (drm *dynamicRESTMapper) RESTMapping(gk schema.GroupKind, versions ...string) (*meta.RESTMapping, error) {
|
|
if err := drm.init(); err != nil {
|
|
return nil, err
|
|
}
|
|
var mapping *meta.RESTMapping
|
|
err := drm.checkAndReload(func() error {
|
|
var err error
|
|
mapping, err = drm.staticMapper.RESTMapping(gk, versions...)
|
|
return err
|
|
})
|
|
return mapping, err
|
|
}
|
|
|
|
func (drm *dynamicRESTMapper) RESTMappings(gk schema.GroupKind, versions ...string) ([]*meta.RESTMapping, error) {
|
|
if err := drm.init(); err != nil {
|
|
return nil, err
|
|
}
|
|
var mappings []*meta.RESTMapping
|
|
err := drm.checkAndReload(func() error {
|
|
var err error
|
|
mappings, err = drm.staticMapper.RESTMappings(gk, versions...)
|
|
return err
|
|
})
|
|
return mappings, err
|
|
}
|
|
|
|
func (drm *dynamicRESTMapper) ResourceSingularizer(resource string) (string, error) {
|
|
if err := drm.init(); err != nil {
|
|
return "", err
|
|
}
|
|
var singular string
|
|
err := drm.checkAndReload(func() error {
|
|
var err error
|
|
singular, err = drm.staticMapper.ResourceSingularizer(resource)
|
|
return err
|
|
})
|
|
return singular, err
|
|
}
|