vendor update for CSI 0.3.0

This commit is contained in:
gman
2018-07-18 16:47:22 +02:00
parent 6f484f92fc
commit 8ea659f0d5
6810 changed files with 438061 additions and 193861 deletions

View File

@ -19,12 +19,12 @@ go_library(
"//pkg/kubelet/kubeletconfig/checkpoint:go_default_library",
"//pkg/kubelet/kubeletconfig/checkpoint/store:go_default_library",
"//pkg/kubelet/kubeletconfig/status:go_default_library",
"//pkg/kubelet/kubeletconfig/util/equal:go_default_library",
"//pkg/kubelet/kubeletconfig/util/log:go_default_library",
"//pkg/kubelet/kubeletconfig/util/panic:go_default_library",
"//pkg/util/filesystem:go_default_library",
"//vendor/github.com/golang/glog:go_default_library",
"//vendor/k8s.io/api/core/v1:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/api/equality:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/fields:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/runtime:go_default_library",

View File

@ -9,48 +9,44 @@ load(
go_test(
name = "go_default_test",
srcs = [
"checkpoint_test.go",
"configmap_test.go",
"download_test.go",
],
embed = [":go_default_library"],
deps = [
"//pkg/kubelet/apis/kubeletconfig:go_default_library",
"//pkg/kubelet/apis/kubeletconfig/scheme:go_default_library",
"//pkg/kubelet/apis/kubeletconfig/v1beta1:go_default_library",
"//pkg/kubelet/kubeletconfig/util/codec:go_default_library",
"//pkg/kubelet/kubeletconfig/util/test:go_default_library",
"//vendor/github.com/davecgh/go-spew/spew:go_default_library",
"//vendor/k8s.io/api/core/v1:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/api/equality:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/runtime:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/types:go_default_library",
"//vendor/k8s.io/client-go/kubernetes:go_default_library",
"//vendor/k8s.io/client-go/kubernetes/fake:go_default_library",
"//vendor/k8s.io/client-go/tools/cache:go_default_library",
],
)
go_library(
name = "go_default_library",
srcs = [
"checkpoint.go",
"configmap.go",
"download.go",
],
importpath = "k8s.io/kubernetes/pkg/kubelet/kubeletconfig/checkpoint",
deps = [
"//pkg/api/legacyscheme:go_default_library",
"//pkg/kubelet/apis/kubeletconfig:go_default_library",
"//pkg/kubelet/apis/kubeletconfig/scheme:go_default_library",
"//pkg/kubelet/apis/kubeletconfig/v1beta1:go_default_library",
"//pkg/kubelet/kubeletconfig/status:go_default_library",
"//pkg/kubelet/kubeletconfig/util/codec:go_default_library",
"//pkg/kubelet/kubeletconfig/util/log:go_default_library",
"//vendor/k8s.io/api/core/v1:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/api/equality:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/fields:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/runtime:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/runtime/serializer:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/watch:go_default_library",
"//vendor/k8s.io/client-go/kubernetes:go_default_library",
"//vendor/k8s.io/client-go/tools/cache:go_default_library",
],
)

View File

@ -1,72 +0,0 @@
/*
Copyright 2017 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package checkpoint
import (
"fmt"
apiv1 "k8s.io/api/core/v1"
apiequality "k8s.io/apimachinery/pkg/api/equality"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/kubernetes/pkg/api/legacyscheme"
"k8s.io/kubernetes/pkg/kubelet/apis/kubeletconfig"
)
// Checkpoint represents a local copy of a config source (payload) object
type Checkpoint interface {
// UID returns the UID of the config source object behind the Checkpoint
UID() string
// Parse extracts the KubeletConfiguration from the checkpoint, applies defaults, and converts to the internal type
Parse() (*kubeletconfig.KubeletConfiguration, error)
// Encode returns a []byte representation of the config source object behind the Checkpoint
Encode() ([]byte, error)
// object returns the underlying checkpointed object. If you want to compare sources for equality, use EqualCheckpoints,
// which compares the underlying checkpointed objects for semantic API equality.
object() interface{}
}
// DecodeCheckpoint is a helper for using the apimachinery to decode serialized checkpoints
func DecodeCheckpoint(data []byte) (Checkpoint, error) {
// decode the checkpoint
obj, err := runtime.Decode(legacyscheme.Codecs.UniversalDecoder(), data)
if err != nil {
return nil, fmt.Errorf("failed to decode, error: %v", err)
}
// TODO(mtaufen): for now we assume we are trying to load a ConfigMap checkpoint, may need to extend this if we allow other checkpoint types
// convert it to the external ConfigMap type, so we're consistently working with the external type outside of the on-disk representation
cm := &apiv1.ConfigMap{}
err = legacyscheme.Scheme.Convert(obj, cm, nil)
if err != nil {
return nil, fmt.Errorf("failed to convert decoded object into a v1 ConfigMap, error: %v", err)
}
return NewConfigMapCheckpoint(cm)
}
// EqualCheckpoints compares two Checkpoints for equality, if their underlying objects are equal, so are the Checkpoints
func EqualCheckpoints(a, b Checkpoint) bool {
if a != nil && b != nil {
return apiequality.Semantic.DeepEqual(a.object(), b.object())
}
if a == nil && b == nil {
return true
}
return false
}

View File

@ -1,89 +0,0 @@
/*
Copyright 2017 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package checkpoint
import (
"testing"
"github.com/davecgh/go-spew/spew"
apiv1 "k8s.io/api/core/v1"
apiequality "k8s.io/apimachinery/pkg/api/equality"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
utilcodec "k8s.io/kubernetes/pkg/kubelet/kubeletconfig/util/codec"
utiltest "k8s.io/kubernetes/pkg/kubelet/kubeletconfig/util/test"
)
func TestDecodeCheckpoint(t *testing.T) {
// generate correct Checkpoint for v1/ConfigMap test case
cm, err := NewConfigMapCheckpoint(&apiv1.ConfigMap{ObjectMeta: metav1.ObjectMeta{UID: types.UID("uid")}})
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
// generate unsupported object encoding for unsupported type test case
unsupported := newUnsupportedEncoded(t)
// test cases
cases := []struct {
desc string
data []byte
expect Checkpoint // expect a deeply-equal Checkpoint to be returned from Decode
err string // expect error to contain this substring
}{
// v1/ConfigMap
{"v1/ConfigMap", []byte(`{"apiVersion": "v1","kind": "ConfigMap","metadata": {"uid": "uid"}}`), cm, ""},
// malformed
{"malformed", []byte("malformed"), nil, "failed to decode"},
// no UID
{"no UID", []byte(`{"apiVersion": "v1","kind": "ConfigMap"}`), nil, "ConfigMap must have a UID"},
// well-formed, but unsupported type
{"well-formed, but unsupported encoded type", unsupported, nil, "failed to convert"},
}
for _, c := range cases {
cpt, err := DecodeCheckpoint(c.data)
if utiltest.SkipRest(t, c.desc, err, c.err) {
continue
}
// Unfortunately reflect.DeepEqual treats nil data structures as != empty data structures, so
// we have to settle for semantic equality of the underlying checkpointed API objects.
// If additional fields are added to the object that implements the Checkpoint interface,
// they should be added to a named sub-object to facilitate a DeepEquals comparison
// of the extra fields.
// decoded checkpoint should match expected checkpoint
if !apiequality.Semantic.DeepEqual(cpt.object(), c.expect.object()) {
t.Errorf("case %q, expect checkpoint %s but got %s", c.desc, spew.Sdump(c.expect), spew.Sdump(cpt))
}
}
}
// newUnsupportedEncoded returns an encoding of an object that does not have a Checkpoint implementation
func newUnsupportedEncoded(t *testing.T) []byte {
encoder, err := utilcodec.NewJSONEncoder(apiv1.GroupName)
if err != nil {
t.Fatalf("could not create an encoder, error: %v", err)
}
unsupported := &apiv1.Node{}
data, err := runtime.Encode(encoder, unsupported)
if err != nil {
t.Fatalf("could not encode object, error: %v", err)
}
return data
}

View File

@ -20,75 +20,42 @@ import (
"fmt"
apiv1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/serializer"
"k8s.io/kubernetes/pkg/kubelet/apis/kubeletconfig"
kubeletscheme "k8s.io/kubernetes/pkg/kubelet/apis/kubeletconfig/scheme"
utilcodec "k8s.io/kubernetes/pkg/kubelet/kubeletconfig/util/codec"
)
const configMapConfigKey = "kubelet"
// configMapCheckpoint implements Checkpoint, backed by a v1/ConfigMap config source object
type configMapCheckpoint struct {
kubeletCodecs *serializer.CodecFactory // codecs for the KubeletConfiguration
configMap *apiv1.ConfigMap
// configMapPayload implements Payload, backed by a v1/ConfigMap config source object
type configMapPayload struct {
cm *apiv1.ConfigMap
}
// NewConfigMapCheckpoint returns a Checkpoint backed by `cm`. `cm` must be non-nil
// and have a non-empty ObjectMeta.UID, or an error will be returned.
func NewConfigMapCheckpoint(cm *apiv1.ConfigMap) (Checkpoint, error) {
var _ Payload = (*configMapPayload)(nil)
// NewConfigMapPayload constructs a Payload backed by a ConfigMap, which must have a non-empty UID
func NewConfigMapPayload(cm *apiv1.ConfigMap) (Payload, error) {
if cm == nil {
return nil, fmt.Errorf("ConfigMap must be non-nil to be treated as a Checkpoint")
} else if len(cm.ObjectMeta.UID) == 0 {
return nil, fmt.Errorf("ConfigMap must have a UID to be treated as a Checkpoint")
return nil, fmt.Errorf("ConfigMap must be non-nil")
} else if cm.ObjectMeta.UID == "" {
return nil, fmt.Errorf("ConfigMap must have a non-empty UID")
} else if cm.ObjectMeta.ResourceVersion == "" {
return nil, fmt.Errorf("ConfigMap must have a non-empty ResourceVersion")
}
_, kubeletCodecs, err := kubeletscheme.NewSchemeAndCodecs()
if err != nil {
return nil, err
}
return &configMapCheckpoint{kubeletCodecs, cm}, nil
return &configMapPayload{cm}, nil
}
// UID returns the UID of a configMapCheckpoint
func (c *configMapCheckpoint) UID() string {
return string(c.configMap.UID)
func (p *configMapPayload) UID() string {
return string(p.cm.UID)
}
// Parse extracts the KubeletConfiguration from v1/ConfigMap checkpoints, applies defaults, and converts to the internal type
func (c *configMapCheckpoint) Parse() (*kubeletconfig.KubeletConfiguration, error) {
const emptyCfgErr = "config was empty, but some parameters are required"
if len(c.configMap.Data) == 0 {
return nil, fmt.Errorf(emptyCfgErr)
}
config, ok := c.configMap.Data[configMapConfigKey]
if !ok {
return nil, fmt.Errorf("key %q not found in ConfigMap", configMapConfigKey)
} else if len(config) == 0 {
return nil, fmt.Errorf(emptyCfgErr)
}
return utilcodec.DecodeKubeletConfiguration(c.kubeletCodecs, []byte(config))
func (p *configMapPayload) ResourceVersion() string {
return p.cm.ResourceVersion
}
// Encode encodes a configMapCheckpoint
func (c *configMapCheckpoint) Encode() ([]byte, error) {
cm := c.configMap
encoder, err := utilcodec.NewJSONEncoder(apiv1.GroupName)
if err != nil {
return nil, err
}
data, err := runtime.Encode(encoder, cm)
if err != nil {
return nil, err
}
return data, nil
func (p *configMapPayload) Files() map[string]string {
return p.cm.Data
}
func (c *configMapCheckpoint) object() interface{} {
return c.configMap
func (p *configMapPayload) object() interface{} {
return p.cm
}

View File

@ -17,7 +17,7 @@ limitations under the License.
package checkpoint
import (
"fmt"
"reflect"
"testing"
"github.com/davecgh/go-spew/spew"
@ -25,213 +25,124 @@ import (
apiv1 "k8s.io/api/core/v1"
apiequality "k8s.io/apimachinery/pkg/api/equality"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/kubernetes/pkg/kubelet/apis/kubeletconfig"
kubeletscheme "k8s.io/kubernetes/pkg/kubelet/apis/kubeletconfig/scheme"
kubeletconfigv1beta1 "k8s.io/kubernetes/pkg/kubelet/apis/kubeletconfig/v1beta1"
utiltest "k8s.io/kubernetes/pkg/kubelet/kubeletconfig/util/test"
)
func TestNewConfigMapCheckpoint(t *testing.T) {
func TestNewConfigMapPayload(t *testing.T) {
cases := []struct {
desc string
cm *apiv1.ConfigMap
err string
}{
{"nil v1/ConfigMap", nil, "must be non-nil"},
{"empty v1/ConfigMap", &apiv1.ConfigMap{}, "must have a UID"},
{"populated v1/ConfigMap",
&apiv1.ConfigMap{
{
desc: "nil",
cm: nil,
err: "ConfigMap must be non-nil",
},
{
desc: "missing uid",
cm: &apiv1.ConfigMap{
ObjectMeta: metav1.ObjectMeta{
Name: "name",
ResourceVersion: "rv",
},
},
err: "ConfigMap must have a non-empty UID",
},
{
desc: "missing resourceVersion",
cm: &apiv1.ConfigMap{
ObjectMeta: metav1.ObjectMeta{
Name: "name",
UID: types.UID("uid"),
UID: "uid",
},
},
err: "ConfigMap must have a non-empty ResourceVersion",
},
{
desc: "populated v1/ConfigMap",
cm: &apiv1.ConfigMap{
ObjectMeta: metav1.ObjectMeta{
Name: "name",
UID: "uid",
ResourceVersion: "rv",
},
Data: map[string]string{
"key1": "value1",
"key2": "value2",
},
}, ""},
}
for _, c := range cases {
cpt, err := NewConfigMapCheckpoint(c.cm)
if utiltest.SkipRest(t, c.desc, err, c.err) {
continue
}
// underlying object should match the object passed in
if !apiequality.Semantic.DeepEqual(cpt.object(), c.cm) {
t.Errorf("case %q, expect Checkpoint %s but got %s", c.desc, spew.Sdump(c.cm), spew.Sdump(cpt))
}
}
}
func TestConfigMapCheckpointUID(t *testing.T) {
_, kubeletCodecs, err := kubeletscheme.NewSchemeAndCodecs()
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
cases := []string{"", "uid", "376dfb73-56db-11e7-a01e-42010a800002"}
for _, uidIn := range cases {
cpt := &configMapCheckpoint{
kubeletCodecs,
&apiv1.ConfigMap{
ObjectMeta: metav1.ObjectMeta{UID: types.UID(uidIn)},
},
}
// UID method should return the correct value of the UID
uidOut := cpt.UID()
if uidIn != uidOut {
t.Errorf("expect UID() to return %q, but got %q", uidIn, uidOut)
}
err: "",
},
}
for _, c := range cases {
t.Run(c.desc, func(t *testing.T) {
payload, err := NewConfigMapPayload(c.cm)
utiltest.ExpectError(t, err, c.err)
if err != nil {
return
}
// underlying object should match the object passed in
if !apiequality.Semantic.DeepEqual(c.cm, payload.object()) {
t.Errorf("expect %s but got %s", spew.Sdump(c.cm), spew.Sdump(payload))
}
})
}
}
func TestConfigMapCheckpointParse(t *testing.T) {
kubeletScheme, kubeletCodecs, err := kubeletscheme.NewSchemeAndCodecs()
func TestConfigMapPayloadUID(t *testing.T) {
const expect = "uid"
payload, err := NewConfigMapPayload(&apiv1.ConfigMap{ObjectMeta: metav1.ObjectMeta{UID: expect, ResourceVersion: "rv"}})
if err != nil {
t.Fatalf("unexpected error: %v", err)
t.Fatalf("error constructing payload: %v", err)
}
uid := payload.UID()
if expect != uid {
t.Errorf("expect %q, but got %q", expect, uid)
}
}
// get the built-in default configuration
external := &kubeletconfigv1beta1.KubeletConfiguration{}
kubeletScheme.Default(external)
defaultConfig := &kubeletconfig.KubeletConfiguration{}
err = kubeletScheme.Convert(external, defaultConfig, nil)
func TestConfigMapPayloadResourceVersion(t *testing.T) {
const expect = "rv"
payload, err := NewConfigMapPayload(&apiv1.ConfigMap{ObjectMeta: metav1.ObjectMeta{UID: "uid", ResourceVersion: expect}})
if err != nil {
t.Fatalf("unexpected error: %v", err)
t.Fatalf("error constructing payload: %v", err)
}
resourceVersion := payload.ResourceVersion()
if expect != resourceVersion {
t.Errorf("expect %q, but got %q", expect, resourceVersion)
}
}
func TestConfigMapPayloadFiles(t *testing.T) {
cases := []struct {
desc string
cm *apiv1.ConfigMap
expect *kubeletconfig.KubeletConfiguration
err string
data map[string]string
expect map[string]string
}{
{"empty data", &apiv1.ConfigMap{}, nil, "config was empty"},
// missing kubelet key
{"missing kubelet key", &apiv1.ConfigMap{Data: map[string]string{
"bogus": "stuff"}}, nil, fmt.Sprintf("key %q not found", configMapConfigKey)},
// invalid format
{"invalid yaml", &apiv1.ConfigMap{Data: map[string]string{
"kubelet": "*"}}, nil, "failed to decode"},
{"invalid json", &apiv1.ConfigMap{Data: map[string]string{
"kubelet": "{*"}}, nil, "failed to decode"},
// invalid object
{"missing kind", &apiv1.ConfigMap{Data: map[string]string{
"kubelet": `{"apiVersion":"kubelet.config.k8s.io/v1beta1"}`}}, nil, "failed to decode"},
{"missing version", &apiv1.ConfigMap{Data: map[string]string{
"kubelet": `{"kind":"KubeletConfiguration"}`}}, nil, "failed to decode"},
{"unregistered kind", &apiv1.ConfigMap{Data: map[string]string{
"kubelet": `{"kind":"BogusKind","apiVersion":"kubelet.config.k8s.io/v1beta1"}`}}, nil, "failed to decode"},
{"unregistered version", &apiv1.ConfigMap{Data: map[string]string{
"kubelet": `{"kind":"KubeletConfiguration","apiVersion":"bogusversion"}`}}, nil, "failed to decode"},
// empty object with correct kind and version should result in the defaults for that kind and version
{"default from yaml", &apiv1.ConfigMap{Data: map[string]string{
"kubelet": `kind: KubeletConfiguration
apiVersion: kubelet.config.k8s.io/v1beta1`}}, defaultConfig, ""},
{"default from json", &apiv1.ConfigMap{Data: map[string]string{
"kubelet": `{"kind":"KubeletConfiguration","apiVersion":"kubelet.config.k8s.io/v1beta1"}`}}, defaultConfig, ""},
{"nil", nil, nil},
{"empty", map[string]string{}, map[string]string{}},
{"populated",
map[string]string{
"foo": "1",
"bar": "2",
},
map[string]string{
"foo": "1",
"bar": "2",
}},
}
for _, c := range cases {
cpt := &configMapCheckpoint{kubeletCodecs, c.cm}
kc, err := cpt.Parse()
if utiltest.SkipRest(t, c.desc, err, c.err) {
continue
}
// we expect the parsed configuration to match what we described in the ConfigMap
if !apiequality.Semantic.DeepEqual(c.expect, kc) {
t.Errorf("case %q, expect config %s but got %s", c.desc, spew.Sdump(c.expect), spew.Sdump(kc))
}
}
}
func TestConfigMapCheckpointEncode(t *testing.T) {
_, kubeletCodecs, err := kubeletscheme.NewSchemeAndCodecs()
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
// only one case, based on output from the existing encoder, and since
// this is hard to test (key order isn't guaranteed), we should probably
// just stick to this test case and mostly rely on the round-trip test.
cases := []struct {
desc string
cpt *configMapCheckpoint
expect string
}{
// we expect Checkpoints to be encoded as a json representation of the underlying API object
{"one-key",
&configMapCheckpoint{kubeletCodecs, &apiv1.ConfigMap{
ObjectMeta: metav1.ObjectMeta{Name: "one-key"},
Data: map[string]string{"one": ""}}},
`{"kind":"ConfigMap","apiVersion":"v1","metadata":{"name":"one-key","creationTimestamp":null},"data":{"one":""}}
`},
}
for _, c := range cases {
data, err := c.cpt.Encode()
// we don't expect any errors from encoding
if utiltest.SkipRest(t, c.desc, err, "") {
continue
}
if string(data) != c.expect {
t.Errorf("case %q, expect encoding %q but got %q", c.desc, c.expect, string(data))
}
}
}
func TestConfigMapCheckpointRoundTrip(t *testing.T) {
_, kubeletCodecs, err := kubeletscheme.NewSchemeAndCodecs()
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
cases := []struct {
desc string
cpt *configMapCheckpoint
decodeErr string
}{
// empty data
{"empty data",
&configMapCheckpoint{kubeletCodecs, &apiv1.ConfigMap{
ObjectMeta: metav1.ObjectMeta{
Name: "empty-data-sha256-e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855",
UID: "uid",
},
Data: map[string]string{}}},
""},
// two keys
{"two keys",
&configMapCheckpoint{kubeletCodecs, &apiv1.ConfigMap{
ObjectMeta: metav1.ObjectMeta{
Name: "two-keys-sha256-2bff03d6249c8a9dc9a1436d087c124741361ccfac6615b81b67afcff5c42431",
UID: "uid",
},
Data: map[string]string{"one": "", "two": "2"}}},
""},
// missing uid
{"missing uid",
&configMapCheckpoint{kubeletCodecs, &apiv1.ConfigMap{
ObjectMeta: metav1.ObjectMeta{
Name: "two-keys-sha256-2bff03d6249c8a9dc9a1436d087c124741361ccfac6615b81b67afcff5c42431",
UID: "",
},
Data: map[string]string{"one": "", "two": "2"}}},
"must have a UID"},
}
for _, c := range cases {
// we don't expect any errors from encoding
data, err := c.cpt.Encode()
if utiltest.SkipRest(t, c.desc, err, "") {
continue
}
after, err := DecodeCheckpoint(data)
if utiltest.SkipRest(t, c.desc, err, c.decodeErr) {
continue
}
if !apiequality.Semantic.DeepEqual(c.cpt.object(), after.object()) {
t.Errorf("case %q, expect round-trip result %s but got %s", c.desc, spew.Sdump(c.cpt), spew.Sdump(after))
}
t.Run(c.desc, func(t *testing.T) {
payload, err := NewConfigMapPayload(&apiv1.ConfigMap{ObjectMeta: metav1.ObjectMeta{UID: "uid", ResourceVersion: "rv"}, Data: c.data})
if err != nil {
t.Fatalf("error constructing payload: %v", err)
}
files := payload.Files()
if !reflect.DeepEqual(c.expect, files) {
t.Errorf("expected %v, but got %v", c.expect, files)
}
})
}
}

View File

@ -18,89 +18,130 @@ package checkpoint
import (
"fmt"
"math/rand"
"time"
apiv1 "k8s.io/api/core/v1"
apiequality "k8s.io/apimachinery/pkg/api/equality"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/fields"
"k8s.io/apimachinery/pkg/runtime"
kuberuntime "k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/watch"
clientset "k8s.io/client-go/kubernetes"
"k8s.io/kubernetes/pkg/api/legacyscheme"
"k8s.io/client-go/tools/cache"
kubeletconfiginternal "k8s.io/kubernetes/pkg/kubelet/apis/kubeletconfig"
"k8s.io/kubernetes/pkg/kubelet/apis/kubeletconfig/scheme"
kubeletconfigv1beta1 "k8s.io/kubernetes/pkg/kubelet/apis/kubeletconfig/v1beta1"
"k8s.io/kubernetes/pkg/kubelet/kubeletconfig/status"
utilcodec "k8s.io/kubernetes/pkg/kubelet/kubeletconfig/util/codec"
utillog "k8s.io/kubernetes/pkg/kubelet/kubeletconfig/util/log"
)
// RemoteConfigSource represents a remote config source object that can be downloaded as a Checkpoint
type RemoteConfigSource interface {
// UID returns the UID of the remote config source object
// Payload represents a local copy of a config source (payload) object
type Payload interface {
// UID returns a globally unique (space and time) identifier for the payload.
// The return value is guaranteed non-empty.
UID() string
// APIPath returns the API path to the remote resource, e.g. its SelfLink
APIPath() string
// Download downloads the remote config source object returns a Checkpoint backed by the object,
// or a sanitized failure reason and error if the download fails
Download(client clientset.Interface) (Checkpoint, string, error)
// Encode returns a []byte representation of the object behind the RemoteConfigSource
Encode() ([]byte, error)
// object returns the underlying source object. If you want to compare sources for equality, use EqualRemoteConfigSources,
// which compares the underlying source objects for semantic API equality.
// ResourceVersion returns a resource version for the payload.
// The return value is guaranteed non-empty.
ResourceVersion() string
// Files returns a map of filenames to file contents.
Files() map[string]string
// object returns the underlying checkpointed object.
object() interface{}
}
// NewRemoteConfigSource constructs a RemoteConfigSource from a v1/NodeConfigSource object, or returns
// a sanitized failure reason and an error if the `source` is blatantly invalid.
// RemoteConfigSource represents a remote config source object that can be downloaded as a Checkpoint
type RemoteConfigSource interface {
// KubeletFilename returns the name of the Kubelet config file as it should appear in the keys of Payload.Files()
KubeletFilename() string
// APIPath returns the API path to the remote resource, e.g. its SelfLink
APIPath() string
// UID returns the globally unique identifier for the most recently downloaded payload targeted by the source.
UID() string
// ResourceVersion returns the resource version of the most recently downloaded payload targeted by the source.
ResourceVersion() string
// Download downloads the remote config source's target object and returns a Payload backed by the object,
// or a sanitized failure reason and error if the download fails.
// Download takes an optional store as an argument. If provided, Download will check this store for the
// target object prior to contacting the API server.
// Download updates the local UID and ResourceVersion tracked by this source, based on the downloaded payload.
Download(client clientset.Interface, store cache.Store) (Payload, string, error)
// Informer returns an informer that can be used to detect changes to the remote config source
Informer(client clientset.Interface, handler cache.ResourceEventHandlerFuncs) cache.SharedInformer
// Encode returns a []byte representation of the object behind the RemoteConfigSource
Encode() ([]byte, error)
// NodeConfigSource returns a copy of the underlying apiv1.NodeConfigSource object.
// All RemoteConfigSources are expected to be backed by a NodeConfigSource,
// though the convenience methods on the interface will target the source
// type that was detected in a call to NewRemoteConfigSource.
NodeConfigSource() *apiv1.NodeConfigSource
}
// NewRemoteConfigSource constructs a RemoteConfigSource from a v1/NodeConfigSource object
// You should only call this with a non-nil config source.
// Note that the API server validates Node.Spec.ConfigSource.
func NewRemoteConfigSource(source *apiv1.NodeConfigSource) (RemoteConfigSource, string, error) {
// exactly one subfield of the config source must be non-nil, toady ConfigMapRef is the only reference
if source.ConfigMapRef == nil {
return nil, status.FailSyncReasonAllNilSubfields, fmt.Errorf("%s, NodeConfigSource was: %#v", status.FailSyncReasonAllNilSubfields, source)
// NOTE: Even though the API server validates the config, we check whether all *known* fields are
// nil here, so that if a new API server allows a new config source type, old clients can send
// an error message rather than crashing due to a nil pointer dereference.
// Exactly one reference subfield of the config source must be non-nil.
// Currently ConfigMap is the only reference subfield.
if source.ConfigMap == nil {
return nil, status.AllNilSubfieldsError, fmt.Errorf("%s, NodeConfigSource was: %#v", status.AllNilSubfieldsError, source)
}
// validate the NodeConfigSource:
// at this point we know we're using the ConfigMapRef subfield
ref := source.ConfigMapRef
// name, namespace, and UID must all be non-empty for ConfigMapRef
if ref.Name == "" || ref.Namespace == "" || string(ref.UID) == "" {
return nil, status.FailSyncReasonPartialObjectReference, fmt.Errorf("%s, ObjectReference was: %#v", status.FailSyncReasonPartialObjectReference, ref)
}
return &remoteConfigMap{source}, "", nil
}
// DecodeRemoteConfigSource is a helper for using the apimachinery to decode serialized RemoteConfigSources;
// e.g. the objects stored in the .cur and .lkg files by checkpoint/store/fsstore.go
// e.g. the metadata stored by checkpoint/store/fsstore.go
func DecodeRemoteConfigSource(data []byte) (RemoteConfigSource, error) {
// decode the remote config source
obj, err := runtime.Decode(legacyscheme.Codecs.UniversalDecoder(), data)
_, codecs, err := scheme.NewSchemeAndCodecs()
if err != nil {
return nil, err
}
obj, err := runtime.Decode(codecs.UniversalDecoder(), data)
if err != nil {
return nil, fmt.Errorf("failed to decode, error: %v", err)
}
// for now we assume we are trying to load an apiv1.NodeConfigSource,
// for now we assume we are trying to load an kubeletconfigv1beta1.SerializedNodeConfigSource,
// this may need to be extended if e.g. a new version of the api is born
// convert it to the external NodeConfigSource type, so we're consistently working with the external type outside of the on-disk representation
cs := &apiv1.NodeConfigSource{}
err = legacyscheme.Scheme.Convert(obj, cs, nil)
if err != nil {
return nil, fmt.Errorf("failed to convert decoded object into a v1 NodeConfigSource, error: %v", err)
cs, ok := obj.(*kubeletconfiginternal.SerializedNodeConfigSource)
if !ok {
return nil, fmt.Errorf("failed to cast decoded remote config source to *k8s.io/kubernetes/pkg/kubelet/apis/kubeletconfig.SerializedNodeConfigSource")
}
source, _, err := NewRemoteConfigSource(cs)
return source, err
// we use the v1.NodeConfigSource type on internal and external, so no need to convert to external here
source, _, err := NewRemoteConfigSource(&cs.Source)
if err != nil {
return nil, err
}
return source, nil
}
// EqualRemoteConfigSources is a helper for comparing remote config sources by
// comparing the underlying API objects for semantic equality.
func EqualRemoteConfigSources(a, b RemoteConfigSource) bool {
if a != nil && b != nil {
return apiequality.Semantic.DeepEqual(a.object(), b.object())
return apiequality.Semantic.DeepEqual(a.NodeConfigSource(), b.NodeConfigSource())
}
if a == nil && b == nil {
return true
}
return false
return a == b
}
// remoteConfigMap implements RemoteConfigSource for v1/ConfigMap config sources
@ -108,58 +149,125 @@ type remoteConfigMap struct {
source *apiv1.NodeConfigSource
}
func (r *remoteConfigMap) UID() string {
return string(r.source.ConfigMapRef.UID)
var _ RemoteConfigSource = (*remoteConfigMap)(nil)
func (r *remoteConfigMap) KubeletFilename() string {
return r.source.ConfigMap.KubeletConfigKey
}
const configMapAPIPathFmt = "/api/v1/namespaces/%s/configmaps/%s"
func (r *remoteConfigMap) APIPath() string {
ref := r.source.ConfigMapRef
ref := r.source.ConfigMap
return fmt.Sprintf(configMapAPIPathFmt, ref.Namespace, ref.Name)
}
func (r *remoteConfigMap) Download(client clientset.Interface) (Checkpoint, string, error) {
var reason string
uid := string(r.source.ConfigMapRef.UID)
func (r *remoteConfigMap) UID() string {
return string(r.source.ConfigMap.UID)
}
utillog.Infof("attempting to download ConfigMap with UID %q", uid)
func (r *remoteConfigMap) ResourceVersion() string {
return r.source.ConfigMap.ResourceVersion
}
// get the ConfigMap via namespace/name, there doesn't seem to be a way to get it by UID
cm, err := client.CoreV1().ConfigMaps(r.source.ConfigMapRef.Namespace).Get(r.source.ConfigMapRef.Name, metav1.GetOptions{})
func (r *remoteConfigMap) Download(client clientset.Interface, store cache.Store) (Payload, string, error) {
var (
cm *apiv1.ConfigMap
err error
)
// check the in-memory store for the ConfigMap, so we can skip unnecessary downloads
if store != nil {
utillog.Infof("checking in-memory store for %s", r.APIPath())
cm, err = getConfigMapFromStore(store, r.source.ConfigMap.Namespace, r.source.ConfigMap.Name)
if err != nil {
// just log the error, we'll attempt a direct download instead
utillog.Errorf("failed to check in-memory store for %s, error: %v", r.APIPath(), err)
} else if cm != nil {
utillog.Infof("found %s in in-memory store, UID: %s, ResourceVersion: %s", r.APIPath(), cm.UID, cm.ResourceVersion)
} else {
utillog.Infof("did not find %s in in-memory store", r.APIPath())
}
}
// if we didn't find the ConfigMap in the in-memory store, download it from the API server
if cm == nil {
utillog.Infof("attempting to download %s", r.APIPath())
cm, err = client.CoreV1().ConfigMaps(r.source.ConfigMap.Namespace).Get(r.source.ConfigMap.Name, metav1.GetOptions{})
if err != nil {
return nil, status.DownloadError, fmt.Errorf("%s, error: %v", status.DownloadError, err)
}
utillog.Infof("successfully downloaded %s, UID: %s, ResourceVersion: %s", r.APIPath(), cm.UID, cm.ResourceVersion)
} // Assert: Now we have a non-nil ConfigMap
// construct Payload from the ConfigMap
payload, err := NewConfigMapPayload(cm)
if err != nil {
reason = fmt.Sprintf(status.FailSyncReasonDownloadFmt, r.APIPath())
return nil, reason, fmt.Errorf("%s, error: %v", reason, err)
// We only expect an error here if ObjectMeta is lacking UID or ResourceVersion. This should
// never happen on objects in the informer's store, or objects downloaded from the API server
// directly, so we report InternalError.
return nil, status.InternalError, fmt.Errorf("%s, error: %v", status.InternalError, err)
}
// update internal UID and ResourceVersion based on latest ConfigMap
r.source.ConfigMap.UID = cm.UID
r.source.ConfigMap.ResourceVersion = cm.ResourceVersion
return payload, "", nil
}
func (r *remoteConfigMap) Informer(client clientset.Interface, handler cache.ResourceEventHandlerFuncs) cache.SharedInformer {
// select ConfigMap by name
fieldselector := fields.OneTermEqualSelector("metadata.name", r.source.ConfigMap.Name)
// add some randomness to resync period, which can help avoid controllers falling into lock-step
minResyncPeriod := 15 * time.Minute
factor := rand.Float64() + 1
resyncPeriod := time.Duration(float64(minResyncPeriod.Nanoseconds()) * factor)
lw := &cache.ListWatch{
ListFunc: func(options metav1.ListOptions) (kuberuntime.Object, error) {
return client.CoreV1().ConfigMaps(r.source.ConfigMap.Namespace).List(metav1.ListOptions{
FieldSelector: fieldselector.String(),
})
},
WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
return client.CoreV1().ConfigMaps(r.source.ConfigMap.Namespace).Watch(metav1.ListOptions{
FieldSelector: fieldselector.String(),
ResourceVersion: options.ResourceVersion,
})
},
}
// ensure that UID matches the UID on the reference, the ObjectReference must be unambiguous
if r.source.ConfigMapRef.UID != cm.UID {
reason = fmt.Sprintf(status.FailSyncReasonUIDMismatchFmt, r.source.ConfigMapRef.UID, r.APIPath(), cm.UID)
return nil, reason, fmt.Errorf(reason)
}
informer := cache.NewSharedInformer(lw, &apiv1.ConfigMap{}, resyncPeriod)
informer.AddEventHandler(handler)
checkpoint, err := NewConfigMapCheckpoint(cm)
if err != nil {
reason = fmt.Sprintf("invalid downloaded object")
return nil, reason, fmt.Errorf("%s, error: %v", reason, err)
}
utillog.Infof("successfully downloaded ConfigMap with UID %q", uid)
return checkpoint, "", nil
return informer
}
func (r *remoteConfigMap) Encode() ([]byte, error) {
encoder, err := utilcodec.NewJSONEncoder(apiv1.GroupName)
encoder, err := utilcodec.NewKubeletconfigYAMLEncoder(kubeletconfigv1beta1.SchemeGroupVersion)
if err != nil {
return nil, err
}
data, err := runtime.Encode(encoder, r.source)
data, err := runtime.Encode(encoder, &kubeletconfigv1beta1.SerializedNodeConfigSource{Source: *r.source})
if err != nil {
return nil, err
}
return data, nil
}
func (r *remoteConfigMap) object() interface{} {
return r.source
func (r *remoteConfigMap) NodeConfigSource() *apiv1.NodeConfigSource {
return r.source.DeepCopy()
}
func getConfigMapFromStore(store cache.Store, namespace, name string) (*apiv1.ConfigMap, error) {
key := fmt.Sprintf("%s/%s", namespace, name)
obj, ok, err := store.GetByKey(key)
if err != nil || !ok {
return nil, err
}
cm, ok := obj.(*apiv1.ConfigMap)
if !ok {
err := fmt.Errorf("failed to cast object %s from informer's store to ConfigMap", key)
utillog.Errorf(err.Error())
return nil, err
}
return cm, nil
}

View File

@ -25,9 +25,9 @@ import (
apiv1 "k8s.io/api/core/v1"
apiequality "k8s.io/apimachinery/pkg/api/equality"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
clientset "k8s.io/client-go/kubernetes"
fakeclient "k8s.io/client-go/kubernetes/fake"
kubeletscheme "k8s.io/kubernetes/pkg/kubelet/apis/kubeletconfig/scheme"
"k8s.io/client-go/tools/cache"
utiltest "k8s.io/kubernetes/pkg/kubelet/kubeletconfig/util/test"
)
@ -38,118 +38,205 @@ func TestNewRemoteConfigSource(t *testing.T) {
expect RemoteConfigSource
err string
}{
// all NodeConfigSource subfields nil
{"all NodeConfigSource subfields nil",
&apiv1.NodeConfigSource{}, nil, "exactly one subfield must be non-nil"},
{"ConfigMapRef: empty name, namespace, and UID",
&apiv1.NodeConfigSource{ConfigMapRef: &apiv1.ObjectReference{}}, nil, "invalid ObjectReference"},
// ConfigMapRef: empty name and namespace
{"ConfigMapRef: empty name and namespace",
&apiv1.NodeConfigSource{ConfigMapRef: &apiv1.ObjectReference{UID: "uid"}}, nil, "invalid ObjectReference"},
// ConfigMapRef: empty name and UID
{"ConfigMapRef: empty name and UID",
&apiv1.NodeConfigSource{ConfigMapRef: &apiv1.ObjectReference{Namespace: "namespace"}}, nil, "invalid ObjectReference"},
// ConfigMapRef: empty namespace and UID
{"ConfigMapRef: empty namespace and UID",
&apiv1.NodeConfigSource{ConfigMapRef: &apiv1.ObjectReference{Name: "name"}}, nil, "invalid ObjectReference"},
// ConfigMapRef: empty UID
{"ConfigMapRef: empty namespace and UID",
&apiv1.NodeConfigSource{ConfigMapRef: &apiv1.ObjectReference{Name: "name", Namespace: "namespace"}}, nil, "invalid ObjectReference"},
// ConfigMapRef: empty namespace
{"ConfigMapRef: empty namespace and UID",
&apiv1.NodeConfigSource{ConfigMapRef: &apiv1.ObjectReference{Name: "name", UID: "uid"}}, nil, "invalid ObjectReference"},
// ConfigMapRef: empty name
{"ConfigMapRef: empty namespace and UID",
&apiv1.NodeConfigSource{ConfigMapRef: &apiv1.ObjectReference{Namespace: "namespace", UID: "uid"}}, nil, "invalid ObjectReference"},
// ConfigMapRef: valid reference
{"ConfigMapRef: valid reference",
&apiv1.NodeConfigSource{ConfigMapRef: &apiv1.ObjectReference{Name: "name", Namespace: "namespace", UID: "uid"}},
&remoteConfigMap{&apiv1.NodeConfigSource{ConfigMapRef: &apiv1.ObjectReference{Name: "name", Namespace: "namespace", UID: "uid"}}}, ""},
{
desc: "all NodeConfigSource subfields nil",
source: &apiv1.NodeConfigSource{},
expect: nil,
err: "exactly one subfield must be non-nil",
},
{
desc: "ConfigMap: valid reference",
source: &apiv1.NodeConfigSource{
ConfigMap: &apiv1.ConfigMapNodeConfigSource{
Name: "name",
Namespace: "namespace",
UID: "uid",
KubeletConfigKey: "kubelet",
}},
expect: &remoteConfigMap{&apiv1.NodeConfigSource{
ConfigMap: &apiv1.ConfigMapNodeConfigSource{
Name: "name",
Namespace: "namespace",
UID: "uid",
KubeletConfigKey: "kubelet",
}}},
err: "",
},
}
for _, c := range cases {
src, _, err := NewRemoteConfigSource(c.source)
if utiltest.SkipRest(t, c.desc, err, c.err) {
continue
}
// underlying object should match the object passed in
if !apiequality.Semantic.DeepEqual(c.expect.object(), src.object()) {
t.Errorf("case %q, expect RemoteConfigSource %s but got %s", c.desc, spew.Sdump(c.expect), spew.Sdump(src))
}
t.Run(c.desc, func(t *testing.T) {
source, _, err := NewRemoteConfigSource(c.source)
utiltest.ExpectError(t, err, c.err)
if err != nil {
return
}
// underlying object should match the object passed in
if !apiequality.Semantic.DeepEqual(c.expect.NodeConfigSource(), source.NodeConfigSource()) {
t.Errorf("case %q, expect RemoteConfigSource %s but got %s", c.desc, spew.Sdump(c.expect), spew.Sdump(source))
}
})
}
}
func TestRemoteConfigMapUID(t *testing.T) {
cases := []string{"", "uid", "376dfb73-56db-11e7-a01e-42010a800002"}
for _, uidIn := range cases {
cpt := &remoteConfigMap{
&apiv1.NodeConfigSource{ConfigMapRef: &apiv1.ObjectReference{Name: "name", Namespace: "namespace", UID: types.UID(uidIn)}},
}
// UID method should return the correct value of the UID
uidOut := cpt.UID()
if uidIn != uidOut {
t.Errorf("expect UID() to return %q, but got %q", uidIn, uidOut)
}
const expect = "uid"
source, _, err := NewRemoteConfigSource(&apiv1.NodeConfigSource{ConfigMap: &apiv1.ConfigMapNodeConfigSource{
Name: "name",
Namespace: "namespace",
UID: expect,
KubeletConfigKey: "kubelet",
}})
if err != nil {
t.Fatalf("error constructing remote config source: %v", err)
}
uid := source.UID()
if expect != uid {
t.Errorf("expect %q, but got %q", expect, uid)
}
}
func TestRemoteConfigMapAPIPath(t *testing.T) {
name := "name"
namespace := "namespace"
cpt := &remoteConfigMap{
&apiv1.NodeConfigSource{ConfigMapRef: &apiv1.ObjectReference{Name: name, Namespace: namespace, UID: ""}},
const (
name = "name"
namespace = "namespace"
)
source, _, err := NewRemoteConfigSource(&apiv1.NodeConfigSource{ConfigMap: &apiv1.ConfigMapNodeConfigSource{
Name: name,
Namespace: namespace,
UID: "uid",
KubeletConfigKey: "kubelet",
}})
if err != nil {
t.Fatalf("error constructing remote config source: %v", err)
}
expect := fmt.Sprintf(configMapAPIPathFmt, cpt.source.ConfigMapRef.Namespace, cpt.source.ConfigMapRef.Name)
// APIPath() method should return the correct path to the referenced resource
path := cpt.APIPath()
expect := fmt.Sprintf(configMapAPIPathFmt, namespace, name)
path := source.APIPath()
if expect != path {
t.Errorf("expect APIPath() to return %q, but got %q", expect, namespace)
t.Errorf("expect %q, but got %q", expect, path)
}
}
func TestRemoteConfigMapDownload(t *testing.T) {
_, kubeletCodecs, err := kubeletscheme.NewSchemeAndCodecs()
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
cm := &apiv1.ConfigMap{
ObjectMeta: metav1.ObjectMeta{
Name: "name",
Namespace: "namespace",
UID: "uid",
Name: "name",
Namespace: "namespace",
UID: "uid",
ResourceVersion: "1",
}}
client := fakeclient.NewSimpleClientset(cm)
source := &apiv1.NodeConfigSource{ConfigMap: &apiv1.ConfigMapNodeConfigSource{
Name: "name",
Namespace: "namespace",
KubeletConfigKey: "kubelet",
}}
expectPayload, err := NewConfigMapPayload(cm)
if err != nil {
t.Fatalf("error constructing payload: %v", err)
}
missingStore := cache.NewStore(cache.DeletionHandlingMetaNamespaceKeyFunc)
hasStore := cache.NewStore(cache.DeletionHandlingMetaNamespaceKeyFunc)
if err := hasStore.Add(cm); err != nil {
t.Fatalf("unexpected error constructing hasStore")
}
missingClient := fakeclient.NewSimpleClientset()
hasClient := fakeclient.NewSimpleClientset(cm)
cases := []struct {
desc string
source RemoteConfigSource
expect Checkpoint
client clientset.Interface
store cache.Store
err string
}{
// object doesn't exist
{"object doesn't exist",
&remoteConfigMap{&apiv1.NodeConfigSource{ConfigMapRef: &apiv1.ObjectReference{Name: "bogus", Namespace: "namespace", UID: "bogus"}}},
nil, "not found"},
// UID of downloaded object doesn't match UID of referent found via namespace/name
{"UID is incorrect for namespace/name",
&remoteConfigMap{&apiv1.NodeConfigSource{ConfigMapRef: &apiv1.ObjectReference{Name: "name", Namespace: "namespace", UID: "bogus"}}},
nil, "does not match"},
// successful download
{"object exists and reference is correct",
&remoteConfigMap{&apiv1.NodeConfigSource{ConfigMapRef: &apiv1.ObjectReference{Name: "name", Namespace: "namespace", UID: "uid"}}},
&configMapCheckpoint{kubeletCodecs, cm}, ""},
{
desc: "nil store, object does not exist in API server",
client: missingClient,
err: "not found",
},
{
desc: "nil store, object exists in API server",
client: hasClient,
},
{
desc: "object exists in store and API server",
store: hasStore,
client: hasClient,
},
{
desc: "object exists in store, but does not exist in API server",
store: hasStore,
client: missingClient,
},
{
desc: "object does not exist in store, but exists in API server",
store: missingStore,
client: hasClient,
},
{
desc: "object does not exist in store or API server",
client: missingClient,
store: missingStore,
err: "not found",
},
}
for _, c := range cases {
cpt, _, err := c.source.Download(client)
if utiltest.SkipRest(t, c.desc, err, c.err) {
continue
}
// "downloaded" object should match the expected
if !apiequality.Semantic.DeepEqual(c.expect.object(), cpt.object()) {
t.Errorf("case %q, expect Checkpoint %s but got %s", c.desc, spew.Sdump(c.expect), spew.Sdump(cpt))
}
t.Run(c.desc, func(t *testing.T) {
// deep copy so we can always check the UID/ResourceVersion are set after Download
s, _, err := NewRemoteConfigSource(source.DeepCopy())
if err != nil {
t.Fatalf("error constructing remote config source %v", err)
}
// attempt download
p, _, err := s.Download(c.client, c.store)
utiltest.ExpectError(t, err, c.err)
if err != nil {
return
}
// downloaded object should match the expected
if !apiequality.Semantic.DeepEqual(expectPayload.object(), p.object()) {
t.Errorf("expect Checkpoint %s but got %s", spew.Sdump(expectPayload), spew.Sdump(p))
}
// source UID and ResourceVersion should be updated by Download
if p.UID() != s.UID() {
t.Errorf("expect UID to be updated by Download to match payload: %s, but got source UID: %s", p.UID(), s.UID())
}
if p.ResourceVersion() != s.ResourceVersion() {
t.Errorf("expect ResourceVersion to be updated by Download to match payload: %s, but got source ResourceVersion: %s", p.ResourceVersion(), s.ResourceVersion())
}
})
}
}
func TestEqualRemoteConfigSources(t *testing.T) {
cases := []struct {
desc string
a RemoteConfigSource
b RemoteConfigSource
expect bool
}{
{"both nil", nil, nil, true},
{"a nil", nil, &remoteConfigMap{}, false},
{"b nil", &remoteConfigMap{}, nil, false},
{"neither nil, equal", &remoteConfigMap{}, &remoteConfigMap{}, true},
{
desc: "neither nil, not equal",
a: &remoteConfigMap{&apiv1.NodeConfigSource{ConfigMap: &apiv1.ConfigMapNodeConfigSource{Name: "a"}}},
b: &remoteConfigMap{&apiv1.NodeConfigSource{ConfigMap: &apiv1.ConfigMapNodeConfigSource{KubeletConfigKey: "kubelet"}}},
expect: false,
},
}
for _, c := range cases {
t.Run(c.desc, func(t *testing.T) {
if EqualRemoteConfigSources(c.a, c.b) != c.expect {
t.Errorf("expected EqualRemoteConfigSources to return %t, but got %t", c.expect, !c.expect)
}
})
}
}

View File

@ -14,7 +14,11 @@ go_test(
],
embed = [":go_default_library"],
deps = [
"//pkg/kubelet/apis/kubeletconfig:go_default_library",
"//pkg/kubelet/apis/kubeletconfig/scheme:go_default_library",
"//pkg/kubelet/apis/kubeletconfig/v1beta1:go_default_library",
"//pkg/kubelet/kubeletconfig/checkpoint:go_default_library",
"//pkg/kubelet/kubeletconfig/util/codec:go_default_library",
"//pkg/kubelet/kubeletconfig/util/files:go_default_library",
"//pkg/kubelet/kubeletconfig/util/test:go_default_library",
"//pkg/util/filesystem:go_default_library",
@ -34,7 +38,9 @@ go_library(
],
importpath = "k8s.io/kubernetes/pkg/kubelet/kubeletconfig/checkpoint/store",
deps = [
"//pkg/kubelet/apis/kubeletconfig:go_default_library",
"//pkg/kubelet/kubeletconfig/checkpoint:go_default_library",
"//pkg/kubelet/kubeletconfig/configfiles:go_default_library",
"//pkg/kubelet/kubeletconfig/util/files:go_default_library",
"//pkg/kubelet/kubeletconfig/util/log:go_default_library",
"//pkg/util/filesystem:go_default_library",

View File

@ -20,52 +20,51 @@ import (
"fmt"
"time"
"k8s.io/kubernetes/pkg/kubelet/apis/kubeletconfig"
"k8s.io/kubernetes/pkg/kubelet/kubeletconfig/checkpoint"
)
// so far only implements Current(), LastKnownGood(), SetCurrent(), and SetLastKnownGood()
// so far only implements Assigned(), LastKnownGood(), SetAssigned(), and SetLastKnownGood()
type fakeStore struct {
current checkpoint.RemoteConfigSource
assigned checkpoint.RemoteConfigSource
lastKnownGood checkpoint.RemoteConfigSource
}
var _ Store = (*fakeStore)(nil)
func (s *fakeStore) Initialize() error {
return fmt.Errorf("Initialize method not supported")
}
func (s *fakeStore) Exists(uid string) (bool, error) {
func (s *fakeStore) Exists(source checkpoint.RemoteConfigSource) (bool, error) {
return false, fmt.Errorf("Exists method not supported")
}
func (s *fakeStore) Save(c checkpoint.Checkpoint) error {
func (s *fakeStore) Save(c checkpoint.Payload) error {
return fmt.Errorf("Save method not supported")
}
func (s *fakeStore) Load(uid string) (checkpoint.Checkpoint, error) {
func (s *fakeStore) Load(source checkpoint.RemoteConfigSource) (*kubeletconfig.KubeletConfiguration, error) {
return nil, fmt.Errorf("Load method not supported")
}
func (s *fakeStore) CurrentModified() (time.Time, error) {
return time.Time{}, fmt.Errorf("CurrentModified method not supported")
func (s *fakeStore) AssignedModified() (time.Time, error) {
return time.Time{}, fmt.Errorf("AssignedModified method not supported")
}
func (s *fakeStore) Current() (checkpoint.RemoteConfigSource, error) {
return s.current, nil
func (s *fakeStore) Assigned() (checkpoint.RemoteConfigSource, error) {
return s.assigned, nil
}
func (s *fakeStore) LastKnownGood() (checkpoint.RemoteConfigSource, error) {
return s.lastKnownGood, nil
}
func (s *fakeStore) SetCurrent(source checkpoint.RemoteConfigSource) error {
s.current = source
func (s *fakeStore) SetAssigned(source checkpoint.RemoteConfigSource) error {
s.assigned = source
return nil
}
func (s *fakeStore) SetCurrentUpdated(source checkpoint.RemoteConfigSource) (bool, error) {
return setCurrentUpdated(s, source)
}
func (s *fakeStore) SetLastKnownGood(source checkpoint.RemoteConfigSource) error {
s.lastKnownGood = source
return nil

View File

@ -21,82 +21,112 @@ import (
"path/filepath"
"time"
"k8s.io/kubernetes/pkg/kubelet/apis/kubeletconfig"
"k8s.io/kubernetes/pkg/kubelet/kubeletconfig/checkpoint"
"k8s.io/kubernetes/pkg/kubelet/kubeletconfig/configfiles"
utilfiles "k8s.io/kubernetes/pkg/kubelet/kubeletconfig/util/files"
utillog "k8s.io/kubernetes/pkg/kubelet/kubeletconfig/util/log"
utilfs "k8s.io/kubernetes/pkg/util/filesystem"
)
const (
curFile = ".cur"
lkgFile = ".lkg"
metaDir = "meta"
assignedFile = "assigned"
lastKnownGoodFile = "last-known-good"
checkpointsDir = "checkpoints"
)
// fsStore is for tracking checkpoints in the local filesystem, implements Store
type fsStore struct {
// fs is the filesystem to use for storage operations; can be mocked for testing
fs utilfs.Filesystem
// checkpointsDir is the absolute path to the storage directory for fsStore
checkpointsDir string
// dir is the absolute path to the storage directory for fsStore
dir string
}
// NewFsStore returns a Store that saves its data in `checkpointsDir`
func NewFsStore(fs utilfs.Filesystem, checkpointsDir string) Store {
var _ Store = (*fsStore)(nil)
// NewFsStore returns a Store that saves its data in dir
func NewFsStore(fs utilfs.Filesystem, dir string) Store {
return &fsStore{
fs: fs,
checkpointsDir: checkpointsDir,
fs: fs,
dir: dir,
}
}
func (s *fsStore) Initialize() error {
utillog.Infof("initializing config checkpoints directory %q", s.checkpointsDir)
if err := utilfiles.EnsureDir(s.fs, s.checkpointsDir); err != nil {
utillog.Infof("initializing config checkpoints directory %q", s.dir)
// ensure top-level dir for store
if err := utilfiles.EnsureDir(s.fs, s.dir); err != nil {
return err
}
if err := utilfiles.EnsureFile(s.fs, filepath.Join(s.checkpointsDir, curFile)); err != nil {
// ensure metadata directory and reference files (tracks assigned and lkg configs)
if err := utilfiles.EnsureDir(s.fs, filepath.Join(s.dir, metaDir)); err != nil {
return err
}
return utilfiles.EnsureFile(s.fs, filepath.Join(s.checkpointsDir, lkgFile))
if err := utilfiles.EnsureFile(s.fs, s.metaPath(assignedFile)); err != nil {
return err
}
if err := utilfiles.EnsureFile(s.fs, s.metaPath(lastKnownGoodFile)); err != nil {
return err
}
// ensure checkpoints directory (saves unpacked payloads in subdirectories named after payload UID)
return utilfiles.EnsureDir(s.fs, filepath.Join(s.dir, checkpointsDir))
}
func (s *fsStore) Exists(uid string) (bool, error) {
ok, err := utilfiles.FileExists(s.fs, filepath.Join(s.checkpointsDir, uid))
func (s *fsStore) Exists(source checkpoint.RemoteConfigSource) (bool, error) {
const errfmt = "failed to determine whether checkpoint exists for source %s, UID: %s, ResourceVersion: %s exists, error: %v"
if len(source.UID()) == 0 {
return false, fmt.Errorf(errfmt, source.APIPath(), source.UID(), source.ResourceVersion(), "empty UID is ambiguous")
}
if len(source.ResourceVersion()) == 0 {
return false, fmt.Errorf(errfmt, source.APIPath(), source.UID(), source.ResourceVersion(), "empty ResourceVersion is ambiguous")
}
// we check whether the directory was created for the resource
ok, err := utilfiles.DirExists(s.fs, s.checkpointPath(source.UID(), source.ResourceVersion()))
if err != nil {
return false, fmt.Errorf("failed to determine whether checkpoint %q exists, error: %v", uid, err)
return false, fmt.Errorf(errfmt, source.APIPath(), source.UID(), source.ResourceVersion(), err)
}
return ok, nil
}
func (s *fsStore) Save(c checkpoint.Checkpoint) error {
// encode the checkpoint
data, err := c.Encode()
if err != nil {
func (s *fsStore) Save(payload checkpoint.Payload) error {
// Note: Payload interface guarantees UID() and ResourceVersion() to be non-empty
path := s.checkpointPath(payload.UID(), payload.ResourceVersion())
// ensure the parent dir (checkpoints/uid) exists, since ReplaceDir requires the parent of the replacee
// to exist, and we checkpoint as checkpoints/uid/resourceVersion/files-from-configmap
if err := utilfiles.EnsureDir(s.fs, filepath.Dir(path)); err != nil {
return err
}
// save the file
return utilfiles.ReplaceFile(s.fs, filepath.Join(s.checkpointsDir, c.UID()), data)
// save the checkpoint's files in the appropriate checkpoint dir
return utilfiles.ReplaceDir(s.fs, path, payload.Files())
}
func (s *fsStore) Load(uid string) (checkpoint.Checkpoint, error) {
filePath := filepath.Join(s.checkpointsDir, uid)
utillog.Infof("loading configuration from %q", filePath)
// load the file
data, err := s.fs.ReadFile(filePath)
if err != nil {
return nil, fmt.Errorf("failed to read checkpoint file %q, error: %v", filePath, err)
func (s *fsStore) Load(source checkpoint.RemoteConfigSource) (*kubeletconfig.KubeletConfiguration, error) {
sourceFmt := fmt.Sprintf("%s, UID: %s, ResourceVersion: %s", source.APIPath(), source.UID(), source.ResourceVersion())
// check if a checkpoint exists for the source
if ok, err := s.Exists(source); err != nil {
return nil, err
} else if !ok {
return nil, fmt.Errorf("no checkpoint for source %s", sourceFmt)
}
// decode it
c, err := checkpoint.DecodeCheckpoint(data)
// load the kubelet config file
utillog.Infof("loading Kubelet configuration checkpoint for source %s", sourceFmt)
loader, err := configfiles.NewFsLoader(s.fs, filepath.Join(s.checkpointPath(source.UID(), source.ResourceVersion()), source.KubeletFilename()))
if err != nil {
return nil, fmt.Errorf("failed to decode checkpoint file %q, error: %v", filePath, err)
return nil, err
}
return c, nil
kc, err := loader.Load()
if err != nil {
return nil, err
}
return kc, nil
}
func (s *fsStore) CurrentModified() (time.Time, error) {
path := filepath.Join(s.checkpointsDir, curFile)
func (s *fsStore) AssignedModified() (time.Time, error) {
path := s.metaPath(assignedFile)
info, err := s.fs.Stat(path)
if err != nil {
return time.Time{}, fmt.Errorf("failed to stat %q while checking modification time, error: %v", path, err)
@ -104,35 +134,36 @@ func (s *fsStore) CurrentModified() (time.Time, error) {
return info.ModTime(), nil
}
func (s *fsStore) Current() (checkpoint.RemoteConfigSource, error) {
return s.sourceFromFile(curFile)
func (s *fsStore) Assigned() (checkpoint.RemoteConfigSource, error) {
return readRemoteConfigSource(s.fs, s.metaPath(assignedFile))
}
func (s *fsStore) LastKnownGood() (checkpoint.RemoteConfigSource, error) {
return s.sourceFromFile(lkgFile)
return readRemoteConfigSource(s.fs, s.metaPath(lastKnownGoodFile))
}
func (s *fsStore) SetCurrent(source checkpoint.RemoteConfigSource) error {
return s.setSourceFile(curFile, source)
}
func (s *fsStore) SetCurrentUpdated(source checkpoint.RemoteConfigSource) (bool, error) {
return setCurrentUpdated(s, source)
func (s *fsStore) SetAssigned(source checkpoint.RemoteConfigSource) error {
return writeRemoteConfigSource(s.fs, s.metaPath(assignedFile), source)
}
func (s *fsStore) SetLastKnownGood(source checkpoint.RemoteConfigSource) error {
return s.setSourceFile(lkgFile, source)
return writeRemoteConfigSource(s.fs, s.metaPath(lastKnownGoodFile), source)
}
func (s *fsStore) Reset() (bool, error) {
return reset(s)
}
// sourceFromFile returns the RemoteConfigSource stored in the file at `s.checkpointsDir/relPath`,
// or nil if the file is empty
func (s *fsStore) sourceFromFile(relPath string) (checkpoint.RemoteConfigSource, error) {
path := filepath.Join(s.checkpointsDir, relPath)
data, err := s.fs.ReadFile(path)
func (s *fsStore) checkpointPath(uid, resourceVersion string) string {
return filepath.Join(s.dir, checkpointsDir, uid, resourceVersion)
}
func (s *fsStore) metaPath(name string) string {
return filepath.Join(s.dir, metaDir, name)
}
func readRemoteConfigSource(fs utilfs.Filesystem, path string) (checkpoint.RemoteConfigSource, error) {
data, err := fs.ReadFile(path)
if err != nil {
return nil, err
} else if len(data) == 0 {
@ -141,17 +172,23 @@ func (s *fsStore) sourceFromFile(relPath string) (checkpoint.RemoteConfigSource,
return checkpoint.DecodeRemoteConfigSource(data)
}
// set source file replaces the file at `s.checkpointsDir/relPath` with a file containing `source`
func (s *fsStore) setSourceFile(relPath string, source checkpoint.RemoteConfigSource) error {
path := filepath.Join(s.checkpointsDir, relPath)
func writeRemoteConfigSource(fs utilfs.Filesystem, path string, source checkpoint.RemoteConfigSource) error {
// if nil, reset the file
if source == nil {
return utilfiles.ReplaceFile(s.fs, path, []byte{})
return utilfiles.ReplaceFile(fs, path, []byte{})
}
// check that UID and ResourceVersion are non-empty,
// error to save reference if the checkpoint can't be fully resolved
if source.UID() == "" {
return fmt.Errorf("failed to write RemoteConfigSource, empty UID is ambiguous")
}
if source.ResourceVersion() == "" {
return fmt.Errorf("failed to write RemoteConfigSource, empty ResourceVersion is ambiguous")
}
// encode the source and save it to the file
data, err := source.Encode()
if err != nil {
return err
}
return utilfiles.ReplaceFile(s.fs, path, data)
return utilfiles.ReplaceFile(fs, path, data)
}

File diff suppressed because it is too large Load Diff

View File

@ -20,65 +20,51 @@ import (
"fmt"
"time"
"k8s.io/kubernetes/pkg/kubelet/apis/kubeletconfig"
"k8s.io/kubernetes/pkg/kubelet/kubeletconfig/checkpoint"
)
// Store saves checkpoints and information about which is the current and last-known-good checkpoint to a storage layer
// Store saves checkpoints and information about which is the assigned and last-known-good checkpoint to a storage layer
type Store interface {
// Initialize sets up the storage layer
Initialize() error
// Exists returns true if a checkpoint with `uid` exists in the store, false otherwise
Exists(uid string) (bool, error)
// Save saves the checkpoint to the storage layer
Save(c checkpoint.Checkpoint) error
// Load loads the checkpoint with UID `uid` from the storage layer, or returns an error if the checkpoint does not exist
Load(uid string) (checkpoint.Checkpoint, error)
// CurrentModified returns the last time that the current UID was set
CurrentModified() (time.Time, error)
// Current returns the source that points to the current checkpoint, or nil if no current checkpoint is set
Current() (checkpoint.RemoteConfigSource, error)
// Exists returns true if the object referenced by `source` has been checkpointed.
// The source must be unambiguous - e.g. if referencing an API object it must specify both uid and resourceVersion.
Exists(source checkpoint.RemoteConfigSource) (bool, error)
// Save Kubelet config payloads to the storage layer. It must be possible to unmarshal the payload to a KubeletConfiguration.
// The following payload types are supported:
// - k8s.io/api/core/v1.ConfigMap
Save(c checkpoint.Payload) error
// Load loads the KubeletConfiguration from the checkpoint referenced by `source`.
Load(source checkpoint.RemoteConfigSource) (*kubeletconfig.KubeletConfiguration, error)
// AssignedModified returns the last time that the assigned checkpoint was set
AssignedModified() (time.Time, error)
// Assigned returns the source that points to the checkpoint currently assigned to the Kubelet, or nil if no assigned checkpoint is set
Assigned() (checkpoint.RemoteConfigSource, error)
// LastKnownGood returns the source that points to the last-known-good checkpoint, or nil if no last-known-good checkpoint is set
LastKnownGood() (checkpoint.RemoteConfigSource, error)
// SetCurrent saves the source that points to the current checkpoint, set to nil to unset
SetCurrent(source checkpoint.RemoteConfigSource) error
// SetCurrentUpdated is similar to SetCurrent, but also returns whether the current checkpoint changed as a result
SetCurrentUpdated(source checkpoint.RemoteConfigSource) (bool, error)
// SetAssigned saves the source that points to the assigned checkpoint, set to nil to unset
SetAssigned(source checkpoint.RemoteConfigSource) error
// SetLastKnownGood saves the source that points to the last-known-good checkpoint, set to nil to unset
SetLastKnownGood(source checkpoint.RemoteConfigSource) error
// Reset unsets the current and last-known-good UIDs and returns whether the current UID was unset as a result of the reset
// Reset unsets the assigned and last-known-good checkpoints and returns whether the assigned checkpoint was unset as a result of the reset
Reset() (bool, error)
}
// reset is a helper for implementing Reset, which can be implemented in terms of Store methods
func reset(s Store) (bool, error) {
assigned, err := s.Assigned()
if err != nil {
return false, err
}
if err := s.SetLastKnownGood(nil); err != nil {
return false, fmt.Errorf("failed to reset last-known-good UID in checkpoint store, error: %v", err)
}
updated, err := s.SetCurrentUpdated(nil)
if err != nil {
return false, fmt.Errorf("failed to reset current UID in checkpoint store, error: %v", err)
if err := s.SetAssigned(nil); err != nil {
return false, fmt.Errorf("failed to reset assigned UID in checkpoint store, error: %v", err)
}
return updated, nil
}
// setCurrentUpdated is a helper for implementing SetCurrentUpdated, which can be implemented in terms of Store methods
func setCurrentUpdated(s Store, source checkpoint.RemoteConfigSource) (bool, error) {
cur, err := s.Current()
if err != nil {
return false, err
}
// if both are nil, no need to update
if cur == nil && source == nil {
return false, nil
}
// if UIDs match, no need to update
if (source != nil && cur != nil) && cur.UID() == source.UID() {
return false, nil
}
// update the source
if err := s.SetCurrent(source); err != nil {
return false, err
}
return true, nil
return assigned != nil, nil
}

View File

@ -26,11 +26,21 @@ import (
)
func TestReset(t *testing.T) {
source, _, err := checkpoint.NewRemoteConfigSource(&apiv1.NodeConfigSource{ConfigMapRef: &apiv1.ObjectReference{Name: "name", Namespace: "namespace", UID: "uid"}})
source, _, err := checkpoint.NewRemoteConfigSource(&apiv1.NodeConfigSource{ConfigMap: &apiv1.ConfigMapNodeConfigSource{
Name: "name",
Namespace: "namespace",
UID: "uid",
KubeletConfigKey: "kubelet",
}})
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
otherSource, _, err := checkpoint.NewRemoteConfigSource(&apiv1.NodeConfigSource{ConfigMapRef: &apiv1.ObjectReference{Name: "other-name", Namespace: "namespace", UID: "other-uid"}})
otherSource, _, err := checkpoint.NewRemoteConfigSource(&apiv1.NodeConfigSource{ConfigMap: &apiv1.ConfigMapNodeConfigSource{
Name: "other-name",
Namespace: "namespace",
UID: "other-uid",
KubeletConfigKey: "kubelet",
}})
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
@ -38,59 +48,24 @@ func TestReset(t *testing.T) {
s *fakeStore
updated bool
}{
{&fakeStore{current: nil, lastKnownGood: nil}, false},
{&fakeStore{current: source, lastKnownGood: nil}, true},
{&fakeStore{current: nil, lastKnownGood: source}, false},
{&fakeStore{current: source, lastKnownGood: source}, true},
{&fakeStore{current: source, lastKnownGood: otherSource}, true},
{&fakeStore{current: otherSource, lastKnownGood: source}, true},
{&fakeStore{assigned: nil, lastKnownGood: nil}, false},
{&fakeStore{assigned: source, lastKnownGood: nil}, true},
{&fakeStore{assigned: nil, lastKnownGood: source}, false},
{&fakeStore{assigned: source, lastKnownGood: source}, true},
{&fakeStore{assigned: source, lastKnownGood: otherSource}, true},
{&fakeStore{assigned: otherSource, lastKnownGood: source}, true},
}
for _, c := range cases {
updated, err := reset(c.s)
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
if c.s.current != nil || c.s.lastKnownGood != nil {
t.Errorf("case %q, expect nil for current and last-known-good checkpoints, but still have %q and %q, respectively",
spew.Sdump(c.s), c.s.current, c.s.lastKnownGood)
if c.s.assigned != nil || c.s.lastKnownGood != nil {
t.Errorf("case %q, expect nil for assigned and last-known-good checkpoints, but still have %q and %q, respectively",
spew.Sdump(c.s), c.s.assigned, c.s.lastKnownGood)
}
if c.updated != updated {
t.Errorf("case %q, expect reset to return %t, but got %t", spew.Sdump(c.s), c.updated, updated)
}
}
}
func TestSetCurrentUpdated(t *testing.T) {
source, _, err := checkpoint.NewRemoteConfigSource(&apiv1.NodeConfigSource{ConfigMapRef: &apiv1.ObjectReference{Name: "name", Namespace: "namespace", UID: "uid"}})
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
otherSource, _, err := checkpoint.NewRemoteConfigSource(&apiv1.NodeConfigSource{ConfigMapRef: &apiv1.ObjectReference{Name: "other-name", Namespace: "namespace", UID: "other-uid"}})
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
cases := []struct {
s *fakeStore
newCurrent checkpoint.RemoteConfigSource
updated bool
}{
{&fakeStore{current: nil}, nil, false},
{&fakeStore{current: nil}, source, true},
{&fakeStore{current: source}, source, false},
{&fakeStore{current: source}, nil, true},
{&fakeStore{current: source}, otherSource, true},
}
for _, c := range cases {
current := c.s.current
updated, err := setCurrentUpdated(c.s, c.newCurrent)
if err != nil {
t.Fatalf("case %q -> %q, unexpected error: %v", current, c.newCurrent, err)
}
if c.newCurrent != c.s.current {
t.Errorf("case %q -> %q, expect current UID to be %q, but got %q", current, c.newCurrent, c.newCurrent, c.s.current)
}
if c.updated != updated {
t.Errorf("case %q -> %q, expect setCurrentUpdated to return %t, but got %t", current, c.newCurrent, c.updated, updated)
}
}
}

View File

@ -37,10 +37,10 @@ import (
const (
// KubeletConfigChangedEventReason identifies an event as a change of Kubelet configuration
KubeletConfigChangedEventReason = "KubeletConfigChanged"
// EventMessageFmt is the message format for Kubelet config change events
EventMessageFmt = "Kubelet will restart to use: %s"
// LocalConfigMessage is the text to apply to EventMessageFmt when the Kubelet has been configured to use its local config (init or defaults)
LocalConfigMessage = "local config"
// LocalEventMessage is sent when the Kubelet restarts to use local config
LocalEventMessage = "Kubelet restarting to use local config"
// RemoteEventMessageFmt is sent when the Kubelet restarts to use a remote config
RemoteEventMessageFmt = "Kubelet restarting to use %s, UID: %s, ResourceVersion: %s, KubeletConfigKey: %s"
)
// pokeConfiSourceWorker tells the worker thread that syncs config sources that work needs to be done
@ -69,124 +69,144 @@ func (cc *Controller) syncConfigSource(client clientset.Interface, eventClient v
}
}()
node, err := latestNode(cc.informer.GetStore(), nodeName)
// get the latest Node.Spec.ConfigSource from the informer
source, err := latestNodeConfigSource(cc.nodeInformer.GetStore(), nodeName)
if err != nil {
cc.configOk.SetFailSyncCondition(status.FailSyncReasonInformer)
syncerr = fmt.Errorf("%s, error: %v", status.FailSyncReasonInformer, err)
cc.configStatus.SetErrorOverride(fmt.Sprintf(status.SyncErrorFmt, status.InternalError))
syncerr = fmt.Errorf("%s, error: %v", status.InternalError, err)
return
}
// check the Node and download any new config
if updated, cur, reason, err := cc.doSyncConfigSource(client, node.Spec.ConfigSource); err != nil {
cc.configOk.SetFailSyncCondition(reason)
// a nil source simply means we reset to local defaults
if source == nil {
utillog.Infof("Node.Spec.ConfigSource is empty, will reset assigned and last-known-good to defaults")
if updated, reason, err := cc.resetConfig(); err != nil {
reason = fmt.Sprintf(status.SyncErrorFmt, reason)
cc.configStatus.SetErrorOverride(reason)
syncerr = fmt.Errorf("%s, error: %v", reason, err)
return
} else if updated {
restartForNewConfig(eventClient, nodeName, nil)
}
return
}
// a non-nil source means we should attempt to download the config, and checkpoint it if necessary
utillog.Infof("Node.Spec.ConfigSource is non-empty, will checkpoint source and update config if necessary")
// TODO(mtaufen): It would be nice if we could check the payload's metadata before (re)downloading the whole payload
// we at least try pulling the latest configmap out of the local informer store.
// construct the interface that can dynamically dispatch the correct Download, etc. methods for the given source type
remote, reason, err := checkpoint.NewRemoteConfigSource(source)
if err != nil {
reason = fmt.Sprintf(status.SyncErrorFmt, reason)
cc.configStatus.SetErrorOverride(reason)
syncerr = fmt.Errorf("%s, error: %v", reason, err)
return
}
// "download" source, either from informer's in-memory store or directly from the API server, if the informer doesn't have a copy
payload, reason, err := cc.downloadConfigPayload(client, remote)
if err != nil {
reason = fmt.Sprintf(status.SyncErrorFmt, reason)
cc.configStatus.SetErrorOverride(reason)
syncerr = fmt.Errorf("%s, error: %v", reason, err)
return
}
// save a checkpoint for the payload, if one does not already exist
if reason, err := cc.saveConfigCheckpoint(remote, payload); err != nil {
reason = fmt.Sprintf(status.SyncErrorFmt, reason)
cc.configStatus.SetErrorOverride(reason)
syncerr = fmt.Errorf("%s, error: %v", reason, err)
return
}
// update the local, persistent record of assigned config
if updated, reason, err := cc.setAssignedConfig(remote); err != nil {
reason = fmt.Sprintf(status.SyncErrorFmt, reason)
cc.configStatus.SetErrorOverride(reason)
syncerr = fmt.Errorf("%s, error: %v", reason, err)
return
} else if updated {
path := LocalConfigMessage
if cur != nil {
path = cur.APIPath()
}
// we directly log and send the event, instead of using the event recorder,
// because the event recorder won't flush its queue before we exit (we'd lose the event)
event := eventf(nodeName, apiv1.EventTypeNormal, KubeletConfigChangedEventReason, EventMessageFmt, path)
glog.V(3).Infof("Event(%#v): type: '%v' reason: '%v' %v", event.InvolvedObject, event.Type, event.Reason, event.Message)
if _, err := eventClient.Events(apiv1.NamespaceDefault).Create(event); err != nil {
utillog.Errorf("failed to send event, error: %v", err)
}
os.Exit(0)
restartForNewConfig(eventClient, nodeName, remote)
}
// If we get here:
// - there is no need to restart to update the current config
// - there is no need to restart to use new config
// - there was no error trying to sync configuration
// - if, previously, there was an error trying to sync configuration, we need to clear that error from the condition
cc.configOk.ClearFailSyncCondition()
// - if, previously, there was an error trying to sync configuration, we need to clear that error from the status
cc.configStatus.SetErrorOverride("")
}
// doSyncConfigSource checkpoints and sets the store's current config to the new config or resets config,
// depending on the `source`, and returns whether the current config in the checkpoint store was updated as a result
func (cc *Controller) doSyncConfigSource(client clientset.Interface, source *apiv1.NodeConfigSource) (bool, checkpoint.RemoteConfigSource, string, error) {
if source == nil {
utillog.Infof("Node.Spec.ConfigSource is empty, will reset current and last-known-good to defaults")
updated, reason, err := cc.resetConfig()
if err != nil {
return false, nil, reason, err
}
return updated, nil, "", nil
// Note: source has up-to-date uid and resourceVersion after calling downloadConfigPayload.
func (cc *Controller) downloadConfigPayload(client clientset.Interface, source checkpoint.RemoteConfigSource) (checkpoint.Payload, string, error) {
var store cache.Store
if cc.remoteConfigSourceInformer != nil {
store = cc.remoteConfigSourceInformer.GetStore()
}
// if the NodeConfigSource is non-nil, download the config
utillog.Infof("Node.Spec.ConfigSource is non-empty, will checkpoint source and update config if necessary")
remote, reason, err := checkpoint.NewRemoteConfigSource(source)
if err != nil {
return false, nil, reason, err
}
reason, err = cc.checkpointConfigSource(client, remote)
if err != nil {
return false, nil, reason, err
}
updated, reason, err := cc.setCurrentConfig(remote)
if err != nil {
return false, nil, reason, err
}
return updated, remote, "", nil
return source.Download(client, store)
}
// checkpointConfigSource downloads and checkpoints the object referred to by `source` if the checkpoint does not already exist,
// if a failure occurs, returns a sanitized failure reason and an error
func (cc *Controller) checkpointConfigSource(client clientset.Interface, source checkpoint.RemoteConfigSource) (string, error) {
uid := source.UID()
// if the checkpoint already exists, skip downloading
if ok, err := cc.checkpointStore.Exists(uid); err != nil {
reason := fmt.Sprintf(status.FailSyncReasonCheckpointExistenceFmt, source.APIPath(), uid)
return reason, fmt.Errorf("%s, error: %v", reason, err)
} else if ok {
utillog.Infof("checkpoint already exists for object with UID %q, skipping download", uid)
func (cc *Controller) saveConfigCheckpoint(source checkpoint.RemoteConfigSource, payload checkpoint.Payload) (string, error) {
ok, err := cc.checkpointStore.Exists(source)
if err != nil {
return status.InternalError, fmt.Errorf("%s, error: %v", status.InternalError, err)
}
if ok {
utillog.Infof("checkpoint already exists for %s, UID: %s, ResourceVersion: %s", source.APIPath(), payload.UID(), payload.ResourceVersion())
return "", nil
}
// download
checkpoint, reason, err := source.Download(client)
if err != nil {
return reason, fmt.Errorf("%s, error: %v", reason, err)
if err := cc.checkpointStore.Save(payload); err != nil {
return status.InternalError, fmt.Errorf("%s, error: %v", status.InternalError, err)
}
// save
err = cc.checkpointStore.Save(checkpoint)
if err != nil {
reason := fmt.Sprintf(status.FailSyncReasonSaveCheckpointFmt, source.APIPath(), checkpoint.UID())
return reason, fmt.Errorf("%s, error: %v", reason, err)
}
return "", nil
}
// setCurrentConfig updates UID of the current checkpoint in the checkpoint store to `uid` and returns whether the
// current UID changed as a result, or a sanitized failure reason and an error.
func (cc *Controller) setCurrentConfig(source checkpoint.RemoteConfigSource) (bool, string, error) {
updated, err := cc.checkpointStore.SetCurrentUpdated(source)
// setAssignedConfig updates the assigned checkpoint config in the store.
// Returns whether the assigned config changed as a result, or a sanitized failure reason and an error.
func (cc *Controller) setAssignedConfig(source checkpoint.RemoteConfigSource) (bool, string, error) {
assigned, err := cc.checkpointStore.Assigned()
if err != nil {
if source == nil {
return false, status.FailSyncReasonSetCurrentLocal, err
}
return false, fmt.Sprintf(status.FailSyncReasonSetCurrentUIDFmt, source.APIPath(), source.UID()), err
return false, status.InternalError, err
}
return updated, "", nil
if err := cc.checkpointStore.SetAssigned(source); err != nil {
return false, status.InternalError, err
}
return !checkpoint.EqualRemoteConfigSources(assigned, source), "", nil
}
// resetConfig resets the current and last-known-good checkpoints in the checkpoint store to their default values and
// returns whether the current checkpoint changed as a result, or a sanitized failure reason and an error.
// resetConfig resets the assigned and last-known-good checkpoints in the checkpoint store to their default values and
// returns whether the assigned checkpoint changed as a result, or a sanitized failure reason and an error.
func (cc *Controller) resetConfig() (bool, string, error) {
updated, err := cc.checkpointStore.Reset()
if err != nil {
return false, status.FailSyncReasonReset, err
return false, status.InternalError, err
}
return updated, "", nil
}
// latestNode returns the most recent Node with `nodeName` from `store`
func latestNode(store cache.Store, nodeName string) (*apiv1.Node, error) {
// restartForNewConfig presumes the Kubelet is managed by a babysitter, e.g. systemd
// It will send an event before exiting.
func restartForNewConfig(eventClient v1core.EventsGetter, nodeName string, source checkpoint.RemoteConfigSource) {
message := LocalEventMessage
if source != nil {
message = fmt.Sprintf(RemoteEventMessageFmt, source.APIPath(), source.UID(), source.ResourceVersion(), source.KubeletFilename())
}
// we directly log and send the event, instead of using the event recorder,
// because the event recorder won't flush its queue before we exit (we'd lose the event)
event := makeEvent(nodeName, apiv1.EventTypeNormal, KubeletConfigChangedEventReason, message)
glog.V(3).Infof("Event(%#v): type: '%v' reason: '%v' %v", event.InvolvedObject, event.Type, event.Reason, event.Message)
if _, err := eventClient.Events(apiv1.NamespaceDefault).Create(event); err != nil {
utillog.Errorf("failed to send event, error: %v", err)
}
utillog.Infof(message)
os.Exit(0)
}
// latestNodeConfigSource returns a copy of the most recent NodeConfigSource from the Node with `nodeName` in `store`
func latestNodeConfigSource(store cache.Store, nodeName string) (*apiv1.NodeConfigSource, error) {
obj, ok, err := store.GetByKey(nodeName)
if err != nil {
err := fmt.Errorf("failed to retrieve Node %q from informer's store, error: %v", nodeName, err)
@ -203,13 +223,11 @@ func latestNode(store cache.Store, nodeName string) (*apiv1.Node, error) {
utillog.Errorf(err.Error())
return nil, err
}
return node, nil
}
// eventf constructs and returns an event containing a formatted message
// similar to k8s.io/client-go/tools/record/event.go
func eventf(nodeName, eventType, reason, messageFmt string, args ...interface{}) *apiv1.Event {
return makeEvent(nodeName, eventType, reason, fmt.Sprintf(messageFmt, args...))
// Copy the source, so anyone who modifies it after here doesn't mess up the informer's store!
// This was previously the cause of a bug that made the Kubelet frequently resync config; Download updated
// the UID and ResourceVersion on the NodeConfigSource, but the pointer was still drilling all the way
// into the informer's copy!
return node.Spec.ConfigSource.DeepCopy(), nil
}
// makeEvent constructs an event

View File

@ -21,7 +21,7 @@ import (
"path/filepath"
"time"
apiv1 "k8s.io/api/core/v1"
apiequality "k8s.io/apimachinery/pkg/api/equality"
"k8s.io/apimachinery/pkg/util/wait"
clientset "k8s.io/client-go/kubernetes"
v1core "k8s.io/client-go/kubernetes/typed/core/v1"
@ -38,67 +38,101 @@ import (
)
const (
checkpointsDir = "checkpoints"
storeDir = "store"
// TODO(mtaufen): We may expose this in a future API, but for the time being we use an internal default,
// because it is not especially clear where this should live in the API.
configTrialDuration = 10 * time.Minute
)
// TransformFunc edits the KubeletConfiguration in-place, and returns an
// error if any of the transformations failed.
type TransformFunc func(kc *kubeletconfig.KubeletConfiguration) error
// Controller manages syncing dynamic Kubelet configurations
// For more information, see the proposal: https://github.com/kubernetes/community/blob/master/contributors/design-proposals/node/dynamic-kubelet-configuration.md
type Controller struct {
// defaultConfig is the configuration to use if no initConfig is provided
defaultConfig *kubeletconfig.KubeletConfiguration
// transform applies an arbitrary transformation to config after loading, and before validation.
// This can be used, for example, to include config from flags before the controller's validation step.
// If transform returns an error, loadConfig will fail, and an InternalError will be reported.
// Be wary if using this function as an extension point, in most cases the controller should
// probably just be natively extended to do what you need. Injecting flag precedence transformations
// is something of an exception because the caller of this controller (cmd/) is aware of flags, but this
// controller's tree (pkg/) is not.
transform TransformFunc
// pendingConfigSource; write to this channel to indicate that the config source needs to be synced from the API server
pendingConfigSource chan bool
// configOk manages the KubeletConfigOk condition that is reported in Node.Status.Conditions
configOk status.ConfigOkCondition
// configStatus manages the status we report on the Node object
configStatus status.NodeConfigStatus
// informer is the informer that watches the Node object
informer cache.SharedInformer
// nodeInformer is the informer that watches the Node object
nodeInformer cache.SharedInformer
// remoteConfigSourceInformer is the informer that watches the assigned config source
remoteConfigSourceInformer cache.SharedInformer
// checkpointStore persists config source checkpoints to a storage layer
checkpointStore store.Store
}
// NewController constructs a new Controller object and returns it. Directory paths must be absolute.
func NewController(defaultConfig *kubeletconfig.KubeletConfiguration, dynamicConfigDir string) *Controller {
// NewController constructs a new Controller object and returns it. The dynamicConfigDir
// path must be absolute. transform applies an arbitrary transformation to config after loading, and before validation.
// This can be used, for example, to include config from flags before the controller's validation step.
// If transform returns an error, loadConfig will fail, and an InternalError will be reported.
// Be wary if using this function as an extension point, in most cases the controller should
// probably just be natively extended to do what you need. Injecting flag precedence transformations
// is something of an exception because the caller of this controller (cmd/) is aware of flags, but this
// controller's tree (pkg/) is not.
func NewController(dynamicConfigDir string, transform TransformFunc) *Controller {
return &Controller{
defaultConfig: defaultConfig,
transform: transform,
// channels must have capacity at least 1, since we signal with non-blocking writes
pendingConfigSource: make(chan bool, 1),
configOk: status.NewConfigOkCondition(),
checkpointStore: store.NewFsStore(utilfs.DefaultFs{}, filepath.Join(dynamicConfigDir, checkpointsDir)),
configStatus: status.NewNodeConfigStatus(),
checkpointStore: store.NewFsStore(utilfs.DefaultFs{}, filepath.Join(dynamicConfigDir, storeDir)),
}
}
// Bootstrap attempts to return a valid KubeletConfiguration based on the configuration of the Controller,
// or returns an error if no valid configuration could be produced. Bootstrap should be called synchronously before StartSync.
// If the pre-existing local configuration should be used, Bootstrap returns a nil config.
func (cc *Controller) Bootstrap() (*kubeletconfig.KubeletConfiguration, error) {
utillog.Infof("starting controller")
// ALWAYS validate the local config. This makes incorrectly provisioned nodes an error.
// It must be valid because it is the default last-known-good config.
utillog.Infof("validating local config")
if err := validation.ValidateKubeletConfiguration(cc.defaultConfig); err != nil {
return nil, fmt.Errorf("local config failed validation, error: %v", err)
}
// ensure the filesystem is initialized
if err := cc.initializeDynamicConfigDir(); err != nil {
return nil, err
}
assigned, curSource, reason, err := cc.loadAssignedConfig(cc.defaultConfig)
// determine assigned source and set status
assignedSource, err := cc.checkpointStore.Assigned()
if err != nil {
return nil, err
}
if assignedSource != nil {
cc.configStatus.SetAssigned(assignedSource.NodeConfigSource())
}
// determine last-known-good source and set status
lastKnownGoodSource, err := cc.checkpointStore.LastKnownGood()
if err != nil {
return nil, err
}
if lastKnownGoodSource != nil {
cc.configStatus.SetLastKnownGood(lastKnownGoodSource.NodeConfigSource())
}
// if the assigned source is nil, return nil to indicate local config
if assignedSource == nil {
return nil, nil
}
// attempt to load assigned config
assignedConfig, reason, err := cc.loadConfig(assignedSource)
if err == nil {
// set the status to indicate we will use the assigned config
if curSource != nil {
cc.configOk.Set(fmt.Sprintf(status.CurRemoteMessageFmt, curSource.APIPath()), reason, apiv1.ConditionTrue)
} else {
cc.configOk.Set(status.CurLocalMessage, reason, apiv1.ConditionTrue)
}
// update the active source to the non-nil assigned source
cc.configStatus.SetActive(assignedSource.NodeConfigSource())
// update the last-known-good config if necessary, and start a timer that
// periodically checks whether the last-known good needs to be updated
@ -106,134 +140,126 @@ func (cc *Controller) Bootstrap() (*kubeletconfig.KubeletConfiguration, error) {
// wait.Forever will call the func once before starting the timer
go wait.Forever(func() { cc.checkTrial(configTrialDuration) }, 10*time.Second)
return assigned, nil
} // Assert: the assigned config failed to load, parse, or validate
return assignedConfig, nil
} // Assert: the assigned config failed to load or validate
// TODO(mtaufen): consider re-attempting download when a load/verify/parse/validate
// error happens outside trial period, we already made it past the trial so it's probably filesystem corruption
// or something else scary (unless someone is using a 0-length trial period)
// load from checkpoint
// or something else scary
// log the reason and error details for the failure to load the assigned config
utillog.Errorf(fmt.Sprintf("%s, error: %v", reason, err))
// load the last-known-good config
lkg, lkgSource, err := cc.loadLastKnownGoodConfig(cc.defaultConfig)
// set status to indicate the failure with the assigned config
cc.configStatus.SetError(reason)
// if the last-known-good source is nil, return nil to indicate local config
if lastKnownGoodSource == nil {
return nil, nil
}
// attempt to load the last-known-good config
lastKnownGoodConfig, _, err := cc.loadConfig(lastKnownGoodSource)
if err != nil {
// we failed to load the last-known-good, so something is really messed up and we just return the error
return nil, err
}
// set the status to indicate that we had to roll back to the lkg for the reason reported when we tried to load the assigned config
if lkgSource != nil {
cc.configOk.Set(fmt.Sprintf(status.LkgRemoteMessageFmt, lkgSource.APIPath()), reason, apiv1.ConditionFalse)
} else {
cc.configOk.Set(status.LkgLocalMessage, reason, apiv1.ConditionFalse)
}
// return the last-known-good config
return lkg, nil
// set status to indicate the active source is the non-nil last-known-good source
cc.configStatus.SetActive(lastKnownGoodSource.NodeConfigSource())
return lastKnownGoodConfig, nil
}
// StartSync launches the controller's sync loops if `client` is non-nil and `nodeName` is non-empty.
// It will always start the Node condition reporting loop, and will also start the dynamic conifg sync loops
// if dynamic config is enabled on the controller. If `nodeName` is empty but `client` is non-nil, an error is logged.
func (cc *Controller) StartSync(client clientset.Interface, eventClient v1core.EventsGetter, nodeName string) {
// StartSync tells the controller to start the goroutines that sync status/config to/from the API server.
// The clients must be non-nil, and the nodeName must be non-empty.
func (cc *Controller) StartSync(client clientset.Interface, eventClient v1core.EventsGetter, nodeName string) error {
const errFmt = "cannot start Kubelet config sync: %s"
if client == nil {
utillog.Infof("nil client, will not start sync loops")
return
} else if len(nodeName) == 0 {
utillog.Errorf("cannot start sync loops with empty nodeName")
return
return fmt.Errorf(errFmt, "nil client")
}
if eventClient == nil {
return fmt.Errorf(errFmt, "nil event client")
}
if nodeName == "" {
return fmt.Errorf(errFmt, "empty nodeName")
}
// start the ConfigOk condition sync loop
go utilpanic.HandlePanic(func() {
utillog.Infof("starting ConfigOk condition sync loop")
wait.JitterUntil(func() {
cc.configOk.Sync(client, nodeName)
}, 10*time.Second, 0.2, true, wait.NeverStop)
})()
cc.informer = newSharedNodeInformer(client, nodeName,
cc.onAddNodeEvent, cc.onUpdateNodeEvent, cc.onDeleteNodeEvent)
// start the informer loop
// Rather than use utilruntime.HandleCrash, which doesn't actually crash in the Kubelet,
// we use HandlePanic to manually call the panic handlers and then crash.
// We have a better chance of recovering normal operation if we just restart the Kubelet in the event
// of a Go runtime error.
go utilpanic.HandlePanic(func() {
utillog.Infof("starting Node informer sync loop")
cc.informer.Run(wait.NeverStop)
})()
// NOTE(mtaufen): utilpanic.HandlePanic returns a function and you have to call it for your thing to run!
// This was EVIL to debug (difficult to see missing `()`).
// The code now uses `go name()` instead of `go utilpanic.HandlePanic(func(){...})()` to avoid confusion.
// start the config source sync loop
go utilpanic.HandlePanic(func() {
utillog.Infof("starting config source sync loop")
// status sync worker
statusSyncLoopFunc := utilpanic.HandlePanic(func() {
utillog.Infof("starting status sync loop")
wait.JitterUntil(func() {
cc.configStatus.Sync(client, nodeName)
}, 10*time.Second, 0.2, true, wait.NeverStop)
})
// remote config source informer, if we have a remote source to watch
assignedSource, err := cc.checkpointStore.Assigned()
if err != nil {
return fmt.Errorf(errFmt, err)
} else if assignedSource == nil {
utillog.Infof("local source is assigned, will not start remote config source informer")
} else {
cc.remoteConfigSourceInformer = assignedSource.Informer(client, cache.ResourceEventHandlerFuncs{
AddFunc: cc.onAddRemoteConfigSourceEvent,
UpdateFunc: cc.onUpdateRemoteConfigSourceEvent,
DeleteFunc: cc.onDeleteRemoteConfigSourceEvent,
},
)
}
remoteConfigSourceInformerFunc := utilpanic.HandlePanic(func() {
if cc.remoteConfigSourceInformer != nil {
utillog.Infof("starting remote config source informer")
cc.remoteConfigSourceInformer.Run(wait.NeverStop)
}
})
// node informer
cc.nodeInformer = newSharedNodeInformer(client, nodeName,
cc.onAddNodeEvent, cc.onUpdateNodeEvent, cc.onDeleteNodeEvent)
nodeInformerFunc := utilpanic.HandlePanic(func() {
utillog.Infof("starting Node informer")
cc.nodeInformer.Run(wait.NeverStop)
})
// config sync worker
configSyncLoopFunc := utilpanic.HandlePanic(func() {
utillog.Infof("starting Kubelet config sync loop")
wait.JitterUntil(func() {
cc.syncConfigSource(client, eventClient, nodeName)
}, 10*time.Second, 0.2, true, wait.NeverStop)
})()
})
go statusSyncLoopFunc()
go remoteConfigSourceInformerFunc()
go nodeInformerFunc()
go configSyncLoopFunc()
return nil
}
// loadAssignedConfig loads the Kubelet's currently assigned config,
// based on the setting in the local checkpoint store.
// It returns the loaded configuration, the checkpoint store's config source record,
// a clean success or failure reason that can be reported in the status, and any error that occurs.
// If the local config should be used, it will be returned. You should validate local before passing it to this function.
func (cc *Controller) loadAssignedConfig(local *kubeletconfig.KubeletConfiguration) (*kubeletconfig.KubeletConfiguration, checkpoint.RemoteConfigSource, string, error) {
src, err := cc.checkpointStore.Current()
// loadConfig loads Kubelet config from a checkpoint
// It returns the loaded configuration or a clean failure reason (for status reporting) and an error.
func (cc *Controller) loadConfig(source checkpoint.RemoteConfigSource) (*kubeletconfig.KubeletConfiguration, string, error) {
// load KubeletConfiguration from checkpoint
kc, err := cc.checkpointStore.Load(source)
if err != nil {
return nil, nil, fmt.Sprintf(status.CurFailLoadReasonFmt, "unknown"), err
return nil, status.LoadError, err
}
// nil source is the signal to use the local config
if src == nil {
return local, src, status.CurLocalOkayReason, nil
// apply any required transformations to the KubeletConfiguration
if cc.transform != nil {
if err := cc.transform(kc); err != nil {
return nil, status.InternalError, err
}
}
curUID := src.UID()
// load from checkpoint
checkpoint, err := cc.checkpointStore.Load(curUID)
if err != nil {
return nil, src, fmt.Sprintf(status.CurFailLoadReasonFmt, src.APIPath()), err
// validate the result
if err := validation.ValidateKubeletConfiguration(kc); err != nil {
return nil, status.ValidateError, err
}
cur, err := checkpoint.Parse()
if err != nil {
return nil, src, fmt.Sprintf(status.CurFailParseReasonFmt, src.APIPath()), err
}
if err := validation.ValidateKubeletConfiguration(cur); err != nil {
return nil, src, fmt.Sprintf(status.CurFailValidateReasonFmt, src.APIPath()), err
}
return cur, src, status.CurRemoteOkayReason, nil
}
// loadLastKnownGoodConfig loads the Kubelet's last-known-good config,
// based on the setting in the local checkpoint store.
// It returns the loaded configuration, the checkpoint store's config source record,
// and any error that occurs.
// If the local config should be used, it will be returned. You should validate local before passing it to this function.
func (cc *Controller) loadLastKnownGoodConfig(local *kubeletconfig.KubeletConfiguration) (*kubeletconfig.KubeletConfiguration, checkpoint.RemoteConfigSource, error) {
src, err := cc.checkpointStore.LastKnownGood()
if err != nil {
return nil, nil, fmt.Errorf("unable to determine last-known-good config, error: %v", err)
}
// nil source is the signal to use the local config
if src == nil {
return local, src, nil
}
lkgUID := src.UID()
// load from checkpoint
checkpoint, err := cc.checkpointStore.Load(lkgUID)
if err != nil {
return nil, src, fmt.Errorf("%s, error: %v", fmt.Sprintf(status.LkgFailLoadReasonFmt, src.APIPath()), err)
}
lkg, err := checkpoint.Parse()
if err != nil {
return nil, src, fmt.Errorf("%s, error: %v", fmt.Sprintf(status.LkgFailParseReasonFmt, src.APIPath()), err)
}
if err := validation.ValidateKubeletConfiguration(lkg); err != nil {
return nil, src, fmt.Errorf("%s, error: %v", fmt.Sprintf(status.LkgFailValidateReasonFmt, src.APIPath()), err)
}
return lkg, src, nil
return kc, "", nil
}
// initializeDynamicConfigDir makes sure that the storage layers for various controller components are set up correctly
@ -249,17 +275,16 @@ func (cc *Controller) checkTrial(duration time.Duration) {
if trial, err := cc.inTrial(duration); err != nil {
utillog.Errorf("failed to check trial period for assigned config, error: %v", err)
} else if !trial {
utillog.Infof("assigned config passed trial period, will set as last-known-good")
if err := cc.graduateAssignedToLastKnownGood(); err != nil {
utillog.Errorf("failed to set last-known-good to assigned config, error: %v", err)
}
}
}
// inTrial returns true if the time elapsed since the last modification of the current config does not exceed `trialDur`, false otherwise
// inTrial returns true if the time elapsed since the last modification of the assigned config does not exceed `trialDur`, false otherwise
func (cc *Controller) inTrial(trialDur time.Duration) (bool, error) {
now := time.Now()
t, err := cc.checkpointStore.CurrentModified()
t, err := cc.checkpointStore.AssignedModified()
if err != nil {
return false, err
}
@ -269,16 +294,31 @@ func (cc *Controller) inTrial(trialDur time.Duration) (bool, error) {
return false, nil
}
// graduateAssignedToLastKnownGood sets the last-known-good UID on the checkpointStore
// to the same value as the current UID maintained by the checkpointStore
// graduateAssignedToLastKnownGood sets the last-known-good in the checkpointStore
// to the same value as the assigned config maintained by the checkpointStore
func (cc *Controller) graduateAssignedToLastKnownGood() error {
curUID, err := cc.checkpointStore.Current()
// get assigned
assigned, err := cc.checkpointStore.Assigned()
if err != nil {
return err
}
err = cc.checkpointStore.SetLastKnownGood(curUID)
// get last-known-good
lastKnownGood, err := cc.checkpointStore.LastKnownGood()
if err != nil {
return err
}
// if the sources are equal, no need to change
if assigned == lastKnownGood ||
assigned != nil && lastKnownGood != nil && apiequality.Semantic.DeepEqual(assigned, lastKnownGood) {
return nil
}
// update last-known-good
err = cc.checkpointStore.SetLastKnownGood(assigned)
if err != nil {
return err
}
// update the status to reflect the new last-known-good config
cc.configStatus.SetLastKnownGood(assigned.NodeConfigSource())
utillog.Infof("updated last-known-good config to %s, UID: %s, ResourceVersion: %s", assigned.APIPath(), assigned.UID(), assigned.ResourceVersion())
return nil
}

View File

@ -10,14 +10,12 @@ go_library(
srcs = ["status.go"],
importpath = "k8s.io/kubernetes/pkg/kubelet/kubeletconfig/status",
deps = [
"//pkg/api/legacyscheme:go_default_library",
"//pkg/apis/core:go_default_library",
"//pkg/kubelet/kubeletconfig/util/equal:go_default_library",
"//pkg/kubelet/kubeletconfig/util/log:go_default_library",
"//pkg/kubelet/metrics:go_default_library",
"//pkg/util/node:go_default_library",
"//vendor/k8s.io/api/core/v1:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/runtime:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/util/strategicpatch:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/types:go_default_library",
"//vendor/k8s.io/client-go/kubernetes:go_default_library",
],
)

View File

@ -19,302 +19,184 @@ package status
import (
"fmt"
"sync"
"time"
apiv1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
kuberuntime "k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/util/strategicpatch"
"k8s.io/apimachinery/pkg/types"
clientset "k8s.io/client-go/kubernetes"
"k8s.io/kubernetes/pkg/api/legacyscheme"
api "k8s.io/kubernetes/pkg/apis/core"
utilequal "k8s.io/kubernetes/pkg/kubelet/kubeletconfig/util/equal"
utillog "k8s.io/kubernetes/pkg/kubelet/kubeletconfig/util/log"
"k8s.io/kubernetes/pkg/kubelet/metrics"
nodeutil "k8s.io/kubernetes/pkg/util/node"
)
// TODO(mtaufen): s/current/assigned, as this is more accurate e.g. if you are using lkg, you aren't currently using "current" :)
const (
// CurLocalMessage indicates that the Kubelet is using its local config, which consists of defaults, flags, and/or local files
CurLocalMessage = "using current: local"
// LkgLocalMessage indicates that the Kubelet is using its local config, which consists of defaults, flags, and/or local files
LkgLocalMessage = "using last-known-good: local"
// LoadError indicates that the Kubelet failed to load the config checkpoint
LoadError = "failed to load config, see Kubelet log for details"
// ValidateError indicates that the Kubelet failed to validate the config checkpoint
ValidateError = "failed to validate config, see Kubelet log for details"
// AllNilSubfieldsError is used when no subfields are set
// This could happen in the case that an old client tries to read an object from a newer API server with a set subfield it does not know about
AllNilSubfieldsError = "invalid NodeConfigSource, exactly one subfield must be non-nil, but all were nil"
// DownloadError is used when the download fails, e.g. due to network issues
DownloadError = "failed to download config, see Kubelet log for details"
// InternalError indicates that some internal error happened while trying to sync config, e.g. filesystem issues
InternalError = "internal failure, see Kubelet log for details"
// CurRemoteMessageFmt indicates the Kubelet is using its current config, which is from an API source
CurRemoteMessageFmt = "using current: %s"
// LkgRemoteMessageFmt indicates the Kubelet is using its last-known-good config, which is from an API source
LkgRemoteMessageFmt = "using last-known-good: %s"
// CurLocalOkayReason indicates that the Kubelet is using its local config
CurLocalOkayReason = "when the config source is nil, the Kubelet uses its local config"
// CurRemoteOkayReason indicates that the config referenced by Node.ConfigSource is currently passing all checks
CurRemoteOkayReason = "passing all checks"
// CurFailLoadReasonFmt indicates that the Kubelet failed to load the current config checkpoint for an API source
CurFailLoadReasonFmt = "failed to load current: %s"
// CurFailParseReasonFmt indicates that the Kubelet failed to parse the current config checkpoint for an API source
CurFailParseReasonFmt = "failed to parse current: %s"
// CurFailValidateReasonFmt indicates that the Kubelet failed to validate the current config checkpoint for an API source
CurFailValidateReasonFmt = "failed to validate current: %s"
// LkgFail*ReasonFmt reasons are currently used to print errors in the Kubelet log, but do not appear in Node.Status.Conditions
// LkgFailLoadReasonFmt indicates that the Kubelet failed to load the last-known-good config checkpoint for an API source
LkgFailLoadReasonFmt = "failed to load last-known-good: %s"
// LkgFailParseReasonFmt indicates that the Kubelet failed to parse the last-known-good config checkpoint for an API source
LkgFailParseReasonFmt = "failed to parse last-known-good: %s"
// LkgFailValidateReasonFmt indicates that the Kubelet failed to validate the last-known-good config checkpoint for an API source
LkgFailValidateReasonFmt = "failed to validate last-known-good: %s"
// FailSyncReasonFmt is used when the system couldn't sync the config, due to a malformed Node.Spec.ConfigSource, a download failure, etc.
FailSyncReasonFmt = "failed to sync, reason: %s"
// FailSyncReasonAllNilSubfields is used when no subfields are set
FailSyncReasonAllNilSubfields = "invalid NodeConfigSource, exactly one subfield must be non-nil, but all were nil"
// FailSyncReasonPartialObjectReference is used when some required subfields remain unset
FailSyncReasonPartialObjectReference = "invalid ObjectReference, all of UID, Name, and Namespace must be specified"
// FailSyncReasonUIDMismatchFmt is used when there is a UID mismatch between the referenced and downloaded ConfigMaps,
// this can happen because objects must be downloaded by namespace/name, rather than by UID
FailSyncReasonUIDMismatchFmt = "invalid ConfigSource.ConfigMapRef.UID: %s does not match %s.UID: %s"
// FailSyncReasonDownloadFmt is used when the download fails, e.g. due to network issues
FailSyncReasonDownloadFmt = "failed to download: %s"
// FailSyncReasonInformer is used when the informer fails to report the Node object
FailSyncReasonInformer = "failed to read Node from informer object cache"
// FailSyncReasonReset is used when we can't reset the local configuration references, e.g. due to filesystem issues
FailSyncReasonReset = "failed to reset to local config"
// FailSyncReasonCheckpointExistenceFmt is used when we can't determine if a checkpoint already exists, e.g. due to filesystem issues
FailSyncReasonCheckpointExistenceFmt = "failed to determine whether object %s with UID %s was already checkpointed"
// FailSyncReasonSaveCheckpointFmt is used when we can't save a checkpoint, e.g. due to filesystem issues
FailSyncReasonSaveCheckpointFmt = "failed to save config checkpoint for object %s with UID %s"
// FailSyncReasonSetCurrentDefault is used when we can't set the current config checkpoint to the local default, e.g. due to filesystem issues
FailSyncReasonSetCurrentLocal = "failed to set current config checkpoint to local config"
// FailSyncReasonSetCurrentUIDFmt is used when we can't set the current config checkpoint to a checkpointed object, e.g. due to filesystem issues
FailSyncReasonSetCurrentUIDFmt = "failed to set current config checkpoint to object %s with UID %s"
// EmptyMessage is a placeholder in the case that we accidentally set the condition's message to the empty string.
// Doing so can result in a partial patch, and thus a confusing status; this makes it clear that the message was not provided.
EmptyMessage = "unknown - message not provided"
// EmptyReason is a placeholder in the case that we accidentally set the condition's reason to the empty string.
// Doing so can result in a partial patch, and thus a confusing status; this makes it clear that the reason was not provided.
EmptyReason = "unknown - reason not provided"
// SyncErrorFmt is used when the system couldn't sync the config, due to a malformed Node.Spec.ConfigSource, a download failure, etc.
SyncErrorFmt = "failed to sync: %s"
)
// ConfigOkCondition represents a ConfigOk NodeCondition
type ConfigOkCondition interface {
// Set sets the Message, Reason, and Status of the condition
Set(message, reason string, status apiv1.ConditionStatus)
// SetFailSyncCondition sets the condition for when syncing Kubelet config fails
SetFailSyncCondition(reason string)
// ClearFailSyncCondition clears the overlay from SetFailSyncCondition
ClearFailSyncCondition()
// Sync patches the current condition into the Node identified by `nodeName`
// NodeConfigStatus represents Node.Status.Config
type NodeConfigStatus interface {
// SetActive sets the active source in the status
SetActive(source *apiv1.NodeConfigSource)
// SetAssigned sets the assigned source in the status
SetAssigned(source *apiv1.NodeConfigSource)
// SetLastKnownGood sets the last-known-good source in the status
SetLastKnownGood(source *apiv1.NodeConfigSource)
// SetError sets the error associated with the status
SetError(err string)
// SetErrorOverride sets an error that overrides the base error set by SetError.
// If the override is set to the empty string, the base error is reported in
// the status, otherwise the override is reported.
SetErrorOverride(err string)
// Sync patches the current status into the Node identified by `nodeName` if an update is pending
Sync(client clientset.Interface, nodeName string)
}
// configOkCondition implements ConfigOkCondition
type configOkCondition struct {
// conditionMux is a mutex on the condition, alternate between setting and syncing the condition
conditionMux sync.Mutex
// message will appear as the condition's message
message string
// reason will appear as the condition's reason
reason string
// status will appear as the condition's status
status apiv1.ConditionStatus
// failedSyncReason is sent in place of the usual reason when the Kubelet is failing to sync the remote config
failedSyncReason string
// pendingCondition; write to this channel to indicate that ConfigOk needs to be synced to the API server
pendingCondition chan bool
type nodeConfigStatus struct {
// status is the core NodeConfigStatus that we report
status apiv1.NodeConfigStatus
// mux is a mutex on the nodeConfigStatus, alternate between setting and syncing the status
mux sync.Mutex
// errorOverride is sent in place of the usual error if it is non-empty
errorOverride string
// syncCh; write to this channel to indicate that the status needs to be synced to the API server
syncCh chan bool
}
// NewConfigOkCondition returns a new ConfigOkCondition
func NewConfigOkCondition() ConfigOkCondition {
return &configOkCondition{
message: EmptyMessage,
reason: EmptyReason,
status: apiv1.ConditionUnknown,
// channels must have capacity at least 1, since we signal with non-blocking writes
pendingCondition: make(chan bool, 1),
// NewNodeConfigStatus returns a new NodeConfigStatus interface
func NewNodeConfigStatus() NodeConfigStatus {
// channels must have capacity at least 1, since we signal with non-blocking writes
syncCh := make(chan bool, 1)
// prime new status managers to sync with the API server on the first call to Sync
syncCh <- true
return &nodeConfigStatus{
syncCh: syncCh,
}
}
// unsafeSet sets the current state of the condition
// it does not grab the conditionMux lock, so you should generally use setConfigOk unless you need to grab the lock
// at a higher level to synchronize additional operations
func (c *configOkCondition) unsafeSet(message, reason string, status apiv1.ConditionStatus) {
// We avoid an empty Message, Reason, or Status on the condition. Since we use Patch to update conditions, an empty
// field might cause a value from a previous condition to leak through, which can be very confusing.
// message
if len(message) == 0 {
message = EmptyMessage
}
c.message = message
// reason
if len(reason) == 0 {
reason = EmptyReason
}
c.reason = reason
// status
if len(string(status)) == 0 {
status = apiv1.ConditionUnknown
}
c.status = status
// always poke worker after update
c.pokeSyncWorker()
// transact grabs the lock, performs the fn, records the need to sync, and releases the lock
func (s *nodeConfigStatus) transact(fn func()) {
s.mux.Lock()
defer s.mux.Unlock()
fn()
s.sync()
}
func (c *configOkCondition) Set(message, reason string, status apiv1.ConditionStatus) {
c.conditionMux.Lock()
defer c.conditionMux.Unlock()
c.unsafeSet(message, reason, status)
func (s *nodeConfigStatus) SetAssigned(source *apiv1.NodeConfigSource) {
s.transact(func() {
s.status.Assigned = source
})
}
// SetFailSyncCondition updates the ConfigOk status to reflect that we failed to sync to the latest config,
// e.g. due to a malformed Node.Spec.ConfigSource, a download failure, etc.
func (c *configOkCondition) SetFailSyncCondition(reason string) {
c.conditionMux.Lock()
defer c.conditionMux.Unlock()
// set the reason overlay and poke the sync worker to send the update
c.failedSyncReason = fmt.Sprintf(FailSyncReasonFmt, reason)
c.pokeSyncWorker()
func (s *nodeConfigStatus) SetActive(source *apiv1.NodeConfigSource) {
s.transact(func() {
s.status.Active = source
})
}
// ClearFailSyncCondition removes the "failed to sync" reason overlay
func (c *configOkCondition) ClearFailSyncCondition() {
c.conditionMux.Lock()
defer c.conditionMux.Unlock()
// clear the reason overlay and poke the sync worker to send the update
c.failedSyncReason = ""
c.pokeSyncWorker()
func (s *nodeConfigStatus) SetLastKnownGood(source *apiv1.NodeConfigSource) {
s.transact(func() {
s.status.LastKnownGood = source
})
}
// pokeSyncWorker notes that the ConfigOk condition needs to be synced to the API server
func (c *configOkCondition) pokeSyncWorker() {
func (s *nodeConfigStatus) SetError(err string) {
s.transact(func() {
s.status.Error = err
})
}
func (s *nodeConfigStatus) SetErrorOverride(err string) {
s.transact(func() {
s.errorOverride = err
})
}
// sync notes that the status needs to be synced to the API server
func (s *nodeConfigStatus) sync() {
select {
case c.pendingCondition <- true:
case s.syncCh <- true:
default:
}
}
// Sync attempts to sync `c.condition` with the Node object for this Kubelet,
// Sync attempts to sync the status with the Node object for this Kubelet,
// if syncing fails, an error is logged, and work is queued for retry.
func (c *configOkCondition) Sync(client clientset.Interface, nodeName string) {
func (s *nodeConfigStatus) Sync(client clientset.Interface, nodeName string) {
select {
case <-c.pendingCondition:
case <-s.syncCh:
default:
// no work to be done, return
return
}
utillog.Infof("updating Node.Status.Config")
// grab the lock
c.conditionMux.Lock()
defer c.conditionMux.Unlock()
s.mux.Lock()
defer s.mux.Unlock()
// if the sync fails, we want to retry
var err error
defer func() {
if err != nil {
utillog.Errorf(err.Error())
c.pokeSyncWorker()
s.sync()
}
}()
// get the Node so we can check the current condition
node, err := client.CoreV1().Nodes().Get(nodeName, metav1.GetOptions{})
// get the Node so we can check the current status
oldNode, err := client.CoreV1().Nodes().Get(nodeName, metav1.GetOptions{})
if err != nil {
err = fmt.Errorf("could not get Node %q, will not sync ConfigOk condition, error: %v", nodeName, err)
err = fmt.Errorf("could not get Node %q, will not sync status, error: %v", nodeName, err)
return
}
// construct the node condition
condition := &apiv1.NodeCondition{
Type: apiv1.NodeKubeletConfigOk,
Message: c.message,
Reason: c.reason,
Status: c.status,
status := &s.status
// override error, if necessary
if len(s.errorOverride) > 0 {
// copy the status, so we don't overwrite the prior error
// with the override
status = status.DeepCopy()
status.Error = s.errorOverride
}
// overlay failed sync reason, if necessary
if len(c.failedSyncReason) > 0 {
condition.Reason = c.failedSyncReason
condition.Status = apiv1.ConditionFalse
}
// set timestamps
syncTime := metav1.NewTime(time.Now())
condition.LastHeartbeatTime = syncTime
if remote := getKubeletConfigOk(node.Status.Conditions); remote == nil || !utilequal.KubeletConfigOkEq(remote, condition) {
// update transition time the first time we create the condition,
// or if we are semantically changing the condition
condition.LastTransitionTime = syncTime
} else {
// since the conditions are semantically equal, use lastTransitionTime from the condition currently on the Node
// we need to do this because the field will always be represented in the patch generated below, and this copy
// prevents nullifying the field during the patch operation
condition.LastTransitionTime = remote.LastTransitionTime
}
// generate the patch
mediaType := "application/json"
info, ok := kuberuntime.SerializerInfoForMediaType(legacyscheme.Codecs.SupportedMediaTypes(), mediaType)
if !ok {
err = fmt.Errorf("unsupported media type %q", mediaType)
return
}
versions := legacyscheme.Registry.EnabledVersionsForGroup(api.GroupName)
if len(versions) == 0 {
err = fmt.Errorf("no enabled versions for group %q", api.GroupName)
return
}
// the "best" version supposedly comes first in the list returned from apiv1.Registry.EnabledVersionsForGroup
encoder := legacyscheme.Codecs.EncoderForVersion(info.Serializer, versions[0])
before, err := kuberuntime.Encode(encoder, node)
// update metrics based on the status we will sync
metrics.SetConfigError(len(status.Error) > 0)
err = metrics.SetAssignedConfig(status.Assigned)
if err != nil {
err = fmt.Errorf(`failed to encode "before" node while generating patch, error: %v`, err)
err = fmt.Errorf("failed to update Assigned config metric, error: %v", err)
return
}
err = metrics.SetActiveConfig(status.Active)
if err != nil {
err = fmt.Errorf("failed to update Active config metric, error: %v", err)
return
}
err = metrics.SetLastKnownGoodConfig(status.LastKnownGood)
if err != nil {
err = fmt.Errorf("failed to update LastKnownGood config metric, error: %v", err)
return
}
patchConfigOk(node, condition)
after, err := kuberuntime.Encode(encoder, node)
if err != nil {
err = fmt.Errorf(`failed to encode "after" node while generating patch, error: %v`, err)
return
}
// apply the status to a copy of the node so we don't modify the object in the informer's store
newNode := oldNode.DeepCopy()
newNode.Status.Config = status
patch, err := strategicpatch.CreateTwoWayMergePatch(before, after, apiv1.Node{})
if err != nil {
err = fmt.Errorf("failed to generate patch for updating ConfigOk condition, error: %v", err)
return
}
// patch the remote Node object
_, err = client.CoreV1().Nodes().PatchStatus(nodeName, patch)
if err != nil {
err = fmt.Errorf("could not update ConfigOk condition, error: %v", err)
return
// patch the node with the new status
if _, _, err := nodeutil.PatchNodeStatus(client.CoreV1(), types.NodeName(nodeName), oldNode, newNode); err != nil {
utillog.Errorf("failed to patch node status, error: %v", err)
}
}
// patchConfigOk replaces or adds the ConfigOk condition to the node
func patchConfigOk(node *apiv1.Node, configOk *apiv1.NodeCondition) {
for i := range node.Status.Conditions {
if node.Status.Conditions[i].Type == apiv1.NodeKubeletConfigOk {
// edit the condition
node.Status.Conditions[i] = *configOk
return
}
}
// append the condition
node.Status.Conditions = append(node.Status.Conditions, *configOk)
}
// getKubeletConfigOk returns the first NodeCondition in `cs` with Type == apiv1.NodeKubeletConfigOk,
// or if no such condition exists, returns nil.
func getKubeletConfigOk(cs []apiv1.NodeCondition) *apiv1.NodeCondition {
for i := range cs {
if cs[i].Type == apiv1.NodeKubeletConfigOk {
return &cs[i]
}
}
return nil
}

View File

@ -13,7 +13,9 @@ go_library(
"//pkg/api/legacyscheme:go_default_library",
"//pkg/apis/core/install:go_default_library",
"//pkg/kubelet/apis/kubeletconfig:go_default_library",
"//pkg/kubelet/apis/kubeletconfig/scheme:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/runtime:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/runtime/schema:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/runtime/serializer:go_default_library",
],
)

View File

@ -23,23 +23,51 @@ import (
_ "k8s.io/kubernetes/pkg/apis/core/install"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/runtime/serializer"
"k8s.io/kubernetes/pkg/api/legacyscheme"
"k8s.io/kubernetes/pkg/kubelet/apis/kubeletconfig"
"k8s.io/kubernetes/pkg/kubelet/apis/kubeletconfig/scheme"
)
// TODO(mtaufen): allow an encoder to be injected into checkpoint objects at creation time? (then we could ultimately instantiate only one encoder)
// EncodeKubeletConfig encodes an internal KubeletConfiguration to an external YAML representation
func EncodeKubeletConfig(internal *kubeletconfig.KubeletConfiguration, targetVersion schema.GroupVersion) ([]byte, error) {
encoder, err := NewKubeletconfigYAMLEncoder(targetVersion)
if err != nil {
return nil, err
}
// encoder will convert to external version
data, err := runtime.Encode(encoder, internal)
if err != nil {
return nil, err
}
return data, nil
}
// NewJSONEncoder generates a new runtime.Encoder that encodes objects to JSON
func NewJSONEncoder(groupName string) (runtime.Encoder, error) {
// encode to json
mediaType := "application/json"
// NewKubeletconfigYAMLEncoder returns an encoder that can write objects in the kubeletconfig API group to YAML
func NewKubeletconfigYAMLEncoder(targetVersion schema.GroupVersion) (runtime.Encoder, error) {
_, codecs, err := scheme.NewSchemeAndCodecs()
if err != nil {
return nil, err
}
mediaType := "application/yaml"
info, ok := runtime.SerializerInfoForMediaType(codecs.SupportedMediaTypes(), mediaType)
if !ok {
return nil, fmt.Errorf("unsupported media type %q", mediaType)
}
return codecs.EncoderForVersion(info.Serializer, targetVersion), nil
}
// NewYAMLEncoder generates a new runtime.Encoder that encodes objects to YAML
func NewYAMLEncoder(groupName string) (runtime.Encoder, error) {
// encode to YAML
mediaType := "application/yaml"
info, ok := runtime.SerializerInfoForMediaType(legacyscheme.Codecs.SupportedMediaTypes(), mediaType)
if !ok {
return nil, fmt.Errorf("unsupported media type %q", mediaType)
}
versions := legacyscheme.Registry.EnabledVersionsForGroup(groupName)
versions := legacyscheme.Scheme.PrioritizedVersionsForGroup(groupName)
if len(versions) == 0 {
return nil, fmt.Errorf("no enabled versions for group %q", groupName)
}

View File

@ -18,33 +18,6 @@ package equal
import apiv1 "k8s.io/api/core/v1"
// ConfigSourceEq returns true if the two config sources are semantically equivalent in the context of dynamic config
func ConfigSourceEq(a, b *apiv1.NodeConfigSource) bool {
if a == b {
return true
} else if a == nil || b == nil {
// not equal, and one is nil
return false
}
// check equality of config source subifelds
if a.ConfigMapRef != b.ConfigMapRef {
return ObjectRefEq(a.ConfigMapRef, b.ConfigMapRef)
}
// all internal subfields of the config source are equal
return true
}
// ObjectRefEq returns true if the two object references are semantically equivalent in the context of dynamic config
func ObjectRefEq(a, b *apiv1.ObjectReference) bool {
if a == b {
return true
} else if a == nil || b == nil {
// not equal, and one is nil
return false
}
return a.UID == b.UID && a.Namespace == b.Namespace && a.Name == b.Name
}
// KubeletConfigOkEq returns true if the two conditions are semantically equivalent in the context of dynamic config
func KubeletConfigOkEq(a, b *apiv1.NodeCondition) bool {
return a.Message == b.Message && a.Reason == b.Reason && a.Status == b.Status

View File

@ -3,6 +3,7 @@ package(default_visibility = ["//visibility:public"])
load(
"@io_bazel_rules_go//go:def.bzl",
"go_library",
"go_test",
)
go_library(
@ -24,3 +25,13 @@ filegroup(
srcs = [":package-srcs"],
tags = ["automanaged"],
)
go_test(
name = "go_default_test",
srcs = ["files_test.go"],
embed = [":go_default_library"],
deps = [
"//pkg/kubelet/kubeletconfig/util/test:go_default_library",
"//pkg/util/filesystem:go_default_library",
],
)

View File

@ -24,7 +24,10 @@ import (
utilfs "k8s.io/kubernetes/pkg/util/filesystem"
)
const defaultPerm = 0666
const (
defaultPerm = 0755
tmptag = "tmp_" // additional prefix to prevent accidental collisions
)
// FileExists returns true if a regular file exists at `path`, false if `path` does not exist, otherwise an error
func FileExists(fs utilfs.Filesystem, path string) (bool, error) {
@ -64,9 +67,10 @@ func EnsureFile(fs utilfs.Filesystem, path string) error {
}
// WriteTmpFile creates a temporary file at `path`, writes `data` into it, and fsyncs the file
// Expects the parent directory to exist.
func WriteTmpFile(fs utilfs.Filesystem, path string, data []byte) (tmpPath string, retErr error) {
dir := filepath.Dir(path)
prefix := filepath.Base(path)
prefix := tmptag + filepath.Base(path)
// create the tmp file
tmpFile, err := fs.TempFile(dir, prefix)
@ -81,7 +85,7 @@ func WriteTmpFile(fs utilfs.Filesystem, path string, data []byte) (tmpPath strin
// if there was an error writing, syncing, or closing, delete the temporary file and return the error
if retErr != nil {
if err := fs.Remove(tmpPath); err != nil {
retErr = fmt.Errorf("attempted to remove temporary file %q after error %v, but failed due to error: %v", path, retErr, err)
retErr = fmt.Errorf("attempted to remove temporary file %q after error %v, but failed due to error: %v", tmpPath, retErr, err)
}
tmpPath = ""
}
@ -100,7 +104,8 @@ func WriteTmpFile(fs utilfs.Filesystem, path string, data []byte) (tmpPath strin
}
// ReplaceFile replaces the contents of the file at `path` with `data` by writing to a tmp file in the same
// dir as `path` and renaming the tmp file over `path`. The file does not have to exist to use ReplaceFile.
// dir as `path` and renaming the tmp file over `path`. The file does not have to exist to use ReplaceFile,
// but the parent directory must exist.
// Note ReplaceFile calls fsync.
func ReplaceFile(fs utilfs.Filesystem, path string, data []byte) error {
// write data to a temporary file
@ -118,7 +123,7 @@ func DirExists(fs utilfs.Filesystem, path string) (bool, error) {
if info.IsDir() {
return true, nil
}
return false, fmt.Errorf("expected dir at %q, but mode is is %q", path, info.Mode().String())
return false, fmt.Errorf("expected dir at %q, but mode is %q", path, info.Mode().String())
} else if os.IsNotExist(err) {
return false, nil
} else {
@ -137,3 +142,88 @@ func EnsureDir(fs utilfs.Filesystem, path string) error {
// create the dir
return fs.MkdirAll(path, defaultPerm)
}
// WriteTempDir creates a temporary dir at `path`, writes `files` into it, and fsyncs all the files
// The keys of `files` represent file names. These names must not:
// - be empty
// - be a path that contains more than the base name of a file (e.g. foo/bar is invalid, as is /bar)
// - match `.` or `..` exactly
// - be longer than 255 characters
// The above validation rules are based on atomic_writer.go, though in this case are more restrictive
// because we only allow a flat hierarchy.
func WriteTempDir(fs utilfs.Filesystem, path string, files map[string]string) (tmpPath string, retErr error) {
// validate the filename keys; for now we only allow a flat keyset
for name := range files {
// invalidate empty names
if name == "" {
return "", fmt.Errorf("invalid file key: must not be empty: %q", name)
}
// invalidate: foo/bar and /bar
if name != filepath.Base(name) {
return "", fmt.Errorf("invalid file key %q, only base names are allowed", name)
}
// invalidate `.` and `..`
if name == "." || name == ".." {
return "", fmt.Errorf("invalid file key, may not be '.' or '..'")
}
// invalidate length > 255 characters
if len(name) > 255 {
return "", fmt.Errorf("invalid file key %q, must be less than 255 characters", name)
}
}
// write the temp directory in parent dir and return path to the tmp directory
dir := filepath.Dir(path)
prefix := tmptag + filepath.Base(path)
// create the tmp dir
var err error
tmpPath, err = fs.TempDir(dir, prefix)
if err != nil {
return "", err
}
// be sure to clean up if there was an error
defer func() {
if retErr != nil {
if err := fs.RemoveAll(tmpPath); err != nil {
retErr = fmt.Errorf("attempted to remove temporary directory %q after error %v, but failed due to error: %v", tmpPath, retErr, err)
}
}
}()
// write data
for name, data := range files {
// create the file
file, err := fs.Create(filepath.Join(tmpPath, name))
if err != nil {
return tmpPath, err
}
// be sure to close the file when we're done
defer func() {
// close the file when we're done, don't overwrite primary retErr if close fails
if err := file.Close(); retErr == nil {
retErr = err
}
}()
// write the file
if _, err := file.Write([]byte(data)); err != nil {
return tmpPath, err
}
// sync the file, to ensure it's written in case a hard reset happens
if err := file.Sync(); err != nil {
return tmpPath, err
}
}
return tmpPath, nil
}
// ReplaceDir replaces the contents of the dir at `path` with `files` by writing to a tmp dir in the same
// dir as `path` and renaming the tmp dir over `path`. The dir does not have to exist to use ReplaceDir.
func ReplaceDir(fs utilfs.Filesystem, path string, files map[string]string) error {
// write data to a temporary directory
tmpPath, err := WriteTempDir(fs, path, files)
if err != nil {
return err
}
// rename over target directory
return fs.Rename(tmpPath, path)
}

View File

@ -0,0 +1,476 @@
/*
Copyright 2018 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package files
import (
"fmt"
"os"
"path/filepath"
"testing"
utiltest "k8s.io/kubernetes/pkg/kubelet/kubeletconfig/util/test"
utilfs "k8s.io/kubernetes/pkg/util/filesystem"
)
const (
prefix = "test-util-files"
)
type file struct {
name string
// mode distinguishes file type,
// we only check for regular vs. directory in these tests,
// specify regular as 0, directory as os.ModeDir
mode os.FileMode
data string // ignored if mode == os.ModeDir
}
func (f *file) write(fs utilfs.Filesystem, dir string) error {
path := filepath.Join(dir, f.name)
if f.mode.IsDir() {
if err := fs.MkdirAll(path, defaultPerm); err != nil {
return err
}
} else if f.mode.IsRegular() {
// create parent directories, if necessary
parents := filepath.Dir(path)
if err := fs.MkdirAll(parents, defaultPerm); err != nil {
return err
}
// create the file
handle, err := fs.Create(path)
if err != nil {
return err
}
_, err = handle.Write([]byte(f.data))
if err != nil {
if cerr := handle.Close(); cerr != nil {
return fmt.Errorf("error %v closing file after error: %v", cerr, err)
}
return err
}
} else {
return fmt.Errorf("mode not implemented for testing %s", f.mode.String())
}
return nil
}
func (f *file) expect(fs utilfs.Filesystem, dir string) error {
path := filepath.Join(dir, f.name)
if f.mode.IsDir() {
info, err := fs.Stat(path)
if err != nil {
return err
}
if !info.IsDir() {
return fmt.Errorf("expected directory, got mode %s", info.Mode().String())
}
} else if f.mode.IsRegular() {
info, err := fs.Stat(path)
if err != nil {
return err
}
if !info.Mode().IsRegular() {
return fmt.Errorf("expected regular file, got mode %s", info.Mode().String())
}
data, err := fs.ReadFile(path)
if err != nil {
return err
}
if f.data != string(data) {
return fmt.Errorf("expected file data %q, got %q", f.data, string(data))
}
} else {
return fmt.Errorf("mode not implemented for testing %s", f.mode.String())
}
return nil
}
// write files, perform some function, then attempt to read files back
// if err is non-empty, expects an error from the function performed in the test
// and skips reading back the expected files
type test struct {
desc string
writes []file
expects []file
fn func(fs utilfs.Filesystem, dir string, c *test) []error
err string
}
func (c *test) write(t *testing.T, fs utilfs.Filesystem, dir string) {
for _, f := range c.writes {
if err := f.write(fs, dir); err != nil {
t.Fatalf("error pre-writing file: %v", err)
}
}
}
// you can optionally skip calling t.Errorf by passing a nil t, and process the
// returned errors instead
func (c *test) expect(t *testing.T, fs utilfs.Filesystem, dir string) []error {
errs := []error{}
for _, f := range c.expects {
if err := f.expect(fs, dir); err != nil {
msg := fmt.Errorf("expect %#v, got error: %v", f, err)
errs = append(errs, msg)
if t != nil {
t.Errorf("%s", msg)
}
}
}
return errs
}
// run a test case, with an arbitrary function to execute between write and expect
// if c.fn is nil, errors from c.expect are checked against c.err, instead of errors
// from fn being checked against c.err
func (c *test) run(t *testing.T, fs utilfs.Filesystem) {
// isolate each test case in a new temporary directory
dir, err := fs.TempDir("", prefix)
if err != nil {
t.Fatalf("error creating temporary directory for test: %v", err)
}
c.write(t, fs, dir)
// if fn exists, check errors from fn, then check expected files
if c.fn != nil {
errs := c.fn(fs, dir, c)
if len(errs) > 0 {
for _, err := range errs {
utiltest.ExpectError(t, err, c.err)
}
// skip checking expected files if we expected errors
// (usually means we didn't create file)
return
}
c.expect(t, fs, dir)
return
}
// just check expected files, and compare errors from c.expect to c.err
// (this lets us test the helper functions above)
errs := c.expect(nil, fs, dir)
for _, err := range errs {
utiltest.ExpectError(t, err, c.err)
}
}
// simple test of the above helper functions
func TestHelpers(t *testing.T) {
// omitting the test.fn means test.err is compared to errors from test.expect
cases := []test{
{
desc: "regular file",
writes: []file{{name: "foo", data: "bar"}},
expects: []file{{name: "foo", data: "bar"}},
},
{
desc: "directory",
writes: []file{{name: "foo", mode: os.ModeDir}},
expects: []file{{name: "foo", mode: os.ModeDir}},
},
{
desc: "deep regular file",
writes: []file{{name: "foo/bar", data: "baz"}},
expects: []file{{name: "foo/bar", data: "baz"}},
},
{
desc: "deep directory",
writes: []file{{name: "foo/bar", mode: os.ModeDir}},
expects: []file{{name: "foo/bar", mode: os.ModeDir}},
},
{
desc: "missing file",
expects: []file{{name: "foo", data: "bar"}},
err: "no such file or directory",
},
{
desc: "missing directory",
expects: []file{{name: "foo/bar", mode: os.ModeDir}},
err: "no such file or directory",
},
}
for _, c := range cases {
t.Run(c.desc, func(t *testing.T) {
c.run(t, utilfs.DefaultFs{})
})
}
}
func TestFileExists(t *testing.T) {
fn := func(fs utilfs.Filesystem, dir string, c *test) []error {
ok, err := FileExists(fs, filepath.Join(dir, "foo"))
if err != nil {
return []error{err}
}
if !ok {
return []error{fmt.Errorf("does not exist (test)")}
}
return nil
}
cases := []test{
{
fn: fn,
desc: "file exists",
writes: []file{{name: "foo"}},
},
{
fn: fn,
desc: "file does not exist",
err: "does not exist (test)",
},
{
fn: fn,
desc: "object has non-file mode",
writes: []file{{name: "foo", mode: os.ModeDir}},
err: "expected regular file",
},
}
for _, c := range cases {
t.Run(c.desc, func(t *testing.T) {
c.run(t, utilfs.DefaultFs{})
})
}
}
func TestEnsureFile(t *testing.T) {
fn := func(fs utilfs.Filesystem, dir string, c *test) []error {
var errs []error
for _, f := range c.expects {
if err := EnsureFile(fs, filepath.Join(dir, f.name)); err != nil {
errs = append(errs, err)
}
}
return errs
}
cases := []test{
{
fn: fn,
desc: "file exists",
writes: []file{{name: "foo"}},
expects: []file{{name: "foo"}},
},
{
fn: fn,
desc: "file does not exist",
expects: []file{{name: "bar"}},
},
{
fn: fn,
desc: "neither parent nor file exists",
expects: []file{{name: "baz/quux"}},
},
}
for _, c := range cases {
t.Run(c.desc, func(t *testing.T) {
c.run(t, utilfs.DefaultFs{})
})
}
}
// Note: This transitively tests WriteTmpFile
func TestReplaceFile(t *testing.T) {
fn := func(fs utilfs.Filesystem, dir string, c *test) []error {
var errs []error
for _, f := range c.expects {
if err := ReplaceFile(fs, filepath.Join(dir, f.name), []byte(f.data)); err != nil {
errs = append(errs, err)
}
}
return errs
}
cases := []test{
{
fn: fn,
desc: "file exists",
writes: []file{{name: "foo"}},
expects: []file{{name: "foo", data: "bar"}},
},
{
fn: fn,
desc: "file does not exist",
expects: []file{{name: "foo", data: "bar"}},
},
{
fn: func(fs utilfs.Filesystem, dir string, c *test) []error {
if err := ReplaceFile(fs, filepath.Join(dir, "foo/bar"), []byte("")); err != nil {
return []error{err}
}
return nil
},
desc: "neither parent nor file exists",
err: "no such file or directory",
},
}
for _, c := range cases {
t.Run(c.desc, func(t *testing.T) {
c.run(t, utilfs.DefaultFs{})
})
}
}
func TestDirExists(t *testing.T) {
fn := func(fs utilfs.Filesystem, dir string, c *test) []error {
ok, err := DirExists(fs, filepath.Join(dir, "foo"))
if err != nil {
return []error{err}
}
if !ok {
return []error{fmt.Errorf("does not exist (test)")}
}
return nil
}
cases := []test{
{
fn: fn,
desc: "dir exists",
writes: []file{{name: "foo", mode: os.ModeDir}},
},
{
fn: fn,
desc: "dir does not exist",
err: "does not exist (test)",
},
{
fn: fn,
desc: "object has non-dir mode",
writes: []file{{name: "foo"}},
err: "expected dir",
},
}
for _, c := range cases {
t.Run(c.desc, func(t *testing.T) {
c.run(t, utilfs.DefaultFs{})
})
}
}
func TestEnsureDir(t *testing.T) {
fn := func(fs utilfs.Filesystem, dir string, c *test) []error {
var errs []error
for _, f := range c.expects {
if err := EnsureDir(fs, filepath.Join(dir, f.name)); err != nil {
errs = append(errs, err)
}
}
return errs
}
cases := []test{
{
fn: fn,
desc: "dir exists",
writes: []file{{name: "foo", mode: os.ModeDir}},
expects: []file{{name: "foo", mode: os.ModeDir}},
},
{
fn: fn,
desc: "dir does not exist",
expects: []file{{name: "bar", mode: os.ModeDir}},
},
{
fn: fn,
desc: "neither parent nor dir exists",
expects: []file{{name: "baz/quux", mode: os.ModeDir}},
},
}
for _, c := range cases {
t.Run(c.desc, func(t *testing.T) {
c.run(t, utilfs.DefaultFs{})
})
}
}
func TestWriteTempDir(t *testing.T) {
// writing a tmp dir is covered by TestReplaceDir, but we additionally test filename validation here
c := test{
desc: "invalid file key",
err: "invalid file key",
fn: func(fs utilfs.Filesystem, dir string, c *test) []error {
if _, err := WriteTempDir(fs, filepath.Join(dir, "tmpdir"), map[string]string{"foo/bar": ""}); err != nil {
return []error{err}
}
return nil
},
}
c.run(t, utilfs.DefaultFs{})
}
func TestReplaceDir(t *testing.T) {
fn := func(fs utilfs.Filesystem, dir string, c *test) []error {
errs := []error{}
// compute filesets from expected files and call ReplaceDir for each
// we don't nest dirs in test cases, order of ReplaceDir call is not guaranteed
dirs := map[string]map[string]string{}
// allocate dirs
for _, f := range c.expects {
if f.mode.IsDir() {
path := filepath.Join(dir, f.name)
if _, ok := dirs[path]; !ok {
dirs[path] = map[string]string{}
}
} else if f.mode.IsRegular() {
path := filepath.Join(dir, filepath.Dir(f.name))
if _, ok := dirs[path]; !ok {
// require an expectation for the parent directory if there is an expectation for the file
errs = append(errs, fmt.Errorf("no prior parent directory in c.expects for file %s", f.name))
continue
}
dirs[path][filepath.Base(f.name)] = f.data
}
}
// short-circuit test case validation errors
if len(errs) > 0 {
return errs
}
// call ReplaceDir for each desired dir
for path, files := range dirs {
if err := ReplaceDir(fs, path, files); err != nil {
errs = append(errs, err)
}
}
return errs
}
cases := []test{
{
fn: fn,
desc: "fn catches invalid test case",
expects: []file{{name: "foo/bar"}},
err: "no prior parent directory",
},
{
fn: fn,
desc: "empty dir",
expects: []file{{name: "foo", mode: os.ModeDir}},
},
{
fn: fn,
desc: "dir with files",
expects: []file{
{name: "foo", mode: os.ModeDir},
{name: "foo/bar", data: "baz"},
{name: "foo/baz", data: "bar"},
},
},
}
for _, c := range cases {
t.Run(c.desc, func(t *testing.T) {
c.run(t, utilfs.DefaultFs{})
})
}
}

View File

@ -21,6 +21,21 @@ import (
"testing"
)
// ExpectError calls t.Fatalf if the error does not contain a substr match.
// If substr is empty, a nil error is expected.
// It is useful to call ExpectError from subtests.
func ExpectError(t *testing.T, err error, substr string) {
if err != nil {
if len(substr) == 0 {
t.Fatalf("expect nil error but got %q", err.Error())
} else if !strings.Contains(err.Error(), substr) {
t.Fatalf("expect error to contain %q but got %q", substr, err.Error())
}
} else if len(substr) > 0 {
t.Fatalf("expect error to contain %q but got nil error", substr)
}
}
// SkipRest returns true if there was a non-nil error or if we expected an error that didn't happen,
// and logs the appropriate error on the test object.
// The return value indicates whether we should skip the rest of the test case due to the error result.

View File

@ -21,13 +21,13 @@ import (
"time"
apiv1 "k8s.io/api/core/v1"
apiequality "k8s.io/apimachinery/pkg/api/equality"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/fields"
kuberuntime "k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/watch"
clientset "k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/cache"
utilequal "k8s.io/kubernetes/pkg/kubelet/kubeletconfig/util/equal"
utillog "k8s.io/kubernetes/pkg/kubelet/kubeletconfig/util/log"
)
@ -86,6 +86,7 @@ func (cc *Controller) onUpdateNodeEvent(oldObj interface{}, newObj interface{})
}
if oldObj == nil {
// Node was just added, need to sync
utillog.Infof("initial Node watch event")
cc.pokeConfigSourceWorker()
return
}
@ -94,32 +95,60 @@ func (cc *Controller) onUpdateNodeEvent(oldObj interface{}, newObj interface{})
utillog.Errorf("failed to cast old object to Node, couldn't handle event")
return
}
if !utilequal.ConfigSourceEq(oldNode.Spec.ConfigSource, newNode.Spec.ConfigSource) {
if !apiequality.Semantic.DeepEqual(oldNode.Spec.ConfigSource, newNode.Spec.ConfigSource) {
utillog.Infof("Node.Spec.ConfigSource was updated")
cc.pokeConfigSourceWorker()
}
}
// onDeleteNodeEvent logs a message if the Node was deleted and may log errors
// if an unexpected DeletedFinalStateUnknown was received.
// onDeleteNodeEvent logs a message if the Node was deleted
// We allow the sync-loop to continue, because it is possible that the Kubelet detected
// a Node with unexpected externalID and is attempting to delete and re-create the Node
// (see pkg/kubelet/kubelet_node_status.go), or that someone accidentally deleted the Node
// (the Kubelet will re-create it).
func (cc *Controller) onDeleteNodeEvent(deletedObj interface{}) {
node, ok := deletedObj.(*apiv1.Node)
// For this case, we just log the event.
// We don't want to poke the worker, because a temporary deletion isn't worth reporting an error for.
// If the Node is deleted because the VM is being deleted, then the Kubelet has nothing to do.
utillog.Infof("Node was deleted")
}
// onAddRemoteConfigSourceEvent calls onUpdateConfigMapEvent with the new object and a nil old object
func (cc *Controller) onAddRemoteConfigSourceEvent(newObj interface{}) {
cc.onUpdateRemoteConfigSourceEvent(nil, newObj)
}
// onUpdateRemoteConfigSourceEvent checks whether the configSource changed between oldObj and newObj,
// and pokes the sync worker if there was a change
func (cc *Controller) onUpdateRemoteConfigSourceEvent(oldObj interface{}, newObj interface{}) {
// since ConfigMap is currently the only source type, we handle that here
newConfigMap, ok := newObj.(*apiv1.ConfigMap)
if !ok {
tombstone, ok := deletedObj.(cache.DeletedFinalStateUnknown)
if !ok {
utillog.Errorf("couldn't cast deleted object to DeletedFinalStateUnknown, object: %+v", deletedObj)
return
}
node, ok = tombstone.Obj.(*apiv1.Node)
if !ok {
utillog.Errorf("received DeletedFinalStateUnknown object but it did not contain a Node, object: %+v", deletedObj)
return
}
utillog.Infof("Node was deleted (DeletedFinalStateUnknown), sync-loop will continue because the Kubelet might recreate the Node, node: %+v", node)
utillog.Errorf("failed to cast new object to ConfigMap, couldn't handle event")
return
}
utillog.Infof("Node was deleted, sync-loop will continue because the Kubelet might recreate the Node, node: %+v", node)
if oldObj == nil {
// ConfigMap was just added, need to sync
utillog.Infof("initial ConfigMap watch event")
cc.pokeConfigSourceWorker()
return
}
oldConfigMap, ok := oldObj.(*apiv1.ConfigMap)
if !ok {
utillog.Errorf("failed to cast old object to ConfigMap, couldn't handle event")
return
}
if !apiequality.Semantic.DeepEqual(oldConfigMap, newConfigMap) {
utillog.Infof("assigned ConfigMap was updated")
cc.pokeConfigSourceWorker()
}
}
// onDeleteRemoteConfigSourceEvent logs a message if the ConfigMap was deleted and pokes the sync worker
func (cc *Controller) onDeleteRemoteConfigSourceEvent(deletedObj interface{}) {
// If the ConfigMap we're watching is deleted, we log the event and poke the sync worker.
// This requires a sync, because if the Node is still configured to use the deleted ConfigMap,
// the Kubelet should report a DownloadError.
utillog.Infof("assigned ConfigMap was deleted")
cc.pokeConfigSourceWorker()
}