mirror of
https://github.com/ceph/ceph-csi.git
synced 2025-01-03 02:29:29 +00:00
325 lines
13 KiB
Go
325 lines
13 KiB
Go
/*
|
|
Copyright 2017 The Kubernetes Authors.
|
|
|
|
Licensed under the Apache License, Version 2.0 (the "License");
|
|
you may not use this file except in compliance with the License.
|
|
You may obtain a copy of the License at
|
|
|
|
http://www.apache.org/licenses/LICENSE-2.0
|
|
|
|
Unless required by applicable law or agreed to in writing, software
|
|
distributed under the License is distributed on an "AS IS" BASIS,
|
|
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
See the License for the specific language governing permissions and
|
|
limitations under the License.
|
|
*/
|
|
|
|
package kubeletconfig
|
|
|
|
import (
|
|
"fmt"
|
|
"path/filepath"
|
|
"time"
|
|
|
|
apiequality "k8s.io/apimachinery/pkg/api/equality"
|
|
"k8s.io/apimachinery/pkg/util/wait"
|
|
clientset "k8s.io/client-go/kubernetes"
|
|
v1core "k8s.io/client-go/kubernetes/typed/core/v1"
|
|
"k8s.io/client-go/tools/cache"
|
|
"k8s.io/kubernetes/pkg/kubelet/apis/kubeletconfig"
|
|
"k8s.io/kubernetes/pkg/kubelet/apis/kubeletconfig/validation"
|
|
|
|
"k8s.io/kubernetes/pkg/kubelet/kubeletconfig/checkpoint"
|
|
"k8s.io/kubernetes/pkg/kubelet/kubeletconfig/checkpoint/store"
|
|
"k8s.io/kubernetes/pkg/kubelet/kubeletconfig/status"
|
|
utillog "k8s.io/kubernetes/pkg/kubelet/kubeletconfig/util/log"
|
|
utilpanic "k8s.io/kubernetes/pkg/kubelet/kubeletconfig/util/panic"
|
|
utilfs "k8s.io/kubernetes/pkg/util/filesystem"
|
|
)
|
|
|
|
const (
|
|
storeDir = "store"
|
|
// TODO(mtaufen): We may expose this in a future API, but for the time being we use an internal default,
|
|
// because it is not especially clear where this should live in the API.
|
|
configTrialDuration = 10 * time.Minute
|
|
)
|
|
|
|
// TransformFunc edits the KubeletConfiguration in-place, and returns an
|
|
// error if any of the transformations failed.
|
|
type TransformFunc func(kc *kubeletconfig.KubeletConfiguration) error
|
|
|
|
// Controller manages syncing dynamic Kubelet configurations
|
|
// For more information, see the proposal: https://github.com/kubernetes/community/blob/master/contributors/design-proposals/node/dynamic-kubelet-configuration.md
|
|
type Controller struct {
|
|
// transform applies an arbitrary transformation to config after loading, and before validation.
|
|
// This can be used, for example, to include config from flags before the controller's validation step.
|
|
// If transform returns an error, loadConfig will fail, and an InternalError will be reported.
|
|
// Be wary if using this function as an extension point, in most cases the controller should
|
|
// probably just be natively extended to do what you need. Injecting flag precedence transformations
|
|
// is something of an exception because the caller of this controller (cmd/) is aware of flags, but this
|
|
// controller's tree (pkg/) is not.
|
|
transform TransformFunc
|
|
|
|
// pendingConfigSource; write to this channel to indicate that the config source needs to be synced from the API server
|
|
pendingConfigSource chan bool
|
|
|
|
// configStatus manages the status we report on the Node object
|
|
configStatus status.NodeConfigStatus
|
|
|
|
// nodeInformer is the informer that watches the Node object
|
|
nodeInformer cache.SharedInformer
|
|
|
|
// remoteConfigSourceInformer is the informer that watches the assigned config source
|
|
remoteConfigSourceInformer cache.SharedInformer
|
|
|
|
// checkpointStore persists config source checkpoints to a storage layer
|
|
checkpointStore store.Store
|
|
}
|
|
|
|
// NewController constructs a new Controller object and returns it. The dynamicConfigDir
|
|
// path must be absolute. transform applies an arbitrary transformation to config after loading, and before validation.
|
|
// This can be used, for example, to include config from flags before the controller's validation step.
|
|
// If transform returns an error, loadConfig will fail, and an InternalError will be reported.
|
|
// Be wary if using this function as an extension point, in most cases the controller should
|
|
// probably just be natively extended to do what you need. Injecting flag precedence transformations
|
|
// is something of an exception because the caller of this controller (cmd/) is aware of flags, but this
|
|
// controller's tree (pkg/) is not.
|
|
func NewController(dynamicConfigDir string, transform TransformFunc) *Controller {
|
|
return &Controller{
|
|
transform: transform,
|
|
// channels must have capacity at least 1, since we signal with non-blocking writes
|
|
pendingConfigSource: make(chan bool, 1),
|
|
configStatus: status.NewNodeConfigStatus(),
|
|
checkpointStore: store.NewFsStore(utilfs.DefaultFs{}, filepath.Join(dynamicConfigDir, storeDir)),
|
|
}
|
|
}
|
|
|
|
// Bootstrap attempts to return a valid KubeletConfiguration based on the configuration of the Controller,
|
|
// or returns an error if no valid configuration could be produced. Bootstrap should be called synchronously before StartSync.
|
|
// If the pre-existing local configuration should be used, Bootstrap returns a nil config.
|
|
func (cc *Controller) Bootstrap() (*kubeletconfig.KubeletConfiguration, error) {
|
|
utillog.Infof("starting controller")
|
|
|
|
// ensure the filesystem is initialized
|
|
if err := cc.initializeDynamicConfigDir(); err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
// determine assigned source and set status
|
|
assignedSource, err := cc.checkpointStore.Assigned()
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
if assignedSource != nil {
|
|
cc.configStatus.SetAssigned(assignedSource.NodeConfigSource())
|
|
}
|
|
|
|
// determine last-known-good source and set status
|
|
lastKnownGoodSource, err := cc.checkpointStore.LastKnownGood()
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
if lastKnownGoodSource != nil {
|
|
cc.configStatus.SetLastKnownGood(lastKnownGoodSource.NodeConfigSource())
|
|
}
|
|
|
|
// if the assigned source is nil, return nil to indicate local config
|
|
if assignedSource == nil {
|
|
return nil, nil
|
|
}
|
|
|
|
// attempt to load assigned config
|
|
assignedConfig, reason, err := cc.loadConfig(assignedSource)
|
|
if err == nil {
|
|
// update the active source to the non-nil assigned source
|
|
cc.configStatus.SetActive(assignedSource.NodeConfigSource())
|
|
|
|
// update the last-known-good config if necessary, and start a timer that
|
|
// periodically checks whether the last-known good needs to be updated
|
|
// we only do this when the assigned config loads and passes validation
|
|
// wait.Forever will call the func once before starting the timer
|
|
go wait.Forever(func() { cc.checkTrial(configTrialDuration) }, 10*time.Second)
|
|
|
|
return assignedConfig, nil
|
|
} // Assert: the assigned config failed to load or validate
|
|
|
|
// TODO(mtaufen): consider re-attempting download when a load/verify/parse/validate
|
|
// error happens outside trial period, we already made it past the trial so it's probably filesystem corruption
|
|
// or something else scary
|
|
|
|
// log the reason and error details for the failure to load the assigned config
|
|
utillog.Errorf(fmt.Sprintf("%s, error: %v", reason, err))
|
|
|
|
// set status to indicate the failure with the assigned config
|
|
cc.configStatus.SetError(reason)
|
|
|
|
// if the last-known-good source is nil, return nil to indicate local config
|
|
if lastKnownGoodSource == nil {
|
|
return nil, nil
|
|
}
|
|
|
|
// attempt to load the last-known-good config
|
|
lastKnownGoodConfig, _, err := cc.loadConfig(lastKnownGoodSource)
|
|
if err != nil {
|
|
// we failed to load the last-known-good, so something is really messed up and we just return the error
|
|
return nil, err
|
|
}
|
|
|
|
// set status to indicate the active source is the non-nil last-known-good source
|
|
cc.configStatus.SetActive(lastKnownGoodSource.NodeConfigSource())
|
|
return lastKnownGoodConfig, nil
|
|
}
|
|
|
|
// StartSync tells the controller to start the goroutines that sync status/config to/from the API server.
|
|
// The clients must be non-nil, and the nodeName must be non-empty.
|
|
func (cc *Controller) StartSync(client clientset.Interface, eventClient v1core.EventsGetter, nodeName string) error {
|
|
const errFmt = "cannot start Kubelet config sync: %s"
|
|
if client == nil {
|
|
return fmt.Errorf(errFmt, "nil client")
|
|
}
|
|
if eventClient == nil {
|
|
return fmt.Errorf(errFmt, "nil event client")
|
|
}
|
|
if nodeName == "" {
|
|
return fmt.Errorf(errFmt, "empty nodeName")
|
|
}
|
|
|
|
// Rather than use utilruntime.HandleCrash, which doesn't actually crash in the Kubelet,
|
|
// we use HandlePanic to manually call the panic handlers and then crash.
|
|
// We have a better chance of recovering normal operation if we just restart the Kubelet in the event
|
|
// of a Go runtime error.
|
|
// NOTE(mtaufen): utilpanic.HandlePanic returns a function and you have to call it for your thing to run!
|
|
// This was EVIL to debug (difficult to see missing `()`).
|
|
// The code now uses `go name()` instead of `go utilpanic.HandlePanic(func(){...})()` to avoid confusion.
|
|
|
|
// status sync worker
|
|
statusSyncLoopFunc := utilpanic.HandlePanic(func() {
|
|
utillog.Infof("starting status sync loop")
|
|
wait.JitterUntil(func() {
|
|
cc.configStatus.Sync(client, nodeName)
|
|
}, 10*time.Second, 0.2, true, wait.NeverStop)
|
|
})
|
|
// remote config source informer, if we have a remote source to watch
|
|
assignedSource, err := cc.checkpointStore.Assigned()
|
|
if err != nil {
|
|
return fmt.Errorf(errFmt, err)
|
|
} else if assignedSource == nil {
|
|
utillog.Infof("local source is assigned, will not start remote config source informer")
|
|
} else {
|
|
cc.remoteConfigSourceInformer = assignedSource.Informer(client, cache.ResourceEventHandlerFuncs{
|
|
AddFunc: cc.onAddRemoteConfigSourceEvent,
|
|
UpdateFunc: cc.onUpdateRemoteConfigSourceEvent,
|
|
DeleteFunc: cc.onDeleteRemoteConfigSourceEvent,
|
|
},
|
|
)
|
|
}
|
|
remoteConfigSourceInformerFunc := utilpanic.HandlePanic(func() {
|
|
if cc.remoteConfigSourceInformer != nil {
|
|
utillog.Infof("starting remote config source informer")
|
|
cc.remoteConfigSourceInformer.Run(wait.NeverStop)
|
|
}
|
|
})
|
|
// node informer
|
|
cc.nodeInformer = newSharedNodeInformer(client, nodeName,
|
|
cc.onAddNodeEvent, cc.onUpdateNodeEvent, cc.onDeleteNodeEvent)
|
|
nodeInformerFunc := utilpanic.HandlePanic(func() {
|
|
utillog.Infof("starting Node informer")
|
|
cc.nodeInformer.Run(wait.NeverStop)
|
|
})
|
|
// config sync worker
|
|
configSyncLoopFunc := utilpanic.HandlePanic(func() {
|
|
utillog.Infof("starting Kubelet config sync loop")
|
|
wait.JitterUntil(func() {
|
|
cc.syncConfigSource(client, eventClient, nodeName)
|
|
}, 10*time.Second, 0.2, true, wait.NeverStop)
|
|
})
|
|
|
|
go statusSyncLoopFunc()
|
|
go remoteConfigSourceInformerFunc()
|
|
go nodeInformerFunc()
|
|
go configSyncLoopFunc()
|
|
return nil
|
|
}
|
|
|
|
// loadConfig loads Kubelet config from a checkpoint
|
|
// It returns the loaded configuration or a clean failure reason (for status reporting) and an error.
|
|
func (cc *Controller) loadConfig(source checkpoint.RemoteConfigSource) (*kubeletconfig.KubeletConfiguration, string, error) {
|
|
// load KubeletConfiguration from checkpoint
|
|
kc, err := cc.checkpointStore.Load(source)
|
|
if err != nil {
|
|
return nil, status.LoadError, err
|
|
}
|
|
// apply any required transformations to the KubeletConfiguration
|
|
if cc.transform != nil {
|
|
if err := cc.transform(kc); err != nil {
|
|
return nil, status.InternalError, err
|
|
}
|
|
}
|
|
// validate the result
|
|
if err := validation.ValidateKubeletConfiguration(kc); err != nil {
|
|
return nil, status.ValidateError, err
|
|
}
|
|
return kc, "", nil
|
|
}
|
|
|
|
// initializeDynamicConfigDir makes sure that the storage layers for various controller components are set up correctly
|
|
func (cc *Controller) initializeDynamicConfigDir() error {
|
|
utillog.Infof("ensuring filesystem is set up correctly")
|
|
// initializeDynamicConfigDir local checkpoint storage location
|
|
return cc.checkpointStore.Initialize()
|
|
}
|
|
|
|
// checkTrial checks whether the trial duration has passed, and updates the last-known-good config if necessary
|
|
func (cc *Controller) checkTrial(duration time.Duration) {
|
|
// when the trial period is over, the assigned config becomes the last-known-good
|
|
if trial, err := cc.inTrial(duration); err != nil {
|
|
utillog.Errorf("failed to check trial period for assigned config, error: %v", err)
|
|
} else if !trial {
|
|
if err := cc.graduateAssignedToLastKnownGood(); err != nil {
|
|
utillog.Errorf("failed to set last-known-good to assigned config, error: %v", err)
|
|
}
|
|
}
|
|
}
|
|
|
|
// inTrial returns true if the time elapsed since the last modification of the assigned config does not exceed `trialDur`, false otherwise
|
|
func (cc *Controller) inTrial(trialDur time.Duration) (bool, error) {
|
|
now := time.Now()
|
|
t, err := cc.checkpointStore.AssignedModified()
|
|
if err != nil {
|
|
return false, err
|
|
}
|
|
if now.Sub(t) <= trialDur {
|
|
return true, nil
|
|
}
|
|
return false, nil
|
|
}
|
|
|
|
// graduateAssignedToLastKnownGood sets the last-known-good in the checkpointStore
|
|
// to the same value as the assigned config maintained by the checkpointStore
|
|
func (cc *Controller) graduateAssignedToLastKnownGood() error {
|
|
// get assigned
|
|
assigned, err := cc.checkpointStore.Assigned()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
// get last-known-good
|
|
lastKnownGood, err := cc.checkpointStore.LastKnownGood()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
// if the sources are equal, no need to change
|
|
if assigned == lastKnownGood ||
|
|
assigned != nil && lastKnownGood != nil && apiequality.Semantic.DeepEqual(assigned, lastKnownGood) {
|
|
return nil
|
|
}
|
|
// update last-known-good
|
|
err = cc.checkpointStore.SetLastKnownGood(assigned)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
// update the status to reflect the new last-known-good config
|
|
cc.configStatus.SetLastKnownGood(assigned.NodeConfigSource())
|
|
utillog.Infof("updated last-known-good config to %s, UID: %s, ResourceVersion: %s", assigned.APIPath(), assigned.UID(), assigned.ResourceVersion())
|
|
return nil
|
|
}
|