mirror of
https://github.com/ceph/ceph-csi.git
synced 2025-01-04 11:09:29 +00:00
296 lines
9.6 KiB
Go
296 lines
9.6 KiB
Go
|
/*
|
||
|
Copyright 2016 The Kubernetes Authors.
|
||
|
|
||
|
Licensed under the Apache License, Version 2.0 (the "License");
|
||
|
you may not use this file except in compliance with the License.
|
||
|
You may obtain a copy of the License at
|
||
|
|
||
|
http://www.apache.org/licenses/LICENSE-2.0
|
||
|
|
||
|
Unless required by applicable law or agreed to in writing, software
|
||
|
distributed under the License is distributed on an "AS IS" BASIS,
|
||
|
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||
|
See the License for the specific language governing permissions and
|
||
|
limitations under the License.
|
||
|
*/
|
||
|
|
||
|
package discovery
|
||
|
|
||
|
import (
|
||
|
"errors"
|
||
|
"io/ioutil"
|
||
|
"net/http"
|
||
|
"os"
|
||
|
"path/filepath"
|
||
|
"sync"
|
||
|
"time"
|
||
|
|
||
|
"github.com/googleapis/gnostic/OpenAPIv2"
|
||
|
"k8s.io/klog"
|
||
|
|
||
|
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||
|
"k8s.io/apimachinery/pkg/runtime"
|
||
|
"k8s.io/apimachinery/pkg/version"
|
||
|
"k8s.io/client-go/kubernetes/scheme"
|
||
|
restclient "k8s.io/client-go/rest"
|
||
|
)
|
||
|
|
||
|
// CachedDiscoveryClient implements the functions that discovery server-supported API groups,
|
||
|
// versions and resources.
|
||
|
type CachedDiscoveryClient struct {
|
||
|
delegate DiscoveryInterface
|
||
|
|
||
|
// cacheDirectory is the directory where discovery docs are held. It must be unique per host:port combination to work well.
|
||
|
cacheDirectory string
|
||
|
|
||
|
// ttl is how long the cache should be considered valid
|
||
|
ttl time.Duration
|
||
|
|
||
|
// mutex protects the variables below
|
||
|
mutex sync.Mutex
|
||
|
|
||
|
// ourFiles are all filenames of cache files created by this process
|
||
|
ourFiles map[string]struct{}
|
||
|
// invalidated is true if all cache files should be ignored that are not ours (e.g. after Invalidate() was called)
|
||
|
invalidated bool
|
||
|
// fresh is true if all used cache files were ours
|
||
|
fresh bool
|
||
|
}
|
||
|
|
||
|
var _ CachedDiscoveryInterface = &CachedDiscoveryClient{}
|
||
|
|
||
|
// ServerResourcesForGroupVersion returns the supported resources for a group and version.
|
||
|
func (d *CachedDiscoveryClient) ServerResourcesForGroupVersion(groupVersion string) (*metav1.APIResourceList, error) {
|
||
|
filename := filepath.Join(d.cacheDirectory, groupVersion, "serverresources.json")
|
||
|
cachedBytes, err := d.getCachedFile(filename)
|
||
|
// don't fail on errors, we either don't have a file or won't be able to run the cached check. Either way we can fallback.
|
||
|
if err == nil {
|
||
|
cachedResources := &metav1.APIResourceList{}
|
||
|
if err := runtime.DecodeInto(scheme.Codecs.UniversalDecoder(), cachedBytes, cachedResources); err == nil {
|
||
|
klog.V(10).Infof("returning cached discovery info from %v", filename)
|
||
|
return cachedResources, nil
|
||
|
}
|
||
|
}
|
||
|
|
||
|
liveResources, err := d.delegate.ServerResourcesForGroupVersion(groupVersion)
|
||
|
if err != nil {
|
||
|
klog.V(3).Infof("skipped caching discovery info due to %v", err)
|
||
|
return liveResources, err
|
||
|
}
|
||
|
if liveResources == nil || len(liveResources.APIResources) == 0 {
|
||
|
klog.V(3).Infof("skipped caching discovery info, no resources found")
|
||
|
return liveResources, err
|
||
|
}
|
||
|
|
||
|
if err := d.writeCachedFile(filename, liveResources); err != nil {
|
||
|
klog.V(1).Infof("failed to write cache to %v due to %v", filename, err)
|
||
|
}
|
||
|
|
||
|
return liveResources, nil
|
||
|
}
|
||
|
|
||
|
// ServerResources returns the supported resources for all groups and versions.
|
||
|
func (d *CachedDiscoveryClient) ServerResources() ([]*metav1.APIResourceList, error) {
|
||
|
return ServerResources(d)
|
||
|
}
|
||
|
|
||
|
// ServerGroups returns the supported groups, with information like supported versions and the
|
||
|
// preferred version.
|
||
|
func (d *CachedDiscoveryClient) ServerGroups() (*metav1.APIGroupList, error) {
|
||
|
filename := filepath.Join(d.cacheDirectory, "servergroups.json")
|
||
|
cachedBytes, err := d.getCachedFile(filename)
|
||
|
// don't fail on errors, we either don't have a file or won't be able to run the cached check. Either way we can fallback.
|
||
|
if err == nil {
|
||
|
cachedGroups := &metav1.APIGroupList{}
|
||
|
if err := runtime.DecodeInto(scheme.Codecs.UniversalDecoder(), cachedBytes, cachedGroups); err == nil {
|
||
|
klog.V(10).Infof("returning cached discovery info from %v", filename)
|
||
|
return cachedGroups, nil
|
||
|
}
|
||
|
}
|
||
|
|
||
|
liveGroups, err := d.delegate.ServerGroups()
|
||
|
if err != nil {
|
||
|
klog.V(3).Infof("skipped caching discovery info due to %v", err)
|
||
|
return liveGroups, err
|
||
|
}
|
||
|
if liveGroups == nil || len(liveGroups.Groups) == 0 {
|
||
|
klog.V(3).Infof("skipped caching discovery info, no groups found")
|
||
|
return liveGroups, err
|
||
|
}
|
||
|
|
||
|
if err := d.writeCachedFile(filename, liveGroups); err != nil {
|
||
|
klog.V(1).Infof("failed to write cache to %v due to %v", filename, err)
|
||
|
}
|
||
|
|
||
|
return liveGroups, nil
|
||
|
}
|
||
|
|
||
|
func (d *CachedDiscoveryClient) getCachedFile(filename string) ([]byte, error) {
|
||
|
// after invalidation ignore cache files not created by this process
|
||
|
d.mutex.Lock()
|
||
|
_, ourFile := d.ourFiles[filename]
|
||
|
if d.invalidated && !ourFile {
|
||
|
d.mutex.Unlock()
|
||
|
return nil, errors.New("cache invalidated")
|
||
|
}
|
||
|
d.mutex.Unlock()
|
||
|
|
||
|
file, err := os.Open(filename)
|
||
|
if err != nil {
|
||
|
return nil, err
|
||
|
}
|
||
|
defer file.Close()
|
||
|
|
||
|
fileInfo, err := file.Stat()
|
||
|
if err != nil {
|
||
|
return nil, err
|
||
|
}
|
||
|
|
||
|
if time.Now().After(fileInfo.ModTime().Add(d.ttl)) {
|
||
|
return nil, errors.New("cache expired")
|
||
|
}
|
||
|
|
||
|
// the cache is present and its valid. Try to read and use it.
|
||
|
cachedBytes, err := ioutil.ReadAll(file)
|
||
|
if err != nil {
|
||
|
return nil, err
|
||
|
}
|
||
|
|
||
|
d.mutex.Lock()
|
||
|
defer d.mutex.Unlock()
|
||
|
d.fresh = d.fresh && ourFile
|
||
|
|
||
|
return cachedBytes, nil
|
||
|
}
|
||
|
|
||
|
func (d *CachedDiscoveryClient) writeCachedFile(filename string, obj runtime.Object) error {
|
||
|
if err := os.MkdirAll(filepath.Dir(filename), 0755); err != nil {
|
||
|
return err
|
||
|
}
|
||
|
|
||
|
bytes, err := runtime.Encode(scheme.Codecs.LegacyCodec(), obj)
|
||
|
if err != nil {
|
||
|
return err
|
||
|
}
|
||
|
|
||
|
f, err := ioutil.TempFile(filepath.Dir(filename), filepath.Base(filename)+".")
|
||
|
if err != nil {
|
||
|
return err
|
||
|
}
|
||
|
defer os.Remove(f.Name())
|
||
|
_, err = f.Write(bytes)
|
||
|
if err != nil {
|
||
|
return err
|
||
|
}
|
||
|
|
||
|
err = os.Chmod(f.Name(), 0755)
|
||
|
if err != nil {
|
||
|
return err
|
||
|
}
|
||
|
|
||
|
name := f.Name()
|
||
|
err = f.Close()
|
||
|
if err != nil {
|
||
|
return err
|
||
|
}
|
||
|
|
||
|
// atomic rename
|
||
|
d.mutex.Lock()
|
||
|
defer d.mutex.Unlock()
|
||
|
err = os.Rename(name, filename)
|
||
|
if err == nil {
|
||
|
d.ourFiles[filename] = struct{}{}
|
||
|
}
|
||
|
return err
|
||
|
}
|
||
|
|
||
|
// RESTClient returns a RESTClient that is used to communicate with API server
|
||
|
// by this client implementation.
|
||
|
func (d *CachedDiscoveryClient) RESTClient() restclient.Interface {
|
||
|
return d.delegate.RESTClient()
|
||
|
}
|
||
|
|
||
|
// ServerPreferredResources returns the supported resources with the version preferred by the
|
||
|
// server.
|
||
|
func (d *CachedDiscoveryClient) ServerPreferredResources() ([]*metav1.APIResourceList, error) {
|
||
|
return ServerPreferredResources(d)
|
||
|
}
|
||
|
|
||
|
// ServerPreferredNamespacedResources returns the supported namespaced resources with the
|
||
|
// version preferred by the server.
|
||
|
func (d *CachedDiscoveryClient) ServerPreferredNamespacedResources() ([]*metav1.APIResourceList, error) {
|
||
|
return ServerPreferredNamespacedResources(d)
|
||
|
}
|
||
|
|
||
|
// ServerVersion retrieves and parses the server's version (git version).
|
||
|
func (d *CachedDiscoveryClient) ServerVersion() (*version.Info, error) {
|
||
|
return d.delegate.ServerVersion()
|
||
|
}
|
||
|
|
||
|
// OpenAPISchema retrieves and parses the swagger API schema the server supports.
|
||
|
func (d *CachedDiscoveryClient) OpenAPISchema() (*openapi_v2.Document, error) {
|
||
|
return d.delegate.OpenAPISchema()
|
||
|
}
|
||
|
|
||
|
// Fresh is supposed to tell the caller whether or not to retry if the cache
|
||
|
// fails to find something (false = retry, true = no need to retry).
|
||
|
func (d *CachedDiscoveryClient) Fresh() bool {
|
||
|
d.mutex.Lock()
|
||
|
defer d.mutex.Unlock()
|
||
|
|
||
|
return d.fresh
|
||
|
}
|
||
|
|
||
|
// Invalidate enforces that no cached data is used in the future that is older than the current time.
|
||
|
func (d *CachedDiscoveryClient) Invalidate() {
|
||
|
d.mutex.Lock()
|
||
|
defer d.mutex.Unlock()
|
||
|
|
||
|
d.ourFiles = map[string]struct{}{}
|
||
|
d.fresh = true
|
||
|
d.invalidated = true
|
||
|
}
|
||
|
|
||
|
// NewCachedDiscoveryClientForConfig creates a new DiscoveryClient for the given config, and wraps
|
||
|
// the created client in a CachedDiscoveryClient. The provided configuration is updated with a
|
||
|
// custom transport that understands cache responses.
|
||
|
// We receive two distinct cache directories for now, in order to preserve old behavior
|
||
|
// which makes use of the --cache-dir flag value for storing cache data from the CacheRoundTripper,
|
||
|
// and makes use of the hardcoded destination (~/.kube/cache/discovery/...) for storing
|
||
|
// CachedDiscoveryClient cache data. If httpCacheDir is empty, the restconfig's transport will not
|
||
|
// be updated with a roundtripper that understands cache responses.
|
||
|
// If discoveryCacheDir is empty, cached server resource data will be looked up in the current directory.
|
||
|
// TODO(juanvallejo): the value of "--cache-dir" should be honored. Consolidate discoveryCacheDir with httpCacheDir
|
||
|
// so that server resources and http-cache data are stored in the same location, provided via config flags.
|
||
|
func NewCachedDiscoveryClientForConfig(config *restclient.Config, discoveryCacheDir, httpCacheDir string, ttl time.Duration) (*CachedDiscoveryClient, error) {
|
||
|
if len(httpCacheDir) > 0 {
|
||
|
// update the given restconfig with a custom roundtripper that
|
||
|
// understands how to handle cache responses.
|
||
|
wt := config.WrapTransport
|
||
|
config.WrapTransport = func(rt http.RoundTripper) http.RoundTripper {
|
||
|
if wt != nil {
|
||
|
rt = wt(rt)
|
||
|
}
|
||
|
return newCacheRoundTripper(httpCacheDir, rt)
|
||
|
}
|
||
|
}
|
||
|
|
||
|
discoveryClient, err := NewDiscoveryClientForConfig(config)
|
||
|
if err != nil {
|
||
|
return nil, err
|
||
|
}
|
||
|
|
||
|
return newCachedDiscoveryClient(discoveryClient, discoveryCacheDir, ttl), nil
|
||
|
}
|
||
|
|
||
|
// NewCachedDiscoveryClient creates a new DiscoveryClient. cacheDirectory is the directory where discovery docs are held. It must be unique per host:port combination to work well.
|
||
|
func newCachedDiscoveryClient(delegate DiscoveryInterface, cacheDirectory string, ttl time.Duration) *CachedDiscoveryClient {
|
||
|
return &CachedDiscoveryClient{
|
||
|
delegate: delegate,
|
||
|
cacheDirectory: cacheDirectory,
|
||
|
ttl: ttl,
|
||
|
ourFiles: map[string]struct{}{},
|
||
|
fresh: true,
|
||
|
}
|
||
|
}
|