mirror of
https://github.com/ceph/ceph-csi.git
synced 2025-01-06 03:49:30 +00:00
f080b9e0c9
Signed-off-by: Niels de Vos <ndevos@ibm.com>
1019 lines
42 KiB
Go
1019 lines
42 KiB
Go
/*
|
|
Copyright 2014 The Kubernetes Authors.
|
|
|
|
Licensed under the Apache License, Version 2.0 (the "License");
|
|
you may not use this file except in compliance with the License.
|
|
You may obtain a copy of the License at
|
|
|
|
http://www.apache.org/licenses/LICENSE-2.0
|
|
|
|
Unless required by applicable law or agreed to in writing, software
|
|
distributed under the License is distributed on an "AS IS" BASIS,
|
|
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
See the License for the specific language governing permissions and
|
|
limitations under the License.
|
|
*/
|
|
|
|
package server
|
|
|
|
import (
|
|
"context"
|
|
"errors"
|
|
"fmt"
|
|
"net/http"
|
|
gpath "path"
|
|
"strings"
|
|
"sync"
|
|
"time"
|
|
|
|
systemd "github.com/coreos/go-systemd/v22/daemon"
|
|
|
|
"golang.org/x/time/rate"
|
|
apidiscoveryv2beta1 "k8s.io/api/apidiscovery/v2beta1"
|
|
"k8s.io/apimachinery/pkg/api/meta"
|
|
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
|
"k8s.io/apimachinery/pkg/runtime"
|
|
"k8s.io/apimachinery/pkg/runtime/schema"
|
|
"k8s.io/apimachinery/pkg/runtime/serializer"
|
|
"k8s.io/apimachinery/pkg/util/managedfields"
|
|
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
|
|
"k8s.io/apimachinery/pkg/util/sets"
|
|
utilwaitgroup "k8s.io/apimachinery/pkg/util/waitgroup"
|
|
"k8s.io/apimachinery/pkg/version"
|
|
"k8s.io/apiserver/pkg/admission"
|
|
"k8s.io/apiserver/pkg/audit"
|
|
"k8s.io/apiserver/pkg/authorization/authorizer"
|
|
genericapi "k8s.io/apiserver/pkg/endpoints"
|
|
"k8s.io/apiserver/pkg/endpoints/discovery"
|
|
discoveryendpoint "k8s.io/apiserver/pkg/endpoints/discovery/aggregated"
|
|
"k8s.io/apiserver/pkg/features"
|
|
"k8s.io/apiserver/pkg/registry/rest"
|
|
"k8s.io/apiserver/pkg/server/healthz"
|
|
"k8s.io/apiserver/pkg/server/routes"
|
|
"k8s.io/apiserver/pkg/storageversion"
|
|
utilfeature "k8s.io/apiserver/pkg/util/feature"
|
|
restclient "k8s.io/client-go/rest"
|
|
"k8s.io/klog/v2"
|
|
openapibuilder3 "k8s.io/kube-openapi/pkg/builder3"
|
|
openapicommon "k8s.io/kube-openapi/pkg/common"
|
|
"k8s.io/kube-openapi/pkg/handler"
|
|
"k8s.io/kube-openapi/pkg/handler3"
|
|
openapiutil "k8s.io/kube-openapi/pkg/util"
|
|
"k8s.io/kube-openapi/pkg/validation/spec"
|
|
"k8s.io/utils/clock"
|
|
)
|
|
|
|
// Info about an API group.
|
|
type APIGroupInfo struct {
|
|
PrioritizedVersions []schema.GroupVersion
|
|
// Info about the resources in this group. It's a map from version to resource to the storage.
|
|
VersionedResourcesStorageMap map[string]map[string]rest.Storage
|
|
// OptionsExternalVersion controls the APIVersion used for common objects in the
|
|
// schema like api.Status, api.DeleteOptions, and metav1.ListOptions. Other implementors may
|
|
// define a version "v1beta1" but want to use the Kubernetes "v1" internal objects.
|
|
// If nil, defaults to groupMeta.GroupVersion.
|
|
// TODO: Remove this when https://github.com/kubernetes/kubernetes/issues/19018 is fixed.
|
|
OptionsExternalVersion *schema.GroupVersion
|
|
// MetaGroupVersion defaults to "meta.k8s.io/v1" and is the scheme group version used to decode
|
|
// common API implementations like ListOptions. Future changes will allow this to vary by group
|
|
// version (for when the inevitable meta/v2 group emerges).
|
|
MetaGroupVersion *schema.GroupVersion
|
|
|
|
// Scheme includes all of the types used by this group and how to convert between them (or
|
|
// to convert objects from outside of this group that are accepted in this API).
|
|
// TODO: replace with interfaces
|
|
Scheme *runtime.Scheme
|
|
// NegotiatedSerializer controls how this group encodes and decodes data
|
|
NegotiatedSerializer runtime.NegotiatedSerializer
|
|
// ParameterCodec performs conversions for query parameters passed to API calls
|
|
ParameterCodec runtime.ParameterCodec
|
|
|
|
// StaticOpenAPISpec is the spec derived from the definitions of all resources installed together.
|
|
// It is set during InstallAPIGroups, InstallAPIGroup, and InstallLegacyAPIGroup.
|
|
StaticOpenAPISpec map[string]*spec.Schema
|
|
}
|
|
|
|
func (a *APIGroupInfo) destroyStorage() {
|
|
for _, stores := range a.VersionedResourcesStorageMap {
|
|
for _, store := range stores {
|
|
store.Destroy()
|
|
}
|
|
}
|
|
}
|
|
|
|
// GenericAPIServer contains state for a Kubernetes cluster api server.
|
|
type GenericAPIServer struct {
|
|
// discoveryAddresses is used to build cluster IPs for discovery.
|
|
discoveryAddresses discovery.Addresses
|
|
|
|
// LoopbackClientConfig is a config for a privileged loopback connection to the API server
|
|
LoopbackClientConfig *restclient.Config
|
|
|
|
// minRequestTimeout is how short the request timeout can be. This is used to build the RESTHandler
|
|
minRequestTimeout time.Duration
|
|
|
|
// ShutdownTimeout is the timeout used for server shutdown. This specifies the timeout before server
|
|
// gracefully shutdown returns.
|
|
ShutdownTimeout time.Duration
|
|
|
|
// legacyAPIGroupPrefixes is used to set up URL parsing for authorization and for validating requests
|
|
// to InstallLegacyAPIGroup
|
|
legacyAPIGroupPrefixes sets.String
|
|
|
|
// admissionControl is used to build the RESTStorage that backs an API Group.
|
|
admissionControl admission.Interface
|
|
|
|
// SecureServingInfo holds configuration of the TLS server.
|
|
SecureServingInfo *SecureServingInfo
|
|
|
|
// ExternalAddress is the address (hostname or IP and port) that should be used in
|
|
// external (public internet) URLs for this GenericAPIServer.
|
|
ExternalAddress string
|
|
|
|
// Serializer controls how common API objects not in a group/version prefix are serialized for this server.
|
|
// Individual APIGroups may define their own serializers.
|
|
Serializer runtime.NegotiatedSerializer
|
|
|
|
// "Outputs"
|
|
// Handler holds the handlers being used by this API server
|
|
Handler *APIServerHandler
|
|
|
|
// UnprotectedDebugSocket is used to serve pprof information in a unix-domain socket. This socket is
|
|
// not protected by authentication/authorization.
|
|
UnprotectedDebugSocket *routes.DebugSocket
|
|
|
|
// listedPathProvider is a lister which provides the set of paths to show at /
|
|
listedPathProvider routes.ListedPathProvider
|
|
|
|
// DiscoveryGroupManager serves /apis in an unaggregated form.
|
|
DiscoveryGroupManager discovery.GroupManager
|
|
|
|
// AggregatedDiscoveryGroupManager serves /apis in an aggregated form.
|
|
AggregatedDiscoveryGroupManager discoveryendpoint.ResourceManager
|
|
|
|
// AggregatedLegacyDiscoveryGroupManager serves /api in an aggregated form.
|
|
AggregatedLegacyDiscoveryGroupManager discoveryendpoint.ResourceManager
|
|
|
|
// Enable swagger and/or OpenAPI if these configs are non-nil.
|
|
openAPIConfig *openapicommon.Config
|
|
|
|
// Enable swagger and/or OpenAPI V3 if these configs are non-nil.
|
|
openAPIV3Config *openapicommon.OpenAPIV3Config
|
|
|
|
// SkipOpenAPIInstallation indicates not to install the OpenAPI handler
|
|
// during PrepareRun.
|
|
// Set this to true when the specific API Server has its own OpenAPI handler
|
|
// (e.g. kube-aggregator)
|
|
skipOpenAPIInstallation bool
|
|
|
|
// OpenAPIVersionedService controls the /openapi/v2 endpoint, and can be used to update the served spec.
|
|
// It is set during PrepareRun if `openAPIConfig` is non-nil unless `skipOpenAPIInstallation` is true.
|
|
OpenAPIVersionedService *handler.OpenAPIService
|
|
|
|
// OpenAPIV3VersionedService controls the /openapi/v3 endpoint and can be used to update the served spec.
|
|
// It is set during PrepareRun if `openAPIConfig` is non-nil unless `skipOpenAPIInstallation` is true.
|
|
OpenAPIV3VersionedService *handler3.OpenAPIService
|
|
|
|
// StaticOpenAPISpec is the spec derived from the restful container endpoints.
|
|
// It is set during PrepareRun.
|
|
StaticOpenAPISpec *spec.Swagger
|
|
|
|
// PostStartHooks are each called after the server has started listening, in a separate go func for each
|
|
// with no guarantee of ordering between them. The map key is a name used for error reporting.
|
|
// It may kill the process with a panic if it wishes to by returning an error.
|
|
postStartHookLock sync.Mutex
|
|
postStartHooks map[string]postStartHookEntry
|
|
postStartHooksCalled bool
|
|
disabledPostStartHooks sets.String
|
|
|
|
preShutdownHookLock sync.Mutex
|
|
preShutdownHooks map[string]preShutdownHookEntry
|
|
preShutdownHooksCalled bool
|
|
|
|
// healthz checks
|
|
healthzLock sync.Mutex
|
|
healthzChecks []healthz.HealthChecker
|
|
healthzChecksInstalled bool
|
|
// livez checks
|
|
livezLock sync.Mutex
|
|
livezChecks []healthz.HealthChecker
|
|
livezChecksInstalled bool
|
|
// readyz checks
|
|
readyzLock sync.Mutex
|
|
readyzChecks []healthz.HealthChecker
|
|
readyzChecksInstalled bool
|
|
livezGracePeriod time.Duration
|
|
livezClock clock.Clock
|
|
|
|
// auditing. The backend is started before the server starts listening.
|
|
AuditBackend audit.Backend
|
|
|
|
// Authorizer determines whether a user is allowed to make a certain request. The Handler does a preliminary
|
|
// authorization check using the request URI but it may be necessary to make additional checks, such as in
|
|
// the create-on-update case
|
|
Authorizer authorizer.Authorizer
|
|
|
|
// EquivalentResourceRegistry provides information about resources equivalent to a given resource,
|
|
// and the kind associated with a given resource. As resources are installed, they are registered here.
|
|
EquivalentResourceRegistry runtime.EquivalentResourceRegistry
|
|
|
|
// delegationTarget is the next delegate in the chain. This is never nil.
|
|
delegationTarget DelegationTarget
|
|
|
|
// NonLongRunningRequestWaitGroup allows you to wait for all chain
|
|
// handlers associated with non long-running requests
|
|
// to complete while the server is shuting down.
|
|
NonLongRunningRequestWaitGroup *utilwaitgroup.SafeWaitGroup
|
|
// WatchRequestWaitGroup allows us to wait for all chain
|
|
// handlers associated with active watch requests to
|
|
// complete while the server is shuting down.
|
|
WatchRequestWaitGroup *utilwaitgroup.RateLimitedSafeWaitGroup
|
|
|
|
// ShutdownDelayDuration allows to block shutdown for some time, e.g. until endpoints pointing to this API server
|
|
// have converged on all node. During this time, the API server keeps serving, /healthz will return 200,
|
|
// but /readyz will return failure.
|
|
ShutdownDelayDuration time.Duration
|
|
|
|
// The limit on the request body size that would be accepted and decoded in a write request.
|
|
// 0 means no limit.
|
|
maxRequestBodyBytes int64
|
|
|
|
// APIServerID is the ID of this API server
|
|
APIServerID string
|
|
|
|
// StorageVersionManager holds the storage versions of the API resources installed by this server.
|
|
StorageVersionManager storageversion.Manager
|
|
|
|
// Version will enable the /version endpoint if non-nil
|
|
Version *version.Info
|
|
|
|
// lifecycleSignals provides access to the various signals that happen during the life cycle of the apiserver.
|
|
lifecycleSignals lifecycleSignals
|
|
|
|
// destroyFns contains a list of functions that should be called on shutdown to clean up resources.
|
|
destroyFns []func()
|
|
|
|
// muxAndDiscoveryCompleteSignals holds signals that indicate all known HTTP paths have been registered.
|
|
// it exists primarily to avoid returning a 404 response when a resource actually exists but we haven't installed the path to a handler.
|
|
// it is exposed for easier composition of the individual servers.
|
|
// the primary users of this field are the WithMuxCompleteProtection filter and the NotFoundHandler
|
|
muxAndDiscoveryCompleteSignals map[string]<-chan struct{}
|
|
|
|
// ShutdownSendRetryAfter dictates when to initiate shutdown of the HTTP
|
|
// Server during the graceful termination of the apiserver. If true, we wait
|
|
// for non longrunning requests in flight to be drained and then initiate a
|
|
// shutdown of the HTTP Server. If false, we initiate a shutdown of the HTTP
|
|
// Server as soon as ShutdownDelayDuration has elapsed.
|
|
// If enabled, after ShutdownDelayDuration elapses, any incoming request is
|
|
// rejected with a 429 status code and a 'Retry-After' response.
|
|
ShutdownSendRetryAfter bool
|
|
|
|
// ShutdownWatchTerminationGracePeriod, if set to a positive value,
|
|
// is the maximum duration the apiserver will wait for all active
|
|
// watch request(s) to drain.
|
|
// Once this grace period elapses, the apiserver will no longer
|
|
// wait for any active watch request(s) in flight to drain, it will
|
|
// proceed to the next step in the graceful server shutdown process.
|
|
// If set to a positive value, the apiserver will keep track of the
|
|
// number of active watch request(s) in flight and during shutdown
|
|
// it will wait, at most, for the specified duration and allow these
|
|
// active watch requests to drain with some rate limiting in effect.
|
|
// The default is zero, which implies the apiserver will not keep
|
|
// track of active watch request(s) in flight and will not wait
|
|
// for them to drain, this maintains backward compatibility.
|
|
// This grace period is orthogonal to other grace periods, and
|
|
// it is not overridden by any other grace period.
|
|
ShutdownWatchTerminationGracePeriod time.Duration
|
|
}
|
|
|
|
// DelegationTarget is an interface which allows for composition of API servers with top level handling that works
|
|
// as expected.
|
|
type DelegationTarget interface {
|
|
// UnprotectedHandler returns a handler that is NOT protected by a normal chain
|
|
UnprotectedHandler() http.Handler
|
|
|
|
// PostStartHooks returns the post-start hooks that need to be combined
|
|
PostStartHooks() map[string]postStartHookEntry
|
|
|
|
// PreShutdownHooks returns the pre-stop hooks that need to be combined
|
|
PreShutdownHooks() map[string]preShutdownHookEntry
|
|
|
|
// HealthzChecks returns the healthz checks that need to be combined
|
|
HealthzChecks() []healthz.HealthChecker
|
|
|
|
// ListedPaths returns the paths for supporting an index
|
|
ListedPaths() []string
|
|
|
|
// NextDelegate returns the next delegationTarget in the chain of delegations
|
|
NextDelegate() DelegationTarget
|
|
|
|
// PrepareRun does post API installation setup steps. It calls recursively the same function of the delegates.
|
|
PrepareRun() preparedGenericAPIServer
|
|
|
|
// MuxAndDiscoveryCompleteSignals exposes registered signals that indicate if all known HTTP paths have been installed.
|
|
MuxAndDiscoveryCompleteSignals() map[string]<-chan struct{}
|
|
|
|
// Destroy cleans up its resources on shutdown.
|
|
// Destroy has to be implemented in thread-safe way and be prepared
|
|
// for being called more than once.
|
|
Destroy()
|
|
}
|
|
|
|
func (s *GenericAPIServer) UnprotectedHandler() http.Handler {
|
|
// when we delegate, we need the server we're delegating to choose whether or not to use gorestful
|
|
return s.Handler.Director
|
|
}
|
|
func (s *GenericAPIServer) PostStartHooks() map[string]postStartHookEntry {
|
|
return s.postStartHooks
|
|
}
|
|
func (s *GenericAPIServer) PreShutdownHooks() map[string]preShutdownHookEntry {
|
|
return s.preShutdownHooks
|
|
}
|
|
func (s *GenericAPIServer) HealthzChecks() []healthz.HealthChecker {
|
|
return s.healthzChecks
|
|
}
|
|
func (s *GenericAPIServer) ListedPaths() []string {
|
|
return s.listedPathProvider.ListedPaths()
|
|
}
|
|
|
|
func (s *GenericAPIServer) NextDelegate() DelegationTarget {
|
|
return s.delegationTarget
|
|
}
|
|
|
|
// RegisterMuxAndDiscoveryCompleteSignal registers the given signal that will be used to determine if all known
|
|
// HTTP paths have been registered. It is okay to call this method after instantiating the generic server but before running.
|
|
func (s *GenericAPIServer) RegisterMuxAndDiscoveryCompleteSignal(signalName string, signal <-chan struct{}) error {
|
|
if _, exists := s.muxAndDiscoveryCompleteSignals[signalName]; exists {
|
|
return fmt.Errorf("%s already registered", signalName)
|
|
}
|
|
s.muxAndDiscoveryCompleteSignals[signalName] = signal
|
|
return nil
|
|
}
|
|
|
|
func (s *GenericAPIServer) MuxAndDiscoveryCompleteSignals() map[string]<-chan struct{} {
|
|
return s.muxAndDiscoveryCompleteSignals
|
|
}
|
|
|
|
// RegisterDestroyFunc registers a function that will be called during Destroy().
|
|
// The function have to be idempotent and prepared to be called more than once.
|
|
func (s *GenericAPIServer) RegisterDestroyFunc(destroyFn func()) {
|
|
s.destroyFns = append(s.destroyFns, destroyFn)
|
|
}
|
|
|
|
// Destroy cleans up all its and its delegation target resources on shutdown.
|
|
// It starts with destroying its own resources and later proceeds with
|
|
// its delegation target.
|
|
func (s *GenericAPIServer) Destroy() {
|
|
for _, destroyFn := range s.destroyFns {
|
|
destroyFn()
|
|
}
|
|
if s.delegationTarget != nil {
|
|
s.delegationTarget.Destroy()
|
|
}
|
|
}
|
|
|
|
type emptyDelegate struct {
|
|
// handler is called at the end of the delegation chain
|
|
// when a request has been made against an unregistered HTTP path the individual servers will simply pass it through until it reaches the handler.
|
|
handler http.Handler
|
|
}
|
|
|
|
func NewEmptyDelegate() DelegationTarget {
|
|
return emptyDelegate{}
|
|
}
|
|
|
|
// NewEmptyDelegateWithCustomHandler allows for registering a custom handler usually for special handling of 404 requests
|
|
func NewEmptyDelegateWithCustomHandler(handler http.Handler) DelegationTarget {
|
|
return emptyDelegate{handler}
|
|
}
|
|
|
|
func (s emptyDelegate) UnprotectedHandler() http.Handler {
|
|
return s.handler
|
|
}
|
|
func (s emptyDelegate) PostStartHooks() map[string]postStartHookEntry {
|
|
return map[string]postStartHookEntry{}
|
|
}
|
|
func (s emptyDelegate) PreShutdownHooks() map[string]preShutdownHookEntry {
|
|
return map[string]preShutdownHookEntry{}
|
|
}
|
|
func (s emptyDelegate) HealthzChecks() []healthz.HealthChecker {
|
|
return []healthz.HealthChecker{}
|
|
}
|
|
func (s emptyDelegate) ListedPaths() []string {
|
|
return []string{}
|
|
}
|
|
func (s emptyDelegate) NextDelegate() DelegationTarget {
|
|
return nil
|
|
}
|
|
func (s emptyDelegate) PrepareRun() preparedGenericAPIServer {
|
|
return preparedGenericAPIServer{nil}
|
|
}
|
|
func (s emptyDelegate) MuxAndDiscoveryCompleteSignals() map[string]<-chan struct{} {
|
|
return map[string]<-chan struct{}{}
|
|
}
|
|
func (s emptyDelegate) Destroy() {
|
|
}
|
|
|
|
// preparedGenericAPIServer is a private wrapper that enforces a call of PrepareRun() before Run can be invoked.
|
|
type preparedGenericAPIServer struct {
|
|
*GenericAPIServer
|
|
}
|
|
|
|
// PrepareRun does post API installation setup steps. It calls recursively the same function of the delegates.
|
|
func (s *GenericAPIServer) PrepareRun() preparedGenericAPIServer {
|
|
s.delegationTarget.PrepareRun()
|
|
|
|
if s.openAPIConfig != nil && !s.skipOpenAPIInstallation {
|
|
s.OpenAPIVersionedService, s.StaticOpenAPISpec = routes.OpenAPI{
|
|
Config: s.openAPIConfig,
|
|
}.InstallV2(s.Handler.GoRestfulContainer, s.Handler.NonGoRestfulMux)
|
|
}
|
|
|
|
if s.openAPIV3Config != nil && !s.skipOpenAPIInstallation {
|
|
s.OpenAPIV3VersionedService = routes.OpenAPI{
|
|
V3Config: s.openAPIV3Config,
|
|
}.InstallV3(s.Handler.GoRestfulContainer, s.Handler.NonGoRestfulMux)
|
|
}
|
|
|
|
s.installHealthz()
|
|
s.installLivez()
|
|
|
|
// as soon as shutdown is initiated, readiness should start failing
|
|
readinessStopCh := s.lifecycleSignals.ShutdownInitiated.Signaled()
|
|
err := s.addReadyzShutdownCheck(readinessStopCh)
|
|
if err != nil {
|
|
klog.Errorf("Failed to install readyz shutdown check %s", err)
|
|
}
|
|
s.installReadyz()
|
|
|
|
return preparedGenericAPIServer{s}
|
|
}
|
|
|
|
// Run spawns the secure http server. It only returns if stopCh is closed
|
|
// or the secure port cannot be listened on initially.
|
|
// This is the diagram of what channels/signals are dependent on each other:
|
|
//
|
|
// | stopCh
|
|
// | |
|
|
// | ---------------------------------------------------------
|
|
// | | |
|
|
// | ShutdownInitiated (shutdownInitiatedCh) |
|
|
// | | |
|
|
// | (ShutdownDelayDuration) (PreShutdownHooks)
|
|
// | | |
|
|
// | AfterShutdownDelayDuration (delayedStopCh) PreShutdownHooksStopped (preShutdownHooksHasStoppedCh)
|
|
// | | |
|
|
// | |-------------------------------------------------------|
|
|
// | |
|
|
// | |
|
|
// | NotAcceptingNewRequest (notAcceptingNewRequestCh)
|
|
// | |
|
|
// | |
|
|
// | |----------------------------------------------------------------------------------|
|
|
// | | | | |
|
|
// | [without [with | |
|
|
// | ShutdownSendRetryAfter] ShutdownSendRetryAfter] | |
|
|
// | | | | |
|
|
// | | ---------------| |
|
|
// | | | |
|
|
// | | |----------------|-----------------------| |
|
|
// | | | | |
|
|
// | | (NonLongRunningRequestWaitGroup::Wait) (WatchRequestWaitGroup::Wait) |
|
|
// | | | | |
|
|
// | | |------------------|---------------------| |
|
|
// | | | |
|
|
// | | InFlightRequestsDrained (drainedCh) |
|
|
// | | | |
|
|
// | |-------------------|---------------------|----------------------------------------|
|
|
// | | |
|
|
// | stopHttpServerCh (AuditBackend::Shutdown())
|
|
// | |
|
|
// | listenerStoppedCh
|
|
// | |
|
|
// | HTTPServerStoppedListening (httpServerStoppedListeningCh)
|
|
func (s preparedGenericAPIServer) Run(stopCh <-chan struct{}) error {
|
|
delayedStopCh := s.lifecycleSignals.AfterShutdownDelayDuration
|
|
shutdownInitiatedCh := s.lifecycleSignals.ShutdownInitiated
|
|
|
|
// Clean up resources on shutdown.
|
|
defer s.Destroy()
|
|
|
|
// If UDS profiling is enabled, start a local http server listening on that socket
|
|
if s.UnprotectedDebugSocket != nil {
|
|
go func() {
|
|
defer utilruntime.HandleCrash()
|
|
klog.Error(s.UnprotectedDebugSocket.Run(stopCh))
|
|
}()
|
|
}
|
|
|
|
// spawn a new goroutine for closing the MuxAndDiscoveryComplete signal
|
|
// registration happens during construction of the generic api server
|
|
// the last server in the chain aggregates signals from the previous instances
|
|
go func() {
|
|
for _, muxAndDiscoveryCompletedSignal := range s.GenericAPIServer.MuxAndDiscoveryCompleteSignals() {
|
|
select {
|
|
case <-muxAndDiscoveryCompletedSignal:
|
|
continue
|
|
case <-stopCh:
|
|
klog.V(1).Infof("haven't completed %s, stop requested", s.lifecycleSignals.MuxAndDiscoveryComplete.Name())
|
|
return
|
|
}
|
|
}
|
|
s.lifecycleSignals.MuxAndDiscoveryComplete.Signal()
|
|
klog.V(1).Infof("%s has all endpoints registered and discovery information is complete", s.lifecycleSignals.MuxAndDiscoveryComplete.Name())
|
|
}()
|
|
|
|
go func() {
|
|
defer delayedStopCh.Signal()
|
|
defer klog.V(1).InfoS("[graceful-termination] shutdown event", "name", delayedStopCh.Name())
|
|
|
|
<-stopCh
|
|
|
|
// As soon as shutdown is initiated, /readyz should start returning failure.
|
|
// This gives the load balancer a window defined by ShutdownDelayDuration to detect that /readyz is red
|
|
// and stop sending traffic to this server.
|
|
shutdownInitiatedCh.Signal()
|
|
klog.V(1).InfoS("[graceful-termination] shutdown event", "name", shutdownInitiatedCh.Name())
|
|
|
|
time.Sleep(s.ShutdownDelayDuration)
|
|
}()
|
|
|
|
// close socket after delayed stopCh
|
|
shutdownTimeout := s.ShutdownTimeout
|
|
if s.ShutdownSendRetryAfter {
|
|
// when this mode is enabled, we do the following:
|
|
// - the server will continue to listen until all existing requests in flight
|
|
// (not including active long running requests) have been drained.
|
|
// - once drained, http Server Shutdown is invoked with a timeout of 2s,
|
|
// net/http waits for 1s for the peer to respond to a GO_AWAY frame, so
|
|
// we should wait for a minimum of 2s
|
|
shutdownTimeout = 2 * time.Second
|
|
klog.V(1).InfoS("[graceful-termination] using HTTP Server shutdown timeout", "shutdownTimeout", shutdownTimeout)
|
|
}
|
|
|
|
notAcceptingNewRequestCh := s.lifecycleSignals.NotAcceptingNewRequest
|
|
drainedCh := s.lifecycleSignals.InFlightRequestsDrained
|
|
stopHttpServerCh := make(chan struct{})
|
|
go func() {
|
|
defer close(stopHttpServerCh)
|
|
|
|
timeToStopHttpServerCh := notAcceptingNewRequestCh.Signaled()
|
|
if s.ShutdownSendRetryAfter {
|
|
timeToStopHttpServerCh = drainedCh.Signaled()
|
|
}
|
|
|
|
<-timeToStopHttpServerCh
|
|
}()
|
|
|
|
// Start the audit backend before any request comes in. This means we must call Backend.Run
|
|
// before http server start serving. Otherwise the Backend.ProcessEvents call might block.
|
|
// AuditBackend.Run will stop as soon as all in-flight requests are drained.
|
|
if s.AuditBackend != nil {
|
|
if err := s.AuditBackend.Run(drainedCh.Signaled()); err != nil {
|
|
return fmt.Errorf("failed to run the audit backend: %v", err)
|
|
}
|
|
}
|
|
|
|
stoppedCh, listenerStoppedCh, err := s.NonBlockingRun(stopHttpServerCh, shutdownTimeout)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
httpServerStoppedListeningCh := s.lifecycleSignals.HTTPServerStoppedListening
|
|
go func() {
|
|
<-listenerStoppedCh
|
|
httpServerStoppedListeningCh.Signal()
|
|
klog.V(1).InfoS("[graceful-termination] shutdown event", "name", httpServerStoppedListeningCh.Name())
|
|
}()
|
|
|
|
// we don't accept new request as soon as both ShutdownDelayDuration has
|
|
// elapsed and preshutdown hooks have completed.
|
|
preShutdownHooksHasStoppedCh := s.lifecycleSignals.PreShutdownHooksStopped
|
|
go func() {
|
|
defer klog.V(1).InfoS("[graceful-termination] shutdown event", "name", notAcceptingNewRequestCh.Name())
|
|
defer notAcceptingNewRequestCh.Signal()
|
|
|
|
// wait for the delayed stopCh before closing the handler chain
|
|
<-delayedStopCh.Signaled()
|
|
|
|
// Additionally wait for preshutdown hooks to also be finished, as some of them need
|
|
// to send API calls to clean up after themselves (e.g. lease reconcilers removing
|
|
// itself from the active servers).
|
|
<-preShutdownHooksHasStoppedCh.Signaled()
|
|
}()
|
|
|
|
// wait for all in-flight non-long running requests to finish
|
|
nonLongRunningRequestDrainedCh := make(chan struct{})
|
|
go func() {
|
|
defer close(nonLongRunningRequestDrainedCh)
|
|
defer klog.V(1).Info("[graceful-termination] in-flight non long-running request(s) have drained")
|
|
|
|
// wait for the delayed stopCh before closing the handler chain (it rejects everything after Wait has been called).
|
|
<-notAcceptingNewRequestCh.Signaled()
|
|
|
|
// Wait for all requests to finish, which are bounded by the RequestTimeout variable.
|
|
// once NonLongRunningRequestWaitGroup.Wait is invoked, the apiserver is
|
|
// expected to reject any incoming request with a {503, Retry-After}
|
|
// response via the WithWaitGroup filter. On the contrary, we observe
|
|
// that incoming request(s) get a 'connection refused' error, this is
|
|
// because, at this point, we have called 'Server.Shutdown' and
|
|
// net/http server has stopped listening. This causes incoming
|
|
// request to get a 'connection refused' error.
|
|
// On the other hand, if 'ShutdownSendRetryAfter' is enabled incoming
|
|
// requests will be rejected with a {429, Retry-After} since
|
|
// 'Server.Shutdown' will be invoked only after in-flight requests
|
|
// have been drained.
|
|
// TODO: can we consolidate these two modes of graceful termination?
|
|
s.NonLongRunningRequestWaitGroup.Wait()
|
|
}()
|
|
|
|
// wait for all in-flight watches to finish
|
|
activeWatchesDrainedCh := make(chan struct{})
|
|
go func() {
|
|
defer close(activeWatchesDrainedCh)
|
|
|
|
<-notAcceptingNewRequestCh.Signaled()
|
|
if s.ShutdownWatchTerminationGracePeriod <= time.Duration(0) {
|
|
klog.V(1).InfoS("[graceful-termination] not going to wait for active watch request(s) to drain")
|
|
return
|
|
}
|
|
|
|
// Wait for all active watches to finish
|
|
grace := s.ShutdownWatchTerminationGracePeriod
|
|
activeBefore, activeAfter, err := s.WatchRequestWaitGroup.Wait(func(count int) (utilwaitgroup.RateLimiter, context.Context, context.CancelFunc) {
|
|
qps := float64(count) / grace.Seconds()
|
|
// TODO: we don't want the QPS (max requests drained per second) to
|
|
// get below a certain floor value, since we want the server to
|
|
// drain the active watch requests as soon as possible.
|
|
// For now, it's hard coded to 200, and it is subject to change
|
|
// based on the result from the scale testing.
|
|
if qps < 200 {
|
|
qps = 200
|
|
}
|
|
|
|
ctx, cancel := context.WithTimeout(context.Background(), grace)
|
|
// We don't expect more than one token to be consumed
|
|
// in a single Wait call, so setting burst to 1.
|
|
return rate.NewLimiter(rate.Limit(qps), 1), ctx, cancel
|
|
})
|
|
klog.V(1).InfoS("[graceful-termination] active watch request(s) have drained",
|
|
"duration", grace, "activeWatchesBefore", activeBefore, "activeWatchesAfter", activeAfter, "error", err)
|
|
}()
|
|
|
|
go func() {
|
|
defer klog.V(1).InfoS("[graceful-termination] shutdown event", "name", drainedCh.Name())
|
|
defer drainedCh.Signal()
|
|
|
|
<-nonLongRunningRequestDrainedCh
|
|
<-activeWatchesDrainedCh
|
|
}()
|
|
|
|
klog.V(1).Info("[graceful-termination] waiting for shutdown to be initiated")
|
|
<-stopCh
|
|
|
|
// run shutdown hooks directly. This includes deregistering from
|
|
// the kubernetes endpoint in case of kube-apiserver.
|
|
func() {
|
|
defer func() {
|
|
preShutdownHooksHasStoppedCh.Signal()
|
|
klog.V(1).InfoS("[graceful-termination] pre-shutdown hooks completed", "name", preShutdownHooksHasStoppedCh.Name())
|
|
}()
|
|
err = s.RunPreShutdownHooks()
|
|
}()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
// Wait for all requests in flight to drain, bounded by the RequestTimeout variable.
|
|
<-drainedCh.Signaled()
|
|
|
|
if s.AuditBackend != nil {
|
|
s.AuditBackend.Shutdown()
|
|
klog.V(1).InfoS("[graceful-termination] audit backend shutdown completed")
|
|
}
|
|
|
|
// wait for stoppedCh that is closed when the graceful termination (server.Shutdown) is finished.
|
|
<-listenerStoppedCh
|
|
<-stoppedCh
|
|
|
|
klog.V(1).Info("[graceful-termination] apiserver is exiting")
|
|
return nil
|
|
}
|
|
|
|
// NonBlockingRun spawns the secure http server. An error is
|
|
// returned if the secure port cannot be listened on.
|
|
// The returned channel is closed when the (asynchronous) termination is finished.
|
|
func (s preparedGenericAPIServer) NonBlockingRun(stopCh <-chan struct{}, shutdownTimeout time.Duration) (<-chan struct{}, <-chan struct{}, error) {
|
|
// Use an internal stop channel to allow cleanup of the listeners on error.
|
|
internalStopCh := make(chan struct{})
|
|
var stoppedCh <-chan struct{}
|
|
var listenerStoppedCh <-chan struct{}
|
|
if s.SecureServingInfo != nil && s.Handler != nil {
|
|
var err error
|
|
stoppedCh, listenerStoppedCh, err = s.SecureServingInfo.Serve(s.Handler, shutdownTimeout, internalStopCh)
|
|
if err != nil {
|
|
close(internalStopCh)
|
|
return nil, nil, err
|
|
}
|
|
}
|
|
|
|
// Now that listener have bound successfully, it is the
|
|
// responsibility of the caller to close the provided channel to
|
|
// ensure cleanup.
|
|
go func() {
|
|
<-stopCh
|
|
close(internalStopCh)
|
|
}()
|
|
|
|
s.RunPostStartHooks(stopCh)
|
|
|
|
if _, err := systemd.SdNotify(true, "READY=1\n"); err != nil {
|
|
klog.Errorf("Unable to send systemd daemon successful start message: %v\n", err)
|
|
}
|
|
|
|
return stoppedCh, listenerStoppedCh, nil
|
|
}
|
|
|
|
// installAPIResources is a private method for installing the REST storage backing each api groupversionresource
|
|
func (s *GenericAPIServer) installAPIResources(apiPrefix string, apiGroupInfo *APIGroupInfo, typeConverter managedfields.TypeConverter) error {
|
|
var resourceInfos []*storageversion.ResourceInfo
|
|
for _, groupVersion := range apiGroupInfo.PrioritizedVersions {
|
|
if len(apiGroupInfo.VersionedResourcesStorageMap[groupVersion.Version]) == 0 {
|
|
klog.Warningf("Skipping API %v because it has no resources.", groupVersion)
|
|
continue
|
|
}
|
|
|
|
apiGroupVersion, err := s.getAPIGroupVersion(apiGroupInfo, groupVersion, apiPrefix)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
if apiGroupInfo.OptionsExternalVersion != nil {
|
|
apiGroupVersion.OptionsExternalVersion = apiGroupInfo.OptionsExternalVersion
|
|
}
|
|
apiGroupVersion.TypeConverter = typeConverter
|
|
apiGroupVersion.MaxRequestBodyBytes = s.maxRequestBodyBytes
|
|
|
|
discoveryAPIResources, r, err := apiGroupVersion.InstallREST(s.Handler.GoRestfulContainer)
|
|
|
|
if err != nil {
|
|
return fmt.Errorf("unable to setup API %v: %v", apiGroupInfo, err)
|
|
}
|
|
resourceInfos = append(resourceInfos, r...)
|
|
|
|
if utilfeature.DefaultFeatureGate.Enabled(features.AggregatedDiscoveryEndpoint) {
|
|
// Aggregated discovery only aggregates resources under /apis
|
|
if apiPrefix == APIGroupPrefix {
|
|
s.AggregatedDiscoveryGroupManager.AddGroupVersion(
|
|
groupVersion.Group,
|
|
apidiscoveryv2beta1.APIVersionDiscovery{
|
|
Freshness: apidiscoveryv2beta1.DiscoveryFreshnessCurrent,
|
|
Version: groupVersion.Version,
|
|
Resources: discoveryAPIResources,
|
|
},
|
|
)
|
|
} else {
|
|
// There is only one group version for legacy resources, priority can be defaulted to 0.
|
|
s.AggregatedLegacyDiscoveryGroupManager.AddGroupVersion(
|
|
groupVersion.Group,
|
|
apidiscoveryv2beta1.APIVersionDiscovery{
|
|
Freshness: apidiscoveryv2beta1.DiscoveryFreshnessCurrent,
|
|
Version: groupVersion.Version,
|
|
Resources: discoveryAPIResources,
|
|
},
|
|
)
|
|
}
|
|
}
|
|
|
|
}
|
|
|
|
s.RegisterDestroyFunc(apiGroupInfo.destroyStorage)
|
|
|
|
if utilfeature.DefaultFeatureGate.Enabled(features.StorageVersionAPI) &&
|
|
utilfeature.DefaultFeatureGate.Enabled(features.APIServerIdentity) {
|
|
// API installation happens before we start listening on the handlers,
|
|
// therefore it is safe to register ResourceInfos here. The handler will block
|
|
// write requests until the storage versions of the targeting resources are updated.
|
|
s.StorageVersionManager.AddResourceInfo(resourceInfos...)
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// InstallLegacyAPIGroup exposes the given legacy api group in the API.
|
|
// The <apiGroupInfo> passed into this function shouldn't be used elsewhere as the
|
|
// underlying storage will be destroyed on this servers shutdown.
|
|
func (s *GenericAPIServer) InstallLegacyAPIGroup(apiPrefix string, apiGroupInfo *APIGroupInfo) error {
|
|
if !s.legacyAPIGroupPrefixes.Has(apiPrefix) {
|
|
return fmt.Errorf("%q is not in the allowed legacy API prefixes: %v", apiPrefix, s.legacyAPIGroupPrefixes.List())
|
|
}
|
|
|
|
openAPIModels, err := s.getOpenAPIModels(apiPrefix, apiGroupInfo)
|
|
if err != nil {
|
|
return fmt.Errorf("unable to get openapi models: %v", err)
|
|
}
|
|
|
|
if err := s.installAPIResources(apiPrefix, apiGroupInfo, openAPIModels); err != nil {
|
|
return err
|
|
}
|
|
|
|
// Install the version handler.
|
|
// Add a handler at /<apiPrefix> to enumerate the supported api versions.
|
|
legacyRootAPIHandler := discovery.NewLegacyRootAPIHandler(s.discoveryAddresses, s.Serializer, apiPrefix)
|
|
if utilfeature.DefaultFeatureGate.Enabled(features.AggregatedDiscoveryEndpoint) {
|
|
wrapped := discoveryendpoint.WrapAggregatedDiscoveryToHandler(legacyRootAPIHandler, s.AggregatedLegacyDiscoveryGroupManager)
|
|
s.Handler.GoRestfulContainer.Add(wrapped.GenerateWebService("/api", metav1.APIVersions{}))
|
|
} else {
|
|
s.Handler.GoRestfulContainer.Add(legacyRootAPIHandler.WebService())
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// InstallAPIGroups exposes given api groups in the API.
|
|
// The <apiGroupInfos> passed into this function shouldn't be used elsewhere as the
|
|
// underlying storage will be destroyed on this servers shutdown.
|
|
func (s *GenericAPIServer) InstallAPIGroups(apiGroupInfos ...*APIGroupInfo) error {
|
|
for _, apiGroupInfo := range apiGroupInfos {
|
|
if len(apiGroupInfo.PrioritizedVersions) == 0 {
|
|
return fmt.Errorf("no version priority set for %#v", *apiGroupInfo)
|
|
}
|
|
// Do not register empty group or empty version. Doing so claims /apis/ for the wrong entity to be returned.
|
|
// Catching these here places the error much closer to its origin
|
|
if len(apiGroupInfo.PrioritizedVersions[0].Group) == 0 {
|
|
return fmt.Errorf("cannot register handler with an empty group for %#v", *apiGroupInfo)
|
|
}
|
|
if len(apiGroupInfo.PrioritizedVersions[0].Version) == 0 {
|
|
return fmt.Errorf("cannot register handler with an empty version for %#v", *apiGroupInfo)
|
|
}
|
|
}
|
|
|
|
openAPIModels, err := s.getOpenAPIModels(APIGroupPrefix, apiGroupInfos...)
|
|
if err != nil {
|
|
return fmt.Errorf("unable to get openapi models: %v", err)
|
|
}
|
|
|
|
for _, apiGroupInfo := range apiGroupInfos {
|
|
if err := s.installAPIResources(APIGroupPrefix, apiGroupInfo, openAPIModels); err != nil {
|
|
return fmt.Errorf("unable to install api resources: %v", err)
|
|
}
|
|
|
|
// setup discovery
|
|
// Install the version handler.
|
|
// Add a handler at /apis/<groupName> to enumerate all versions supported by this group.
|
|
apiVersionsForDiscovery := []metav1.GroupVersionForDiscovery{}
|
|
for _, groupVersion := range apiGroupInfo.PrioritizedVersions {
|
|
// Check the config to make sure that we elide versions that don't have any resources
|
|
if len(apiGroupInfo.VersionedResourcesStorageMap[groupVersion.Version]) == 0 {
|
|
continue
|
|
}
|
|
apiVersionsForDiscovery = append(apiVersionsForDiscovery, metav1.GroupVersionForDiscovery{
|
|
GroupVersion: groupVersion.String(),
|
|
Version: groupVersion.Version,
|
|
})
|
|
}
|
|
preferredVersionForDiscovery := metav1.GroupVersionForDiscovery{
|
|
GroupVersion: apiGroupInfo.PrioritizedVersions[0].String(),
|
|
Version: apiGroupInfo.PrioritizedVersions[0].Version,
|
|
}
|
|
apiGroup := metav1.APIGroup{
|
|
Name: apiGroupInfo.PrioritizedVersions[0].Group,
|
|
Versions: apiVersionsForDiscovery,
|
|
PreferredVersion: preferredVersionForDiscovery,
|
|
}
|
|
|
|
s.DiscoveryGroupManager.AddGroup(apiGroup)
|
|
s.Handler.GoRestfulContainer.Add(discovery.NewAPIGroupHandler(s.Serializer, apiGroup).WebService())
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// InstallAPIGroup exposes the given api group in the API.
|
|
// The <apiGroupInfo> passed into this function shouldn't be used elsewhere as the
|
|
// underlying storage will be destroyed on this servers shutdown.
|
|
func (s *GenericAPIServer) InstallAPIGroup(apiGroupInfo *APIGroupInfo) error {
|
|
return s.InstallAPIGroups(apiGroupInfo)
|
|
}
|
|
|
|
func (s *GenericAPIServer) getAPIGroupVersion(apiGroupInfo *APIGroupInfo, groupVersion schema.GroupVersion, apiPrefix string) (*genericapi.APIGroupVersion, error) {
|
|
storage := make(map[string]rest.Storage)
|
|
for k, v := range apiGroupInfo.VersionedResourcesStorageMap[groupVersion.Version] {
|
|
if strings.ToLower(k) != k {
|
|
return nil, fmt.Errorf("resource names must be lowercase only, not %q", k)
|
|
}
|
|
storage[k] = v
|
|
}
|
|
version := s.newAPIGroupVersion(apiGroupInfo, groupVersion)
|
|
version.Root = apiPrefix
|
|
version.Storage = storage
|
|
return version, nil
|
|
}
|
|
|
|
func (s *GenericAPIServer) newAPIGroupVersion(apiGroupInfo *APIGroupInfo, groupVersion schema.GroupVersion) *genericapi.APIGroupVersion {
|
|
|
|
allServedVersionsByResource := map[string][]string{}
|
|
for version, resourcesInVersion := range apiGroupInfo.VersionedResourcesStorageMap {
|
|
for resource := range resourcesInVersion {
|
|
if len(groupVersion.Group) == 0 {
|
|
allServedVersionsByResource[resource] = append(allServedVersionsByResource[resource], version)
|
|
} else {
|
|
allServedVersionsByResource[resource] = append(allServedVersionsByResource[resource], fmt.Sprintf("%s/%s", groupVersion.Group, version))
|
|
}
|
|
}
|
|
}
|
|
|
|
return &genericapi.APIGroupVersion{
|
|
GroupVersion: groupVersion,
|
|
AllServedVersionsByResource: allServedVersionsByResource,
|
|
MetaGroupVersion: apiGroupInfo.MetaGroupVersion,
|
|
|
|
ParameterCodec: apiGroupInfo.ParameterCodec,
|
|
Serializer: apiGroupInfo.NegotiatedSerializer,
|
|
Creater: apiGroupInfo.Scheme,
|
|
Convertor: apiGroupInfo.Scheme,
|
|
ConvertabilityChecker: apiGroupInfo.Scheme,
|
|
UnsafeConvertor: runtime.UnsafeObjectConvertor(apiGroupInfo.Scheme),
|
|
Defaulter: apiGroupInfo.Scheme,
|
|
Typer: apiGroupInfo.Scheme,
|
|
Namer: runtime.Namer(meta.NewAccessor()),
|
|
|
|
EquivalentResourceRegistry: s.EquivalentResourceRegistry,
|
|
|
|
Admit: s.admissionControl,
|
|
MinRequestTimeout: s.minRequestTimeout,
|
|
Authorizer: s.Authorizer,
|
|
}
|
|
}
|
|
|
|
// NewDefaultAPIGroupInfo returns an APIGroupInfo stubbed with "normal" values
|
|
// exposed for easier composition from other packages
|
|
func NewDefaultAPIGroupInfo(group string, scheme *runtime.Scheme, parameterCodec runtime.ParameterCodec, codecs serializer.CodecFactory) APIGroupInfo {
|
|
return APIGroupInfo{
|
|
PrioritizedVersions: scheme.PrioritizedVersionsForGroup(group),
|
|
VersionedResourcesStorageMap: map[string]map[string]rest.Storage{},
|
|
// TODO unhardcode this. It was hardcoded before, but we need to re-evaluate
|
|
OptionsExternalVersion: &schema.GroupVersion{Version: "v1"},
|
|
Scheme: scheme,
|
|
ParameterCodec: parameterCodec,
|
|
NegotiatedSerializer: codecs,
|
|
}
|
|
}
|
|
|
|
// getOpenAPIModels is a private method for getting the OpenAPI models
|
|
func (s *GenericAPIServer) getOpenAPIModels(apiPrefix string, apiGroupInfos ...*APIGroupInfo) (managedfields.TypeConverter, error) {
|
|
if s.openAPIV3Config == nil {
|
|
// SSA is GA and requires OpenAPI config to be set
|
|
// to create models.
|
|
return nil, errors.New("OpenAPIV3 config must not be nil")
|
|
}
|
|
pathsToIgnore := openapiutil.NewTrie(s.openAPIV3Config.IgnorePrefixes)
|
|
resourceNames := make([]string, 0)
|
|
for _, apiGroupInfo := range apiGroupInfos {
|
|
groupResources, err := getResourceNamesForGroup(apiPrefix, apiGroupInfo, pathsToIgnore)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
resourceNames = append(resourceNames, groupResources...)
|
|
}
|
|
|
|
// Build the openapi definitions for those resources and convert it to proto models
|
|
openAPISpec, err := openapibuilder3.BuildOpenAPIDefinitionsForResources(s.openAPIV3Config, resourceNames...)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
for _, apiGroupInfo := range apiGroupInfos {
|
|
apiGroupInfo.StaticOpenAPISpec = openAPISpec
|
|
}
|
|
|
|
typeConverter, err := managedfields.NewTypeConverter(openAPISpec, false)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
return typeConverter, nil
|
|
}
|
|
|
|
// getResourceNamesForGroup is a private method for getting the canonical names for each resource to build in an api group
|
|
func getResourceNamesForGroup(apiPrefix string, apiGroupInfo *APIGroupInfo, pathsToIgnore openapiutil.Trie) ([]string, error) {
|
|
// Get the canonical names of every resource we need to build in this api group
|
|
resourceNames := make([]string, 0)
|
|
for _, groupVersion := range apiGroupInfo.PrioritizedVersions {
|
|
for resource, storage := range apiGroupInfo.VersionedResourcesStorageMap[groupVersion.Version] {
|
|
path := gpath.Join(apiPrefix, groupVersion.Group, groupVersion.Version, resource)
|
|
if !pathsToIgnore.HasPrefix(path) {
|
|
kind, err := genericapi.GetResourceKind(groupVersion, storage, apiGroupInfo.Scheme)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
sampleObject, err := apiGroupInfo.Scheme.New(kind)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
name := openapiutil.GetCanonicalTypeName(sampleObject)
|
|
resourceNames = append(resourceNames, name)
|
|
}
|
|
}
|
|
}
|
|
|
|
return resourceNames, nil
|
|
}
|