Fresh dep ensure

This commit is contained in:
Mike Cronce
2018-11-26 13:23:56 -05:00
parent 93cb8a04d7
commit 407478ab9a
9016 changed files with 551394 additions and 279685 deletions

View File

@ -0,0 +1,64 @@
load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test")
go_library(
name = "go_default_library",
srcs = [
"cache.go",
"interface.go",
"node_tree.go",
],
importpath = "k8s.io/kubernetes/pkg/scheduler/internal/cache",
visibility = ["//visibility:public"],
deps = [
"//pkg/features:go_default_library",
"//pkg/scheduler/cache:go_default_library",
"//pkg/util/node:go_default_library",
"//staging/src/k8s.io/api/core/v1:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/labels:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/sets:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/wait:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/util/feature:go_default_library",
"//vendor/k8s.io/klog:go_default_library",
],
)
go_test(
name = "go_default_test",
srcs = [
"cache_test.go",
"main_test.go",
"node_tree_test.go",
],
embed = [":go_default_library"],
deps = [
"//pkg/features:go_default_library",
"//pkg/kubelet/apis:go_default_library",
"//pkg/scheduler/algorithm/priorities/util:go_default_library",
"//pkg/scheduler/cache:go_default_library",
"//staging/src/k8s.io/api/core/v1:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/api/resource:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/labels:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/types:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/util/feature:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/util/feature/testing:go_default_library",
],
)
filegroup(
name = "package-srcs",
srcs = glob(["**"]),
tags = ["automanaged"],
visibility = ["//visibility:private"],
)
filegroup(
name = "all-srcs",
srcs = [
":package-srcs",
"//pkg/scheduler/internal/cache/debugger:all-srcs",
"//pkg/scheduler/internal/cache/fake:all-srcs",
],
tags = ["automanaged"],
visibility = ["//visibility:public"],
)

View File

