mirror of
https://github.com/ceph/ceph-csi.git
synced 2024-11-30 18:20:19 +00:00
966 lines
35 KiB
Go
966 lines
35 KiB
Go
/*
|
|
Copyright 2015 The Kubernetes Authors.
|
|
|
|
Licensed under the Apache License, Version 2.0 (the "License");
|
|
you may not use this file except in compliance with the License.
|
|
You may obtain a copy of the License at
|
|
|
|
http://www.apache.org/licenses/LICENSE-2.0
|
|
|
|
Unless required by applicable law or agreed to in writing, software
|
|
distributed under the License is distributed on an "AS IS" BASIS,
|
|
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
See the License for the specific language governing permissions and
|
|
limitations under the License.
|
|
*/
|
|
|
|
// Package app makes it easy to create a kubelet server for various contexts.
|
|
package app
|
|
|
|
import (
|
|
"crypto/tls"
|
|
"errors"
|
|
"fmt"
|
|
"math/rand"
|
|
"net"
|
|
"net/http"
|
|
_ "net/http/pprof"
|
|
"net/url"
|
|
"os"
|
|
"path"
|
|
"path/filepath"
|
|
"strconv"
|
|
"time"
|
|
|
|
"github.com/golang/glog"
|
|
"github.com/spf13/cobra"
|
|
"github.com/spf13/pflag"
|
|
|
|
"k8s.io/api/core/v1"
|
|
"k8s.io/apimachinery/pkg/api/resource"
|
|
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
|
"k8s.io/apimachinery/pkg/types"
|
|
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
|
|
"k8s.io/apimachinery/pkg/util/sets"
|
|
"k8s.io/apimachinery/pkg/util/wait"
|
|
"k8s.io/apiserver/pkg/server/healthz"
|
|
utilfeature "k8s.io/apiserver/pkg/util/feature"
|
|
"k8s.io/apiserver/pkg/util/flag"
|
|
clientset "k8s.io/client-go/kubernetes"
|
|
v1core "k8s.io/client-go/kubernetes/typed/core/v1"
|
|
restclient "k8s.io/client-go/rest"
|
|
"k8s.io/client-go/tools/clientcmd"
|
|
"k8s.io/client-go/tools/record"
|
|
certutil "k8s.io/client-go/util/cert"
|
|
"k8s.io/client-go/util/certificate"
|
|
"k8s.io/kubernetes/cmd/kubelet/app/options"
|
|
"k8s.io/kubernetes/pkg/api/legacyscheme"
|
|
api "k8s.io/kubernetes/pkg/apis/core"
|
|
"k8s.io/kubernetes/pkg/capabilities"
|
|
"k8s.io/kubernetes/pkg/client/chaosclient"
|
|
"k8s.io/kubernetes/pkg/cloudprovider"
|
|
"k8s.io/kubernetes/pkg/credentialprovider"
|
|
"k8s.io/kubernetes/pkg/features"
|
|
"k8s.io/kubernetes/pkg/kubelet"
|
|
kubeletconfiginternal "k8s.io/kubernetes/pkg/kubelet/apis/kubeletconfig"
|
|
kubeletscheme "k8s.io/kubernetes/pkg/kubelet/apis/kubeletconfig/scheme"
|
|
kubeletconfigv1alpha1 "k8s.io/kubernetes/pkg/kubelet/apis/kubeletconfig/v1alpha1"
|
|
"k8s.io/kubernetes/pkg/kubelet/cadvisor"
|
|
kubeletcertificate "k8s.io/kubernetes/pkg/kubelet/certificate"
|
|
"k8s.io/kubernetes/pkg/kubelet/certificate/bootstrap"
|
|
"k8s.io/kubernetes/pkg/kubelet/cm"
|
|
"k8s.io/kubernetes/pkg/kubelet/config"
|
|
kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
|
|
"k8s.io/kubernetes/pkg/kubelet/dockershim"
|
|
dockerremote "k8s.io/kubernetes/pkg/kubelet/dockershim/remote"
|
|
"k8s.io/kubernetes/pkg/kubelet/eviction"
|
|
evictionapi "k8s.io/kubernetes/pkg/kubelet/eviction/api"
|
|
"k8s.io/kubernetes/pkg/kubelet/kubeletconfig"
|
|
"k8s.io/kubernetes/pkg/kubelet/server"
|
|
"k8s.io/kubernetes/pkg/kubelet/server/streaming"
|
|
kubetypes "k8s.io/kubernetes/pkg/kubelet/types"
|
|
"k8s.io/kubernetes/pkg/util/configz"
|
|
"k8s.io/kubernetes/pkg/util/flock"
|
|
kubeio "k8s.io/kubernetes/pkg/util/io"
|
|
"k8s.io/kubernetes/pkg/util/mount"
|
|
nodeutil "k8s.io/kubernetes/pkg/util/node"
|
|
"k8s.io/kubernetes/pkg/util/oom"
|
|
"k8s.io/kubernetes/pkg/util/rlimit"
|
|
"k8s.io/kubernetes/pkg/version"
|
|
)
|
|
|
|
const (
|
|
// Kubelet component name
|
|
componentKubelet = "kubelet"
|
|
)
|
|
|
|
// NewKubeletCommand creates a *cobra.Command object with default parameters
|
|
func NewKubeletCommand() *cobra.Command {
|
|
// ignore the error, as this is just for generating docs and the like
|
|
s, _ := options.NewKubeletServer()
|
|
s.AddFlags(pflag.CommandLine)
|
|
cmd := &cobra.Command{
|
|
Use: componentKubelet,
|
|
Long: `The kubelet is the primary "node agent" that runs on each
|
|
node. The kubelet works in terms of a PodSpec. A PodSpec is a YAML or JSON object
|
|
that describes a pod. The kubelet takes a set of PodSpecs that are provided through
|
|
various mechanisms (primarily through the apiserver) and ensures that the containers
|
|
described in those PodSpecs are running and healthy. The kubelet doesn't manage
|
|
containers which were not created by Kubernetes.
|
|
|
|
Other than from an PodSpec from the apiserver, there are three ways that a container
|
|
manifest can be provided to the Kubelet.
|
|
|
|
File: Path passed as a flag on the command line. Files under this path will be monitored
|
|
periodically for updates. The monitoring period is 20s by default and is configurable
|
|
via a flag.
|
|
|
|
HTTP endpoint: HTTP endpoint passed as a parameter on the command line. This endpoint
|
|
is checked every 20 seconds (also configurable with a flag).
|
|
|
|
HTTP server: The kubelet can also listen for HTTP and respond to a simple API
|
|
(underspec'd currently) to submit a new manifest.`,
|
|
Run: func(cmd *cobra.Command, args []string) {
|
|
},
|
|
}
|
|
|
|
return cmd
|
|
}
|
|
|
|
// UnsecuredDependencies returns a Dependencies suitable for being run, or an error if the server setup
|
|
// is not valid. It will not start any background processes, and does not include authentication/authorization
|
|
func UnsecuredDependencies(s *options.KubeletServer) (*kubelet.Dependencies, error) {
|
|
// Initialize the TLS Options
|
|
tlsOptions, err := InitializeTLS(&s.KubeletFlags, &s.KubeletConfiguration)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
mounter := mount.New(s.ExperimentalMounterPath)
|
|
var writer kubeio.Writer = &kubeio.StdWriter{}
|
|
if s.Containerized {
|
|
glog.V(2).Info("Running kubelet in containerized mode (experimental)")
|
|
mounter = mount.NewNsenterMounter()
|
|
writer = &kubeio.NsenterWriter{}
|
|
}
|
|
|
|
var dockerClientConfig *dockershim.ClientConfig
|
|
if s.ContainerRuntime == kubetypes.DockerContainerRuntime {
|
|
dockerClientConfig = &dockershim.ClientConfig{
|
|
DockerEndpoint: s.DockerEndpoint,
|
|
RuntimeRequestTimeout: s.RuntimeRequestTimeout.Duration,
|
|
ImagePullProgressDeadline: s.ImagePullProgressDeadline.Duration,
|
|
}
|
|
}
|
|
|
|
return &kubelet.Dependencies{
|
|
Auth: nil, // default does not enforce auth[nz]
|
|
CAdvisorInterface: nil, // cadvisor.New launches background processes (bg http.ListenAndServe, and some bg cleaners), not set here
|
|
Cloud: nil, // cloud provider might start background processes
|
|
ContainerManager: nil,
|
|
DockerClientConfig: dockerClientConfig,
|
|
KubeClient: nil,
|
|
HeartbeatClient: nil,
|
|
ExternalKubeClient: nil,
|
|
EventClient: nil,
|
|
Mounter: mounter,
|
|
NetworkPlugins: ProbeNetworkPlugins(s.CNIConfDir, s.CNIBinDir),
|
|
OOMAdjuster: oom.NewOOMAdjuster(),
|
|
OSInterface: kubecontainer.RealOS{},
|
|
Writer: writer,
|
|
VolumePlugins: ProbeVolumePlugins(),
|
|
DynamicPluginProber: GetDynamicPluginProber(s.VolumePluginDir),
|
|
TLSOptions: tlsOptions}, nil
|
|
}
|
|
|
|
// Run runs the specified KubeletServer with the given Dependencies. This should never exit.
|
|
// The kubeDeps argument may be nil - if so, it is initialized from the settings on KubeletServer.
|
|
// Otherwise, the caller is assumed to have set up the Dependencies object and a default one will
|
|
// not be generated.
|
|
func Run(s *options.KubeletServer, kubeDeps *kubelet.Dependencies) error {
|
|
// To help debugging, immediately log version
|
|
glog.Infof("Version: %+v", version.Get())
|
|
if err := run(s, kubeDeps); err != nil {
|
|
return fmt.Errorf("failed to run Kubelet: %v", err)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func checkPermissions() error {
|
|
if uid := os.Getuid(); uid != 0 {
|
|
return fmt.Errorf("Kubelet needs to run as uid `0`. It is being run as %d", uid)
|
|
}
|
|
// TODO: Check if kubelet is running in the `initial` user namespace.
|
|
// http://man7.org/linux/man-pages/man7/user_namespaces.7.html
|
|
return nil
|
|
}
|
|
|
|
func setConfigz(cz *configz.Config, kc *kubeletconfiginternal.KubeletConfiguration) error {
|
|
scheme, _, err := kubeletscheme.NewSchemeAndCodecs()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
versioned := kubeletconfigv1alpha1.KubeletConfiguration{}
|
|
if err := scheme.Convert(kc, &versioned, nil); err != nil {
|
|
return err
|
|
}
|
|
cz.Set(versioned)
|
|
return nil
|
|
}
|
|
|
|
func initConfigz(kc *kubeletconfiginternal.KubeletConfiguration) error {
|
|
cz, err := configz.New("kubeletconfig")
|
|
if err != nil {
|
|
glog.Errorf("unable to register configz: %s", err)
|
|
return err
|
|
}
|
|
if err := setConfigz(cz, kc); err != nil {
|
|
glog.Errorf("unable to register config: %s", err)
|
|
return err
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// makeEventRecorder sets up kubeDeps.Recorder if it's nil. It's a no-op otherwise.
|
|
func makeEventRecorder(kubeDeps *kubelet.Dependencies, nodeName types.NodeName) {
|
|
if kubeDeps.Recorder != nil {
|
|
return
|
|
}
|
|
eventBroadcaster := record.NewBroadcaster()
|
|
kubeDeps.Recorder = eventBroadcaster.NewRecorder(legacyscheme.Scheme, v1.EventSource{Component: componentKubelet, Host: string(nodeName)})
|
|
eventBroadcaster.StartLogging(glog.V(3).Infof)
|
|
if kubeDeps.EventClient != nil {
|
|
glog.V(4).Infof("Sending events to api server.")
|
|
eventBroadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: kubeDeps.EventClient.Events("")})
|
|
} else {
|
|
glog.Warning("No api server defined - no events will be sent to API server.")
|
|
}
|
|
}
|
|
|
|
func run(s *options.KubeletServer, kubeDeps *kubelet.Dependencies) (err error) {
|
|
// Set global feature gates based on the value on the initial KubeletServer
|
|
err = utilfeature.DefaultFeatureGate.SetFromMap(s.KubeletConfiguration.FeatureGates)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
// validate the initial KubeletServer (we set feature gates first, because this validation depends on feature gates)
|
|
if err := options.ValidateKubeletServer(s); err != nil {
|
|
return err
|
|
}
|
|
|
|
// Obtain Kubelet Lock File
|
|
if s.ExitOnLockContention && s.LockFilePath == "" {
|
|
return errors.New("cannot exit on lock file contention: no lock file specified")
|
|
}
|
|
done := make(chan struct{})
|
|
if s.LockFilePath != "" {
|
|
glog.Infof("acquiring file lock on %q", s.LockFilePath)
|
|
if err := flock.Acquire(s.LockFilePath); err != nil {
|
|
return fmt.Errorf("unable to acquire file lock on %q: %v", s.LockFilePath, err)
|
|
}
|
|
if s.ExitOnLockContention {
|
|
glog.Infof("watching for inotify events for: %v", s.LockFilePath)
|
|
if err := watchForLockfileContention(s.LockFilePath, done); err != nil {
|
|
return err
|
|
}
|
|
}
|
|
}
|
|
|
|
// Register current configuration with /configz endpoint
|
|
err = initConfigz(&s.KubeletConfiguration)
|
|
if err != nil {
|
|
glog.Errorf("unable to register KubeletConfiguration with configz, error: %v", err)
|
|
}
|
|
|
|
// About to get clients and such, detect standaloneMode
|
|
standaloneMode := true
|
|
switch {
|
|
case s.RequireKubeConfig == true:
|
|
standaloneMode = false
|
|
glog.Warningf("--require-kubeconfig is deprecated. Set --kubeconfig without using --require-kubeconfig.")
|
|
case s.KubeConfig.Provided():
|
|
standaloneMode = false
|
|
}
|
|
|
|
if kubeDeps == nil {
|
|
kubeDeps, err = UnsecuredDependencies(s)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
}
|
|
|
|
if s.CloudProvider == kubeletconfigv1alpha1.AutoDetectCloudProvider {
|
|
glog.Warning("--cloud-provider=auto-detect is deprecated. The desired cloud provider should be set explicitly")
|
|
}
|
|
|
|
if kubeDeps.Cloud == nil {
|
|
if !cloudprovider.IsExternal(s.CloudProvider) && s.CloudProvider != kubeletconfigv1alpha1.AutoDetectCloudProvider {
|
|
cloud, err := cloudprovider.InitCloudProvider(s.CloudProvider, s.CloudConfigFile)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
if cloud == nil {
|
|
glog.V(2).Infof("No cloud provider specified: %q from the config file: %q\n", s.CloudProvider, s.CloudConfigFile)
|
|
} else {
|
|
glog.V(2).Infof("Successfully initialized cloud provider: %q from the config file: %q\n", s.CloudProvider, s.CloudConfigFile)
|
|
}
|
|
kubeDeps.Cloud = cloud
|
|
}
|
|
}
|
|
|
|
nodeName, err := getNodeName(kubeDeps.Cloud, nodeutil.GetHostname(s.HostnameOverride))
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
if s.BootstrapKubeconfig != "" {
|
|
if err := bootstrap.LoadClientCert(s.KubeConfig.Value(), s.BootstrapKubeconfig, s.CertDirectory, nodeName); err != nil {
|
|
return err
|
|
}
|
|
}
|
|
|
|
// if in standalone mode, indicate as much by setting all clients to nil
|
|
if standaloneMode {
|
|
kubeDeps.KubeClient = nil
|
|
kubeDeps.ExternalKubeClient = nil
|
|
kubeDeps.EventClient = nil
|
|
kubeDeps.HeartbeatClient = nil
|
|
glog.Warningf("standalone mode, no API client")
|
|
} else if kubeDeps.KubeClient == nil || kubeDeps.ExternalKubeClient == nil || kubeDeps.EventClient == nil || kubeDeps.HeartbeatClient == nil {
|
|
// initialize clients if not standalone mode and any of the clients are not provided
|
|
var kubeClient clientset.Interface
|
|
var eventClient v1core.EventsGetter
|
|
var heartbeatClient v1core.CoreV1Interface
|
|
var externalKubeClient clientset.Interface
|
|
|
|
clientConfig, err := CreateAPIServerClientConfig(s)
|
|
|
|
var clientCertificateManager certificate.Manager
|
|
if err == nil {
|
|
if s.RotateCertificates && utilfeature.DefaultFeatureGate.Enabled(features.RotateKubeletClientCertificate) {
|
|
clientCertificateManager, err = kubeletcertificate.NewKubeletClientCertificateManager(s.CertDirectory, nodeName, clientConfig.CertData, clientConfig.KeyData, clientConfig.CertFile, clientConfig.KeyFile)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
// we set exitIfExpired to true because we use this client configuration to request new certs - if we are unable
|
|
// to request new certs, we will be unable to continue normal operation
|
|
if err := kubeletcertificate.UpdateTransport(wait.NeverStop, clientConfig, clientCertificateManager, true); err != nil {
|
|
return err
|
|
}
|
|
}
|
|
|
|
kubeClient, err = clientset.NewForConfig(clientConfig)
|
|
if err != nil {
|
|
glog.Warningf("New kubeClient from clientConfig error: %v", err)
|
|
} else if kubeClient.CertificatesV1beta1() != nil && clientCertificateManager != nil {
|
|
glog.V(2).Info("Starting client certificate rotation.")
|
|
clientCertificateManager.SetCertificateSigningRequestClient(kubeClient.CertificatesV1beta1().CertificateSigningRequests())
|
|
clientCertificateManager.Start()
|
|
}
|
|
externalKubeClient, err = clientset.NewForConfig(clientConfig)
|
|
if err != nil {
|
|
glog.Warningf("New kubeClient from clientConfig error: %v", err)
|
|
}
|
|
|
|
// make a separate client for events
|
|
eventClientConfig := *clientConfig
|
|
eventClientConfig.QPS = float32(s.EventRecordQPS)
|
|
eventClientConfig.Burst = int(s.EventBurst)
|
|
eventClient, err = v1core.NewForConfig(&eventClientConfig)
|
|
if err != nil {
|
|
glog.Warningf("Failed to create API Server client for Events: %v", err)
|
|
}
|
|
|
|
// make a separate client for heartbeat with throttling disabled and a timeout attached
|
|
heartbeatClientConfig := *clientConfig
|
|
heartbeatClientConfig.Timeout = s.KubeletConfiguration.NodeStatusUpdateFrequency.Duration
|
|
heartbeatClientConfig.QPS = float32(-1)
|
|
heartbeatClient, err = v1core.NewForConfig(&heartbeatClientConfig)
|
|
if err != nil {
|
|
glog.Warningf("Failed to create API Server client for heartbeat: %v", err)
|
|
}
|
|
} else {
|
|
switch {
|
|
case s.RequireKubeConfig:
|
|
return fmt.Errorf("invalid kubeconfig: %v", err)
|
|
case s.KubeConfig.Provided():
|
|
glog.Warningf("invalid kubeconfig: %v", err)
|
|
}
|
|
}
|
|
|
|
kubeDeps.KubeClient = kubeClient
|
|
kubeDeps.ExternalKubeClient = externalKubeClient
|
|
if heartbeatClient != nil {
|
|
kubeDeps.HeartbeatClient = heartbeatClient
|
|
}
|
|
if eventClient != nil {
|
|
kubeDeps.EventClient = eventClient
|
|
}
|
|
}
|
|
|
|
// Alpha Dynamic Configuration Implementation;
|
|
// if the kubelet config controller is available, inject the latest to start the config and status sync loops
|
|
if utilfeature.DefaultFeatureGate.Enabled(features.DynamicKubeletConfig) && kubeDeps.KubeletConfigController != nil && !standaloneMode && !s.RunOnce {
|
|
kubeDeps.KubeletConfigController.StartSync(kubeDeps.KubeClient, string(nodeName))
|
|
}
|
|
|
|
if kubeDeps.Auth == nil {
|
|
auth, err := BuildAuth(nodeName, kubeDeps.ExternalKubeClient, s.KubeletConfiguration)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
kubeDeps.Auth = auth
|
|
}
|
|
|
|
if kubeDeps.CAdvisorInterface == nil {
|
|
imageFsInfoProvider := cadvisor.NewImageFsInfoProvider(s.ContainerRuntime, s.RemoteRuntimeEndpoint)
|
|
kubeDeps.CAdvisorInterface, err = cadvisor.New(s.Address, uint(s.CAdvisorPort), imageFsInfoProvider, s.RootDirectory, cadvisor.UsingLegacyCadvisorStats(s.ContainerRuntime, s.RemoteRuntimeEndpoint))
|
|
if err != nil {
|
|
return err
|
|
}
|
|
}
|
|
|
|
// Setup event recorder if required.
|
|
makeEventRecorder(kubeDeps, nodeName)
|
|
|
|
if kubeDeps.ContainerManager == nil {
|
|
if s.CgroupsPerQOS && s.CgroupRoot == "" {
|
|
glog.Infof("--cgroups-per-qos enabled, but --cgroup-root was not specified. defaulting to /")
|
|
s.CgroupRoot = "/"
|
|
}
|
|
kubeReserved, err := parseResourceList(s.KubeReserved)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
systemReserved, err := parseResourceList(s.SystemReserved)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
var hardEvictionThresholds []evictionapi.Threshold
|
|
// If the user requested to ignore eviction thresholds, then do not set valid values for hardEvictionThresholds here.
|
|
if !s.ExperimentalNodeAllocatableIgnoreEvictionThreshold {
|
|
hardEvictionThresholds, err = eviction.ParseThresholdConfig([]string{}, s.EvictionHard, nil, nil, nil)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
}
|
|
experimentalQOSReserved, err := cm.ParseQOSReserved(s.ExperimentalQOSReserved)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
devicePluginEnabled := utilfeature.DefaultFeatureGate.Enabled(features.DevicePlugins)
|
|
|
|
kubeDeps.ContainerManager, err = cm.NewContainerManager(
|
|
kubeDeps.Mounter,
|
|
kubeDeps.CAdvisorInterface,
|
|
cm.NodeConfig{
|
|
RuntimeCgroupsName: s.RuntimeCgroups,
|
|
SystemCgroupsName: s.SystemCgroups,
|
|
KubeletCgroupsName: s.KubeletCgroups,
|
|
ContainerRuntime: s.ContainerRuntime,
|
|
CgroupsPerQOS: s.CgroupsPerQOS,
|
|
CgroupRoot: s.CgroupRoot,
|
|
CgroupDriver: s.CgroupDriver,
|
|
KubeletRootDir: s.RootDirectory,
|
|
ProtectKernelDefaults: s.ProtectKernelDefaults,
|
|
NodeAllocatableConfig: cm.NodeAllocatableConfig{
|
|
KubeReservedCgroupName: s.KubeReservedCgroup,
|
|
SystemReservedCgroupName: s.SystemReservedCgroup,
|
|
EnforceNodeAllocatable: sets.NewString(s.EnforceNodeAllocatable...),
|
|
KubeReserved: kubeReserved,
|
|
SystemReserved: systemReserved,
|
|
HardEvictionThresholds: hardEvictionThresholds,
|
|
},
|
|
ExperimentalQOSReserved: *experimentalQOSReserved,
|
|
ExperimentalCPUManagerPolicy: s.CPUManagerPolicy,
|
|
ExperimentalCPUManagerReconcilePeriod: s.CPUManagerReconcilePeriod.Duration,
|
|
},
|
|
s.FailSwapOn,
|
|
devicePluginEnabled,
|
|
kubeDeps.Recorder)
|
|
|
|
if err != nil {
|
|
return err
|
|
}
|
|
}
|
|
|
|
if err := checkPermissions(); err != nil {
|
|
glog.Error(err)
|
|
}
|
|
|
|
utilruntime.ReallyCrash = s.ReallyCrashForTesting
|
|
|
|
rand.Seed(time.Now().UTC().UnixNano())
|
|
|
|
// TODO(vmarmol): Do this through container config.
|
|
oomAdjuster := kubeDeps.OOMAdjuster
|
|
if err := oomAdjuster.ApplyOOMScoreAdj(0, int(s.OOMScoreAdj)); err != nil {
|
|
glog.Warning(err)
|
|
}
|
|
|
|
if err := RunKubelet(&s.KubeletFlags, &s.KubeletConfiguration, kubeDeps, s.RunOnce); err != nil {
|
|
return err
|
|
}
|
|
|
|
if s.HealthzPort > 0 {
|
|
healthz.DefaultHealthz()
|
|
go wait.Until(func() {
|
|
err := http.ListenAndServe(net.JoinHostPort(s.HealthzBindAddress, strconv.Itoa(int(s.HealthzPort))), nil)
|
|
if err != nil {
|
|
glog.Errorf("Starting health server failed: %v", err)
|
|
}
|
|
}, 5*time.Second, wait.NeverStop)
|
|
}
|
|
|
|
if s.RunOnce {
|
|
return nil
|
|
}
|
|
|
|
<-done
|
|
return nil
|
|
}
|
|
|
|
// getNodeName returns the node name according to the cloud provider
|
|
// if cloud provider is specified. Otherwise, returns the hostname of the node.
|
|
func getNodeName(cloud cloudprovider.Interface, hostname string) (types.NodeName, error) {
|
|
if cloud == nil {
|
|
return types.NodeName(hostname), nil
|
|
}
|
|
|
|
instances, ok := cloud.Instances()
|
|
if !ok {
|
|
return "", fmt.Errorf("failed to get instances from cloud provider")
|
|
}
|
|
|
|
nodeName, err := instances.CurrentNodeName(hostname)
|
|
if err != nil {
|
|
return "", fmt.Errorf("error fetching current node name from cloud provider: %v", err)
|
|
}
|
|
|
|
glog.V(2).Infof("cloud provider determined current node name to be %s", nodeName)
|
|
|
|
return nodeName, nil
|
|
}
|
|
|
|
// InitializeTLS checks for a configured TLSCertFile and TLSPrivateKeyFile: if unspecified a new self-signed
|
|
// certificate and key file are generated. Returns a configured server.TLSOptions object.
|
|
func InitializeTLS(kf *options.KubeletFlags, kc *kubeletconfiginternal.KubeletConfiguration) (*server.TLSOptions, error) {
|
|
if !utilfeature.DefaultFeatureGate.Enabled(features.RotateKubeletServerCertificate) && kc.TLSCertFile == "" && kc.TLSPrivateKeyFile == "" {
|
|
kc.TLSCertFile = path.Join(kf.CertDirectory, "kubelet.crt")
|
|
kc.TLSPrivateKeyFile = path.Join(kf.CertDirectory, "kubelet.key")
|
|
|
|
canReadCertAndKey, err := certutil.CanReadCertAndKey(kc.TLSCertFile, kc.TLSPrivateKeyFile)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
if !canReadCertAndKey {
|
|
cert, key, err := certutil.GenerateSelfSignedCertKey(nodeutil.GetHostname(kf.HostnameOverride), nil, nil)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("unable to generate self signed cert: %v", err)
|
|
}
|
|
|
|
if err := certutil.WriteCert(kc.TLSCertFile, cert); err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
if err := certutil.WriteKey(kc.TLSPrivateKeyFile, key); err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
glog.V(4).Infof("Using self-signed cert (%s, %s)", kc.TLSCertFile, kc.TLSPrivateKeyFile)
|
|
}
|
|
}
|
|
tlsOptions := &server.TLSOptions{
|
|
Config: &tls.Config{
|
|
// Can't use SSLv3 because of POODLE and BEAST
|
|
// Can't use TLSv1.0 because of POODLE and BEAST using CBC cipher
|
|
// Can't use TLSv1.1 because of RC4 cipher usage
|
|
MinVersion: tls.VersionTLS12,
|
|
},
|
|
CertFile: kc.TLSCertFile,
|
|
KeyFile: kc.TLSPrivateKeyFile,
|
|
}
|
|
|
|
if len(kc.Authentication.X509.ClientCAFile) > 0 {
|
|
clientCAs, err := certutil.NewPool(kc.Authentication.X509.ClientCAFile)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("unable to load client CA file %s: %v", kc.Authentication.X509.ClientCAFile, err)
|
|
}
|
|
// Specify allowed CAs for client certificates
|
|
tlsOptions.Config.ClientCAs = clientCAs
|
|
// Populate PeerCertificates in requests, but don't reject connections without verified certificates
|
|
tlsOptions.Config.ClientAuth = tls.RequestClientCert
|
|
}
|
|
|
|
return tlsOptions, nil
|
|
}
|
|
|
|
func kubeconfigClientConfig(s *options.KubeletServer) (*restclient.Config, error) {
|
|
return clientcmd.NewNonInteractiveDeferredLoadingClientConfig(
|
|
&clientcmd.ClientConfigLoadingRules{ExplicitPath: s.KubeConfig.Value()},
|
|
&clientcmd.ConfigOverrides{},
|
|
).ClientConfig()
|
|
}
|
|
|
|
// createClientConfig creates a client configuration from the command line arguments.
|
|
// If --kubeconfig is explicitly set, it will be used. If it is not set but
|
|
// --require-kubeconfig=true, we attempt to load the default kubeconfig file.
|
|
func createClientConfig(s *options.KubeletServer) (*restclient.Config, error) {
|
|
// If --kubeconfig was not provided, it will have a default path set in cmd/kubelet/app/options/options.go.
|
|
// We only use that default path when --require-kubeconfig=true. The default path is temporary until --require-kubeconfig is removed.
|
|
// TODO(#41161:v1.10.0): Remove the default kubeconfig path and --require-kubeconfig.
|
|
if s.BootstrapKubeconfig != "" || s.KubeConfig.Provided() || s.RequireKubeConfig == true {
|
|
return kubeconfigClientConfig(s)
|
|
} else {
|
|
return nil, fmt.Errorf("createClientConfig called in standalone mode")
|
|
}
|
|
}
|
|
|
|
// CreateAPIServerClientConfig generates a client.Config from command line flags
|
|
// via createClientConfig and then injects chaos into the configuration via addChaosToClientConfig.
|
|
// This func is exported to support integration with third party kubelet extensions (e.g. kubernetes-mesos).
|
|
func CreateAPIServerClientConfig(s *options.KubeletServer) (*restclient.Config, error) {
|
|
clientConfig, err := createClientConfig(s)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
clientConfig.ContentType = s.ContentType
|
|
// Override kubeconfig qps/burst settings from flags
|
|
clientConfig.QPS = float32(s.KubeAPIQPS)
|
|
clientConfig.Burst = int(s.KubeAPIBurst)
|
|
|
|
addChaosToClientConfig(s, clientConfig)
|
|
return clientConfig, nil
|
|
}
|
|
|
|
// addChaosToClientConfig injects random errors into client connections if configured.
|
|
func addChaosToClientConfig(s *options.KubeletServer, config *restclient.Config) {
|
|
if s.ChaosChance != 0.0 {
|
|
config.WrapTransport = func(rt http.RoundTripper) http.RoundTripper {
|
|
seed := chaosclient.NewSeed(1)
|
|
// TODO: introduce a standard chaos package with more tunables - this is just a proof of concept
|
|
// TODO: introduce random latency and stalls
|
|
return chaosclient.NewChaosRoundTripper(rt, chaosclient.LogChaos, seed.P(s.ChaosChance, chaosclient.ErrSimulatedConnectionResetByPeer))
|
|
}
|
|
}
|
|
}
|
|
|
|
// RunKubelet is responsible for setting up and running a kubelet. It is used in three different applications:
|
|
// 1 Integration tests
|
|
// 2 Kubelet binary
|
|
// 3 Standalone 'kubernetes' binary
|
|
// Eventually, #2 will be replaced with instances of #3
|
|
func RunKubelet(kubeFlags *options.KubeletFlags, kubeCfg *kubeletconfiginternal.KubeletConfiguration, kubeDeps *kubelet.Dependencies, runOnce bool) error {
|
|
hostname := nodeutil.GetHostname(kubeFlags.HostnameOverride)
|
|
// Query the cloud provider for our node name, default to hostname if kubeDeps.Cloud == nil
|
|
nodeName, err := getNodeName(kubeDeps.Cloud, hostname)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
// Setup event recorder if required.
|
|
makeEventRecorder(kubeDeps, nodeName)
|
|
|
|
// TODO(mtaufen): I moved the validation of these fields here, from UnsecuredKubeletConfig,
|
|
// so that I could remove the associated fields from KubeletConfiginternal. I would
|
|
// prefer this to be done as part of an independent validation step on the
|
|
// KubeletConfiguration. But as far as I can tell, we don't have an explicit
|
|
// place for validation of the KubeletConfiguration yet.
|
|
hostNetworkSources, err := kubetypes.GetValidatedSources(kubeCfg.HostNetworkSources)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
hostPIDSources, err := kubetypes.GetValidatedSources(kubeCfg.HostPIDSources)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
hostIPCSources, err := kubetypes.GetValidatedSources(kubeCfg.HostIPCSources)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
privilegedSources := capabilities.PrivilegedSources{
|
|
HostNetworkSources: hostNetworkSources,
|
|
HostPIDSources: hostPIDSources,
|
|
HostIPCSources: hostIPCSources,
|
|
}
|
|
capabilities.Setup(kubeCfg.AllowPrivileged, privilegedSources, 0)
|
|
|
|
credentialprovider.SetPreferredDockercfgPath(kubeFlags.RootDirectory)
|
|
glog.V(2).Infof("Using root directory: %v", kubeFlags.RootDirectory)
|
|
|
|
builder := kubeDeps.Builder
|
|
if builder == nil {
|
|
builder = CreateAndInitKubelet
|
|
}
|
|
if kubeDeps.OSInterface == nil {
|
|
kubeDeps.OSInterface = kubecontainer.RealOS{}
|
|
}
|
|
|
|
k, err := builder(kubeCfg,
|
|
kubeDeps,
|
|
&kubeFlags.ContainerRuntimeOptions,
|
|
kubeFlags.ContainerRuntime,
|
|
kubeFlags.RuntimeCgroups,
|
|
kubeFlags.HostnameOverride,
|
|
kubeFlags.NodeIP,
|
|
kubeFlags.ProviderID,
|
|
kubeFlags.CloudProvider,
|
|
kubeFlags.CertDirectory,
|
|
kubeFlags.RootDirectory,
|
|
kubeFlags.RegisterNode,
|
|
kubeFlags.RegisterWithTaints,
|
|
kubeFlags.AllowedUnsafeSysctls,
|
|
kubeFlags.Containerized,
|
|
kubeFlags.RemoteRuntimeEndpoint,
|
|
kubeFlags.RemoteImageEndpoint,
|
|
kubeFlags.ExperimentalMounterPath,
|
|
kubeFlags.ExperimentalKernelMemcgNotification,
|
|
kubeFlags.ExperimentalCheckNodeCapabilitiesBeforeMount,
|
|
kubeFlags.ExperimentalNodeAllocatableIgnoreEvictionThreshold,
|
|
kubeFlags.MinimumGCAge,
|
|
kubeFlags.MaxPerPodContainerCount,
|
|
kubeFlags.MaxContainerCount,
|
|
kubeFlags.MasterServiceNamespace,
|
|
kubeFlags.RegisterSchedulable,
|
|
kubeFlags.NonMasqueradeCIDR,
|
|
kubeFlags.KeepTerminatedPodVolumes,
|
|
kubeFlags.NodeLabels,
|
|
kubeFlags.SeccompProfileRoot,
|
|
kubeFlags.BootstrapCheckpointPath)
|
|
if err != nil {
|
|
return fmt.Errorf("failed to create kubelet: %v", err)
|
|
}
|
|
|
|
// NewMainKubelet should have set up a pod source config if one didn't exist
|
|
// when the builder was run. This is just a precaution.
|
|
if kubeDeps.PodConfig == nil {
|
|
return fmt.Errorf("failed to create kubelet, pod source config was nil")
|
|
}
|
|
podCfg := kubeDeps.PodConfig
|
|
|
|
rlimit.RlimitNumFiles(uint64(kubeCfg.MaxOpenFiles))
|
|
|
|
// process pods and exit.
|
|
if runOnce {
|
|
if _, err := k.RunOnce(podCfg.Updates()); err != nil {
|
|
return fmt.Errorf("runonce failed: %v", err)
|
|
}
|
|
glog.Infof("Started kubelet as runonce")
|
|
} else {
|
|
startKubelet(k, podCfg, kubeCfg, kubeDeps)
|
|
glog.Infof("Started kubelet")
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func startKubelet(k kubelet.Bootstrap, podCfg *config.PodConfig, kubeCfg *kubeletconfiginternal.KubeletConfiguration, kubeDeps *kubelet.Dependencies) {
|
|
// start the kubelet
|
|
go wait.Until(func() { k.Run(podCfg.Updates()) }, 0, wait.NeverStop)
|
|
|
|
// start the kubelet server
|
|
if kubeCfg.EnableServer {
|
|
go wait.Until(func() {
|
|
k.ListenAndServe(net.ParseIP(kubeCfg.Address), uint(kubeCfg.Port), kubeDeps.TLSOptions, kubeDeps.Auth, kubeCfg.EnableDebuggingHandlers, kubeCfg.EnableContentionProfiling)
|
|
}, 0, wait.NeverStop)
|
|
}
|
|
if kubeCfg.ReadOnlyPort > 0 {
|
|
go wait.Until(func() {
|
|
k.ListenAndServeReadOnly(net.ParseIP(kubeCfg.Address), uint(kubeCfg.ReadOnlyPort))
|
|
}, 0, wait.NeverStop)
|
|
}
|
|
}
|
|
|
|
func CreateAndInitKubelet(kubeCfg *kubeletconfiginternal.KubeletConfiguration,
|
|
kubeDeps *kubelet.Dependencies,
|
|
crOptions *config.ContainerRuntimeOptions,
|
|
containerRuntime string,
|
|
runtimeCgroups string,
|
|
hostnameOverride string,
|
|
nodeIP string,
|
|
providerID string,
|
|
cloudProvider string,
|
|
certDirectory string,
|
|
rootDirectory string,
|
|
registerNode bool,
|
|
registerWithTaints []api.Taint,
|
|
allowedUnsafeSysctls []string,
|
|
containerized bool,
|
|
remoteRuntimeEndpoint string,
|
|
remoteImageEndpoint string,
|
|
experimentalMounterPath string,
|
|
experimentalKernelMemcgNotification bool,
|
|
experimentalCheckNodeCapabilitiesBeforeMount bool,
|
|
experimentalNodeAllocatableIgnoreEvictionThreshold bool,
|
|
minimumGCAge metav1.Duration,
|
|
maxPerPodContainerCount int32,
|
|
maxContainerCount int32,
|
|
masterServiceNamespace string,
|
|
registerSchedulable bool,
|
|
nonMasqueradeCIDR string,
|
|
keepTerminatedPodVolumes bool,
|
|
nodeLabels map[string]string,
|
|
seccompProfileRoot string,
|
|
bootstrapCheckpointPath string) (k kubelet.Bootstrap, err error) {
|
|
// TODO: block until all sources have delivered at least one update to the channel, or break the sync loop
|
|
// up into "per source" synchronizations
|
|
|
|
k, err = kubelet.NewMainKubelet(kubeCfg,
|
|
kubeDeps,
|
|
crOptions,
|
|
containerRuntime,
|
|
runtimeCgroups,
|
|
hostnameOverride,
|
|
nodeIP,
|
|
providerID,
|
|
cloudProvider,
|
|
certDirectory,
|
|
rootDirectory,
|
|
registerNode,
|
|
registerWithTaints,
|
|
allowedUnsafeSysctls,
|
|
containerized,
|
|
remoteRuntimeEndpoint,
|
|
remoteImageEndpoint,
|
|
experimentalMounterPath,
|
|
experimentalKernelMemcgNotification,
|
|
experimentalCheckNodeCapabilitiesBeforeMount,
|
|
experimentalNodeAllocatableIgnoreEvictionThreshold,
|
|
minimumGCAge,
|
|
maxPerPodContainerCount,
|
|
maxContainerCount,
|
|
masterServiceNamespace,
|
|
registerSchedulable,
|
|
nonMasqueradeCIDR,
|
|
keepTerminatedPodVolumes,
|
|
nodeLabels,
|
|
seccompProfileRoot,
|
|
bootstrapCheckpointPath)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
k.BirthCry()
|
|
|
|
k.StartGarbageCollection()
|
|
|
|
return k, nil
|
|
}
|
|
|
|
// parseResourceList parses the given configuration map into an API
|
|
// ResourceList or returns an error.
|
|
func parseResourceList(m map[string]string) (v1.ResourceList, error) {
|
|
if len(m) == 0 {
|
|
return nil, nil
|
|
}
|
|
rl := make(v1.ResourceList)
|
|
for k, v := range m {
|
|
switch v1.ResourceName(k) {
|
|
// CPU, memory and local storage resources are supported.
|
|
case v1.ResourceCPU, v1.ResourceMemory, v1.ResourceEphemeralStorage:
|
|
q, err := resource.ParseQuantity(v)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
if q.Sign() == -1 {
|
|
return nil, fmt.Errorf("resource quantity for %q cannot be negative: %v", k, v)
|
|
}
|
|
rl[v1.ResourceName(k)] = q
|
|
default:
|
|
return nil, fmt.Errorf("cannot reserve %q resource", k)
|
|
}
|
|
}
|
|
return rl, nil
|
|
}
|
|
|
|
// BootstrapKubeletConfigController constructs and bootstrap a configuration controller
|
|
func BootstrapKubeletConfigController(defaultConfig *kubeletconfiginternal.KubeletConfiguration,
|
|
initConfigDirFlag flag.StringFlag,
|
|
dynamicConfigDirFlag flag.StringFlag) (*kubeletconfiginternal.KubeletConfiguration, *kubeletconfig.Controller, error) {
|
|
var err error
|
|
// Alpha Dynamic Configuration Implementation; this section only loads config from disk, it does not contact the API server
|
|
// compute absolute paths based on current working dir
|
|
initConfigDir := ""
|
|
if utilfeature.DefaultFeatureGate.Enabled(features.KubeletConfigFile) && initConfigDirFlag.Provided() {
|
|
initConfigDir, err = filepath.Abs(initConfigDirFlag.Value())
|
|
if err != nil {
|
|
return nil, nil, fmt.Errorf("failed to get absolute path for --init-config-dir")
|
|
}
|
|
}
|
|
dynamicConfigDir := ""
|
|
if utilfeature.DefaultFeatureGate.Enabled(features.DynamicKubeletConfig) && dynamicConfigDirFlag.Provided() {
|
|
dynamicConfigDir, err = filepath.Abs(dynamicConfigDirFlag.Value())
|
|
if err != nil {
|
|
return nil, nil, fmt.Errorf("failed to get absolute path for --dynamic-config-dir")
|
|
}
|
|
}
|
|
|
|
// get the latest KubeletConfiguration checkpoint from disk, or load the init or default config if no valid checkpoints exist
|
|
kubeletConfigController, err := kubeletconfig.NewController(defaultConfig, initConfigDir, dynamicConfigDir)
|
|
if err != nil {
|
|
return nil, nil, fmt.Errorf("failed to construct controller, error: %v", err)
|
|
}
|
|
kubeletConfig, err := kubeletConfigController.Bootstrap()
|
|
if err != nil {
|
|
return nil, nil, fmt.Errorf("failed to determine a valid configuration, error: %v", err)
|
|
}
|
|
return kubeletConfig, kubeletConfigController, nil
|
|
}
|
|
|
|
// RunDockershim only starts the dockershim in current process. This is only used for cri validate testing purpose
|
|
// TODO(random-liu): Move this to a separate binary.
|
|
func RunDockershim(f *options.KubeletFlags, c *kubeletconfiginternal.KubeletConfiguration) error {
|
|
r := &f.ContainerRuntimeOptions
|
|
|
|
// Initialize docker client configuration.
|
|
dockerClientConfig := &dockershim.ClientConfig{
|
|
DockerEndpoint: r.DockerEndpoint,
|
|
RuntimeRequestTimeout: c.RuntimeRequestTimeout.Duration,
|
|
ImagePullProgressDeadline: r.ImagePullProgressDeadline.Duration,
|
|
}
|
|
|
|
// Initialize network plugin settings.
|
|
nh := &kubelet.NoOpLegacyHost{}
|
|
pluginSettings := dockershim.NetworkPluginSettings{
|
|
HairpinMode: kubeletconfiginternal.HairpinMode(c.HairpinMode),
|
|
NonMasqueradeCIDR: f.NonMasqueradeCIDR,
|
|
PluginName: r.NetworkPluginName,
|
|
PluginConfDir: r.CNIConfDir,
|
|
PluginBinDir: r.CNIBinDir,
|
|
MTU: int(r.NetworkPluginMTU),
|
|
LegacyRuntimeHost: nh,
|
|
}
|
|
|
|
// Initialize streaming configuration. (Not using TLS now)
|
|
streamingConfig := &streaming.Config{
|
|
// Use a relative redirect (no scheme or host).
|
|
BaseURL: &url.URL{Path: "/cri/"},
|
|
StreamIdleTimeout: c.StreamingConnectionIdleTimeout.Duration,
|
|
StreamCreationTimeout: streaming.DefaultConfig.StreamCreationTimeout,
|
|
SupportedRemoteCommandProtocols: streaming.DefaultConfig.SupportedRemoteCommandProtocols,
|
|
SupportedPortForwardProtocols: streaming.DefaultConfig.SupportedPortForwardProtocols,
|
|
}
|
|
|
|
ds, err := dockershim.NewDockerService(dockerClientConfig, r.PodSandboxImage, streamingConfig, &pluginSettings,
|
|
f.RuntimeCgroups, c.CgroupDriver, r.DockershimRootDirectory, r.DockerDisableSharedPID)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
if err := ds.Start(); err != nil {
|
|
return err
|
|
}
|
|
|
|
glog.V(2).Infof("Starting the GRPC server for the docker CRI shim.")
|
|
server := dockerremote.NewDockerServer(f.RemoteRuntimeEndpoint, ds)
|
|
if err := server.Start(); err != nil {
|
|
return err
|
|
}
|
|
|
|
// Start the streaming server
|
|
addr := net.JoinHostPort(c.Address, strconv.Itoa(int(c.Port)))
|
|
return http.ListenAndServe(addr, ds)
|
|
}
|