mirror of
https://github.com/ceph/ceph-csi.git
synced 2025-01-05 19:49:28 +00:00
9eaa55506f
This commit updates controller-runtime to v0.9.2 and makes changes in persistentvolume.go to add context to various functions and function calls made here instead of context.TODO(). Signed-off-by: Rakshith R <rar@redhat.com>
286 lines
8.6 KiB
Go
286 lines
8.6 KiB
Go
/*
|
|
Copyright 2019 The Kubernetes Authors.
|
|
|
|
Licensed under the Apache License, Version 2.0 (the "License");
|
|
you may not use this file except in compliance with the License.
|
|
You may obtain a copy of the License at
|
|
|
|
http://www.apache.org/licenses/LICENSE-2.0
|
|
|
|
Unless required by applicable law or agreed to in writing, software
|
|
distributed under the License is distributed on an "AS IS" BASIS,
|
|
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
See the License for the specific language governing permissions and
|
|
limitations under the License.
|
|
*/
|
|
|
|
package apiutil
|
|
|
|
import (
|
|
"errors"
|
|
"sync"
|
|
|
|
"golang.org/x/time/rate"
|
|
"k8s.io/apimachinery/pkg/api/meta"
|
|
"k8s.io/apimachinery/pkg/runtime/schema"
|
|
"k8s.io/client-go/discovery"
|
|
"k8s.io/client-go/rest"
|
|
"k8s.io/client-go/restmapper"
|
|
)
|
|
|
|
// dynamicRESTMapper is a RESTMapper that dynamically discovers resource
|
|
// types at runtime.
|
|
type dynamicRESTMapper struct {
|
|
mu sync.RWMutex // protects the following fields
|
|
staticMapper meta.RESTMapper
|
|
limiter *rate.Limiter
|
|
newMapper func() (meta.RESTMapper, error)
|
|
|
|
lazy bool
|
|
// Used for lazy init.
|
|
initOnce sync.Once
|
|
}
|
|
|
|
// DynamicRESTMapperOption is a functional option on the dynamicRESTMapper.
|
|
type DynamicRESTMapperOption func(*dynamicRESTMapper) error
|
|
|
|
// WithLimiter sets the RESTMapper's underlying limiter to lim.
|
|
func WithLimiter(lim *rate.Limiter) DynamicRESTMapperOption {
|
|
return func(drm *dynamicRESTMapper) error {
|
|
drm.limiter = lim
|
|
return nil
|
|
}
|
|
}
|
|
|
|
// WithLazyDiscovery prevents the RESTMapper from discovering REST mappings
|
|
// until an API call is made.
|
|
var WithLazyDiscovery DynamicRESTMapperOption = func(drm *dynamicRESTMapper) error {
|
|
drm.lazy = true
|
|
return nil
|
|
}
|
|
|
|
// WithCustomMapper supports setting a custom RESTMapper refresher instead of
|
|
// the default method, which uses a discovery client.
|
|
//
|
|
// This exists mainly for testing, but can be useful if you need tighter control
|
|
// over how discovery is performed, which discovery endpoints are queried, etc.
|
|
func WithCustomMapper(newMapper func() (meta.RESTMapper, error)) DynamicRESTMapperOption {
|
|
return func(drm *dynamicRESTMapper) error {
|
|
drm.newMapper = newMapper
|
|
return nil
|
|
}
|
|
}
|
|
|
|
// NewDynamicRESTMapper returns a dynamic RESTMapper for cfg. The dynamic
|
|
// RESTMapper dynamically discovers resource types at runtime. opts
|
|
// configure the RESTMapper.
|
|
func NewDynamicRESTMapper(cfg *rest.Config, opts ...DynamicRESTMapperOption) (meta.RESTMapper, error) {
|
|
client, err := discovery.NewDiscoveryClientForConfig(cfg)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
drm := &dynamicRESTMapper{
|
|
limiter: rate.NewLimiter(rate.Limit(defaultRefillRate), defaultLimitSize),
|
|
newMapper: func() (meta.RESTMapper, error) {
|
|
groupResources, err := restmapper.GetAPIGroupResources(client)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
return restmapper.NewDiscoveryRESTMapper(groupResources), nil
|
|
},
|
|
}
|
|
for _, opt := range opts {
|
|
if err = opt(drm); err != nil {
|
|
return nil, err
|
|
}
|
|
}
|
|
if !drm.lazy {
|
|
if err := drm.setStaticMapper(); err != nil {
|
|
return nil, err
|
|
}
|
|
}
|
|
return drm, nil
|
|
}
|
|
|
|
var (
|
|
// defaultRefilRate is the default rate at which potential calls are
|
|
// added back to the "bucket" of allowed calls.
|
|
defaultRefillRate = 5
|
|
// defaultLimitSize is the default starting/max number of potential calls
|
|
// per second. Once a call is used, it's added back to the bucket at a rate
|
|
// of defaultRefillRate per second.
|
|
defaultLimitSize = 5
|
|
)
|
|
|
|
// setStaticMapper sets drm's staticMapper by querying its client, regardless
|
|
// of reload backoff.
|
|
func (drm *dynamicRESTMapper) setStaticMapper() error {
|
|
newMapper, err := drm.newMapper()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
drm.staticMapper = newMapper
|
|
return nil
|
|
}
|
|
|
|
// init initializes drm only once if drm is lazy.
|
|
func (drm *dynamicRESTMapper) init() (err error) {
|
|
drm.initOnce.Do(func() {
|
|
if drm.lazy {
|
|
err = drm.setStaticMapper()
|
|
}
|
|
})
|
|
return err
|
|
}
|
|
|
|
// checkAndReload attempts to call the given callback, which is assumed to be dependent
|
|
// on the data in the restmapper.
|
|
//
|
|
// If the callback returns an error that matches the given error, it will attempt to reload
|
|
// the RESTMapper's data and re-call the callback once that's occurred.
|
|
// If the callback returns any other error, the function will return immediately regardless.
|
|
//
|
|
// It will take care of ensuring that reloads are rate-limited and that extraneous calls
|
|
// aren't made. If a reload would exceed the limiters rate, it returns the error return by
|
|
// the callback.
|
|
// It's thread-safe, and worries about thread-safety for the callback (so the callback does
|
|
// not need to attempt to lock the restmapper).
|
|
func (drm *dynamicRESTMapper) checkAndReload(needsReloadErr error, checkNeedsReload func() error) error {
|
|
// first, check the common path -- data is fresh enough
|
|
// (use an IIFE for the lock's defer)
|
|
err := func() error {
|
|
drm.mu.RLock()
|
|
defer drm.mu.RUnlock()
|
|
|
|
return checkNeedsReload()
|
|
}()
|
|
|
|
// NB(directxman12): `Is` and `As` have a confusing relationship --
|
|
// `Is` is like `== or does this implement .Is`, whereas `As` says
|
|
// `can I type-assert into`
|
|
needsReload := errors.As(err, &needsReloadErr)
|
|
if !needsReload {
|
|
return err
|
|
}
|
|
|
|
// if the data wasn't fresh, we'll need to try and update it, so grab the lock...
|
|
drm.mu.Lock()
|
|
defer drm.mu.Unlock()
|
|
|
|
// ... and double-check that we didn't reload in the meantime
|
|
err = checkNeedsReload()
|
|
needsReload = errors.As(err, &needsReloadErr)
|
|
if !needsReload {
|
|
return err
|
|
}
|
|
|
|
// we're still stale, so grab a rate-limit token if we can...
|
|
if !drm.limiter.Allow() {
|
|
// return error from static mapper here, we have refreshed often enough (exceeding rate of provided limiter)
|
|
// so that client's can handle this the same way as a "normal" NoResourceMatchError / NoKindMatchError
|
|
return err
|
|
}
|
|
|
|
// ...reload...
|
|
if err := drm.setStaticMapper(); err != nil {
|
|
return err
|
|
}
|
|
|
|
// ...and return the results of the closure regardless
|
|
return checkNeedsReload()
|
|
}
|
|
|
|
// TODO: wrap reload errors on NoKindMatchError with go 1.13 errors.
|
|
|
|
func (drm *dynamicRESTMapper) KindFor(resource schema.GroupVersionResource) (schema.GroupVersionKind, error) {
|
|
if err := drm.init(); err != nil {
|
|
return schema.GroupVersionKind{}, err
|
|
}
|
|
var gvk schema.GroupVersionKind
|
|
err := drm.checkAndReload(&meta.NoResourceMatchError{}, func() error {
|
|
var err error
|
|
gvk, err = drm.staticMapper.KindFor(resource)
|
|
return err
|
|
})
|
|
return gvk, err
|
|
}
|
|
|
|
func (drm *dynamicRESTMapper) KindsFor(resource schema.GroupVersionResource) ([]schema.GroupVersionKind, error) {
|
|
if err := drm.init(); err != nil {
|
|
return nil, err
|
|
}
|
|
var gvks []schema.GroupVersionKind
|
|
err := drm.checkAndReload(&meta.NoResourceMatchError{}, func() error {
|
|
var err error
|
|
gvks, err = drm.staticMapper.KindsFor(resource)
|
|
return err
|
|
})
|
|
return gvks, err
|
|
}
|
|
|
|
func (drm *dynamicRESTMapper) ResourceFor(input schema.GroupVersionResource) (schema.GroupVersionResource, error) {
|
|
if err := drm.init(); err != nil {
|
|
return schema.GroupVersionResource{}, err
|
|
}
|
|
|
|
var gvr schema.GroupVersionResource
|
|
err := drm.checkAndReload(&meta.NoResourceMatchError{}, func() error {
|
|
var err error
|
|
gvr, err = drm.staticMapper.ResourceFor(input)
|
|
return err
|
|
})
|
|
return gvr, err
|
|
}
|
|
|
|
func (drm *dynamicRESTMapper) ResourcesFor(input schema.GroupVersionResource) ([]schema.GroupVersionResource, error) {
|
|
if err := drm.init(); err != nil {
|
|
return nil, err
|
|
}
|
|
var gvrs []schema.GroupVersionResource
|
|
err := drm.checkAndReload(&meta.NoResourceMatchError{}, func() error {
|
|
var err error
|
|
gvrs, err = drm.staticMapper.ResourcesFor(input)
|
|
return err
|
|
})
|
|
return gvrs, err
|
|
}
|
|
|
|
func (drm *dynamicRESTMapper) RESTMapping(gk schema.GroupKind, versions ...string) (*meta.RESTMapping, error) {
|
|
if err := drm.init(); err != nil {
|
|
return nil, err
|
|
}
|
|
var mapping *meta.RESTMapping
|
|
err := drm.checkAndReload(&meta.NoKindMatchError{}, func() error {
|
|
var err error
|
|
mapping, err = drm.staticMapper.RESTMapping(gk, versions...)
|
|
return err
|
|
})
|
|
return mapping, err
|
|
}
|
|
|
|
func (drm *dynamicRESTMapper) RESTMappings(gk schema.GroupKind, versions ...string) ([]*meta.RESTMapping, error) {
|
|
if err := drm.init(); err != nil {
|
|
return nil, err
|
|
}
|
|
var mappings []*meta.RESTMapping
|
|
err := drm.checkAndReload(&meta.NoKindMatchError{}, func() error {
|
|
var err error
|
|
mappings, err = drm.staticMapper.RESTMappings(gk, versions...)
|
|
return err
|
|
})
|
|
return mappings, err
|
|
}
|
|
|
|
func (drm *dynamicRESTMapper) ResourceSingularizer(resource string) (string, error) {
|
|
if err := drm.init(); err != nil {
|
|
return "", err
|
|
}
|
|
var singular string
|
|
err := drm.checkAndReload(&meta.NoResourceMatchError{}, func() error {
|
|
var err error
|
|
singular, err = drm.staticMapper.ResourceSingularizer(resource)
|
|
return err
|
|
})
|
|
return singular, err
|
|
}
|