@ -0,0 +1,553 @@
/*
Copyright 2015 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package cache
import (
"fmt"
"sync"
"time"
"k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/apimachinery/pkg/util/wait"
utilfeature "k8s.io/apiserver/pkg/util/feature"
"k8s.io/kubernetes/pkg/features"
schedulercache "k8s.io/kubernetes/pkg/scheduler/cache"
"k8s.io/klog"
)
var (
cleanAssumedPeriod = 1 * time.Second
)
// New returns a Cache implementation.
// It automatically starts a go routine that manages expiration of assumed pods.
// "ttl" is how long the assumed pod will get expired.
// "stop" is the channel that would close the background goroutine.
func New(ttl time.Duration, stop <-chan struct{}) Cache {
cache := newSchedulerCache(ttl, cleanAssumedPeriod, stop)
cache.run()
return cache
}
type schedulerCache struct {
stop <-chan struct{}
ttl time.Duration
period time.Duration
// This mutex guards all fields within this cache struct.
mu sync.RWMutex
// a set of assumed pod keys.
// The key could further be used to get an entry in podStates.
assumedPods map[string]bool
// a map from pod key to podState.
podStates map[string]*podState
nodes map[string]*schedulercache.NodeInfo
nodeTree *NodeTree
// A map from image name to its imageState.
imageStates map[string]*imageState
}
type podState struct {
pod *v1.Pod
// Used by assumedPod to determinate expiration.
deadline *time.Time
// Used to block cache from expiring assumedPod if binding still runs
bindingFinished bool
}
type imageState struct {
// Size of the image
size int64
// A set of node names for nodes having this image present
nodes sets.String
}
// createImageStateSummary returns a summarizing snapshot of the given image's state.
func (cache *schedulerCache) createImageStateSummary(state *imageState) *schedulercache.ImageStateSummary {
return &schedulercache.ImageStateSummary{
Size: state.size,
NumNodes: len(state.nodes),
}
}
func newSchedulerCache(ttl, period time.Duration, stop <-chan struct{}) *schedulerCache {
return &schedulerCache{
ttl: ttl,
period: period,
stop: stop,
nodes: make(map[string]*schedulercache.NodeInfo),
nodeTree: newNodeTree(nil),
assumedPods: make(map[string]bool),
podStates: make(map[string]*podState),
imageStates: make(map[string]*imageState),
}
}
// Snapshot takes a snapshot of the current schedulerinternalcache. The method has performance impact,
// and should be only used in non-critical path.
func (cache *schedulerCache) Snapshot() *Snapshot {
cache.mu.RLock()
defer cache.mu.RUnlock()
nodes := make(map[string]*schedulercache.NodeInfo)
for k, v := range cache.nodes {
nodes[k] = v.Clone()
}
assumedPods := make(map[string]bool)
for k, v := range cache.assumedPods {
assumedPods[k] = v
}
return &Snapshot{
Nodes: nodes,
AssumedPods: assumedPods,
}
}
func (cache *schedulerCache) UpdateNodeNameToInfoMap(nodeNameToInfo map[string]*schedulercache.NodeInfo) error {
cache.mu.Lock()
defer cache.mu.Unlock()
for name, info := range cache.nodes {
if utilfeature.DefaultFeatureGate.Enabled(features.BalanceAttachedNodeVolumes) && info.TransientInfo != nil {
// Transient scheduler info is reset here.
info.TransientInfo.ResetTransientSchedulerInfo()
}
if current, ok := nodeNameToInfo[name]; !ok || current.GetGeneration() != info.GetGeneration() {
nodeNameToInfo[name] = info.Clone()
}
}
for name := range nodeNameToInfo {
if _, ok := cache.nodes[name]; !ok {
delete(nodeNameToInfo, name)
}
}
return nil
}
func (cache *schedulerCache) List(selector labels.Selector) ([]*v1.Pod, error) {
alwaysTrue := func(p *v1.Pod) bool { return true }
return cache.FilteredList(alwaysTrue, selector)
}
func (cache *schedulerCache) FilteredList(podFilter PodFilter, selector labels.Selector) ([]*v1.Pod, error) {
cache.mu.RLock()
defer cache.mu.RUnlock()
// podFilter is expected to return true for most or all of the pods. We
// can avoid expensive array growth without wasting too much memory by
// pre-allocating capacity.
maxSize := 0
for _, info := range cache.nodes {
maxSize += len(info.Pods())
}
pods := make([]*v1.Pod, 0, maxSize)
for _, info := range cache.nodes {
for _, pod := range info.Pods() {
if podFilter(pod) && selector.Matches(labels.Set(pod.Labels)) {
pods = append(pods, pod)
}
}
}
return pods, nil
}
func (cache *schedulerCache) AssumePod(pod *v1.Pod) error {
key, err := schedulercache.GetPodKey(pod)
if err != nil {
return err
}
cache.mu.Lock()
defer cache.mu.Unlock()
if _, ok := cache.podStates[key]; ok {
return fmt.Errorf("pod %v is in the cache, so can't be assumed", key)
}
cache.addPod(pod)
ps := &podState{
pod: pod,
}
cache.podStates[key] = ps
cache.assumedPods[key] = true
return nil
}
func (cache *schedulerCache) FinishBinding(pod *v1.Pod) error {
return cache.finishBinding(pod, time.Now())
}
// finishBinding exists to make tests determinitistic by injecting now as an argument
func (cache *schedulerCache) finishBinding(pod *v1.Pod, now time.Time) error {
key, err := schedulercache.GetPodKey(pod)
if err != nil {
return err
}
cache.mu.RLock()
defer cache.mu.RUnlock()
klog.V(5).Infof("Finished binding for pod %v. Can be expired.", key)
currState, ok := cache.podStates[key]
if ok && cache.assumedPods[key] {
dl := now.Add(cache.ttl)
currState.bindingFinished = true
currState.deadline = &dl
}
return nil
}
func (cache *schedulerCache) ForgetPod(pod *v1.Pod) error {
key, err := schedulercache.GetPodKey(pod)
if err != nil {
return err
}
cache.mu.Lock()
defer cache.mu.Unlock()
currState, ok := cache.podStates[key]
if ok && currState.pod.Spec.NodeName != pod.Spec.NodeName {
return fmt.Errorf("pod %v was assumed on %v but assigned to %v", key, pod.Spec.NodeName, currState.pod.Spec.NodeName)
}
switch {
// Only assumed pod can be forgotten.
case ok && cache.assumedPods[key]:
err := cache.removePod(pod)
if err != nil {
return err
}
delete(cache.assumedPods, key)
delete(cache.podStates, key)
default:
return fmt.Errorf("pod %v wasn't assumed so cannot be forgotten", key)
}
return nil
}
// Assumes that lock is already acquired.
func (cache *schedulerCache) addPod(pod *v1.Pod) {
n, ok := cache.nodes[pod.Spec.NodeName]
if !ok {
n = schedulercache.NewNodeInfo()
cache.nodes[pod.Spec.NodeName] = n
}
n.AddPod(pod)
}
// Assumes that lock is already acquired.
func (cache *schedulerCache) updatePod(oldPod, newPod *v1.Pod) error {
if err := cache.removePod(oldPod); err != nil {
return err
}
cache.addPod(newPod)
return nil
}
// Assumes that lock is already acquired.
func (cache *schedulerCache) removePod(pod *v1.Pod) error {
n := cache.nodes[pod.Spec.NodeName]
if err := n.RemovePod(pod); err != nil {
return err
}
if len(n.Pods()) == 0 && n.Node() == nil {
delete(cache.nodes, pod.Spec.NodeName)
}
return nil
}
func (cache *schedulerCache) AddPod(pod *v1.Pod) error {
key, err := schedulercache.GetPodKey(pod)
if err != nil {
return err
}
cache.mu.Lock()
defer cache.mu.Unlock()
currState, ok := cache.podStates[key]
switch {
case ok && cache.assumedPods[key]:
if currState.pod.Spec.NodeName != pod.Spec.NodeName {
// The pod was added to a different node than it was assumed to.
klog.Warningf("Pod %v was assumed to be on %v but got added to %v", key, pod.Spec.NodeName, currState.pod.Spec.NodeName)
// Clean this up.
cache.removePod(currState.pod)
cache.addPod(pod)
}
delete(cache.assumedPods, key)
cache.podStates[key].deadline = nil
cache.podStates[key].pod = pod
case !ok:
// Pod was expired. We should add it back.
cache.addPod(pod)
ps := &podState{
pod: pod,
}
cache.podStates[key] = ps
default:
return fmt.Errorf("pod %v was already in added state", key)
}
return nil
}
func (cache *schedulerCache) UpdatePod(oldPod, newPod *v1.Pod) error {
key, err := schedulercache.GetPodKey(oldPod)
if err != nil {
return err
}
cache.mu.Lock()
defer cache.mu.Unlock()
currState, ok := cache.podStates[key]
switch {
// An assumed pod won't have Update/Remove event. It needs to have Add event
// before Update event, in which case the state would change from Assumed to Added.
case ok && !cache.assumedPods[key]:
if currState.pod.Spec.NodeName != newPod.Spec.NodeName {
klog.Errorf("Pod %v updated on a different node than previously added to.", key)
klog.Fatalf("Schedulercache is corrupted and can badly affect scheduling decisions")
}
if err := cache.updatePod(oldPod, newPod); err != nil {
return err
}
currState.pod = newPod
default:
return fmt.Errorf("pod %v is not added to scheduler cache, so cannot be updated", key)
}
return nil
}
func (cache *schedulerCache) RemovePod(pod *v1.Pod) error {
key, err := schedulercache.GetPodKey(pod)
if err != nil {
return err
}
cache.mu.Lock()
defer cache.mu.Unlock()
currState, ok := cache.podStates[key]
switch {
// An assumed pod won't have Delete/Remove event. It needs to have Add event
// before Remove event, in which case the state would change from Assumed to Added.
case ok && !cache.assumedPods[key]:
if currState.pod.Spec.NodeName != pod.Spec.NodeName {
klog.Errorf("Pod %v was assumed to be on %v but got added to %v", key, pod.Spec.NodeName, currState.pod.Spec.NodeName)
klog.Fatalf("Schedulercache is corrupted and can badly affect scheduling decisions")
}
err := cache.removePod(currState.pod)
if err != nil {
return err
}
delete(cache.podStates, key)
default:
return fmt.Errorf("pod %v is not found in scheduler cache, so cannot be removed from it", key)
}
return nil
}
func (cache *schedulerCache) IsAssumedPod(pod *v1.Pod) (bool, error) {
key, err := schedulercache.GetPodKey(pod)
if err != nil {
return false, err
}
cache.mu.RLock()
defer cache.mu.RUnlock()
b, found := cache.assumedPods[key]
if !found {
return false, nil
}
return b, nil
}
func (cache *schedulerCache) GetPod(pod *v1.Pod) (*v1.Pod, error) {
key, err := schedulercache.GetPodKey(pod)
if err != nil {
return nil, err
}
cache.mu.RLock()
defer cache.mu.RUnlock()
podState, ok := cache.podStates[key]
if !ok {
return nil, fmt.Errorf("pod %v does not exist in scheduler cache", key)
}
return podState.pod, nil
}
func (cache *schedulerCache) AddNode(node *v1.Node) error {
cache.mu.Lock()
defer cache.mu.Unlock()
n, ok := cache.nodes[node.Name]
if !ok {
n = schedulercache.NewNodeInfo()
cache.nodes[node.Name] = n
} else {
cache.removeNodeImageStates(n.Node())
}
cache.nodeTree.AddNode(node)
cache.addNodeImageStates(node, n)
return n.SetNode(node)
}
func (cache *schedulerCache) UpdateNode(oldNode, newNode *v1.Node) error {
cache.mu.Lock()
defer cache.mu.Unlock()
n, ok := cache.nodes[newNode.Name]
if !ok {
n = schedulercache.NewNodeInfo()
cache.nodes[newNode.Name] = n
} else {
cache.removeNodeImageStates(n.Node())
}
cache.nodeTree.UpdateNode(oldNode, newNode)
cache.addNodeImageStates(newNode, n)
return n.SetNode(newNode)
}
func (cache *schedulerCache) RemoveNode(node *v1.Node) error {
cache.mu.Lock()
defer cache.mu.Unlock()
n := cache.nodes[node.Name]
if err := n.RemoveNode(node); err != nil {
return err
}
// We remove NodeInfo for this node only if there aren't any pods on this node.
// We can't do it unconditionally, because notifications about pods are delivered
// in a different watch, and thus can potentially be observed later, even though
// they happened before node removal.
if len(n.Pods()) == 0 && n.Node() == nil {
delete(cache.nodes, node.Name)
}
cache.nodeTree.RemoveNode(node)
cache.removeNodeImageStates(node)
return nil
}
// addNodeImageStates adds states of the images on given node to the given nodeInfo and update the imageStates in
// scheduler cache. This function assumes the lock to scheduler cache has been acquired.
func (cache *schedulerCache) addNodeImageStates(node *v1.Node, nodeInfo *schedulercache.NodeInfo) {
newSum := make(map[string]*schedulercache.ImageStateSummary)
for _, image := range node.Status.Images {
for _, name := range image.Names {
// update the entry in imageStates
state, ok := cache.imageStates[name]
if !ok {
state = &imageState{
size: image.SizeBytes,
nodes: sets.NewString(node.Name),
}
cache.imageStates[name] = state
} else {
state.nodes.Insert(node.Name)
}
// create the imageStateSummary for this image
if _, ok := newSum[name]; !ok {
newSum[name] = cache.createImageStateSummary(state)
}
}
}
nodeInfo.SetImageStates(newSum)
}
// removeNodeImageStates removes the given node record from image entries having the node
// in imageStates cache. After the removal, if any image becomes free, i.e., the image
// is no longer available on any node, the image entry will be removed from imageStates.
func (cache *schedulerCache) removeNodeImageStates(node *v1.Node) {
if node == nil {
return
}
for _, image := range node.Status.Images {
for _, name := range image.Names {
state, ok := cache.imageStates[name]
if ok {
state.nodes.Delete(node.Name)
if len(state.nodes) == 0 {
// Remove the unused image to make sure the length of
// imageStates represents the total number of different
// images on all nodes
delete(cache.imageStates, name)
}
}
}
}
}
func (cache *schedulerCache) run() {
go wait.Until(cache.cleanupExpiredAssumedPods, cache.period, cache.stop)
}
func (cache *schedulerCache) cleanupExpiredAssumedPods() {
cache.cleanupAssumedPods(time.Now())
}
// cleanupAssumedPods exists for making test deterministic by taking time as input argument.
func (cache *schedulerCache) cleanupAssumedPods(now time.Time) {
cache.mu.Lock()
defer cache.mu.Unlock()
// The size of assumedPods should be small
for key := range cache.assumedPods {
ps, ok := cache.podStates[key]
if !ok {
panic("Key found in assumed set but not in podStates. Potentially a logical error.")
}
if !ps.bindingFinished {
klog.V(3).Infof("Couldn't expire cache for pod %v/%v. Binding is still in progress.",
ps.pod.Namespace, ps.pod.Name)
continue
}
if now.After(*ps.deadline) {
klog.Warningf("Pod %s/%s expired", ps.pod.Namespace, ps.pod.Name)
if err := cache.expirePod(key, ps); err != nil {
klog.Errorf("ExpirePod failed for %s: %v", key, err)
}
}
}
}
func (cache *schedulerCache) expirePod(key string, ps *podState) error {
if err := cache.removePod(ps.pod); err != nil {
return err
}
delete(cache.assumedPods, key)
delete(cache.podStates, key)
return nil
}
func (cache *schedulerCache) NodeTree() *NodeTree {
return cache.nodeTree
}

File diff suppressed because it is too large Load Diff

View File

@ -0,0 +1,46 @@
load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test")
go_library(
name = "go_default_library",
srcs = [
"comparer.go",
"debugger.go",
"dumper.go",
],
importpath = "k8s.io/kubernetes/pkg/scheduler/internal/cache/debugger",
visibility = ["//pkg/scheduler:__subpackages__"],
deps = [
"//pkg/scheduler/cache:go_default_library",
"//pkg/scheduler/internal/cache:go_default_library",
"//pkg/scheduler/internal/queue:go_default_library",
"//staging/src/k8s.io/api/core/v1:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/labels:go_default_library",
"//staging/src/k8s.io/client-go/listers/core/v1:go_default_library",
"//vendor/k8s.io/klog:go_default_library",
],
)
go_test(
name = "go_default_test",
srcs = ["comparer_test.go"],
embed = [":go_default_library"],
deps = [
"//pkg/scheduler/cache:go_default_library",
"//staging/src/k8s.io/api/core/v1:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/types:go_default_library",
],
)
filegroup(
name = "package-srcs",
srcs = glob(["**"]),
tags = ["automanaged"],
visibility = ["//visibility:private"],
)
filegroup(
name = "all-srcs",
srcs = [":package-srcs"],
tags = ["automanaged"],
visibility = ["//visibility:public"],
)

View File

@ -0,0 +1,135 @@
/*
Copyright 2018 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package debugger
import (
"sort"
"strings"
"k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/labels"
corelisters "k8s.io/client-go/listers/core/v1"
"k8s.io/klog"
schedulercache "k8s.io/kubernetes/pkg/scheduler/cache"
schedulerinternalcache "k8s.io/kubernetes/pkg/scheduler/internal/cache"
internalqueue "k8s.io/kubernetes/pkg/scheduler/internal/queue"
)
// CacheComparer is an implementation of the Scheduler's cache comparer.
type CacheComparer struct {
NodeLister corelisters.NodeLister
PodLister corelisters.PodLister
Cache schedulerinternalcache.Cache
PodQueue internalqueue.SchedulingQueue
}
// Compare compares the nodes and pods of NodeLister with Cache.Snapshot.
func (c *CacheComparer) Compare() error {
klog.V(3).Info("cache comparer started")
defer klog.V(3).Info("cache comparer finished")
nodes, err := c.NodeLister.List(labels.Everything())
if err != nil {
return err
}
pods, err := c.PodLister.List(labels.Everything())
if err != nil {
return err
}
snapshot := c.Cache.Snapshot()
waitingPods := c.PodQueue.WaitingPods()
if missed, redundant := c.CompareNodes(nodes, snapshot.Nodes); len(missed)+len(redundant) != 0 {
klog.Warningf("cache mismatch: missed nodes: %s; redundant nodes: %s", missed, redundant)
}
if missed, redundant := c.ComparePods(pods, waitingPods, snapshot.Nodes); len(missed)+len(redundant) != 0 {
klog.Warningf("cache mismatch: missed pods: %s; redundant pods: %s", missed, redundant)
}
return nil
}
// CompareNodes compares actual nodes with cached nodes.
func (c *CacheComparer) CompareNodes(nodes []*v1.Node, nodeinfos map[string]*schedulercache.NodeInfo) (missed, redundant []string) {
actual := []string{}
for _, node := range nodes {
actual = append(actual, node.Name)
}
cached := []string{}
for nodeName := range nodeinfos {
cached = append(cached, nodeName)
}
return compareStrings(actual, cached)
}
// ComparePods compares actual pods with cached pods.
func (c *CacheComparer) ComparePods(pods, waitingPods []*v1.Pod, nodeinfos map[string]*schedulercache.NodeInfo) (missed, redundant []string) {
actual := []string{}
for _, pod := range pods {
actual = append(actual, string(pod.UID))
}
cached := []string{}
for _, nodeinfo := range nodeinfos {
for _, pod := range nodeinfo.Pods() {
cached = append(cached, string(pod.UID))
}
}
for _, pod := range waitingPods {
cached = append(cached, string(pod.UID))
}
return compareStrings(actual, cached)
}
func compareStrings(actual, cached []string) (missed, redundant []string) {
missed, redundant = []string{}, []string{}
sort.Strings(actual)
sort.Strings(cached)
compare := func(i, j int) int {
if i == len(actual) {
return 1
} else if j == len(cached) {
return -1
}
return strings.Compare(actual[i], cached[j])
}
for i, j := 0, 0; i < len(actual) || j < len(cached); {
switch compare(i, j) {
case 0:
i++
j++
case -1:
missed = append(missed, actual[i])
i++
case 1:
redundant = append(redundant, cached[j])
j++
}
}
return
}

View File

@ -0,0 +1,192 @@
/*
Copyright 2018 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package debugger
import (
"reflect"
"testing"
"k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/types"
schedulercache "k8s.io/kubernetes/pkg/scheduler/cache"
)
func TestCompareNodes(t *testing.T) {
tests := []struct {
name string
actual []string
cached []string
missing []string
redundant []string
}{
{
name: "redundant cached value",
actual: []string{"foo", "bar"},
cached: []string{"bar", "foo", "foobar"},
missing: []string{},
redundant: []string{"foobar"},
},
{
name: "missing cached value",
actual: []string{"foo", "bar", "foobar"},
cached: []string{"bar", "foo"},
missing: []string{"foobar"},
redundant: []string{},
},
{
name: "proper cache set",
actual: []string{"foo", "bar", "foobar"},
cached: []string{"bar", "foobar", "foo"},
missing: []string{},
redundant: []string{},
},
}
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
testCompareNodes(test.actual, test.cached, test.missing, test.redundant, t)
})
}
}
func testCompareNodes(actual, cached, missing, redundant []string, t *testing.T) {
compare := CacheComparer{}
nodes := []*v1.Node{}
for _, nodeName := range actual {
node := &v1.Node{}
node.Name = nodeName
nodes = append(nodes, node)
}
nodeInfo := make(map[string]*schedulercache.NodeInfo)
for _, nodeName := range cached {
nodeInfo[nodeName] = &schedulercache.NodeInfo{}
}
m, r := compare.CompareNodes(nodes, nodeInfo)
if !reflect.DeepEqual(m, missing) {
t.Errorf("missing expected to be %s; got %s", missing, m)
}
if !reflect.DeepEqual(r, redundant) {
t.Errorf("redundant expected to be %s; got %s", redundant, r)
}
}
func TestComparePods(t *testing.T) {
tests := []struct {
name string
actual []string
cached []string
queued []string
missing []string
redundant []string
}{
{
name: "redundant cached value",
actual: []string{"foo", "bar"},
cached: []string{"bar", "foo", "foobar"},
queued: []string{},
missing: []string{},
redundant: []string{"foobar"},
},
{
name: "redundant and queued values",
actual: []string{"foo", "bar"},
cached: []string{"foo", "foobar"},
queued: []string{"bar"},
missing: []string{},
redundant: []string{"foobar"},
},
{
name: "missing cached value",
actual: []string{"foo", "bar", "foobar"},
cached: []string{"bar", "foo"},
queued: []string{},
missing: []string{"foobar"},
redundant: []string{},
},
{
name: "missing and queued values",
actual: []string{"foo", "bar", "foobar"},
cached: []string{"foo"},
queued: []string{"bar"},
missing: []string{"foobar"},
redundant: []string{},
},
{
name: "correct cache set",
actual: []string{"foo", "bar", "foobar"},
cached: []string{"bar", "foobar", "foo"},
queued: []string{},
missing: []string{},
redundant: []string{},
},
{
name: "queued cache value",
actual: []string{"foo", "bar", "foobar"},
cached: []string{"foobar", "foo"},
queued: []string{"bar"},
missing: []string{},
redundant: []string{},
},
}
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
testComparePods(test.actual, test.cached, test.queued, test.missing, test.redundant, t)
})
}
}
func testComparePods(actual, cached, queued, missing, redundant []string, t *testing.T) {
compare := CacheComparer{}
pods := []*v1.Pod{}
for _, uid := range actual {
pod := &v1.Pod{}
pod.UID = types.UID(uid)
pods = append(pods, pod)
}
queuedPods := []*v1.Pod{}
for _, uid := range queued {
pod := &v1.Pod{}
pod.UID = types.UID(uid)
queuedPods = append(queuedPods, pod)
}
nodeInfo := make(map[string]*schedulercache.NodeInfo)
for _, uid := range cached {
pod := &v1.Pod{}
pod.UID = types.UID(uid)
pod.Namespace = "ns"
pod.Name = uid
nodeInfo[uid] = schedulercache.NewNodeInfo(pod)
}
m, r := compare.ComparePods(pods, queuedPods, nodeInfo)
if !reflect.DeepEqual(m, missing) {
t.Errorf("missing expected to be %s; got %s", missing, m)
}
if !reflect.DeepEqual(r, redundant) {
t.Errorf("redundant expected to be %s; got %s", redundant, r)
}
}

View File

@ -0,0 +1,50 @@
/*
Copyright 2018 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package debugger
import (
corelisters "k8s.io/client-go/listers/core/v1"
internalcache "k8s.io/kubernetes/pkg/scheduler/internal/cache"
internalqueue "k8s.io/kubernetes/pkg/scheduler/internal/queue"
)
// CacheDebugger provides ways to check and write cache information for debugging.
type CacheDebugger struct {
Comparer CacheComparer
Dumper CacheDumper
}
// New creates a CacheDebugger.
func New(
nodeLister corelisters.NodeLister,
podLister corelisters.PodLister,
cache internalcache.Cache,
podQueue internalqueue.SchedulingQueue,
) *CacheDebugger {
return &CacheDebugger{
Comparer: CacheComparer{
NodeLister: nodeLister,
PodLister: podLister,
Cache: cache,
PodQueue: podQueue,
},
Dumper: CacheDumper{
cache: cache,
podQueue: podQueue,
},
}
}

View File

@ -0,0 +1,78 @@
/*
Copyright 2018 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package debugger
import (
"fmt"
"strings"
"k8s.io/klog"
"k8s.io/api/core/v1"
"k8s.io/kubernetes/pkg/scheduler/cache"
internalcache "k8s.io/kubernetes/pkg/scheduler/internal/cache"
"k8s.io/kubernetes/pkg/scheduler/internal/queue"
)
// CacheDumper writes some information from the scheduler cache and the scheduling queue to the
// scheduler logs for debugging purposes.
type CacheDumper struct {
cache internalcache.Cache
podQueue queue.SchedulingQueue
}
// DumpAll writes cached nodes and scheduling queue information to the scheduler logs.
func (d *CacheDumper) DumpAll() {
d.dumpNodes()
d.dumpSchedulingQueue()
}
// dumpNodes writes NodeInfo to the scheduler logs.
func (d *CacheDumper) dumpNodes() {
snapshot := d.cache.Snapshot()
klog.Info("Dump of cached NodeInfo")
for _, nodeInfo := range snapshot.Nodes {
klog.Info(printNodeInfo(nodeInfo))
}
}
// dumpSchedulingQueue writes pods in the scheduling queue to the scheduler logs.
func (d *CacheDumper) dumpSchedulingQueue() {
waitingPods := d.podQueue.WaitingPods()
var podData strings.Builder
for _, p := range waitingPods {
podData.WriteString(printPod(p))
}
klog.Infof("Dump of scheduling queue:\n%s", podData.String())
}
// printNodeInfo writes parts of NodeInfo to a string.
func printNodeInfo(n *cache.NodeInfo) string {
var nodeData strings.Builder
nodeData.WriteString(fmt.Sprintf("\nNode name: %+v\nRequested Resources: %+v\nAllocatable Resources:%+v\nNumber of Pods: %v\nPods:\n",
n.Node().Name, n.RequestedResource(), n.AllocatableResource(), len(n.Pods())))
// Dumping Pod Info
for _, p := range n.Pods() {
nodeData.WriteString(printPod(p))
}
return nodeData.String()
}
// printPod writes parts of a Pod object to a string.
func printPod(p *v1.Pod) string {
return fmt.Sprintf("name: %v, namespace: %v, uid: %v, phase: %v, nominated node: %v\n", p.Name, p.Namespace, p.UID, p.Status.Phase, p.Status.NominatedNodeName)
}

View File

@ -0,0 +1,28 @@
load("@io_bazel_rules_go//go:def.bzl", "go_library")
go_library(
name = "go_default_library",
srcs = ["fake_cache.go"],
importpath = "k8s.io/kubernetes/pkg/scheduler/internal/cache/fake",
visibility = ["//pkg/scheduler:__subpackages__"],
deps = [
"//pkg/scheduler/cache:go_default_library",
"//pkg/scheduler/internal/cache:go_default_library",
"//staging/src/k8s.io/api/core/v1:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/labels:go_default_library",
],
)
filegroup(
name = "package-srcs",
srcs = glob(["**"]),
tags = ["automanaged"],
visibility = ["//visibility:private"],
)
filegroup(
name = "all-srcs",
srcs = [":package-srcs"],
tags = ["automanaged"],
visibility = ["//visibility:public"],
)

View File

@ -0,0 +1,96 @@
/*
Copyright 2015 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package fake
import (
"k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/labels"
schedulercache "k8s.io/kubernetes/pkg/scheduler/cache"
schedulerinternalcache "k8s.io/kubernetes/pkg/scheduler/internal/cache"
)
// Cache is used for testing
type Cache struct {
AssumeFunc func(*v1.Pod)
ForgetFunc func(*v1.Pod)
IsAssumedPodFunc func(*v1.Pod) bool
GetPodFunc func(*v1.Pod) *v1.Pod
}
// AssumePod is a fake method for testing.
func (c *Cache) AssumePod(pod *v1.Pod) error {
c.AssumeFunc(pod)
return nil
}
// FinishBinding is a fake method for testing.
func (c *Cache) FinishBinding(pod *v1.Pod) error { return nil }
// ForgetPod is a fake method for testing.
func (c *Cache) ForgetPod(pod *v1.Pod) error {
c.ForgetFunc(pod)
return nil
}
// AddPod is a fake method for testing.
func (c *Cache) AddPod(pod *v1.Pod) error { return nil }
// UpdatePod is a fake method for testing.
func (c *Cache) UpdatePod(oldPod, newPod *v1.Pod) error { return nil }
// RemovePod is a fake method for testing.
func (c *Cache) RemovePod(pod *v1.Pod) error { return nil }
// IsAssumedPod is a fake method for testing.
func (c *Cache) IsAssumedPod(pod *v1.Pod) (bool, error) {
return c.IsAssumedPodFunc(pod), nil
}
// GetPod is a fake method for testing.
func (c *Cache) GetPod(pod *v1.Pod) (*v1.Pod, error) {
return c.GetPodFunc(pod), nil
}
// AddNode is a fake method for testing.
func (c *Cache) AddNode(node *v1.Node) error { return nil }
// UpdateNode is a fake method for testing.
func (c *Cache) UpdateNode(oldNode, newNode *v1.Node) error { return nil }
// RemoveNode is a fake method for testing.
func (c *Cache) RemoveNode(node *v1.Node) error { return nil }
// UpdateNodeNameToInfoMap is a fake method for testing.
func (c *Cache) UpdateNodeNameToInfoMap(infoMap map[string]*schedulercache.NodeInfo) error {
return nil
}
// List is a fake method for testing.
func (c *Cache) List(s labels.Selector) ([]*v1.Pod, error) { return nil, nil }
// FilteredList is a fake method for testing.
func (c *Cache) FilteredList(filter schedulerinternalcache.PodFilter, selector labels.Selector) ([]*v1.Pod, error) {
return nil, nil
}
// Snapshot is a fake method for testing
func (c *Cache) Snapshot() *schedulerinternalcache.Snapshot {
return &schedulerinternalcache.Snapshot{}
}
// NodeTree is a fake method for testing.
func (c *Cache) NodeTree() *schedulerinternalcache.NodeTree { return nil }

View File

@ -0,0 +1,122 @@
/*
Copyright 2015 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package cache
import (
"k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/labels"
schedulercache "k8s.io/kubernetes/pkg/scheduler/cache"
)
// PodFilter is a function to filter a pod. If pod passed return true else return false.
type PodFilter func(*v1.Pod) bool
// Cache collects pods' information and provides node-level aggregated information.
// It's intended for generic scheduler to do efficient lookup.
// Cache's operations are pod centric. It does incremental updates based on pod events.
// Pod events are sent via network. We don't have guaranteed delivery of all events:
// We use Reflector to list and watch from remote.
// Reflector might be slow and do a relist, which would lead to missing events.
//
// State Machine of a pod's events in scheduler's cache:
//
//
// +-------------------------------------------+ +----+
// | Add | | |
// | | | | Update
// + Assume Add v v |
//Initial +--------> Assumed +------------+---> Added <--+
// ^ + + | +
// | | | | |
// | | | Add | | Remove
// | | | | |
// | | | + |
// +----------------+ +-----------> Expired +----> Deleted
// Forget Expire
//
//
// Note that an assumed pod can expire, because if we haven't received Add event notifying us
// for a while, there might be some problems and we shouldn't keep the pod in cache anymore.
//
// Note that "Initial", "Expired", and "Deleted" pods do not actually exist in cache.
// Based on existing use cases, we are making the following assumptions:
// - No pod would be assumed twice
// - A pod could be added without going through scheduler. In this case, we will see Add but not Assume event.
// - If a pod wasn't added, it wouldn't be removed or updated.
// - Both "Expired" and "Deleted" are valid end states. In case of some problems, e.g. network issue,
// a pod might have changed its state (e.g. added and deleted) without delivering notification to the cache.
type Cache interface {
// AssumePod assumes a pod scheduled and aggregates the pod's information into its node.
// The implementation also decides the policy to expire pod before being confirmed (receiving Add event).
// After expiration, its information would be subtracted.
AssumePod(pod *v1.Pod) error
// FinishBinding signals that cache for assumed pod can be expired
FinishBinding(pod *v1.Pod) error
// ForgetPod removes an assumed pod from cache.
ForgetPod(pod *v1.Pod) error
// AddPod either confirms a pod if it's assumed, or adds it back if it's expired.
// If added back, the pod's information would be added again.
AddPod(pod *v1.Pod) error
// UpdatePod removes oldPod's information and adds newPod's information.
UpdatePod(oldPod, newPod *v1.Pod) error
// RemovePod removes a pod. The pod's information would be subtracted from assigned node.
RemovePod(pod *v1.Pod) error
// GetPod returns the pod from the cache with the same namespace and the
// same name of the specified pod.
GetPod(pod *v1.Pod) (*v1.Pod, error)
// IsAssumedPod returns true if the pod is assumed and not expired.
IsAssumedPod(pod *v1.Pod) (bool, error)
// AddNode adds overall information about node.
AddNode(node *v1.Node) error
// UpdateNode updates overall information about node.
UpdateNode(oldNode, newNode *v1.Node) error
// RemoveNode removes overall information about node.
RemoveNode(node *v1.Node) error
// UpdateNodeNameToInfoMap updates the passed infoMap to the current contents of Cache.
// The node info contains aggregated information of pods scheduled (including assumed to be)
// on this node.
UpdateNodeNameToInfoMap(infoMap map[string]*schedulercache.NodeInfo) error
// List lists all cached pods (including assumed ones).
List(labels.Selector) ([]*v1.Pod, error)
// FilteredList returns all cached pods that pass the filter.
FilteredList(filter PodFilter, selector labels.Selector) ([]*v1.Pod, error)
// Snapshot takes a snapshot on current cache
Snapshot() *Snapshot
// NodeTree returns a node tree structure
NodeTree() *NodeTree
}
// Snapshot is a snapshot of cache state
type Snapshot struct {
AssumedPods map[string]bool
Nodes map[string]*schedulercache.NodeInfo
}

View File

@ -0,0 +1,29 @@
/*
Copyright 2018 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package cache
import (
"testing"
utilfeature "k8s.io/apiserver/pkg/util/feature"
utilfeaturetesting "k8s.io/apiserver/pkg/util/feature/testing"
_ "k8s.io/kubernetes/pkg/features"
)
func TestMain(m *testing.M) {
utilfeaturetesting.VerifyFeatureGatesUnchanged(utilfeature.DefaultFeatureGate, m.Run)
}

View File

@ -0,0 +1,186 @@
/*
Copyright 2018 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package cache
import (
"fmt"
"sync"
"k8s.io/api/core/v1"
utilnode "k8s.io/kubernetes/pkg/util/node"
"k8s.io/klog"
)
// NodeTree is a tree-like data structure that holds node names in each zone. Zone names are
// keys to "NodeTree.tree" and values of "NodeTree.tree" are arrays of node names.
type NodeTree struct {
tree map[string]*nodeArray // a map from zone (region-zone) to an array of nodes in the zone.
zones []string // a list of all the zones in the tree (keys)
zoneIndex int
NumNodes int
mu sync.RWMutex
}
// nodeArray is a struct that has nodes that are in a zone.
// We use a slice (as opposed to a set/map) to store the nodes because iterating over the nodes is
// a lot more frequent than searching them by name.
type nodeArray struct {
nodes []string
lastIndex int
}
func (na *nodeArray) next() (nodeName string, exhausted bool) {
if len(na.nodes) == 0 {
klog.Error("The nodeArray is empty. It should have been deleted from NodeTree.")
return "", false
}
if na.lastIndex >= len(na.nodes) {
return "", true
}
nodeName = na.nodes[na.lastIndex]
na.lastIndex++
return nodeName, false
}
// newNodeTree creates a NodeTree from nodes.
func newNodeTree(nodes []*v1.Node) *NodeTree {
nt := &NodeTree{
tree: make(map[string]*nodeArray),
}
for _, n := range nodes {
nt.AddNode(n)
}
return nt
}
// AddNode adds a node and its corresponding zone to the tree. If the zone already exists, the node
// is added to the array of nodes in that zone.
func (nt *NodeTree) AddNode(n *v1.Node) {
nt.mu.Lock()
defer nt.mu.Unlock()
nt.addNode(n)
}
func (nt *NodeTree) addNode(n *v1.Node) {
zone := utilnode.GetZoneKey(n)
if na, ok := nt.tree[zone]; ok {
for _, nodeName := range na.nodes {
if nodeName == n.Name {
klog.Warningf("node %v already exist in the NodeTree", n.Name)
return
}
}
na.nodes = append(na.nodes, n.Name)
} else {
nt.zones = append(nt.zones, zone)
nt.tree[zone] = &nodeArray{nodes: []string{n.Name}, lastIndex: 0}
}
klog.V(5).Infof("Added node %v in group %v to NodeTree", n.Name, zone)
nt.NumNodes++
}
// RemoveNode removes a node from the NodeTree.
func (nt *NodeTree) RemoveNode(n *v1.Node) error {
nt.mu.Lock()
defer nt.mu.Unlock()
return nt.removeNode(n)
}
func (nt *NodeTree) removeNode(n *v1.Node) error {
zone := utilnode.GetZoneKey(n)
if na, ok := nt.tree[zone]; ok {
for i, nodeName := range na.nodes {
if nodeName == n.Name {
na.nodes = append(na.nodes[:i], na.nodes[i+1:]...)
if len(na.nodes) == 0 {
nt.removeZone(zone)
}
klog.V(5).Infof("Removed node %v in group %v from NodeTree", n.Name, zone)
nt.NumNodes--
return nil
}
}
}
klog.Errorf("Node %v in group %v was not found", n.Name, zone)
return fmt.Errorf("node %v in group %v was not found", n.Name, zone)
}
// removeZone removes a zone from tree.
// This function must be called while writer locks are hold.
func (nt *NodeTree) removeZone(zone string) {
delete(nt.tree, zone)
for i, z := range nt.zones {
if z == zone {
nt.zones = append(nt.zones[:i], nt.zones[i+1:]...)
}
}
}
// UpdateNode updates a node in the NodeTree.
func (nt *NodeTree) UpdateNode(old, new *v1.Node) {
var oldZone string
if old != nil {
oldZone = utilnode.GetZoneKey(old)
}
newZone := utilnode.GetZoneKey(new)
// If the zone ID of the node has not changed, we don't need to do anything. Name of the node
// cannot be changed in an update.
if oldZone == newZone {
return
}
nt.mu.Lock()
defer nt.mu.Unlock()
nt.removeNode(old) // No error checking. We ignore whether the old node exists or not.
nt.addNode(new)
}
func (nt *NodeTree) resetExhausted() {
for _, na := range nt.tree {
na.lastIndex = 0
}
nt.zoneIndex = 0
}
// Next returns the name of the next node. NodeTree iterates over zones and in each zone iterates
// over nodes in a round robin fashion.
func (nt *NodeTree) Next() string {
nt.mu.Lock()
defer nt.mu.Unlock()
if len(nt.zones) == 0 {
return ""
}
numExhaustedZones := 0
for {
if nt.zoneIndex >= len(nt.zones) {
nt.zoneIndex = 0
}
zone := nt.zones[nt.zoneIndex]
nt.zoneIndex++
// We do not check the exhausted zones before calling next() on the zone. This ensures
// that if more nodes are added to a zone after it is exhausted, we iterate over the new nodes.
nodeName, exhausted := nt.tree[zone].next()
if exhausted {
numExhaustedZones++
if numExhaustedZones >= len(nt.zones) { // all zones are exhausted. we should reset.
nt.resetExhausted()
}
} else {
return nodeName
}
}
}

View File

@ -0,0 +1,448 @@
/*
Copyright 2018 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package cache
import (
"reflect"
"testing"
"k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
kubeletapis "k8s.io/kubernetes/pkg/kubelet/apis"
)
var allNodes = []*v1.Node{
// Node 0: a node without any region-zone label
{
ObjectMeta: metav1.ObjectMeta{
Name: "node-0",
},
},
// Node 1: a node with region label only
{
ObjectMeta: metav1.ObjectMeta{
Name: "node-1",
Labels: map[string]string{
kubeletapis.LabelZoneRegion: "region-1",
},
},
},
// Node 2: a node with zone label only
{
ObjectMeta: metav1.ObjectMeta{
Name: "node-2",
Labels: map[string]string{
kubeletapis.LabelZoneFailureDomain: "zone-2",
},
},
},
// Node 3: a node with proper region and zone labels
{
ObjectMeta: metav1.ObjectMeta{
Name: "node-3",
Labels: map[string]string{
kubeletapis.LabelZoneRegion: "region-1",
kubeletapis.LabelZoneFailureDomain: "zone-2",
},
},
},
// Node 4: a node with proper region and zone labels
{
ObjectMeta: metav1.ObjectMeta{
Name: "node-4",
Labels: map[string]string{
kubeletapis.LabelZoneRegion: "region-1",
kubeletapis.LabelZoneFailureDomain: "zone-2",
},
},
},
// Node 5: a node with proper region and zone labels in a different zone, same region as above
{
ObjectMeta: metav1.ObjectMeta{
Name: "node-5",
Labels: map[string]string{
kubeletapis.LabelZoneRegion: "region-1",
kubeletapis.LabelZoneFailureDomain: "zone-3",
},
},
},
// Node 6: a node with proper region and zone labels in a new region and zone
{
ObjectMeta: metav1.ObjectMeta{
Name: "node-6",
Labels: map[string]string{
kubeletapis.LabelZoneRegion: "region-2",
kubeletapis.LabelZoneFailureDomain: "zone-2",
},
},
},
// Node 7: a node with proper region and zone labels in a region and zone as node-6
{
ObjectMeta: metav1.ObjectMeta{
Name: "node-7",
Labels: map[string]string{
kubeletapis.LabelZoneRegion: "region-2",
kubeletapis.LabelZoneFailureDomain: "zone-2",
},
},
},
// Node 8: a node with proper region and zone labels in a region and zone as node-6
{
ObjectMeta: metav1.ObjectMeta{
Name: "node-8",
Labels: map[string]string{
kubeletapis.LabelZoneRegion: "region-2",
kubeletapis.LabelZoneFailureDomain: "zone-2",
},
},
}}
func verifyNodeTree(t *testing.T, nt *NodeTree, expectedTree map[string]*nodeArray) {
expectedNumNodes := int(0)
for _, na := range expectedTree {
expectedNumNodes += len(na.nodes)
}
if nt.NumNodes != expectedNumNodes {
t.Errorf("unexpected NodeTree.numNodes. Expected: %v, Got: %v", expectedNumNodes, nt.NumNodes)
}
if !reflect.DeepEqual(nt.tree, expectedTree) {
t.Errorf("The node tree is not the same as expected. Expected: %v, Got: %v", expectedTree, nt.tree)
}
if len(nt.zones) != len(expectedTree) {
t.Errorf("Number of zones in NodeTree.zones is not expected. Expected: %v, Got: %v", len(expectedTree), len(nt.zones))
}
for _, z := range nt.zones {
if _, ok := expectedTree[z]; !ok {
t.Errorf("zone %v is not expected to exist in NodeTree.zones", z)
}
}
}
func TestNodeTree_AddNode(t *testing.T) {
tests := []struct {
name string
nodesToAdd []*v1.Node
expectedTree map[string]*nodeArray
}{
{
name: "single node no labels",
nodesToAdd: allNodes[:1],
expectedTree: map[string]*nodeArray{"": {[]string{"node-0"}, 0}},
},
{
name: "mix of nodes with and without proper labels",
nodesToAdd: allNodes[:4],
expectedTree: map[string]*nodeArray{
"": {[]string{"node-0"}, 0},
"region-1:\x00:": {[]string{"node-1"}, 0},
":\x00:zone-2": {[]string{"node-2"}, 0},
"region-1:\x00:zone-2": {[]string{"node-3"}, 0},
},
},
{
name: "mix of nodes with and without proper labels and some zones with multiple nodes",
nodesToAdd: allNodes[:7],
expectedTree: map[string]*nodeArray{
"": {[]string{"node-0"}, 0},
"region-1:\x00:": {[]string{"node-1"}, 0},
":\x00:zone-2": {[]string{"node-2"}, 0},
"region-1:\x00:zone-2": {[]string{"node-3", "node-4"}, 0},
"region-1:\x00:zone-3": {[]string{"node-5"}, 0},
"region-2:\x00:zone-2": {[]string{"node-6"}, 0},
},
},
}
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
nt := newNodeTree(nil)
for _, n := range test.nodesToAdd {
nt.AddNode(n)
}
verifyNodeTree(t, nt, test.expectedTree)
})
}
}
func TestNodeTree_RemoveNode(t *testing.T) {
tests := []struct {
name string
existingNodes []*v1.Node
nodesToRemove []*v1.Node
expectedTree map[string]*nodeArray
expectError bool
}{
{
name: "remove a single node with no labels",
existingNodes: allNodes[:7],
nodesToRemove: allNodes[:1],
expectedTree: map[string]*nodeArray{
"region-1:\x00:": {[]string{"node-1"}, 0},
":\x00:zone-2": {[]string{"node-2"}, 0},
"region-1:\x00:zone-2": {[]string{"node-3", "node-4"}, 0},
"region-1:\x00:zone-3": {[]string{"node-5"}, 0},
"region-2:\x00:zone-2": {[]string{"node-6"}, 0},
},
},
{
name: "remove a few nodes including one from a zone with multiple nodes",
existingNodes: allNodes[:7],
nodesToRemove: allNodes[1:4],
expectedTree: map[string]*nodeArray{
"": {[]string{"node-0"}, 0},
"region-1:\x00:zone-2": {[]string{"node-4"}, 0},
"region-1:\x00:zone-3": {[]string{"node-5"}, 0},
"region-2:\x00:zone-2": {[]string{"node-6"}, 0},
},
},
{
name: "remove all nodes",
existingNodes: allNodes[:7],
nodesToRemove: allNodes[:7],
expectedTree: map[string]*nodeArray{},
},
{
name: "remove non-existing node",
existingNodes: nil,
nodesToRemove: allNodes[:5],
expectedTree: map[string]*nodeArray{},
expectError: true,
},
}
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
nt := newNodeTree(test.existingNodes)
for _, n := range test.nodesToRemove {
err := nt.RemoveNode(n)
if test.expectError == (err == nil) {
t.Errorf("unexpected returned error value: %v", err)
}
}
verifyNodeTree(t, nt, test.expectedTree)
})
}
}
func TestNodeTree_UpdateNode(t *testing.T) {
tests := []struct {
name string
existingNodes []*v1.Node
nodeToUpdate *v1.Node
expectedTree map[string]*nodeArray
}{
{
name: "update a node without label",
existingNodes: allNodes[:7],
nodeToUpdate: &v1.Node{
ObjectMeta: metav1.ObjectMeta{
Name: "node-0",
Labels: map[string]string{
kubeletapis.LabelZoneRegion: "region-1",
kubeletapis.LabelZoneFailureDomain: "zone-2",
},
},
},
expectedTree: map[string]*nodeArray{
"region-1:\x00:": {[]string{"node-1"}, 0},
":\x00:zone-2": {[]string{"node-2"}, 0},
"region-1:\x00:zone-2": {[]string{"node-3", "node-4", "node-0"}, 0},
"region-1:\x00:zone-3": {[]string{"node-5"}, 0},
"region-2:\x00:zone-2": {[]string{"node-6"}, 0},
},
},
{
name: "update the only existing node",
existingNodes: allNodes[:1],
nodeToUpdate: &v1.Node{
ObjectMeta: metav1.ObjectMeta{
Name: "node-0",
Labels: map[string]string{
kubeletapis.LabelZoneRegion: "region-1",
kubeletapis.LabelZoneFailureDomain: "zone-2",
},
},
},
expectedTree: map[string]*nodeArray{
"region-1:\x00:zone-2": {[]string{"node-0"}, 0},
},
},
{
name: "update non-existing node",
existingNodes: allNodes[:1],
nodeToUpdate: &v1.Node{
ObjectMeta: metav1.ObjectMeta{
Name: "node-new",
Labels: map[string]string{
kubeletapis.LabelZoneRegion: "region-1",
kubeletapis.LabelZoneFailureDomain: "zone-2",
},
},
},
expectedTree: map[string]*nodeArray{
"": {[]string{"node-0"}, 0},
"region-1:\x00:zone-2": {[]string{"node-new"}, 0},
},
},
}
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
nt := newNodeTree(test.existingNodes)
var oldNode *v1.Node
for _, n := range allNodes {
if n.Name == test.nodeToUpdate.Name {
oldNode = n
break
}
}
if oldNode == nil {
oldNode = &v1.Node{ObjectMeta: metav1.ObjectMeta{Name: "nonexisting-node"}}
}
nt.UpdateNode(oldNode, test.nodeToUpdate)
verifyNodeTree(t, nt, test.expectedTree)
})
}
}
func TestNodeTree_Next(t *testing.T) {
tests := []struct {
name string
nodesToAdd []*v1.Node
numRuns int // number of times to run Next()
expectedOutput []string
}{
{
name: "empty tree",
nodesToAdd: nil,
numRuns: 2,
expectedOutput: []string{"", ""},
},
{
name: "should go back to the first node after finishing a round",
nodesToAdd: allNodes[:1],
numRuns: 2,
expectedOutput: []string{"node-0", "node-0"},
},
{
name: "should go back to the first node after going over all nodes",
nodesToAdd: allNodes[:4],
numRuns: 5,
expectedOutput: []string{"node-0", "node-1", "node-2", "node-3", "node-0"},
},
{
name: "should go to all zones before going to the second nodes in the same zone",
nodesToAdd: allNodes[:9],
numRuns: 11,
expectedOutput: []string{"node-0", "node-1", "node-2", "node-3", "node-5", "node-6", "node-4", "node-7", "node-8", "node-0", "node-1"},
},
}
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
nt := newNodeTree(test.nodesToAdd)
var output []string
for i := 0; i < test.numRuns; i++ {
output = append(output, nt.Next())
}
if !reflect.DeepEqual(output, test.expectedOutput) {
t.Errorf("unexpected output. Expected: %v, Got: %v", test.expectedOutput, output)
}
})
}
}
func TestNodeTreeMultiOperations(t *testing.T) {
tests := []struct {
name string
nodesToAdd []*v1.Node
nodesToRemove []*v1.Node
operations []string
expectedOutput []string
}{
{
name: "add and remove all nodes between two Next operations",
nodesToAdd: allNodes[2:9],
nodesToRemove: allNodes[2:9],
operations: []string{"add", "add", "next", "add", "remove", "remove", "remove", "next"},
expectedOutput: []string{"node-2", ""},
},
{
name: "add and remove some nodes between two Next operations",
nodesToAdd: allNodes[2:9],
nodesToRemove: allNodes[2:9],
operations: []string{"add", "add", "next", "add", "remove", "remove", "next"},
expectedOutput: []string{"node-2", "node-4"},
},
{
name: "remove nodes already iterated on and add new nodes",
nodesToAdd: allNodes[2:9],
nodesToRemove: allNodes[2:9],
operations: []string{"add", "add", "next", "next", "add", "remove", "remove", "next"},
expectedOutput: []string{"node-2", "node-3", "node-4"},
},
{
name: "add more nodes to an exhausted zone",
nodesToAdd: append(allNodes[4:9], allNodes[3]),
nodesToRemove: nil,
operations: []string{"add", "add", "add", "add", "add", "next", "next", "next", "next", "add", "next", "next", "next"},
expectedOutput: []string{"node-4", "node-5", "node-6", "node-7", "node-3", "node-8", "node-4"},
},
{
name: "remove zone and add new to ensure exhausted is reset correctly",
nodesToAdd: append(allNodes[3:5], allNodes[6:8]...),
nodesToRemove: allNodes[3:5],
operations: []string{"add", "add", "next", "next", "remove", "add", "add", "next", "next", "remove", "next", "next"},
expectedOutput: []string{"node-3", "node-4", "node-6", "node-7", "node-6", "node-7"},
},
}
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
nt := newNodeTree(nil)
addIndex := 0
removeIndex := 0
var output []string
for _, op := range test.operations {
switch op {
case "add":
if addIndex >= len(test.nodesToAdd) {
t.Error("more add operations than nodesToAdd")
} else {
nt.AddNode(test.nodesToAdd[addIndex])
addIndex++
}
case "remove":
if removeIndex >= len(test.nodesToRemove) {
t.Error("more remove operations than nodesToRemove")
} else {
nt.RemoveNode(test.nodesToRemove[removeIndex])
removeIndex++
}
case "next":
output = append(output, nt.Next())
default:
t.Errorf("unknow operation: %v", op)
}
}
if !reflect.DeepEqual(output, test.expectedOutput) {
t.Errorf("unexpected output. Expected: %v, Got: %v", test.expectedOutput, output)
}
})
}
}

View File

@ -0,0 +1,43 @@
load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test")
go_library(
name = "go_default_library",
srcs = ["scheduling_queue.go"],
importpath = "k8s.io/kubernetes/pkg/scheduler/internal/queue",
visibility = ["//pkg/scheduler:__subpackages__"],
deps = [
"//pkg/api/v1/pod:go_default_library",
"//pkg/scheduler/algorithm/predicates:go_default_library",
"//pkg/scheduler/algorithm/priorities/util:go_default_library",
"//pkg/scheduler/util:go_default_library",
"//staging/src/k8s.io/api/core/v1:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
"//staging/src/k8s.io/client-go/tools/cache:go_default_library",
"//vendor/k8s.io/klog:go_default_library",
],
)
go_test(
name = "go_default_test",
srcs = ["scheduling_queue_test.go"],
embed = [":go_default_library"],
deps = [
"//pkg/scheduler/util:go_default_library",
"//staging/src/k8s.io/api/core/v1:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
],
)
filegroup(
name = "package-srcs",
srcs = glob(["**"]),
tags = ["automanaged"],
visibility = ["//visibility:private"],
)
filegroup(
name = "all-srcs",
srcs = [":package-srcs"],
tags = ["automanaged"],
visibility = ["//visibility:public"],
)

View File

@ -0,0 +1,769 @@
/*
Copyright 2017 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
// This file contains structures that implement scheduling queue types.
// Scheduling queues hold pods waiting to be scheduled. This file has two types
// of scheduling queue: 1) a FIFO, which is mostly the same as cache.FIFO, 2) a
// priority queue which has two sub queues. One sub-queue holds pods that are
// being considered for scheduling. This is called activeQ. Another queue holds
// pods that are already tried and are determined to be unschedulable. The latter
// is called unschedulableQ.
// FIFO is here for flag-gating purposes and allows us to use the traditional
// scheduling queue when util.PodPriorityEnabled() returns false.
package queue
import (
"container/heap"
"fmt"
"reflect"
"sync"
"k8s.io/klog"
"k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/tools/cache"
podutil "k8s.io/kubernetes/pkg/api/v1/pod"
"k8s.io/kubernetes/pkg/scheduler/algorithm/predicates"
priorityutil "k8s.io/kubernetes/pkg/scheduler/algorithm/priorities/util"
"k8s.io/kubernetes/pkg/scheduler/util"
)
var (
queueClosed = "scheduling queue is closed"
)
// SchedulingQueue is an interface for a queue to store pods waiting to be scheduled.
// The interface follows a pattern similar to cache.FIFO and cache.Heap and
// makes it easy to use those data structures as a SchedulingQueue.
type SchedulingQueue interface {
Add(pod *v1.Pod) error
AddIfNotPresent(pod *v1.Pod) error
AddUnschedulableIfNotPresent(pod *v1.Pod) error
// Pop removes the head of the queue and returns it. It blocks if the
// queue is empty and waits until a new item is added to the queue.
Pop() (*v1.Pod, error)
Update(oldPod, newPod *v1.Pod) error
Delete(pod *v1.Pod) error
MoveAllToActiveQueue()
AssignedPodAdded(pod *v1.Pod)
AssignedPodUpdated(pod *v1.Pod)
WaitingPodsForNode(nodeName string) []*v1.Pod
WaitingPods() []*v1.Pod
// Close closes the SchedulingQueue so that the goroutine which is
// waiting to pop items can exit gracefully.
Close()
// DeleteNominatedPodIfExists deletes nominatedPod from internal cache
DeleteNominatedPodIfExists(pod *v1.Pod)
}
// NewSchedulingQueue initializes a new scheduling queue. If pod priority is
// enabled a priority queue is returned. If it is disabled, a FIFO is returned.
func NewSchedulingQueue() SchedulingQueue {
if util.PodPriorityEnabled() {
return NewPriorityQueue()
}
return NewFIFO()
}
// FIFO is basically a simple wrapper around cache.FIFO to make it compatible
// with the SchedulingQueue interface.
type FIFO struct {
*cache.FIFO
}
var _ = SchedulingQueue(&FIFO{}) // Making sure that FIFO implements SchedulingQueue.
// Add adds a pod to the FIFO.
func (f *FIFO) Add(pod *v1.Pod) error {
return f.FIFO.Add(pod)
}
// AddIfNotPresent adds a pod to the FIFO if it is absent in the FIFO.
func (f *FIFO) AddIfNotPresent(pod *v1.Pod) error {
return f.FIFO.AddIfNotPresent(pod)
}
// AddUnschedulableIfNotPresent adds an unschedulable pod back to the queue. In
// FIFO it is added to the end of the queue.
func (f *FIFO) AddUnschedulableIfNotPresent(pod *v1.Pod) error {
return f.FIFO.AddIfNotPresent(pod)
}
// Update updates a pod in the FIFO.
func (f *FIFO) Update(oldPod, newPod *v1.Pod) error {
return f.FIFO.Update(newPod)
}
// Delete deletes a pod in the FIFO.
func (f *FIFO) Delete(pod *v1.Pod) error {
return f.FIFO.Delete(pod)
}
// Pop removes the head of FIFO and returns it.
// This is just a copy/paste of cache.Pop(queue Queue) from fifo.go that scheduler
// has always been using. There is a comment in that file saying that this method
// shouldn't be used in production code, but scheduler has always been using it.
// This function does minimal error checking.
func (f *FIFO) Pop() (*v1.Pod, error) {
result, err := f.FIFO.Pop(func(obj interface{}) error { return nil })
if err == cache.FIFOClosedError {
return nil, fmt.Errorf(queueClosed)
}
return result.(*v1.Pod), err
}
// WaitingPods returns all the waiting pods in the queue.
func (f *FIFO) WaitingPods() []*v1.Pod {
result := []*v1.Pod{}
for _, pod := range f.FIFO.List() {
result = append(result, pod.(*v1.Pod))
}
return result
}
// FIFO does not need to react to events, as all pods are always in the active
// scheduling queue anyway.
// AssignedPodAdded does nothing here.
func (f *FIFO) AssignedPodAdded(pod *v1.Pod) {}
// AssignedPodUpdated does nothing here.
func (f *FIFO) AssignedPodUpdated(pod *v1.Pod) {}
// MoveAllToActiveQueue does nothing in FIFO as all pods are always in the active queue.
func (f *FIFO) MoveAllToActiveQueue() {}
// WaitingPodsForNode returns pods that are nominated to run on the given node,
// but FIFO does not support it.
func (f *FIFO) WaitingPodsForNode(nodeName string) []*v1.Pod {
return nil
}
// Close closes the FIFO queue.
func (f *FIFO) Close() {
f.FIFO.Close()
}
// DeleteNominatedPodIfExists does nothing in FIFO.
func (f *FIFO) DeleteNominatedPodIfExists(pod *v1.Pod) {}
// NewFIFO creates a FIFO object.
func NewFIFO() *FIFO {
return &FIFO{FIFO: cache.NewFIFO(cache.MetaNamespaceKeyFunc)}
}
// NominatedNodeName returns nominated node name of a Pod.
func NominatedNodeName(pod *v1.Pod) string {
return pod.Status.NominatedNodeName
}
// PriorityQueue implements a scheduling queue. It is an alternative to FIFO.
// The head of PriorityQueue is the highest priority pending pod. This structure
// has two sub queues. One sub-queue holds pods that are being considered for
// scheduling. This is called activeQ and is a Heap. Another queue holds
// pods that are already tried and are determined to be unschedulable. The latter
// is called unschedulableQ.
type PriorityQueue struct {
lock sync.RWMutex
cond sync.Cond
// activeQ is heap structure that scheduler actively looks at to find pods to
// schedule. Head of heap is the highest priority pod.
activeQ *Heap
// unschedulableQ holds pods that have been tried and determined unschedulable.
unschedulableQ *UnschedulablePodsMap
// nominatedPods is a map keyed by a node name and the value is a list of
// pods which are nominated to run on the node. These are pods which can be in
// the activeQ or unschedulableQ.
nominatedPods map[string][]*v1.Pod
// receivedMoveRequest is set to true whenever we receive a request to move a
// pod from the unschedulableQ to the activeQ, and is set to false, when we pop
// a pod from the activeQ. It indicates if we received a move request when a
// pod was in flight (we were trying to schedule it). In such a case, we put
// the pod back into the activeQ if it is determined unschedulable.
receivedMoveRequest bool
// closed indicates that the queue is closed.
// It is mainly used to let Pop() exit its control loop while waiting for an item.
closed bool
}
// Making sure that PriorityQueue implements SchedulingQueue.
var _ = SchedulingQueue(&PriorityQueue{})
// NewPriorityQueue creates a PriorityQueue object.
func NewPriorityQueue() *PriorityQueue {
pq := &PriorityQueue{
activeQ: newHeap(cache.MetaNamespaceKeyFunc, util.HigherPriorityPod),
unschedulableQ: newUnschedulablePodsMap(),
nominatedPods: map[string][]*v1.Pod{},
}
pq.cond.L = &pq.lock
return pq
}
// addNominatedPodIfNeeded adds a pod to nominatedPods if it has a NominatedNodeName and it does not
// already exist in the map. Adding an existing pod is not going to update the pod.
func (p *PriorityQueue) addNominatedPodIfNeeded(pod *v1.Pod) {
nnn := NominatedNodeName(pod)
if len(nnn) > 0 {
for _, np := range p.nominatedPods[nnn] {
if np.UID == pod.UID {
klog.V(4).Infof("Pod %v/%v already exists in the nominated map!", pod.Namespace, pod.Name)
return
}
}
p.nominatedPods[nnn] = append(p.nominatedPods[nnn], pod)
}
}
// deleteNominatedPodIfExists deletes a pod from the nominatedPods.
// NOTE: this function assumes lock has been acquired in caller.
func (p *PriorityQueue) deleteNominatedPodIfExists(pod *v1.Pod) {
nnn := NominatedNodeName(pod)
if len(nnn) > 0 {
for i, np := range p.nominatedPods[nnn] {
if np.UID == pod.UID {
p.nominatedPods[nnn] = append(p.nominatedPods[nnn][:i], p.nominatedPods[nnn][i+1:]...)
if len(p.nominatedPods[nnn]) == 0 {
delete(p.nominatedPods, nnn)
}
break
}
}
}
}
// updateNominatedPod updates a pod in the nominatedPods.
func (p *PriorityQueue) updateNominatedPod(oldPod, newPod *v1.Pod) {
// Even if the nominated node name of the Pod is not changed, we must delete and add it again
// to ensure that its pointer is updated.
p.deleteNominatedPodIfExists(oldPod)
p.addNominatedPodIfNeeded(newPod)
}
// Add adds a pod to the active queue. It should be called only when a new pod
// is added so there is no chance the pod is already in either queue.
func (p *PriorityQueue) Add(pod *v1.Pod) error {
p.lock.Lock()
defer p.lock.Unlock()
err := p.activeQ.Add(pod)
if err != nil {
klog.Errorf("Error adding pod %v/%v to the scheduling queue: %v", pod.Namespace, pod.Name, err)
} else {
if p.unschedulableQ.get(pod) != nil {
klog.Errorf("Error: pod %v/%v is already in the unschedulable queue.", pod.Namespace, pod.Name)
p.deleteNominatedPodIfExists(pod)
p.unschedulableQ.delete(pod)
}
p.addNominatedPodIfNeeded(pod)
p.cond.Broadcast()
}
return err
}
// AddIfNotPresent adds a pod to the active queue if it is not present in any of
// the two queues. If it is present in any, it doesn't do any thing.
func (p *PriorityQueue) AddIfNotPresent(pod *v1.Pod) error {
p.lock.Lock()
defer p.lock.Unlock()
if p.unschedulableQ.get(pod) != nil {
return nil
}
if _, exists, _ := p.activeQ.Get(pod); exists {
return nil
}
err := p.activeQ.Add(pod)
if err != nil {
klog.Errorf("Error adding pod %v/%v to the scheduling queue: %v", pod.Namespace, pod.Name, err)
} else {
p.addNominatedPodIfNeeded(pod)
p.cond.Broadcast()
}
return err
}
func isPodUnschedulable(pod *v1.Pod) bool {
_, cond := podutil.GetPodCondition(&pod.Status, v1.PodScheduled)
return cond != nil && cond.Status == v1.ConditionFalse && cond.Reason == v1.PodReasonUnschedulable
}
// AddUnschedulableIfNotPresent does nothing if the pod is present in either
// queue. Otherwise it adds the pod to the unschedulable queue if
// p.receivedMoveRequest is false, and to the activeQ if p.receivedMoveRequest is true.
func (p *PriorityQueue) AddUnschedulableIfNotPresent(pod *v1.Pod) error {
p.lock.Lock()
defer p.lock.Unlock()
if p.unschedulableQ.get(pod) != nil {
return fmt.Errorf("pod is already present in unschedulableQ")
}
if _, exists, _ := p.activeQ.Get(pod); exists {
return fmt.Errorf("pod is already present in the activeQ")
}
if !p.receivedMoveRequest && isPodUnschedulable(pod) {
p.unschedulableQ.addOrUpdate(pod)
p.addNominatedPodIfNeeded(pod)
return nil
}
err := p.activeQ.Add(pod)
if err == nil {
p.addNominatedPodIfNeeded(pod)
p.cond.Broadcast()
}
return err
}
// Pop removes the head of the active queue and returns it. It blocks if the
// activeQ is empty and waits until a new item is added to the queue. It also
// clears receivedMoveRequest to mark the beginning of a new scheduling cycle.
func (p *PriorityQueue) Pop() (*v1.Pod, error) {
p.lock.Lock()
defer p.lock.Unlock()
for len(p.activeQ.data.queue) == 0 {
// When the queue is empty, invocation of Pop() is blocked until new item is enqueued.
// When Close() is called, the p.closed is set and the condition is broadcast,
// which causes this loop to continue and return from the Pop().
if p.closed {
return nil, fmt.Errorf(queueClosed)
}
p.cond.Wait()
}
obj, err := p.activeQ.Pop()
if err != nil {
return nil, err
}
pod := obj.(*v1.Pod)
p.receivedMoveRequest = false
return pod, err
}
// isPodUpdated checks if the pod is updated in a way that it may have become
// schedulable. It drops status of the pod and compares it with old version.
func isPodUpdated(oldPod, newPod *v1.Pod) bool {
strip := func(pod *v1.Pod) *v1.Pod {
p := pod.DeepCopy()
p.ResourceVersion = ""
p.Generation = 0
p.Status = v1.PodStatus{}
return p
}
return !reflect.DeepEqual(strip(oldPod), strip(newPod))
}
// Update updates a pod in the active queue if present. Otherwise, it removes
// the item from the unschedulable queue and adds the updated one to the active
// queue.
func (p *PriorityQueue) Update(oldPod, newPod *v1.Pod) error {
p.lock.Lock()
defer p.lock.Unlock()
// If the pod is already in the active queue, just update it there.
if _, exists, _ := p.activeQ.Get(newPod); exists {
p.updateNominatedPod(oldPod, newPod)
err := p.activeQ.Update(newPod)
return err
}
// If the pod is in the unschedulable queue, updating it may make it schedulable.
if usPod := p.unschedulableQ.get(newPod); usPod != nil {
p.updateNominatedPod(oldPod, newPod)
if isPodUpdated(oldPod, newPod) {
p.unschedulableQ.delete(usPod)
err := p.activeQ.Add(newPod)
if err == nil {
p.cond.Broadcast()
}
return err
}
p.unschedulableQ.addOrUpdate(newPod)
return nil
}
// If pod is not in any of the two queue, we put it in the active queue.
err := p.activeQ.Add(newPod)
if err == nil {
p.addNominatedPodIfNeeded(newPod)
p.cond.Broadcast()
}
return err
}
// Delete deletes the item from either of the two queues. It assumes the pod is
// only in one queue.
func (p *PriorityQueue) Delete(pod *v1.Pod) error {
p.lock.Lock()
defer p.lock.Unlock()
p.deleteNominatedPodIfExists(pod)
err := p.activeQ.Delete(pod)
if err != nil { // The item was probably not found in the activeQ.
p.unschedulableQ.delete(pod)
}
return nil
}
// AssignedPodAdded is called when a bound pod is added. Creation of this pod
// may make pending pods with matching affinity terms schedulable.
func (p *PriorityQueue) AssignedPodAdded(pod *v1.Pod) {
p.lock.Lock()
p.movePodsToActiveQueue(p.getUnschedulablePodsWithMatchingAffinityTerm(pod))
p.lock.Unlock()
}
// AssignedPodUpdated is called when a bound pod is updated. Change of labels
// may make pending pods with matching affinity terms schedulable.
func (p *PriorityQueue) AssignedPodUpdated(pod *v1.Pod) {
p.lock.Lock()
p.movePodsToActiveQueue(p.getUnschedulablePodsWithMatchingAffinityTerm(pod))
p.lock.Unlock()
}
// MoveAllToActiveQueue moves all pods from unschedulableQ to activeQ. This
// function adds all pods and then signals the condition variable to ensure that
// if Pop() is waiting for an item, it receives it after all the pods are in the
// queue and the head is the highest priority pod.
// TODO(bsalamat): We should add a back-off mechanism here so that a high priority
// pod which is unschedulable does not go to the head of the queue frequently. For
// example in a cluster where a lot of pods being deleted, such a high priority
// pod can deprive other pods from getting scheduled.
func (p *PriorityQueue) MoveAllToActiveQueue() {
p.lock.Lock()
defer p.lock.Unlock()
for _, pod := range p.unschedulableQ.pods {
if err := p.activeQ.Add(pod); err != nil {
klog.Errorf("Error adding pod %v/%v to the scheduling queue: %v", pod.Namespace, pod.Name, err)
}
}
p.unschedulableQ.clear()
p.receivedMoveRequest = true
p.cond.Broadcast()
}
// NOTE: this function assumes lock has been acquired in caller
func (p *PriorityQueue) movePodsToActiveQueue(pods []*v1.Pod) {
for _, pod := range pods {
if err := p.activeQ.Add(pod); err == nil {
p.unschedulableQ.delete(pod)
} else {
klog.Errorf("Error adding pod %v/%v to the scheduling queue: %v", pod.Namespace, pod.Name, err)
}
}
p.receivedMoveRequest = true
p.cond.Broadcast()
}
// getUnschedulablePodsWithMatchingAffinityTerm returns unschedulable pods which have
// any affinity term that matches "pod".
// NOTE: this function assumes lock has been acquired in caller.
func (p *PriorityQueue) getUnschedulablePodsWithMatchingAffinityTerm(pod *v1.Pod) []*v1.Pod {
var podsToMove []*v1.Pod
for _, up := range p.unschedulableQ.pods {
affinity := up.Spec.Affinity
if affinity != nil && affinity.PodAffinity != nil {
terms := predicates.GetPodAffinityTerms(affinity.PodAffinity)
for _, term := range terms {
namespaces := priorityutil.GetNamespacesFromPodAffinityTerm(up, &term)
selector, err := metav1.LabelSelectorAsSelector(term.LabelSelector)
if err != nil {
klog.Errorf("Error getting label selectors for pod: %v.", up.Name)
}
if priorityutil.PodMatchesTermsNamespaceAndSelector(pod, namespaces, selector) {
podsToMove = append(podsToMove, up)
break
}
}
}
}
return podsToMove
}
// WaitingPodsForNode returns pods that are nominated to run on the given node,
// but they are waiting for other pods to be removed from the node before they
// can be actually scheduled.
func (p *PriorityQueue) WaitingPodsForNode(nodeName string) []*v1.Pod {
p.lock.RLock()
defer p.lock.RUnlock()
if list, ok := p.nominatedPods[nodeName]; ok {
return list
}
return nil
}
// WaitingPods returns all the waiting pods in the queue.
func (p *PriorityQueue) WaitingPods() []*v1.Pod {
p.lock.Lock()
defer p.lock.Unlock()
result := []*v1.Pod{}
for _, pod := range p.activeQ.List() {
result = append(result, pod.(*v1.Pod))
}
for _, pod := range p.unschedulableQ.pods {
result = append(result, pod)
}
return result
}
// Close closes the priority queue.
func (p *PriorityQueue) Close() {
p.lock.Lock()
defer p.lock.Unlock()
p.closed = true
p.cond.Broadcast()
}
// DeleteNominatedPodIfExists deletes pod from internal cache if it's a nominatedPod
func (p *PriorityQueue) DeleteNominatedPodIfExists(pod *v1.Pod) {
p.lock.Lock()
p.deleteNominatedPodIfExists(pod)
p.lock.Unlock()
}
// UnschedulablePodsMap holds pods that cannot be scheduled. This data structure
// is used to implement unschedulableQ.
type UnschedulablePodsMap struct {
// pods is a map key by a pod's full-name and the value is a pointer to the pod.
pods map[string]*v1.Pod
keyFunc func(*v1.Pod) string
}
// Add adds a pod to the unschedulable pods.
func (u *UnschedulablePodsMap) addOrUpdate(pod *v1.Pod) {
u.pods[u.keyFunc(pod)] = pod
}
// Delete deletes a pod from the unschedulable pods.
func (u *UnschedulablePodsMap) delete(pod *v1.Pod) {
delete(u.pods, u.keyFunc(pod))
}
// Get returns the pod if a pod with the same key as the key of the given "pod"
// is found in the map. It returns nil otherwise.
func (u *UnschedulablePodsMap) get(pod *v1.Pod) *v1.Pod {
podKey := u.keyFunc(pod)
if p, exists := u.pods[podKey]; exists {
return p
}
return nil
}
// Clear removes all the entries from the unschedulable maps.
func (u *UnschedulablePodsMap) clear() {
u.pods = make(map[string]*v1.Pod)
}
// newUnschedulablePodsMap initializes a new object of UnschedulablePodsMap.
func newUnschedulablePodsMap() *UnschedulablePodsMap {
return &UnschedulablePodsMap{
pods: make(map[string]*v1.Pod),
keyFunc: util.GetPodFullName,
}
}
// Below is the implementation of the a heap. The logic is pretty much the same
// as cache.heap, however, this heap does not perform synchronization. It leaves
// synchronization to the SchedulingQueue.
// LessFunc is a function type to compare two objects.
type LessFunc func(interface{}, interface{}) bool
// KeyFunc is a function type to get the key from an object.
type KeyFunc func(obj interface{}) (string, error)
type heapItem struct {
obj interface{} // The object which is stored in the heap.
index int // The index of the object's key in the Heap.queue.
}
type itemKeyValue struct {
key string
obj interface{}
}
// heapData is an internal struct that implements the standard heap interface
// and keeps the data stored in the heap.
type heapData struct {
// items is a map from key of the objects to the objects and their index.
// We depend on the property that items in the map are in the queue and vice versa.
items map[string]*heapItem
// queue implements a heap data structure and keeps the order of elements
// according to the heap invariant. The queue keeps the keys of objects stored
// in "items".
queue []string
// keyFunc is used to make the key used for queued item insertion and retrieval, and
// should be deterministic.
keyFunc KeyFunc
// lessFunc is used to compare two objects in the heap.
lessFunc LessFunc
}
var (
_ = heap.Interface(&heapData{}) // heapData is a standard heap
)
// Less compares two objects and returns true if the first one should go
// in front of the second one in the heap.
func (h *heapData) Less(i, j int) bool {
if i > len(h.queue) || j > len(h.queue) {
return false
}
itemi, ok := h.items[h.queue[i]]
if !ok {
return false
}
itemj, ok := h.items[h.queue[j]]
if !ok {
return false
}
return h.lessFunc(itemi.obj, itemj.obj)
}
// Len returns the number of items in the Heap.
func (h *heapData) Len() int { return len(h.queue) }
// Swap implements swapping of two elements in the heap. This is a part of standard
// heap interface and should never be called directly.
func (h *heapData) Swap(i, j int) {
h.queue[i], h.queue[j] = h.queue[j], h.queue[i]
item := h.items[h.queue[i]]
item.index = i
item = h.items[h.queue[j]]
item.index = j
}
// Push is supposed to be called by heap.Push only.
func (h *heapData) Push(kv interface{}) {
keyValue := kv.(*itemKeyValue)
n := len(h.queue)
h.items[keyValue.key] = &heapItem{keyValue.obj, n}
h.queue = append(h.queue, keyValue.key)
}
// Pop is supposed to be called by heap.Pop only.
func (h *heapData) Pop() interface{} {
key := h.queue[len(h.queue)-1]
h.queue = h.queue[0 : len(h.queue)-1]
item, ok := h.items[key]
if !ok {
// This is an error
return nil
}
delete(h.items, key)
return item.obj
}
// Heap is a producer/consumer queue that implements a heap data structure.
// It can be used to implement priority queues and similar data structures.
type Heap struct {
// data stores objects and has a queue that keeps their ordering according
// to the heap invariant.
data *heapData
}
// Add inserts an item, and puts it in the queue. The item is updated if it
// already exists.
func (h *Heap) Add(obj interface{}) error {
key, err := h.data.keyFunc(obj)
if err != nil {
return cache.KeyError{Obj: obj, Err: err}
}
if _, exists := h.data.items[key]; exists {
h.data.items[key].obj = obj
heap.Fix(h.data, h.data.items[key].index)
} else {
heap.Push(h.data, &itemKeyValue{key, obj})
}
return nil
}
// AddIfNotPresent inserts an item, and puts it in the queue. If an item with
// the key is present in the map, no changes is made to the item.
func (h *Heap) AddIfNotPresent(obj interface{}) error {
key, err := h.data.keyFunc(obj)
if err != nil {
return cache.KeyError{Obj: obj, Err: err}
}
if _, exists := h.data.items[key]; !exists {
heap.Push(h.data, &itemKeyValue{key, obj})
}
return nil
}
// Update is the same as Add in this implementation. When the item does not
// exist, it is added.
func (h *Heap) Update(obj interface{}) error {
return h.Add(obj)
}
// Delete removes an item.
func (h *Heap) Delete(obj interface{}) error {
key, err := h.data.keyFunc(obj)
if err != nil {
return cache.KeyError{Obj: obj, Err: err}
}
if item, ok := h.data.items[key]; ok {
heap.Remove(h.data, item.index)
return nil
}
return fmt.Errorf("object not found")
}
// Pop returns the head of the heap.
func (h *Heap) Pop() (interface{}, error) {
obj := heap.Pop(h.data)
if obj != nil {
return obj, nil
}
return nil, fmt.Errorf("object was removed from heap data")
}
// Get returns the requested item, or sets exists=false.
func (h *Heap) Get(obj interface{}) (interface{}, bool, error) {
key, err := h.data.keyFunc(obj)
if err != nil {
return nil, false, cache.KeyError{Obj: obj, Err: err}
}
return h.GetByKey(key)
}
// GetByKey returns the requested item, or sets exists=false.
func (h *Heap) GetByKey(key string) (interface{}, bool, error) {
item, exists := h.data.items[key]
if !exists {
return nil, false, nil
}
return item.obj, true, nil
}
// List returns a list of all the items.
func (h *Heap) List() []interface{} {
list := make([]interface{}, 0, len(h.data.items))
for _, item := range h.data.items {
list = append(list, item.obj)
}
return list
}
// newHeap returns a Heap which can be used to queue up items to process.
func newHeap(keyFn KeyFunc, lessFn LessFunc) *Heap {
return &Heap{
data: &heapData{
items: map[string]*heapItem{},
queue: []string{},
keyFunc: keyFn,
lessFunc: lessFn,
},
}
}

View File

@ -0,0 +1,514 @@
/*
Copyright 2017 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package queue
import (
"fmt"
"reflect"
"sync"
"testing"
"k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/kubernetes/pkg/scheduler/util"
)
var negPriority, lowPriority, midPriority, highPriority, veryHighPriority = int32(-100), int32(0), int32(100), int32(1000), int32(10000)
var mediumPriority = (lowPriority + highPriority) / 2
var highPriorityPod, highPriNominatedPod, medPriorityPod, unschedulablePod = v1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: "hpp",
Namespace: "ns1",
UID: "hppns1",
},
Spec: v1.PodSpec{
Priority: &highPriority,
},
},
v1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: "hpp",
Namespace: "ns1",
UID: "hppns1",
},
Spec: v1.PodSpec{
Priority: &highPriority,
},
Status: v1.PodStatus{
NominatedNodeName: "node1",
},
},
v1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: "mpp",
Namespace: "ns2",
UID: "mppns2",
Annotations: map[string]string{
"annot2": "val2",
},
},
Spec: v1.PodSpec{
Priority: &mediumPriority,
},
Status: v1.PodStatus{
NominatedNodeName: "node1",
},
},
v1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: "up",
Namespace: "ns1",
UID: "upns1",
Annotations: map[string]string{
"annot2": "val2",
},
},
Spec: v1.PodSpec{
Priority: &lowPriority,
},
Status: v1.PodStatus{
Conditions: []v1.PodCondition{
{
Type: v1.PodScheduled,
Status: v1.ConditionFalse,
Reason: v1.PodReasonUnschedulable,
},
},
NominatedNodeName: "node1",
},
}
func TestPriorityQueue_Add(t *testing.T) {
q := NewPriorityQueue()
q.Add(&medPriorityPod)
q.Add(&unschedulablePod)
q.Add(&highPriorityPod)
expectedNominatedPods := map[string][]*v1.Pod{
"node1": {&medPriorityPod, &unschedulablePod},
}
if !reflect.DeepEqual(q.nominatedPods, expectedNominatedPods) {
t.Errorf("Unexpected nominated map after adding pods. Expected: %v, got: %v", expectedNominatedPods, q.nominatedPods)
}
if p, err := q.Pop(); err != nil || p != &highPriorityPod {
t.Errorf("Expected: %v after Pop, but got: %v", highPriorityPod.Name, p.Name)
}
if p, err := q.Pop(); err != nil || p != &medPriorityPod {
t.Errorf("Expected: %v after Pop, but got: %v", medPriorityPod.Name, p.Name)
}
if p, err := q.Pop(); err != nil || p != &unschedulablePod {
t.Errorf("Expected: %v after Pop, but got: %v", unschedulablePod.Name, p.Name)
}
if len(q.nominatedPods["node1"]) != 2 {
t.Errorf("Expected medPriorityPod and unschedulablePod to be still present in nomindatePods: %v", q.nominatedPods["node1"])
}
}
func TestPriorityQueue_AddIfNotPresent(t *testing.T) {
q := NewPriorityQueue()
q.unschedulableQ.addOrUpdate(&highPriNominatedPod)
q.AddIfNotPresent(&highPriNominatedPod) // Must not add anything.
q.AddIfNotPresent(&medPriorityPod)
q.AddIfNotPresent(&unschedulablePod)
expectedNominatedPods := map[string][]*v1.Pod{
"node1": {&medPriorityPod, &unschedulablePod},
}
if !reflect.DeepEqual(q.nominatedPods, expectedNominatedPods) {
t.Errorf("Unexpected nominated map after adding pods. Expected: %v, got: %v", expectedNominatedPods, q.nominatedPods)
}
if p, err := q.Pop(); err != nil || p != &medPriorityPod {
t.Errorf("Expected: %v after Pop, but got: %v", medPriorityPod.Name, p.Name)
}
if p, err := q.Pop(); err != nil || p != &unschedulablePod {
t.Errorf("Expected: %v after Pop, but got: %v", unschedulablePod.Name, p.Name)
}
if len(q.nominatedPods["node1"]) != 2 {
t.Errorf("Expected medPriorityPod and unschedulablePod to be still present in nomindatePods: %v", q.nominatedPods["node1"])
}
if q.unschedulableQ.get(&highPriNominatedPod) != &highPriNominatedPod {
t.Errorf("Pod %v was not found in the unschedulableQ.", highPriNominatedPod.Name)
}
}
func TestPriorityQueue_AddUnschedulableIfNotPresent(t *testing.T) {
q := NewPriorityQueue()
q.Add(&highPriNominatedPod)
q.AddUnschedulableIfNotPresent(&highPriNominatedPod) // Must not add anything.
q.AddUnschedulableIfNotPresent(&medPriorityPod) // This should go to activeQ.
q.AddUnschedulableIfNotPresent(&unschedulablePod)
expectedNominatedPods := map[string][]*v1.Pod{
"node1": {&highPriNominatedPod, &medPriorityPod, &unschedulablePod},
}
if !reflect.DeepEqual(q.nominatedPods, expectedNominatedPods) {
t.Errorf("Unexpected nominated map after adding pods. Expected: %v, got: %v", expectedNominatedPods, q.nominatedPods)
}
if p, err := q.Pop(); err != nil || p != &highPriNominatedPod {
t.Errorf("Expected: %v after Pop, but got: %v", highPriNominatedPod.Name, p.Name)
}
if p, err := q.Pop(); err != nil || p != &medPriorityPod {
t.Errorf("Expected: %v after Pop, but got: %v", medPriorityPod.Name, p.Name)
}
if len(q.nominatedPods) != 1 {
t.Errorf("Expected nomindatePods to have one element: %v", q.nominatedPods)
}
if q.unschedulableQ.get(&unschedulablePod) != &unschedulablePod {
t.Errorf("Pod %v was not found in the unschedulableQ.", unschedulablePod.Name)
}
}
func TestPriorityQueue_Pop(t *testing.T) {
q := NewPriorityQueue()
wg := sync.WaitGroup{}
wg.Add(1)
go func() {
defer wg.Done()
if p, err := q.Pop(); err != nil || p != &medPriorityPod {
t.Errorf("Expected: %v after Pop, but got: %v", medPriorityPod.Name, p.Name)
}
if len(q.nominatedPods["node1"]) != 1 {
t.Errorf("Expected medPriorityPod to be present in nomindatePods: %v", q.nominatedPods["node1"])
}
}()
q.Add(&medPriorityPod)
wg.Wait()
}
func TestPriorityQueue_Update(t *testing.T) {
q := NewPriorityQueue()
q.Update(nil, &highPriorityPod)
if _, exists, _ := q.activeQ.Get(&highPriorityPod); !exists {
t.Errorf("Expected %v to be added to activeQ.", highPriorityPod.Name)
}
if len(q.nominatedPods) != 0 {
t.Errorf("Expected nomindatePods to be empty: %v", q.nominatedPods)
}
// Update highPriorityPod and add a nominatedNodeName to it.
q.Update(&highPriorityPod, &highPriNominatedPod)
if q.activeQ.data.Len() != 1 {
t.Error("Expected only one item in activeQ.")
}
if len(q.nominatedPods) != 1 {
t.Errorf("Expected one item in nomindatePods map: %v", q.nominatedPods)
}
// Updating an unschedulable pod which is not in any of the two queues, should
// add the pod to activeQ.
q.Update(&unschedulablePod, &unschedulablePod)
if _, exists, _ := q.activeQ.Get(&unschedulablePod); !exists {
t.Errorf("Expected %v to be added to activeQ.", unschedulablePod.Name)
}
// Updating a pod that is already in activeQ, should not change it.
q.Update(&unschedulablePod, &unschedulablePod)
if len(q.unschedulableQ.pods) != 0 {
t.Error("Expected unschedulableQ to be empty.")
}
if _, exists, _ := q.activeQ.Get(&unschedulablePod); !exists {
t.Errorf("Expected: %v to be added to activeQ.", unschedulablePod.Name)
}
if p, err := q.Pop(); err != nil || p != &highPriNominatedPod {
t.Errorf("Expected: %v after Pop, but got: %v", highPriorityPod.Name, p.Name)
}
}
func TestPriorityQueue_Delete(t *testing.T) {
q := NewPriorityQueue()
q.Update(&highPriorityPod, &highPriNominatedPod)
q.Add(&unschedulablePod)
q.Delete(&highPriNominatedPod)
if _, exists, _ := q.activeQ.Get(&unschedulablePod); !exists {
t.Errorf("Expected %v to be in activeQ.", unschedulablePod.Name)
}
if _, exists, _ := q.activeQ.Get(&highPriNominatedPod); exists {
t.Errorf("Didn't expect %v to be in activeQ.", highPriorityPod.Name)
}
if len(q.nominatedPods) != 1 {
t.Errorf("Expected nomindatePods to have only 'unschedulablePod': %v", q.nominatedPods)
}
q.Delete(&unschedulablePod)
if len(q.nominatedPods) != 0 {
t.Errorf("Expected nomindatePods to be empty: %v", q.nominatedPods)
}
}
func TestPriorityQueue_MoveAllToActiveQueue(t *testing.T) {
q := NewPriorityQueue()
q.Add(&medPriorityPod)
q.unschedulableQ.addOrUpdate(&unschedulablePod)
q.unschedulableQ.addOrUpdate(&highPriorityPod)
q.MoveAllToActiveQueue()
if q.activeQ.data.Len() != 3 {
t.Error("Expected all items to be in activeQ.")
}
}
// TestPriorityQueue_AssignedPodAdded tests AssignedPodAdded. It checks that
// when a pod with pod affinity is in unschedulableQ and another pod with a
// matching label is added, the unschedulable pod is moved to activeQ.
func TestPriorityQueue_AssignedPodAdded(t *testing.T) {
affinityPod := unschedulablePod.DeepCopy()
affinityPod.Name = "afp"
affinityPod.Spec = v1.PodSpec{
Affinity: &v1.Affinity{
PodAffinity: &v1.PodAffinity{
RequiredDuringSchedulingIgnoredDuringExecution: []v1.PodAffinityTerm{
{
LabelSelector: &metav1.LabelSelector{
MatchExpressions: []metav1.LabelSelectorRequirement{
{
Key: "service",
Operator: metav1.LabelSelectorOpIn,
Values: []string{"securityscan", "value2"},
},
},
},
TopologyKey: "region",
},
},
},
},
Priority: &mediumPriority,
}
labelPod := v1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: "lbp",
Namespace: affinityPod.Namespace,
Labels: map[string]string{"service": "securityscan"},
},
Spec: v1.PodSpec{NodeName: "machine1"},
}
q := NewPriorityQueue()
q.Add(&medPriorityPod)
// Add a couple of pods to the unschedulableQ.
q.unschedulableQ.addOrUpdate(&unschedulablePod)
q.unschedulableQ.addOrUpdate(affinityPod)
// Simulate addition of an assigned pod. The pod has matching labels for
// affinityPod. So, affinityPod should go to activeQ.
q.AssignedPodAdded(&labelPod)
if q.unschedulableQ.get(affinityPod) != nil {
t.Error("affinityPod is still in the unschedulableQ.")
}
if _, exists, _ := q.activeQ.Get(affinityPod); !exists {
t.Error("affinityPod is not moved to activeQ.")
}
// Check that the other pod is still in the unschedulableQ.
if q.unschedulableQ.get(&unschedulablePod) == nil {
t.Error("unschedulablePod is not in the unschedulableQ.")
}
}
func TestPriorityQueue_WaitingPodsForNode(t *testing.T) {
q := NewPriorityQueue()
q.Add(&medPriorityPod)
q.Add(&unschedulablePod)
q.Add(&highPriorityPod)
if p, err := q.Pop(); err != nil || p != &highPriorityPod {
t.Errorf("Expected: %v after Pop, but got: %v", highPriorityPod.Name, p.Name)
}
expectedList := []*v1.Pod{&medPriorityPod, &unschedulablePod}
if !reflect.DeepEqual(expectedList, q.WaitingPodsForNode("node1")) {
t.Error("Unexpected list of nominated Pods for node.")
}
if q.WaitingPodsForNode("node2") != nil {
t.Error("Expected list of nominated Pods for node2 to be empty.")
}
}
func TestUnschedulablePodsMap(t *testing.T) {
var pods = []*v1.Pod{
{
ObjectMeta: metav1.ObjectMeta{
Name: "p0",
Namespace: "ns1",
Annotations: map[string]string{
"annot1": "val1",
},
},
Status: v1.PodStatus{
NominatedNodeName: "node1",
},
},
{
ObjectMeta: metav1.ObjectMeta{
Name: "p1",
Namespace: "ns1",
Annotations: map[string]string{
"annot": "val",
},
},
},
{
ObjectMeta: metav1.ObjectMeta{
Name: "p2",
Namespace: "ns2",
Annotations: map[string]string{
"annot2": "val2", "annot3": "val3",
},
},
Status: v1.PodStatus{
NominatedNodeName: "node3",
},
},
{
ObjectMeta: metav1.ObjectMeta{
Name: "p3",
Namespace: "ns4",
},
Status: v1.PodStatus{
NominatedNodeName: "node1",
},
},
}
var updatedPods = make([]*v1.Pod, len(pods))
updatedPods[0] = pods[0].DeepCopy()
updatedPods[1] = pods[1].DeepCopy()
updatedPods[3] = pods[3].DeepCopy()
tests := []struct {
name string
podsToAdd []*v1.Pod
expectedMapAfterAdd map[string]*v1.Pod
podsToUpdate []*v1.Pod
expectedMapAfterUpdate map[string]*v1.Pod
podsToDelete []*v1.Pod
expectedMapAfterDelete map[string]*v1.Pod
}{
{
name: "create, update, delete subset of pods",
podsToAdd: []*v1.Pod{pods[0], pods[1], pods[2], pods[3]},
expectedMapAfterAdd: map[string]*v1.Pod{
util.GetPodFullName(pods[0]): pods[0],
util.GetPodFullName(pods[1]): pods[1],
util.GetPodFullName(pods[2]): pods[2],
util.GetPodFullName(pods[3]): pods[3],
},
podsToUpdate: []*v1.Pod{updatedPods[0]},
expectedMapAfterUpdate: map[string]*v1.Pod{
util.GetPodFullName(pods[0]): updatedPods[0],
util.GetPodFullName(pods[1]): pods[1],
util.GetPodFullName(pods[2]): pods[2],
util.GetPodFullName(pods[3]): pods[3],
},
podsToDelete: []*v1.Pod{pods[0], pods[1]},
expectedMapAfterDelete: map[string]*v1.Pod{
util.GetPodFullName(pods[2]): pods[2],
util.GetPodFullName(pods[3]): pods[3],
},
},
{
name: "create, update, delete all",
podsToAdd: []*v1.Pod{pods[0], pods[3]},
expectedMapAfterAdd: map[string]*v1.Pod{
util.GetPodFullName(pods[0]): pods[0],
util.GetPodFullName(pods[3]): pods[3],
},
podsToUpdate: []*v1.Pod{updatedPods[3]},
expectedMapAfterUpdate: map[string]*v1.Pod{
util.GetPodFullName(pods[0]): pods[0],
util.GetPodFullName(pods[3]): updatedPods[3],
},
podsToDelete: []*v1.Pod{pods[0], pods[3]},
expectedMapAfterDelete: map[string]*v1.Pod{},
},
{
name: "delete non-existing and existing pods",
podsToAdd: []*v1.Pod{pods[1], pods[2]},
expectedMapAfterAdd: map[string]*v1.Pod{
util.GetPodFullName(pods[1]): pods[1],
util.GetPodFullName(pods[2]): pods[2],
},
podsToUpdate: []*v1.Pod{updatedPods[1]},
expectedMapAfterUpdate: map[string]*v1.Pod{
util.GetPodFullName(pods[1]): updatedPods[1],
util.GetPodFullName(pods[2]): pods[2],
},
podsToDelete: []*v1.Pod{pods[2], pods[3]},
expectedMapAfterDelete: map[string]*v1.Pod{
util.GetPodFullName(pods[1]): updatedPods[1],
},
},
}
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
upm := newUnschedulablePodsMap()
for _, p := range test.podsToAdd {
upm.addOrUpdate(p)
}
if !reflect.DeepEqual(upm.pods, test.expectedMapAfterAdd) {
t.Errorf("Unexpected map after adding pods. Expected: %v, got: %v",
test.expectedMapAfterAdd, upm.pods)
}
if len(test.podsToUpdate) > 0 {
for _, p := range test.podsToUpdate {
upm.addOrUpdate(p)
}
if !reflect.DeepEqual(upm.pods, test.expectedMapAfterUpdate) {
t.Errorf("Unexpected map after updating pods. Expected: %v, got: %v",
test.expectedMapAfterUpdate, upm.pods)
}
}
for _, p := range test.podsToDelete {
upm.delete(p)
}
if !reflect.DeepEqual(upm.pods, test.expectedMapAfterDelete) {
t.Errorf("Unexpected map after deleting pods. Expected: %v, got: %v",
test.expectedMapAfterDelete, upm.pods)
}
upm.clear()
if len(upm.pods) != 0 {
t.Errorf("Expected the map to be empty, but has %v elements.", len(upm.pods))
}
})
}
}
func TestSchedulingQueue_Close(t *testing.T) {
tests := []struct {
name string
q SchedulingQueue
expectedErr error
}{
{
name: "FIFO close",
q: NewFIFO(),
expectedErr: fmt.Errorf(queueClosed),
},
{
name: "PriorityQueue close",
q: NewPriorityQueue(),
expectedErr: fmt.Errorf(queueClosed),
},
}
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
wg := sync.WaitGroup{}
wg.Add(1)
go func() {
defer wg.Done()
pod, err := test.q.Pop()
if err.Error() != test.expectedErr.Error() {
t.Errorf("Expected err %q from Pop() if queue is closed, but got %q", test.expectedErr.Error(), err.Error())
}
if pod != nil {
t.Errorf("Expected pod nil from Pop() if queue is closed, but got: %v", pod)
}
}()
test.q.Close()
wg.Wait()
})
}
}