build: move e2e dependencies into e2e/go.mod

Several packages are only used while running the e2e suite. These
packages are less important to update, as the they can not influence the
final executable that is part of the Ceph-CSI container-image.

By moving these dependencies out of the main Ceph-CSI go.mod, it is
easier to identify if a reported CVE affects Ceph-CSI, or only the
testing (like most of the Kubernetes CVEs).

Signed-off-by: Niels de Vos <ndevos@ibm.com>
This commit is contained in:
Niels de Vos
2025-03-04 08:57:28 +01:00
committed by mergify[bot]
parent 15da101b1b
commit bec6090996
8047 changed files with 1407827 additions and 3453 deletions

View File

@ -0,0 +1,760 @@
/*
Copyright 2015 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package cache
import (
"context"
"errors"
"fmt"
"sync"
"time"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/klog/v2"
"k8s.io/kubernetes/pkg/scheduler/framework"
"k8s.io/kubernetes/pkg/scheduler/metrics"
)
var (
cleanAssumedPeriod = 1 * time.Second
)
// New returns a Cache implementation.
// It automatically starts a go routine that manages expiration of assumed pods.
// "ttl" is how long the assumed pod will get expired.
// "ctx" is the context that would close the background goroutine.
func New(ctx context.Context, ttl time.Duration) Cache {
logger := klog.FromContext(ctx)
cache := newCache(ctx, ttl, cleanAssumedPeriod)
cache.run(logger)
return cache
}
// nodeInfoListItem holds a NodeInfo pointer and acts as an item in a doubly
// linked list. When a NodeInfo is updated, it goes to the head of the list.
// The items closer to the head are the most recently updated items.
type nodeInfoListItem struct {
info *framework.NodeInfo
next *nodeInfoListItem
prev *nodeInfoListItem
}
type cacheImpl struct {
stop <-chan struct{}
ttl time.Duration
period time.Duration
// This mutex guards all fields within this cache struct.
mu sync.RWMutex
// a set of assumed pod keys.
// The key could further be used to get an entry in podStates.
assumedPods sets.Set[string]
// a map from pod key to podState.
podStates map[string]*podState
nodes map[string]*nodeInfoListItem
// headNode points to the most recently updated NodeInfo in "nodes". It is the
// head of the linked list.
headNode *nodeInfoListItem
nodeTree *nodeTree
// A map from image name to its ImageStateSummary.
imageStates map[string]*framework.ImageStateSummary
}
type podState struct {
pod *v1.Pod
// Used by assumedPod to determinate expiration.
// If deadline is nil, assumedPod will never expire.
deadline *time.Time
// Used to block cache from expiring assumedPod if binding still runs
bindingFinished bool
}
func newCache(ctx context.Context, ttl, period time.Duration) *cacheImpl {
logger := klog.FromContext(ctx)
return &cacheImpl{
ttl: ttl,
period: period,
stop: ctx.Done(),
nodes: make(map[string]*nodeInfoListItem),
nodeTree: newNodeTree(logger, nil),
assumedPods: sets.New[string](),
podStates: make(map[string]*podState),
imageStates: make(map[string]*framework.ImageStateSummary),
}
}
// newNodeInfoListItem initializes a new nodeInfoListItem.
func newNodeInfoListItem(ni *framework.NodeInfo) *nodeInfoListItem {
return &nodeInfoListItem{
info: ni,
}
}
// moveNodeInfoToHead moves a NodeInfo to the head of "cache.nodes" doubly
// linked list. The head is the most recently updated NodeInfo.
// We assume cache lock is already acquired.
func (cache *cacheImpl) moveNodeInfoToHead(logger klog.Logger, name string) {
ni, ok := cache.nodes[name]
if !ok {
logger.Error(nil, "No node info with given name found in the cache", "node", klog.KRef("", name))
return
}
// if the node info list item is already at the head, we are done.
if ni == cache.headNode {
return
}
if ni.prev != nil {
ni.prev.next = ni.next
}
if ni.next != nil {
ni.next.prev = ni.prev
}
if cache.headNode != nil {
cache.headNode.prev = ni
}
ni.next = cache.headNode
ni.prev = nil
cache.headNode = ni
}
// removeNodeInfoFromList removes a NodeInfo from the "cache.nodes" doubly
// linked list.
// We assume cache lock is already acquired.
func (cache *cacheImpl) removeNodeInfoFromList(logger klog.Logger, name string) {
ni, ok := cache.nodes[name]
if !ok {
logger.Error(nil, "No node info with given name found in the cache", "node", klog.KRef("", name))
return
}
if ni.prev != nil {
ni.prev.next = ni.next
}
if ni.next != nil {
ni.next.prev = ni.prev
}
// if the removed item was at the head, we must update the head.
if ni == cache.headNode {
cache.headNode = ni.next
}
delete(cache.nodes, name)
}
// Dump produces a dump of the current scheduler cache. This is used for
// debugging purposes only and shouldn't be confused with UpdateSnapshot
// function.
// This method is expensive, and should be only used in non-critical path.
func (cache *cacheImpl) Dump() *Dump {
cache.mu.RLock()
defer cache.mu.RUnlock()
nodes := make(map[string]*framework.NodeInfo, len(cache.nodes))
for k, v := range cache.nodes {
nodes[k] = v.info.Snapshot()
}
return &Dump{
Nodes: nodes,
AssumedPods: cache.assumedPods.Union(nil),
}
}
// UpdateSnapshot takes a snapshot of cached NodeInfo map. This is called at
// beginning of every scheduling cycle.
// The snapshot only includes Nodes that are not deleted at the time this function is called.
// nodeInfo.Node() is guaranteed to be not nil for all the nodes in the snapshot.
// This function tracks generation number of NodeInfo and updates only the
// entries of an existing snapshot that have changed after the snapshot was taken.
func (cache *cacheImpl) UpdateSnapshot(logger klog.Logger, nodeSnapshot *Snapshot) error {
cache.mu.Lock()
defer cache.mu.Unlock()
// Get the last generation of the snapshot.
snapshotGeneration := nodeSnapshot.generation
// NodeInfoList and HavePodsWithAffinityNodeInfoList must be re-created if a node was added
// or removed from the cache.
updateAllLists := false
// HavePodsWithAffinityNodeInfoList must be re-created if a node changed its
// status from having pods with affinity to NOT having pods with affinity or the other
// way around.
updateNodesHavePodsWithAffinity := false
// HavePodsWithRequiredAntiAffinityNodeInfoList must be re-created if a node changed its
// status from having pods with required anti-affinity to NOT having pods with required
// anti-affinity or the other way around.
updateNodesHavePodsWithRequiredAntiAffinity := false
// usedPVCSet must be re-created whenever the head node generation is greater than
// last snapshot generation.
updateUsedPVCSet := false
// Start from the head of the NodeInfo doubly linked list and update snapshot
// of NodeInfos updated after the last snapshot.
for node := cache.headNode; node != nil; node = node.next {
if node.info.Generation <= snapshotGeneration {
// all the nodes are updated before the existing snapshot. We are done.
break
}
if np := node.info.Node(); np != nil {
existing, ok := nodeSnapshot.nodeInfoMap[np.Name]
if !ok {
updateAllLists = true
existing = &framework.NodeInfo{}
nodeSnapshot.nodeInfoMap[np.Name] = existing
}
clone := node.info.Snapshot()
// We track nodes that have pods with affinity, here we check if this node changed its
// status from having pods with affinity to NOT having pods with affinity or the other
// way around.
if (len(existing.PodsWithAffinity) > 0) != (len(clone.PodsWithAffinity) > 0) {
updateNodesHavePodsWithAffinity = true
}
if (len(existing.PodsWithRequiredAntiAffinity) > 0) != (len(clone.PodsWithRequiredAntiAffinity) > 0) {
updateNodesHavePodsWithRequiredAntiAffinity = true
}
if !updateUsedPVCSet {
if len(existing.PVCRefCounts) != len(clone.PVCRefCounts) {
updateUsedPVCSet = true
} else {
for pvcKey := range clone.PVCRefCounts {
if _, found := existing.PVCRefCounts[pvcKey]; !found {
updateUsedPVCSet = true
break
}
}
}
}
// We need to preserve the original pointer of the NodeInfo struct since it
// is used in the NodeInfoList, which we may not update.
*existing = *clone
}
}
// Update the snapshot generation with the latest NodeInfo generation.
if cache.headNode != nil {
nodeSnapshot.generation = cache.headNode.info.Generation
}
// Comparing to pods in nodeTree.
// Deleted nodes get removed from the tree, but they might remain in the nodes map
// if they still have non-deleted Pods.
if len(nodeSnapshot.nodeInfoMap) > cache.nodeTree.numNodes {
cache.removeDeletedNodesFromSnapshot(nodeSnapshot)
updateAllLists = true
}
if updateAllLists || updateNodesHavePodsWithAffinity || updateNodesHavePodsWithRequiredAntiAffinity || updateUsedPVCSet {
cache.updateNodeInfoSnapshotList(logger, nodeSnapshot, updateAllLists)
}
if len(nodeSnapshot.nodeInfoList) != cache.nodeTree.numNodes {
errMsg := fmt.Sprintf("snapshot state is not consistent, length of NodeInfoList=%v not equal to length of nodes in tree=%v "+
", length of NodeInfoMap=%v, length of nodes in cache=%v"+
", trying to recover",
len(nodeSnapshot.nodeInfoList), cache.nodeTree.numNodes,
len(nodeSnapshot.nodeInfoMap), len(cache.nodes))
logger.Error(nil, errMsg)
// We will try to recover by re-creating the lists for the next scheduling cycle, but still return an
// error to surface the problem, the error will likely cause a failure to the current scheduling cycle.
cache.updateNodeInfoSnapshotList(logger, nodeSnapshot, true)
return errors.New(errMsg)
}
return nil
}
func (cache *cacheImpl) updateNodeInfoSnapshotList(logger klog.Logger, snapshot *Snapshot, updateAll bool) {
snapshot.havePodsWithAffinityNodeInfoList = make([]*framework.NodeInfo, 0, cache.nodeTree.numNodes)
snapshot.havePodsWithRequiredAntiAffinityNodeInfoList = make([]*framework.NodeInfo, 0, cache.nodeTree.numNodes)
snapshot.usedPVCSet = sets.New[string]()
if updateAll {
// Take a snapshot of the nodes order in the tree
snapshot.nodeInfoList = make([]*framework.NodeInfo, 0, cache.nodeTree.numNodes)
nodesList, err := cache.nodeTree.list()
if err != nil {
logger.Error(err, "Error occurred while retrieving the list of names of the nodes from node tree")
}
for _, nodeName := range nodesList {
if nodeInfo := snapshot.nodeInfoMap[nodeName]; nodeInfo != nil {
snapshot.nodeInfoList = append(snapshot.nodeInfoList, nodeInfo)
if len(nodeInfo.PodsWithAffinity) > 0 {
snapshot.havePodsWithAffinityNodeInfoList = append(snapshot.havePodsWithAffinityNodeInfoList, nodeInfo)
}
if len(nodeInfo.PodsWithRequiredAntiAffinity) > 0 {
snapshot.havePodsWithRequiredAntiAffinityNodeInfoList = append(snapshot.havePodsWithRequiredAntiAffinityNodeInfoList, nodeInfo)
}
for key := range nodeInfo.PVCRefCounts {
snapshot.usedPVCSet.Insert(key)
}
} else {
logger.Error(nil, "Node exists in nodeTree but not in NodeInfoMap, this should not happen", "node", klog.KRef("", nodeName))
}
}
} else {
for _, nodeInfo := range snapshot.nodeInfoList {
if len(nodeInfo.PodsWithAffinity) > 0 {
snapshot.havePodsWithAffinityNodeInfoList = append(snapshot.havePodsWithAffinityNodeInfoList, nodeInfo)
}
if len(nodeInfo.PodsWithRequiredAntiAffinity) > 0 {
snapshot.havePodsWithRequiredAntiAffinityNodeInfoList = append(snapshot.havePodsWithRequiredAntiAffinityNodeInfoList, nodeInfo)
}
for key := range nodeInfo.PVCRefCounts {
snapshot.usedPVCSet.Insert(key)
}
}
}
}
// If certain nodes were deleted after the last snapshot was taken, we should remove them from the snapshot.
func (cache *cacheImpl) removeDeletedNodesFromSnapshot(snapshot *Snapshot) {
toDelete := len(snapshot.nodeInfoMap) - cache.nodeTree.numNodes
for name := range snapshot.nodeInfoMap {
if toDelete <= 0 {
break
}
if n, ok := cache.nodes[name]; !ok || n.info.Node() == nil {
delete(snapshot.nodeInfoMap, name)
toDelete--
}
}
}
// NodeCount returns the number of nodes in the cache.
// DO NOT use outside of tests.
func (cache *cacheImpl) NodeCount() int {
cache.mu.RLock()
defer cache.mu.RUnlock()
return len(cache.nodes)
}
// PodCount returns the number of pods in the cache (including those from deleted nodes).
// DO NOT use outside of tests.
func (cache *cacheImpl) PodCount() (int, error) {
cache.mu.RLock()
defer cache.mu.RUnlock()
// podFilter is expected to return true for most or all of the pods. We
// can avoid expensive array growth without wasting too much memory by
// pre-allocating capacity.
count := 0
for _, n := range cache.nodes {
count += len(n.info.Pods)
}
return count, nil
}
func (cache *cacheImpl) AssumePod(logger klog.Logger, pod *v1.Pod) error {
key, err := framework.GetPodKey(pod)
if err != nil {
return err
}
cache.mu.Lock()
defer cache.mu.Unlock()
if _, ok := cache.podStates[key]; ok {
return fmt.Errorf("pod %v(%v) is in the cache, so can't be assumed", key, klog.KObj(pod))
}
return cache.addPod(logger, pod, true)
}
func (cache *cacheImpl) FinishBinding(logger klog.Logger, pod *v1.Pod) error {
return cache.finishBinding(logger, pod, time.Now())
}
// finishBinding exists to make tests deterministic by injecting now as an argument
func (cache *cacheImpl) finishBinding(logger klog.Logger, pod *v1.Pod, now time.Time) error {
key, err := framework.GetPodKey(pod)
if err != nil {
return err
}
cache.mu.RLock()
defer cache.mu.RUnlock()
logger.V(5).Info("Finished binding for pod, can be expired", "podKey", key, "pod", klog.KObj(pod))
currState, ok := cache.podStates[key]
if ok && cache.assumedPods.Has(key) {
if cache.ttl == time.Duration(0) {
currState.deadline = nil
} else {
dl := now.Add(cache.ttl)
currState.deadline = &dl
}
currState.bindingFinished = true
}
return nil
}
func (cache *cacheImpl) ForgetPod(logger klog.Logger, pod *v1.Pod) error {
key, err := framework.GetPodKey(pod)
if err != nil {
return err
}
cache.mu.Lock()
defer cache.mu.Unlock()
currState, ok := cache.podStates[key]
if ok && currState.pod.Spec.NodeName != pod.Spec.NodeName {
return fmt.Errorf("pod %v(%v) was assumed on %v but assigned to %v", key, klog.KObj(pod), pod.Spec.NodeName, currState.pod.Spec.NodeName)
}
// Only assumed pod can be forgotten.
if ok && cache.assumedPods.Has(key) {
return cache.removePod(logger, pod)
}
return fmt.Errorf("pod %v(%v) wasn't assumed so cannot be forgotten", key, klog.KObj(pod))
}
// Assumes that lock is already acquired.
func (cache *cacheImpl) addPod(logger klog.Logger, pod *v1.Pod, assumePod bool) error {
key, err := framework.GetPodKey(pod)
if err != nil {
return err
}
n, ok := cache.nodes[pod.Spec.NodeName]
if !ok {
n = newNodeInfoListItem(framework.NewNodeInfo())
cache.nodes[pod.Spec.NodeName] = n
}
n.info.AddPod(pod)
cache.moveNodeInfoToHead(logger, pod.Spec.NodeName)
ps := &podState{
pod: pod,
}
cache.podStates[key] = ps
if assumePod {
cache.assumedPods.Insert(key)
}
return nil
}
// Assumes that lock is already acquired.
func (cache *cacheImpl) updatePod(logger klog.Logger, oldPod, newPod *v1.Pod) error {
if err := cache.removePod(logger, oldPod); err != nil {
return err
}
return cache.addPod(logger, newPod, false)
}
// Assumes that lock is already acquired.
// Removes a pod from the cached node info. If the node information was already
// removed and there are no more pods left in the node, cleans up the node from
// the cache.
func (cache *cacheImpl) removePod(logger klog.Logger, pod *v1.Pod) error {
key, err := framework.GetPodKey(pod)
if err != nil {
return err
}
n, ok := cache.nodes[pod.Spec.NodeName]
if !ok {
logger.Error(nil, "Node not found when trying to remove pod", "node", klog.KRef("", pod.Spec.NodeName), "podKey", key, "pod", klog.KObj(pod))
} else {
if err := n.info.RemovePod(logger, pod); err != nil {
return err
}
if len(n.info.Pods) == 0 && n.info.Node() == nil {
cache.removeNodeInfoFromList(logger, pod.Spec.NodeName)
} else {
cache.moveNodeInfoToHead(logger, pod.Spec.NodeName)
}
}
delete(cache.podStates, key)
delete(cache.assumedPods, key)
return nil
}
func (cache *cacheImpl) AddPod(logger klog.Logger, pod *v1.Pod) error {
key, err := framework.GetPodKey(pod)
if err != nil {
return err
}
cache.mu.Lock()
defer cache.mu.Unlock()
currState, ok := cache.podStates[key]
switch {
case ok && cache.assumedPods.Has(key):
// When assuming, we've already added the Pod to cache,
// Just update here to make sure the Pod's status is up-to-date.
if err = cache.updatePod(logger, currState.pod, pod); err != nil {
logger.Error(err, "Error occurred while updating pod")
}
if currState.pod.Spec.NodeName != pod.Spec.NodeName {
// The pod was added to a different node than it was assumed to.
logger.Info("Pod was added to a different node than it was assumed", "podKey", key, "pod", klog.KObj(pod), "assumedNode", klog.KRef("", pod.Spec.NodeName), "currentNode", klog.KRef("", currState.pod.Spec.NodeName))
return nil
}
case !ok:
// Pod was expired. We should add it back.
if err = cache.addPod(logger, pod, false); err != nil {
logger.Error(err, "Error occurred while adding pod")
}
default:
return fmt.Errorf("pod %v(%v) was already in added state", key, klog.KObj(pod))
}
return nil
}
func (cache *cacheImpl) UpdatePod(logger klog.Logger, oldPod, newPod *v1.Pod) error {
key, err := framework.GetPodKey(oldPod)
if err != nil {
return err
}
cache.mu.Lock()
defer cache.mu.Unlock()
currState, ok := cache.podStates[key]
if !ok {
return fmt.Errorf("pod %v(%v) is not added to scheduler cache, so cannot be updated", key, klog.KObj(oldPod))
}
// An assumed pod won't have Update/Remove event. It needs to have Add event
// before Update event, in which case the state would change from Assumed to Added.
if cache.assumedPods.Has(key) {
return fmt.Errorf("assumed pod %v(%v) should not be updated", key, klog.KObj(oldPod))
}
if currState.pod.Spec.NodeName != newPod.Spec.NodeName {
logger.Error(nil, "Pod updated on a different node than previously added to", "podKey", key, "pod", klog.KObj(oldPod))
logger.Error(nil, "scheduler cache is corrupted and can badly affect scheduling decisions")
klog.FlushAndExit(klog.ExitFlushTimeout, 1)
}
return cache.updatePod(logger, oldPod, newPod)
}
func (cache *cacheImpl) RemovePod(logger klog.Logger, pod *v1.Pod) error {
key, err := framework.GetPodKey(pod)
if err != nil {
return err
}
cache.mu.Lock()
defer cache.mu.Unlock()
currState, ok := cache.podStates[key]
if !ok {
return fmt.Errorf("pod %v(%v) is not found in scheduler cache, so cannot be removed from it", key, klog.KObj(pod))
}
if currState.pod.Spec.NodeName != pod.Spec.NodeName {
logger.Error(nil, "Pod was added to a different node than it was assumed", "podKey", key, "pod", klog.KObj(pod), "assumedNode", klog.KRef("", pod.Spec.NodeName), "currentNode", klog.KRef("", currState.pod.Spec.NodeName))
if pod.Spec.NodeName != "" {
// An empty NodeName is possible when the scheduler misses a Delete
// event and it gets the last known state from the informer cache.
logger.Error(nil, "scheduler cache is corrupted and can badly affect scheduling decisions")
klog.FlushAndExit(klog.ExitFlushTimeout, 1)
}
}
return cache.removePod(logger, currState.pod)
}
func (cache *cacheImpl) IsAssumedPod(pod *v1.Pod) (bool, error) {
key, err := framework.GetPodKey(pod)
if err != nil {
return false, err
}
cache.mu.RLock()
defer cache.mu.RUnlock()
return cache.assumedPods.Has(key), nil
}
// GetPod might return a pod for which its node has already been deleted from
// the main cache. This is useful to properly process pod update events.
func (cache *cacheImpl) GetPod(pod *v1.Pod) (*v1.Pod, error) {
key, err := framework.GetPodKey(pod)
if err != nil {
return nil, err
}
cache.mu.RLock()
defer cache.mu.RUnlock()
podState, ok := cache.podStates[key]
if !ok {
return nil, fmt.Errorf("pod %v(%v) does not exist in scheduler cache", key, klog.KObj(pod))
}
return podState.pod, nil
}
func (cache *cacheImpl) AddNode(logger klog.Logger, node *v1.Node) *framework.NodeInfo {
cache.mu.Lock()
defer cache.mu.Unlock()
n, ok := cache.nodes[node.Name]
if !ok {
n = newNodeInfoListItem(framework.NewNodeInfo())
cache.nodes[node.Name] = n
} else {
cache.removeNodeImageStates(n.info.Node())
}
cache.moveNodeInfoToHead(logger, node.Name)
cache.nodeTree.addNode(logger, node)
cache.addNodeImageStates(node, n.info)
n.info.SetNode(node)
return n.info.Snapshot()
}
func (cache *cacheImpl) UpdateNode(logger klog.Logger, oldNode, newNode *v1.Node) *framework.NodeInfo {
cache.mu.Lock()
defer cache.mu.Unlock()
n, ok := cache.nodes[newNode.Name]
if !ok {
n = newNodeInfoListItem(framework.NewNodeInfo())
cache.nodes[newNode.Name] = n
cache.nodeTree.addNode(logger, newNode)
} else {
cache.removeNodeImageStates(n.info.Node())
}
cache.moveNodeInfoToHead(logger, newNode.Name)
cache.nodeTree.updateNode(logger, oldNode, newNode)
cache.addNodeImageStates(newNode, n.info)
n.info.SetNode(newNode)
return n.info.Snapshot()
}
// RemoveNode removes a node from the cache's tree.
// The node might still have pods because their deletion events didn't arrive
// yet. Those pods are considered removed from the cache, being the node tree
// the source of truth.
// However, we keep a ghost node with the list of pods until all pod deletion
// events have arrived. A ghost node is skipped from snapshots.
func (cache *cacheImpl) RemoveNode(logger klog.Logger, node *v1.Node) error {
cache.mu.Lock()
defer cache.mu.Unlock()
n, ok := cache.nodes[node.Name]
if !ok {
return fmt.Errorf("node %v is not found", node.Name)
}
n.info.RemoveNode()
// We remove NodeInfo for this node only if there aren't any pods on this node.
// We can't do it unconditionally, because notifications about pods are delivered
// in a different watch, and thus can potentially be observed later, even though
// they happened before node removal.
if len(n.info.Pods) == 0 {
cache.removeNodeInfoFromList(logger, node.Name)
} else {
cache.moveNodeInfoToHead(logger, node.Name)
}
if err := cache.nodeTree.removeNode(logger, node); err != nil {
return err
}
cache.removeNodeImageStates(node)
return nil
}
// addNodeImageStates adds states of the images on given node to the given nodeInfo and update the imageStates in
// scheduler cache. This function assumes the lock to scheduler cache has been acquired.
func (cache *cacheImpl) addNodeImageStates(node *v1.Node, nodeInfo *framework.NodeInfo) {
newSum := make(map[string]*framework.ImageStateSummary)
for _, image := range node.Status.Images {
for _, name := range image.Names {
// update the entry in imageStates
state, ok := cache.imageStates[name]
if !ok {
state = &framework.ImageStateSummary{
Size: image.SizeBytes,
Nodes: sets.New(node.Name),
}
cache.imageStates[name] = state
} else {
state.Nodes.Insert(node.Name)
}
// create the ImageStateSummary for this image
if _, ok := newSum[name]; !ok {
newSum[name] = state
}
}
}
nodeInfo.ImageStates = newSum
}
// removeNodeImageStates removes the given node record from image entries having the node
// in imageStates cache. After the removal, if any image becomes free, i.e., the image
// is no longer available on any node, the image entry will be removed from imageStates.
func (cache *cacheImpl) removeNodeImageStates(node *v1.Node) {
if node == nil {
return
}
for _, image := range node.Status.Images {
for _, name := range image.Names {
state, ok := cache.imageStates[name]
if ok {
state.Nodes.Delete(node.Name)
if state.Nodes.Len() == 0 {
// Remove the unused image to make sure the length of
// imageStates represents the total number of different
// images on all nodes
delete(cache.imageStates, name)
}
}
}
}
}
func (cache *cacheImpl) run(logger klog.Logger) {
go wait.Until(func() {
cache.cleanupAssumedPods(logger, time.Now())
}, cache.period, cache.stop)
}
// cleanupAssumedPods exists for making test deterministic by taking time as input argument.
// It also reports metrics on the cache size for nodes, pods, and assumed pods.
func (cache *cacheImpl) cleanupAssumedPods(logger klog.Logger, now time.Time) {
cache.mu.Lock()
defer cache.mu.Unlock()
defer cache.updateMetrics()
// The size of assumedPods should be small
for key := range cache.assumedPods {
ps, ok := cache.podStates[key]
if !ok {
logger.Error(nil, "Key found in assumed set but not in podStates, potentially a logical error")
klog.FlushAndExit(klog.ExitFlushTimeout, 1)
}
if !ps.bindingFinished {
logger.V(5).Info("Could not expire cache for pod as binding is still in progress", "podKey", key, "pod", klog.KObj(ps.pod))
continue
}
if cache.ttl != 0 && now.After(*ps.deadline) {
logger.Info("Pod expired", "podKey", key, "pod", klog.KObj(ps.pod))
if err := cache.removePod(logger, ps.pod); err != nil {
logger.Error(err, "ExpirePod failed", "podKey", key, "pod", klog.KObj(ps.pod))
}
}
}
}
// updateMetrics updates cache size metric values for pods, assumed pods, and nodes
func (cache *cacheImpl) updateMetrics() {
metrics.CacheSize.WithLabelValues("assumed_pods").Set(float64(len(cache.assumedPods)))
metrics.CacheSize.WithLabelValues("pods").Set(float64(len(cache.podStates)))
metrics.CacheSize.WithLabelValues("nodes").Set(float64(len(cache.nodes)))
}

View File

@ -0,0 +1,135 @@
/*
Copyright 2018 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package debugger
import (
"sort"
"strings"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/labels"
corelisters "k8s.io/client-go/listers/core/v1"
"k8s.io/klog/v2"
internalcache "k8s.io/kubernetes/pkg/scheduler/backend/cache"
internalqueue "k8s.io/kubernetes/pkg/scheduler/backend/queue"
"k8s.io/kubernetes/pkg/scheduler/framework"
)
// CacheComparer is an implementation of the Scheduler's cache comparer.
type CacheComparer struct {
NodeLister corelisters.NodeLister
PodLister corelisters.PodLister
Cache internalcache.Cache
PodQueue internalqueue.SchedulingQueue
}
// Compare compares the nodes and pods of NodeLister with Cache.Snapshot.
func (c *CacheComparer) Compare(logger klog.Logger) error {
logger.V(3).Info("Cache comparer started")
defer logger.V(3).Info("Cache comparer finished")
nodes, err := c.NodeLister.List(labels.Everything())
if err != nil {
return err
}
pods, err := c.PodLister.List(labels.Everything())
if err != nil {
return err
}
dump := c.Cache.Dump()
pendingPods, _ := c.PodQueue.PendingPods()
if missed, redundant := c.CompareNodes(nodes, dump.Nodes); len(missed)+len(redundant) != 0 {
logger.Info("Cache mismatch", "missedNodes", missed, "redundantNodes", redundant)
}
if missed, redundant := c.ComparePods(pods, pendingPods, dump.Nodes); len(missed)+len(redundant) != 0 {
logger.Info("Cache mismatch", "missedPods", missed, "redundantPods", redundant)
}
return nil
}
// CompareNodes compares actual nodes with cached nodes.
func (c *CacheComparer) CompareNodes(nodes []*v1.Node, nodeinfos map[string]*framework.NodeInfo) (missed, redundant []string) {
actual := []string{}
for _, node := range nodes {
actual = append(actual, node.Name)
}
cached := []string{}
for nodeName := range nodeinfos {
cached = append(cached, nodeName)
}
return compareStrings(actual, cached)
}
// ComparePods compares actual pods with cached pods.
func (c *CacheComparer) ComparePods(pods, waitingPods []*v1.Pod, nodeinfos map[string]*framework.NodeInfo) (missed, redundant []string) {
actual := []string{}
for _, pod := range pods {
actual = append(actual, string(pod.UID))
}
cached := []string{}
for _, nodeinfo := range nodeinfos {
for _, p := range nodeinfo.Pods {
cached = append(cached, string(p.Pod.UID))
}
}
for _, pod := range waitingPods {
cached = append(cached, string(pod.UID))
}
return compareStrings(actual, cached)
}
func compareStrings(actual, cached []string) (missed, redundant []string) {
missed, redundant = []string{}, []string{}
sort.Strings(actual)
sort.Strings(cached)
compare := func(i, j int) int {
if i == len(actual) {
return 1
} else if j == len(cached) {
return -1
}
return strings.Compare(actual[i], cached[j])
}
for i, j := 0, 0; i < len(actual) || j < len(cached); {
switch compare(i, j) {
case 0:
i++
j++
case -1:
missed = append(missed, actual[i])
i++
case 1:
redundant = append(redundant, cached[j])
j++
}
}
return
}

View File

@ -0,0 +1,76 @@
/*
Copyright 2018 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package debugger
import (
"context"
"os"
"os/signal"
corelisters "k8s.io/client-go/listers/core/v1"
"k8s.io/klog/v2"
internalcache "k8s.io/kubernetes/pkg/scheduler/backend/cache"
internalqueue "k8s.io/kubernetes/pkg/scheduler/backend/queue"
)
// CacheDebugger provides ways to check and write cache information for debugging.
type CacheDebugger struct {
Comparer CacheComparer
Dumper CacheDumper
}
// New creates a CacheDebugger.
func New(
nodeLister corelisters.NodeLister,
podLister corelisters.PodLister,
cache internalcache.Cache,
podQueue internalqueue.SchedulingQueue,
) *CacheDebugger {
return &CacheDebugger{
Comparer: CacheComparer{
NodeLister: nodeLister,
PodLister: podLister,
Cache: cache,
PodQueue: podQueue,
},
Dumper: CacheDumper{
cache: cache,
podQueue: podQueue,
},
}
}
// ListenForSignal starts a goroutine that will trigger the CacheDebugger's
// behavior when the process receives SIGINT (Windows) or SIGUSER2 (non-Windows).
func (d *CacheDebugger) ListenForSignal(ctx context.Context) {
logger := klog.FromContext(ctx)
stopCh := ctx.Done()
ch := make(chan os.Signal, 1)
signal.Notify(ch, compareSignal)
go func() {
for {
select {
case <-stopCh:
return
case <-ch:
d.Comparer.Compare(logger)
d.Dumper.DumpAll(logger)
}
}
}()
}

View File

@ -0,0 +1,88 @@
/*
Copyright 2018 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package debugger
import (
"fmt"
"strings"
"k8s.io/klog/v2"
v1 "k8s.io/api/core/v1"
internalcache "k8s.io/kubernetes/pkg/scheduler/backend/cache"
"k8s.io/kubernetes/pkg/scheduler/backend/queue"
"k8s.io/kubernetes/pkg/scheduler/framework"
)
// CacheDumper writes some information from the scheduler cache and the scheduling queue to the
// scheduler logs for debugging purposes.
type CacheDumper struct {
cache internalcache.Cache
podQueue queue.SchedulingQueue
}
// DumpAll writes cached nodes and scheduling queue information to the scheduler logs.
func (d *CacheDumper) DumpAll(logger klog.Logger) {
d.dumpNodes(logger)
d.dumpSchedulingQueue(logger)
}
// dumpNodes writes NodeInfo to the scheduler logs.
func (d *CacheDumper) dumpNodes(logger klog.Logger) {
dump := d.cache.Dump()
nodeInfos := make([]string, 0, len(dump.Nodes))
for name, nodeInfo := range dump.Nodes {
nodeInfos = append(nodeInfos, d.printNodeInfo(name, nodeInfo))
}
// Extra blank line added between node entries for readability.
logger.Info("Dump of cached NodeInfo", "nodes", strings.Join(nodeInfos, "\n\n"))
}
// dumpSchedulingQueue writes pods in the scheduling queue to the scheduler logs.
func (d *CacheDumper) dumpSchedulingQueue(logger klog.Logger) {
pendingPods, s := d.podQueue.PendingPods()
var podData strings.Builder
for _, p := range pendingPods {
podData.WriteString(printPod(p))
}
logger.Info("Dump of scheduling queue", "summary", s, "pods", podData.String())
}
// printNodeInfo writes parts of NodeInfo to a string.
func (d *CacheDumper) printNodeInfo(name string, n *framework.NodeInfo) string {
var nodeData strings.Builder
nodeData.WriteString(fmt.Sprintf("Node name: %s\nDeleted: %t\nRequested Resources: %+v\nAllocatable Resources:%+v\nScheduled Pods(number: %v):\n",
name, n.Node() == nil, n.Requested, n.Allocatable, len(n.Pods)))
// Dumping Pod Info
for _, p := range n.Pods {
nodeData.WriteString(printPod(p.Pod))
}
// Dumping nominated pods info on the node
nominatedPodInfos := d.podQueue.NominatedPodsForNode(name)
if len(nominatedPodInfos) != 0 {
nodeData.WriteString(fmt.Sprintf("Nominated Pods(number: %v):\n", len(nominatedPodInfos)))
for _, pi := range nominatedPodInfos {
nodeData.WriteString(printPod(pi.Pod))
}
}
return nodeData.String()
}
// printPod writes parts of a Pod object to a string.
func printPod(p *v1.Pod) string {
return fmt.Sprintf("name: %v, namespace: %v, uid: %v, phase: %v, nominated node: %v\n", p.Name, p.Namespace, p.UID, p.Status.Phase, p.Status.NominatedNodeName)
}

View File

@ -0,0 +1,26 @@
//go:build !windows
// +build !windows
/*
Copyright 2018 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package debugger
import "syscall"
// compareSignal is the signal to trigger cache compare. For non-windows
// environment it's SIGUSR2.
var compareSignal = syscall.SIGUSR2

View File

@ -0,0 +1,23 @@
/*
Copyright 2018 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package debugger
import "os"
// compareSignal is the signal to trigger cache compare. For windows,
// it's SIGINT.
var compareSignal = os.Interrupt

View File

@ -0,0 +1,123 @@
/*
Copyright 2015 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package cache
import (
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/klog/v2"
"k8s.io/kubernetes/pkg/scheduler/framework"
)
// Cache collects pods' information and provides node-level aggregated information.
// It's intended for generic scheduler to do efficient lookup.
// Cache's operations are pod centric. It does incremental updates based on pod events.
// Pod events are sent via network. We don't have guaranteed delivery of all events:
// We use Reflector to list and watch from remote.
// Reflector might be slow and do a relist, which would lead to missing events.
//
// State Machine of a pod's events in scheduler's cache:
//
// +-------------------------------------------+ +----+
// | Add | | |
// | | | | Update
// + Assume Add v v |
//
// Initial +--------> Assumed +------------+---> Added <--+
//
// ^ + + | +
// | | | | |
// | | | Add | | Remove
// | | | | |
// | | | + |
// +----------------+ +-----------> Expired +----> Deleted
// Forget Expire
//
// Note that an assumed pod can expire, because if we haven't received Add event notifying us
// for a while, there might be some problems and we shouldn't keep the pod in cache anymore.
//
// Note that "Initial", "Expired", and "Deleted" pods do not actually exist in cache.
// Based on existing use cases, we are making the following assumptions:
// - No pod would be assumed twice
// - A pod could be added without going through scheduler. In this case, we will see Add but not Assume event.
// - If a pod wasn't added, it wouldn't be removed or updated.
// - Both "Expired" and "Deleted" are valid end states. In case of some problems, e.g. network issue,
// a pod might have changed its state (e.g. added and deleted) without delivering notification to the cache.
type Cache interface {
// NodeCount returns the number of nodes in the cache.
// DO NOT use outside of tests.
NodeCount() int
// PodCount returns the number of pods in the cache (including those from deleted nodes).
// DO NOT use outside of tests.
PodCount() (int, error)
// AssumePod assumes a pod scheduled and aggregates the pod's information into its node.
// The implementation also decides the policy to expire pod before being confirmed (receiving Add event).
// After expiration, its information would be subtracted.
AssumePod(logger klog.Logger, pod *v1.Pod) error
// FinishBinding signals that cache for assumed pod can be expired
FinishBinding(logger klog.Logger, pod *v1.Pod) error
// ForgetPod removes an assumed pod from cache.
ForgetPod(logger klog.Logger, pod *v1.Pod) error
// AddPod either confirms a pod if it's assumed, or adds it back if it's expired.
// If added back, the pod's information would be added again.
AddPod(logger klog.Logger, pod *v1.Pod) error
// UpdatePod removes oldPod's information and adds newPod's information.
UpdatePod(logger klog.Logger, oldPod, newPod *v1.Pod) error
// RemovePod removes a pod. The pod's information would be subtracted from assigned node.
RemovePod(logger klog.Logger, pod *v1.Pod) error
// GetPod returns the pod from the cache with the same namespace and the
// same name of the specified pod.
GetPod(pod *v1.Pod) (*v1.Pod, error)
// IsAssumedPod returns true if the pod is assumed and not expired.
IsAssumedPod(pod *v1.Pod) (bool, error)
// AddNode adds overall information about node.
// It returns a clone of added NodeInfo object.
AddNode(logger klog.Logger, node *v1.Node) *framework.NodeInfo
// UpdateNode updates overall information about node.
// It returns a clone of updated NodeInfo object.
UpdateNode(logger klog.Logger, oldNode, newNode *v1.Node) *framework.NodeInfo
// RemoveNode removes overall information about node.
RemoveNode(logger klog.Logger, node *v1.Node) error
// UpdateSnapshot updates the passed infoSnapshot to the current contents of Cache.
// The node info contains aggregated information of pods scheduled (including assumed to be)
// on this node.
// The snapshot only includes Nodes that are not deleted at the time this function is called.
// nodeinfo.Node() is guaranteed to be not nil for all the nodes in the snapshot.
UpdateSnapshot(logger klog.Logger, nodeSnapshot *Snapshot) error
// Dump produces a dump of the current cache.
Dump() *Dump
}
// Dump is a dump of the cache state.
type Dump struct {
AssumedPods sets.Set[string]
Nodes map[string]*framework.NodeInfo
}

View File

@ -0,0 +1,143 @@
/*
Copyright 2018 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package cache
import (
"errors"
"fmt"
v1 "k8s.io/api/core/v1"
utilnode "k8s.io/component-helpers/node/topology"
"k8s.io/klog/v2"
)
// nodeTree is a tree-like data structure that holds node names in each zone. Zone names are
// keys to "NodeTree.tree" and values of "NodeTree.tree" are arrays of node names.
// NodeTree is NOT thread-safe, any concurrent updates/reads from it must be synchronized by the caller.
// It is used only by schedulerCache, and should stay as such.
type nodeTree struct {
tree map[string][]string // a map from zone (region-zone) to an array of nodes in the zone.
zones []string // a list of all the zones in the tree (keys)
numNodes int
}
// newNodeTree creates a NodeTree from nodes.
func newNodeTree(logger klog.Logger, nodes []*v1.Node) *nodeTree {
nt := &nodeTree{
tree: make(map[string][]string, len(nodes)),
}
for _, n := range nodes {
nt.addNode(logger, n)
}
return nt
}
// addNode adds a node and its corresponding zone to the tree. If the zone already exists, the node
// is added to the array of nodes in that zone.
func (nt *nodeTree) addNode(logger klog.Logger, n *v1.Node) {
zone := utilnode.GetZoneKey(n)
if na, ok := nt.tree[zone]; ok {
for _, nodeName := range na {
if nodeName == n.Name {
logger.Info("Did not add to the NodeTree because it already exists", "node", klog.KObj(n))
return
}
}
nt.tree[zone] = append(na, n.Name)
} else {
nt.zones = append(nt.zones, zone)
nt.tree[zone] = []string{n.Name}
}
logger.V(2).Info("Added node to NodeTree", "node", klog.KObj(n), "zone", zone)
nt.numNodes++
}
// removeNode removes a node from the NodeTree.
func (nt *nodeTree) removeNode(logger klog.Logger, n *v1.Node) error {
zone := utilnode.GetZoneKey(n)
if na, ok := nt.tree[zone]; ok {
for i, nodeName := range na {
if nodeName == n.Name {
nt.tree[zone] = append(na[:i], na[i+1:]...)
if len(nt.tree[zone]) == 0 {
nt.removeZone(zone)
}
logger.V(2).Info("Removed node from NodeTree", "node", klog.KObj(n), "zone", zone)
nt.numNodes--
return nil
}
}
}
logger.Error(nil, "Did not remove Node in NodeTree because it was not found", "node", klog.KObj(n), "zone", zone)
return fmt.Errorf("node %q in group %q was not found", n.Name, zone)
}
// removeZone removes a zone from tree.
// This function must be called while writer locks are hold.
func (nt *nodeTree) removeZone(zone string) {
delete(nt.tree, zone)
for i, z := range nt.zones {
if z == zone {
nt.zones = append(nt.zones[:i], nt.zones[i+1:]...)
return
}
}
}
// updateNode updates a node in the NodeTree.
func (nt *nodeTree) updateNode(logger klog.Logger, old, new *v1.Node) {
var oldZone string
if old != nil {
oldZone = utilnode.GetZoneKey(old)
}
newZone := utilnode.GetZoneKey(new)
// If the zone ID of the node has not changed, we don't need to do anything. Name of the node
// cannot be changed in an update.
if oldZone == newZone {
return
}
nt.removeNode(logger, old) // No error checking. We ignore whether the old node exists or not.
nt.addNode(logger, new)
}
// list returns the list of names of the node. NodeTree iterates over zones and in each zone iterates
// over nodes in a round robin fashion.
func (nt *nodeTree) list() ([]string, error) {
if len(nt.zones) == 0 {
return nil, nil
}
nodesList := make([]string, 0, nt.numNodes)
numExhaustedZones := 0
nodeIndex := 0
for len(nodesList) < nt.numNodes {
if numExhaustedZones >= len(nt.zones) { // all zones are exhausted.
return nodesList, errors.New("all zones exhausted before reaching count of nodes expected")
}
for zoneIndex := 0; zoneIndex < len(nt.zones); zoneIndex++ {
na := nt.tree[nt.zones[zoneIndex]]
if nodeIndex >= len(na) { // If the zone is exhausted, continue
if nodeIndex == len(na) { // If it is the first time the zone is exhausted
numExhaustedZones++
}
continue
}
nodesList = append(nodesList, na[nodeIndex])
}
nodeIndex++
}
return nodesList, nil
}

View File

@ -0,0 +1,198 @@
/*
Copyright 2019 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package cache
import (
"fmt"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/kubernetes/pkg/scheduler/framework"
)
// Snapshot is a snapshot of cache NodeInfo and NodeTree order. The scheduler takes a
// snapshot at the beginning of each scheduling cycle and uses it for its operations in that cycle.
type Snapshot struct {
// nodeInfoMap a map of node name to a snapshot of its NodeInfo.
nodeInfoMap map[string]*framework.NodeInfo
// nodeInfoList is the list of nodes as ordered in the cache's nodeTree.
nodeInfoList []*framework.NodeInfo
// havePodsWithAffinityNodeInfoList is the list of nodes with at least one pod declaring affinity terms.
havePodsWithAffinityNodeInfoList []*framework.NodeInfo
// havePodsWithRequiredAntiAffinityNodeInfoList is the list of nodes with at least one pod declaring
// required anti-affinity terms.
havePodsWithRequiredAntiAffinityNodeInfoList []*framework.NodeInfo
// usedPVCSet contains a set of PVC names that have one or more scheduled pods using them,
// keyed in the format "namespace/name".
usedPVCSet sets.Set[string]
generation int64
}
var _ framework.SharedLister = &Snapshot{}
// NewEmptySnapshot initializes a Snapshot struct and returns it.
func NewEmptySnapshot() *Snapshot {
return &Snapshot{
nodeInfoMap: make(map[string]*framework.NodeInfo),
usedPVCSet: sets.New[string](),
}
}
// NewSnapshot initializes a Snapshot struct and returns it.
func NewSnapshot(pods []*v1.Pod, nodes []*v1.Node) *Snapshot {
nodeInfoMap := createNodeInfoMap(pods, nodes)
nodeInfoList := make([]*framework.NodeInfo, 0, len(nodeInfoMap))
havePodsWithAffinityNodeInfoList := make([]*framework.NodeInfo, 0, len(nodeInfoMap))
havePodsWithRequiredAntiAffinityNodeInfoList := make([]*framework.NodeInfo, 0, len(nodeInfoMap))
for _, v := range nodeInfoMap {
nodeInfoList = append(nodeInfoList, v)
if len(v.PodsWithAffinity) > 0 {
havePodsWithAffinityNodeInfoList = append(havePodsWithAffinityNodeInfoList, v)
}
if len(v.PodsWithRequiredAntiAffinity) > 0 {
havePodsWithRequiredAntiAffinityNodeInfoList = append(havePodsWithRequiredAntiAffinityNodeInfoList, v)
}
}
s := NewEmptySnapshot()
s.nodeInfoMap = nodeInfoMap
s.nodeInfoList = nodeInfoList
s.havePodsWithAffinityNodeInfoList = havePodsWithAffinityNodeInfoList
s.havePodsWithRequiredAntiAffinityNodeInfoList = havePodsWithRequiredAntiAffinityNodeInfoList
s.usedPVCSet = createUsedPVCSet(pods)
return s
}
// createNodeInfoMap obtains a list of pods and pivots that list into a map
// where the keys are node names and the values are the aggregated information
// for that node.
func createNodeInfoMap(pods []*v1.Pod, nodes []*v1.Node) map[string]*framework.NodeInfo {
nodeNameToInfo := make(map[string]*framework.NodeInfo)
for _, pod := range pods {
nodeName := pod.Spec.NodeName
if _, ok := nodeNameToInfo[nodeName]; !ok {
nodeNameToInfo[nodeName] = framework.NewNodeInfo()
}
nodeNameToInfo[nodeName].AddPod(pod)
}
imageExistenceMap := createImageExistenceMap(nodes)
for _, node := range nodes {
if _, ok := nodeNameToInfo[node.Name]; !ok {
nodeNameToInfo[node.Name] = framework.NewNodeInfo()
}
nodeInfo := nodeNameToInfo[node.Name]
nodeInfo.SetNode(node)
nodeInfo.ImageStates = getNodeImageStates(node, imageExistenceMap)
}
return nodeNameToInfo
}
func createUsedPVCSet(pods []*v1.Pod) sets.Set[string] {
usedPVCSet := sets.New[string]()
for _, pod := range pods {
if pod.Spec.NodeName == "" {
continue
}
for _, v := range pod.Spec.Volumes {
if v.PersistentVolumeClaim == nil {
continue
}
key := framework.GetNamespacedName(pod.Namespace, v.PersistentVolumeClaim.ClaimName)
usedPVCSet.Insert(key)
}
}
return usedPVCSet
}
// getNodeImageStates returns the given node's image states based on the given imageExistence map.
func getNodeImageStates(node *v1.Node, imageExistenceMap map[string]sets.Set[string]) map[string]*framework.ImageStateSummary {
imageStates := make(map[string]*framework.ImageStateSummary)
for _, image := range node.Status.Images {
for _, name := range image.Names {
imageStates[name] = &framework.ImageStateSummary{
Size: image.SizeBytes,
NumNodes: imageExistenceMap[name].Len(),
}
}
}
return imageStates
}
// createImageExistenceMap returns a map recording on which nodes the images exist, keyed by the images' names.
func createImageExistenceMap(nodes []*v1.Node) map[string]sets.Set[string] {
imageExistenceMap := make(map[string]sets.Set[string])
for _, node := range nodes {
for _, image := range node.Status.Images {
for _, name := range image.Names {
if _, ok := imageExistenceMap[name]; !ok {
imageExistenceMap[name] = sets.New(node.Name)
} else {
imageExistenceMap[name].Insert(node.Name)
}
}
}
}
return imageExistenceMap
}
// NodeInfos returns a NodeInfoLister.
func (s *Snapshot) NodeInfos() framework.NodeInfoLister {
return s
}
// StorageInfos returns a StorageInfoLister.
func (s *Snapshot) StorageInfos() framework.StorageInfoLister {
return s
}
// NumNodes returns the number of nodes in the snapshot.
func (s *Snapshot) NumNodes() int {
return len(s.nodeInfoList)
}
// List returns the list of nodes in the snapshot.
func (s *Snapshot) List() ([]*framework.NodeInfo, error) {
return s.nodeInfoList, nil
}
// HavePodsWithAffinityList returns the list of nodes with at least one pod with inter-pod affinity
func (s *Snapshot) HavePodsWithAffinityList() ([]*framework.NodeInfo, error) {
return s.havePodsWithAffinityNodeInfoList, nil
}
// HavePodsWithRequiredAntiAffinityList returns the list of nodes with at least one pod with
// required inter-pod anti-affinity
func (s *Snapshot) HavePodsWithRequiredAntiAffinityList() ([]*framework.NodeInfo, error) {
return s.havePodsWithRequiredAntiAffinityNodeInfoList, nil
}
// Get returns the NodeInfo of the given node name.
func (s *Snapshot) Get(nodeName string) (*framework.NodeInfo, error) {
if v, ok := s.nodeInfoMap[nodeName]; ok && v.Node() != nil {
return v, nil
}
return nil, fmt.Errorf("nodeinfo not found for node name %q", nodeName)
}
func (s *Snapshot) IsPVCUsedByPods(key string) bool {
return s.usedPVCSet.Has(key)
}

View File

@ -0,0 +1,244 @@
/*
Copyright 2018 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
// Below is the implementation of the a heap. The logic is pretty much the same
// as cache.heap, however, this heap does not perform synchronization. It leaves
// synchronization to the SchedulingQueue.
package heap
import (
"container/heap"
"fmt"
"k8s.io/kubernetes/pkg/scheduler/metrics"
)
// KeyFunc is a function type to get the key from an object.
type KeyFunc[T any] func(obj T) string
type heapItem[T any] struct {
obj T // The object which is stored in the heap.
index int // The index of the object's key in the Heap.queue.
}
type itemKeyValue[T any] struct {
key string
obj T
}
// data is an internal struct that implements the standard heap interface
// and keeps the data stored in the heap.
type data[T any] struct {
// items is a map from key of the objects to the objects and their index.
// We depend on the property that items in the map are in the queue and vice versa.
items map[string]*heapItem[T]
// queue implements a heap data structure and keeps the order of elements
// according to the heap invariant. The queue keeps the keys of objects stored
// in "items".
queue []string
// keyFunc is used to make the key used for queued item insertion and retrieval, and
// should be deterministic.
keyFunc KeyFunc[T]
// lessFunc is used to compare two objects in the heap.
lessFunc LessFunc[T]
}
var (
_ = heap.Interface(&data[any]{}) // heapData is a standard heap
)
// Less compares two objects and returns true if the first one should go
// in front of the second one in the heap.
func (h *data[T]) Less(i, j int) bool {
if i > len(h.queue) || j > len(h.queue) {
return false
}
itemi, ok := h.items[h.queue[i]]
if !ok {
return false
}
itemj, ok := h.items[h.queue[j]]
if !ok {
return false
}
return h.lessFunc(itemi.obj, itemj.obj)
}
// Len returns the number of items in the Heap.
func (h *data[T]) Len() int { return len(h.queue) }
// Swap implements swapping of two elements in the heap. This is a part of standard
// heap interface and should never be called directly.
func (h *data[T]) Swap(i, j int) {
if i < 0 || j < 0 {
return
}
h.queue[i], h.queue[j] = h.queue[j], h.queue[i]
item := h.items[h.queue[i]]
item.index = i
item = h.items[h.queue[j]]
item.index = j
}
// Push is supposed to be called by container/heap.Push only.
func (h *data[T]) Push(kv interface{}) {
keyValue := kv.(*itemKeyValue[T])
n := len(h.queue)
h.items[keyValue.key] = &heapItem[T]{keyValue.obj, n}
h.queue = append(h.queue, keyValue.key)
}
// Pop is supposed to be called by container/heap.Pop only.
func (h *data[T]) Pop() interface{} {
if len(h.queue) == 0 {
return nil
}
key := h.queue[len(h.queue)-1]
h.queue = h.queue[0 : len(h.queue)-1]
item, ok := h.items[key]
if !ok {
// This is an error
return nil
}
delete(h.items, key)
return item.obj
}
// Peek returns the head of the heap without removing it.
func (h *data[T]) Peek() (T, bool) {
if len(h.queue) > 0 {
return h.items[h.queue[0]].obj, true
}
var zero T
return zero, false
}
// Heap is a producer/consumer queue that implements a heap data structure.
// It can be used to implement priority queues and similar data structures.
type Heap[T any] struct {
// data stores objects and has a queue that keeps their ordering according
// to the heap invariant.
data *data[T]
// metricRecorder updates the counter when elements of a heap get added or
// removed, and it does nothing if it's nil
metricRecorder metrics.MetricRecorder
}
// AddOrUpdate inserts an item, and puts it in the queue. The item is updated if it
// already exists.
func (h *Heap[T]) AddOrUpdate(obj T) {
key := h.data.keyFunc(obj)
if _, exists := h.data.items[key]; exists {
h.data.items[key].obj = obj
heap.Fix(h.data, h.data.items[key].index)
} else {
heap.Push(h.data, &itemKeyValue[T]{key, obj})
if h.metricRecorder != nil {
h.metricRecorder.Inc()
}
}
}
// Delete removes an item.
func (h *Heap[T]) Delete(obj T) error {
key := h.data.keyFunc(obj)
if item, ok := h.data.items[key]; ok {
heap.Remove(h.data, item.index)
if h.metricRecorder != nil {
h.metricRecorder.Dec()
}
return nil
}
return fmt.Errorf("object not found")
}
// Peek returns the head of the heap without removing it.
func (h *Heap[T]) Peek() (T, bool) {
return h.data.Peek()
}
// Pop returns the head of the heap and removes it.
func (h *Heap[T]) Pop() (T, error) {
obj := heap.Pop(h.data)
if obj != nil {
if h.metricRecorder != nil {
h.metricRecorder.Dec()
}
return obj.(T), nil
}
var zero T
return zero, fmt.Errorf("heap is empty")
}
// Get returns the requested item, or sets exists=false.
func (h *Heap[T]) Get(obj T) (T, bool) {
key := h.data.keyFunc(obj)
return h.GetByKey(key)
}
// GetByKey returns the requested item, or sets exists=false.
func (h *Heap[T]) GetByKey(key string) (T, bool) {
item, exists := h.data.items[key]
if !exists {
var zero T
return zero, false
}
return item.obj, true
}
func (h *Heap[T]) Has(obj T) bool {
key := h.data.keyFunc(obj)
_, ok := h.GetByKey(key)
return ok
}
// List returns a list of all the items.
func (h *Heap[T]) List() []T {
list := make([]T, 0, len(h.data.items))
for _, item := range h.data.items {
list = append(list, item.obj)
}
return list
}
// Len returns the number of items in the heap.
func (h *Heap[T]) Len() int {
return len(h.data.queue)
}
// New returns a Heap which can be used to queue up items to process.
func New[T any](keyFn KeyFunc[T], lessFn LessFunc[T]) *Heap[T] {
return NewWithRecorder(keyFn, lessFn, nil)
}
// NewWithRecorder wraps an optional metricRecorder to compose a Heap object.
func NewWithRecorder[T any](keyFn KeyFunc[T], lessFn LessFunc[T], metricRecorder metrics.MetricRecorder) *Heap[T] {
return &Heap[T]{
data: &data[T]{
items: map[string]*heapItem[T]{},
queue: []string{},
keyFunc: keyFn,
lessFunc: lessFn,
},
metricRecorder: metricRecorder,
}
}
// LessFunc is a function that receives two items and returns true if the first
// item should be placed before the second one when the list is sorted.
type LessFunc[T any] func(item1, item2 T) bool

View File

@ -0,0 +1,415 @@
/*
Copyright 2024 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package queue
import (
"container/list"
"fmt"
"sync"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/klog/v2"
"k8s.io/kubernetes/pkg/scheduler/backend/heap"
"k8s.io/kubernetes/pkg/scheduler/framework"
"k8s.io/kubernetes/pkg/scheduler/metrics"
)
// activeQueuer is a wrapper for activeQ related operations.
// Its methods, except "unlocked" ones, take the lock inside.
// Note: be careful when using unlocked() methods.
// getLock() methods should be used only for unlocked() methods
// and it is forbidden to call any other activeQueuer's method under this lock.
type activeQueuer interface {
underLock(func(unlockedActiveQ unlockedActiveQueuer))
underRLock(func(unlockedActiveQ unlockedActiveQueueReader))
update(newPod *v1.Pod, oldPodInfo *framework.QueuedPodInfo) *framework.QueuedPodInfo
delete(pInfo *framework.QueuedPodInfo) error
pop(logger klog.Logger) (*framework.QueuedPodInfo, error)
list() []*v1.Pod
len() int
has(pInfo *framework.QueuedPodInfo) bool
listInFlightEvents() []interface{}
listInFlightPods() []*v1.Pod
clusterEventsForPod(logger klog.Logger, pInfo *framework.QueuedPodInfo) ([]*clusterEvent, error)
addEventsIfPodInFlight(oldPod, newPod *v1.Pod, events []framework.ClusterEvent) bool
addEventIfAnyInFlight(oldObj, newObj interface{}, event framework.ClusterEvent) bool
schedulingCycle() int64
done(pod types.UID)
close()
broadcast()
}
// unlockedActiveQueuer defines activeQ methods that are not protected by the lock itself.
// underLock() method should be used to protect these methods.
type unlockedActiveQueuer interface {
unlockedActiveQueueReader
AddOrUpdate(pInfo *framework.QueuedPodInfo)
}
// unlockedActiveQueueReader defines activeQ read-only methods that are not protected by the lock itself.
// underLock() or underRLock() method should be used to protect these methods.
type unlockedActiveQueueReader interface {
Get(pInfo *framework.QueuedPodInfo) (*framework.QueuedPodInfo, bool)
Has(pInfo *framework.QueuedPodInfo) bool
}
// activeQueue implements activeQueuer. All of the fields have to be protected using the lock.
type activeQueue struct {
// lock synchronizes all operations related to activeQ.
// It protects activeQ, inFlightPods, inFlightEvents, schedulingCycle and closed fields.
// Caution: DO NOT take "SchedulingQueue.lock" after taking "lock".
// You should always take "SchedulingQueue.lock" first, otherwise the queue could end up in deadlock.
// "lock" should not be taken after taking "nLock".
// Correct locking order is: SchedulingQueue.lock > lock > nominator.nLock.
lock sync.RWMutex
// activeQ is heap structure that scheduler actively looks at to find pods to
// schedule. Head of heap is the highest priority pod.
queue *heap.Heap[*framework.QueuedPodInfo]
// cond is a condition that is notified when the pod is added to activeQ.
// It is used with lock.
cond sync.Cond
// inFlightPods holds the UID of all pods which have been popped out for which Done
// hasn't been called yet - in other words, all pods that are currently being
// processed (being scheduled, in permit, or in the binding cycle).
//
// The values in the map are the entry of each pod in the inFlightEvents list.
// The value of that entry is the *v1.Pod at the time that scheduling of that
// pod started, which can be useful for logging or debugging.
inFlightPods map[types.UID]*list.Element
// inFlightEvents holds the events received by the scheduling queue
// (entry value is clusterEvent) together with in-flight pods (entry
// value is *v1.Pod). Entries get added at the end while the mutex is
// locked, so they get serialized.
//
// The pod entries are added in Pop and used to track which events
// occurred after the pod scheduling attempt for that pod started.
// They get removed when the scheduling attempt is done, at which
// point all events that occurred in the meantime are processed.
//
// After removal of a pod, events at the start of the list are no
// longer needed because all of the other in-flight pods started
// later. Those events can be removed.
inFlightEvents *list.List
// schedCycle represents sequence number of scheduling cycle and is incremented
// when a pod is popped.
schedCycle int64
// closed indicates that the queue is closed.
// It is mainly used to let Pop() exit its control loop while waiting for an item.
closed bool
// isSchedulingQueueHintEnabled indicates whether the feature gate for the scheduling queue is enabled.
isSchedulingQueueHintEnabled bool
metricsRecorder metrics.MetricAsyncRecorder
}
func newActiveQueue(queue *heap.Heap[*framework.QueuedPodInfo], isSchedulingQueueHintEnabled bool, metricRecorder metrics.MetricAsyncRecorder) *activeQueue {
aq := &activeQueue{
queue: queue,
inFlightPods: make(map[types.UID]*list.Element),
inFlightEvents: list.New(),
isSchedulingQueueHintEnabled: isSchedulingQueueHintEnabled,
metricsRecorder: metricRecorder,
}
aq.cond.L = &aq.lock
return aq
}
// underLock runs the fn function under the lock.Lock.
// fn can run unlockedActiveQueuer methods but should NOT run any other activeQueue method,
// as it would end up in deadlock.
func (aq *activeQueue) underLock(fn func(unlockedActiveQ unlockedActiveQueuer)) {
aq.lock.Lock()
defer aq.lock.Unlock()
fn(aq.queue)
}
// underLock runs the fn function under the lock.RLock.
// fn can run unlockedActiveQueueReader methods but should NOT run any other activeQueue method,
// as it would end up in deadlock.
func (aq *activeQueue) underRLock(fn func(unlockedActiveQ unlockedActiveQueueReader)) {
aq.lock.RLock()
defer aq.lock.RUnlock()
fn(aq.queue)
}
// update updates the pod in activeQ if oldPodInfo is already in the queue.
// It returns new pod info if updated, nil otherwise.
func (aq *activeQueue) update(newPod *v1.Pod, oldPodInfo *framework.QueuedPodInfo) *framework.QueuedPodInfo {
aq.lock.Lock()
defer aq.lock.Unlock()
if pInfo, exists := aq.queue.Get(oldPodInfo); exists {
_ = pInfo.Update(newPod)
aq.queue.AddOrUpdate(pInfo)
return pInfo
}
return nil
}
// delete deletes the pod info from activeQ.
func (aq *activeQueue) delete(pInfo *framework.QueuedPodInfo) error {
aq.lock.Lock()
defer aq.lock.Unlock()
return aq.queue.Delete(pInfo)
}
// pop removes the head of the queue and returns it.
// It blocks if the queue is empty and waits until a new item is added to the queue.
// It increments scheduling cycle when a pod is popped.
func (aq *activeQueue) pop(logger klog.Logger) (*framework.QueuedPodInfo, error) {
aq.lock.Lock()
defer aq.lock.Unlock()
return aq.unlockedPop(logger)
}
func (aq *activeQueue) unlockedPop(logger klog.Logger) (*framework.QueuedPodInfo, error) {
for aq.queue.Len() == 0 {
// When the queue is empty, invocation of Pop() is blocked until new item is enqueued.
// When Close() is called, the p.closed is set and the condition is broadcast,
// which causes this loop to continue and return from the Pop().
if aq.closed {
logger.V(2).Info("Scheduling queue is closed")
return nil, nil
}
aq.cond.Wait()
}
pInfo, err := aq.queue.Pop()
if err != nil {
return nil, err
}
pInfo.Attempts++
// In flight, no concurrent events yet.
if aq.isSchedulingQueueHintEnabled {
// If the pod is already in the map, we shouldn't overwrite the inFlightPods otherwise it'd lead to a memory leak.
// https://github.com/kubernetes/kubernetes/pull/127016
if _, ok := aq.inFlightPods[pInfo.Pod.UID]; ok {
// Just report it as an error, but no need to stop the scheduler
// because it likely doesn't cause any visible issues from the scheduling perspective.
logger.Error(nil, "the same pod is tracked in multiple places in the scheduler, and just discard it", "pod", klog.KObj(pInfo.Pod))
// Just ignore/discard this duplicated pod and try to pop the next one.
return aq.unlockedPop(logger)
}
aq.metricsRecorder.ObserveInFlightEventsAsync(metrics.PodPoppedInFlightEvent, 1, false)
aq.inFlightPods[pInfo.Pod.UID] = aq.inFlightEvents.PushBack(pInfo.Pod)
}
aq.schedCycle++
// Update metrics and reset the set of unschedulable plugins for the next attempt.
for plugin := range pInfo.UnschedulablePlugins.Union(pInfo.PendingPlugins) {
metrics.UnschedulableReason(plugin, pInfo.Pod.Spec.SchedulerName).Dec()
}
pInfo.UnschedulablePlugins.Clear()
pInfo.PendingPlugins.Clear()
return pInfo, nil
}
// list returns all pods that are in the queue.
func (aq *activeQueue) list() []*v1.Pod {
aq.lock.RLock()
defer aq.lock.RUnlock()
var result []*v1.Pod
for _, pInfo := range aq.queue.List() {
result = append(result, pInfo.Pod)
}
return result
}
// len returns length of the queue.
func (aq *activeQueue) len() int {
return aq.queue.Len()
}
// has inform if pInfo exists in the queue.
func (aq *activeQueue) has(pInfo *framework.QueuedPodInfo) bool {
aq.lock.RLock()
defer aq.lock.RUnlock()
return aq.queue.Has(pInfo)
}
// listInFlightEvents returns all inFlightEvents.
func (aq *activeQueue) listInFlightEvents() []interface{} {
aq.lock.RLock()
defer aq.lock.RUnlock()
var values []interface{}
for event := aq.inFlightEvents.Front(); event != nil; event = event.Next() {
values = append(values, event.Value)
}
return values
}
// listInFlightPods returns all inFlightPods.
func (aq *activeQueue) listInFlightPods() []*v1.Pod {
aq.lock.RLock()
defer aq.lock.RUnlock()
var pods []*v1.Pod
for _, obj := range aq.inFlightPods {
pods = append(pods, obj.Value.(*v1.Pod))
}
return pods
}
// clusterEventsForPod gets all cluster events that have happened during pod for pInfo is being scheduled.
func (aq *activeQueue) clusterEventsForPod(logger klog.Logger, pInfo *framework.QueuedPodInfo) ([]*clusterEvent, error) {
aq.lock.RLock()
defer aq.lock.RUnlock()
logger.V(5).Info("Checking events for in-flight pod", "pod", klog.KObj(pInfo.Pod), "unschedulablePlugins", pInfo.UnschedulablePlugins, "inFlightEventsSize", aq.inFlightEvents.Len(), "inFlightPodsSize", len(aq.inFlightPods))
// AddUnschedulableIfNotPresent is called with the Pod at the end of scheduling or binding.
// So, given pInfo should have been Pop()ed before,
// we can assume pInfo must be recorded in inFlightPods and thus inFlightEvents.
inFlightPod, ok := aq.inFlightPods[pInfo.Pod.UID]
if !ok {
return nil, fmt.Errorf("in flight Pod isn't found in the scheduling queue. If you see this error log, it's likely a bug in the scheduler")
}
var events []*clusterEvent
for event := inFlightPod.Next(); event != nil; event = event.Next() {
e, ok := event.Value.(*clusterEvent)
if !ok {
// Must be another in-flight Pod (*v1.Pod). Can be ignored.
continue
}
events = append(events, e)
}
return events, nil
}
// addEventsIfPodInFlight adds clusterEvent to inFlightEvents if the newPod is in inFlightPods.
// It returns true if pushed the event to the inFlightEvents.
func (aq *activeQueue) addEventsIfPodInFlight(oldPod, newPod *v1.Pod, events []framework.ClusterEvent) bool {
aq.lock.Lock()
defer aq.lock.Unlock()
_, ok := aq.inFlightPods[newPod.UID]
if ok {
for _, event := range events {
aq.metricsRecorder.ObserveInFlightEventsAsync(event.Label(), 1, false)
aq.inFlightEvents.PushBack(&clusterEvent{
event: event,
oldObj: oldPod,
newObj: newPod,
})
}
}
return ok
}
// addEventIfAnyInFlight adds clusterEvent to inFlightEvents if any pod is in inFlightPods.
// It returns true if pushed the event to the inFlightEvents.
func (aq *activeQueue) addEventIfAnyInFlight(oldObj, newObj interface{}, event framework.ClusterEvent) bool {
aq.lock.Lock()
defer aq.lock.Unlock()
if len(aq.inFlightPods) != 0 {
aq.metricsRecorder.ObserveInFlightEventsAsync(event.Label(), 1, false)
aq.inFlightEvents.PushBack(&clusterEvent{
event: event,
oldObj: oldObj,
newObj: newObj,
})
return true
}
return false
}
func (aq *activeQueue) schedulingCycle() int64 {
aq.lock.RLock()
defer aq.lock.RUnlock()
return aq.schedCycle
}
// done must be called for pod returned by Pop. This allows the queue to
// keep track of which pods are currently being processed.
func (aq *activeQueue) done(pod types.UID) {
aq.lock.Lock()
defer aq.lock.Unlock()
inFlightPod, ok := aq.inFlightPods[pod]
if !ok {
// This Pod is already done()ed.
return
}
delete(aq.inFlightPods, pod)
// Remove the pod from the list.
aq.inFlightEvents.Remove(inFlightPod)
aggrMetricsCounter := map[string]int{}
// Remove events which are only referred to by this Pod
// so that the inFlightEvents list doesn't grow infinitely.
// If the pod was at the head of the list, then all
// events between it and the next pod are no longer needed
// and can be removed.
for {
e := aq.inFlightEvents.Front()
if e == nil {
// Empty list.
break
}
ev, ok := e.Value.(*clusterEvent)
if !ok {
// A pod, must stop pruning.
break
}
aq.inFlightEvents.Remove(e)
aggrMetricsCounter[ev.event.Label()]--
}
for evLabel, count := range aggrMetricsCounter {
aq.metricsRecorder.ObserveInFlightEventsAsync(evLabel, float64(count), false)
}
aq.metricsRecorder.ObserveInFlightEventsAsync(metrics.PodPoppedInFlightEvent, -1,
// If it's the last Pod in inFlightPods, we should force-flush the metrics.
// Otherwise, especially in small clusters, which don't get a new Pod frequently,
// the metrics might not be flushed for a long time.
len(aq.inFlightPods) == 0)
}
// close closes the activeQueue.
func (aq *activeQueue) close() {
// We should call done() for all in-flight pods to clean up the inFlightEvents metrics.
// It's safe even if the binding cycle running asynchronously calls done() afterwards
// done() will just be a no-op.
for pod := range aq.inFlightPods {
aq.done(pod)
}
aq.lock.Lock()
aq.closed = true
aq.lock.Unlock()
}
// broadcast notifies the pop() operation that new pod(s) was added to the activeQueue.
func (aq *activeQueue) broadcast() {
aq.cond.Broadcast()
}

View File

@ -0,0 +1,195 @@
/*
Copyright 2024 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package queue
import (
"slices"
"sync"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
listersv1 "k8s.io/client-go/listers/core/v1"
"k8s.io/klog/v2"
"k8s.io/kubernetes/pkg/scheduler/framework"
)
// nominator is a structure that stores pods nominated to run on nodes.
// It exists because nominatedNodeName of pod objects stored in the structure
// may be different than what scheduler has here. We should be able to find pods
// by their UID and update/delete them.
type nominator struct {
// nLock synchronizes all operations related to nominator.
// It should not be used anywhere else.
// Caution: DO NOT take ("SchedulingQueue.lock" or "activeQueue.lock") after taking "nLock".
// You should always take "SchedulingQueue.lock" and "activeQueue.lock" first,
// otherwise the nominator could end up in deadlock.
// Correct locking order is: SchedulingQueue.lock > activeQueue.lock > nLock.
nLock sync.RWMutex
// podLister is used to verify if the given pod is alive.
podLister listersv1.PodLister
// nominatedPods is a map keyed by a node name and the value is a list of
// pods which are nominated to run on the node. These are pods which can be in
// the activeQ or unschedulablePods.
nominatedPods map[string][]podRef
// nominatedPodToNode is map keyed by a Pod UID to the node name where it is
// nominated.
nominatedPodToNode map[types.UID]string
}
func newPodNominator(podLister listersv1.PodLister) *nominator {
return &nominator{
podLister: podLister,
nominatedPods: make(map[string][]podRef),
nominatedPodToNode: make(map[types.UID]string),
}
}
// AddNominatedPod adds a pod to the nominated pods of the given node.
// This is called during the preemption process after a node is nominated to run
// the pod. We update the structure before sending a request to update the pod
// object to avoid races with the following scheduling cycles.
func (npm *nominator) AddNominatedPod(logger klog.Logger, pi *framework.PodInfo, nominatingInfo *framework.NominatingInfo) {
npm.nLock.Lock()
npm.addNominatedPodUnlocked(logger, pi, nominatingInfo)
npm.nLock.Unlock()
}
func (npm *nominator) addNominatedPodUnlocked(logger klog.Logger, pi *framework.PodInfo, nominatingInfo *framework.NominatingInfo) {
// Always delete the pod if it already exists, to ensure we never store more than
// one instance of the pod.
npm.deleteUnlocked(pi.Pod)
var nodeName string
if nominatingInfo.Mode() == framework.ModeOverride {
nodeName = nominatingInfo.NominatedNodeName
} else if nominatingInfo.Mode() == framework.ModeNoop {
if pi.Pod.Status.NominatedNodeName == "" {
return
}
nodeName = pi.Pod.Status.NominatedNodeName
}
if npm.podLister != nil {
// If the pod was removed or if it was already scheduled, don't nominate it.
updatedPod, err := npm.podLister.Pods(pi.Pod.Namespace).Get(pi.Pod.Name)
if err != nil {
logger.V(4).Info("Pod doesn't exist in podLister, aborted adding it to the nominator", "pod", klog.KObj(pi.Pod))
return
}
if updatedPod.Spec.NodeName != "" {
logger.V(4).Info("Pod is already scheduled to a node, aborted adding it to the nominator", "pod", klog.KObj(pi.Pod), "node", updatedPod.Spec.NodeName)
return
}
}
npm.nominatedPodToNode[pi.Pod.UID] = nodeName
for _, np := range npm.nominatedPods[nodeName] {
if np.uid == pi.Pod.UID {
logger.V(4).Info("Pod already exists in the nominator", "pod", np.uid)
return
}
}
npm.nominatedPods[nodeName] = append(npm.nominatedPods[nodeName], podToRef(pi.Pod))
}
// UpdateNominatedPod updates the <oldPod> with <newPod>.
func (npm *nominator) UpdateNominatedPod(logger klog.Logger, oldPod *v1.Pod, newPodInfo *framework.PodInfo) {
npm.nLock.Lock()
defer npm.nLock.Unlock()
// In some cases, an Update event with no "NominatedNode" present is received right
// after a node("NominatedNode") is reserved for this pod in memory.
// In this case, we need to keep reserving the NominatedNode when updating the pod pointer.
var nominatingInfo *framework.NominatingInfo
// We won't fall into below `if` block if the Update event represents:
// (1) NominatedNode info is added
// (2) NominatedNode info is updated
// (3) NominatedNode info is removed
if nominatedNodeName(oldPod) == "" && nominatedNodeName(newPodInfo.Pod) == "" {
if nnn, ok := npm.nominatedPodToNode[oldPod.UID]; ok {
// This is the only case we should continue reserving the NominatedNode
nominatingInfo = &framework.NominatingInfo{
NominatingMode: framework.ModeOverride,
NominatedNodeName: nnn,
}
}
}
// We update irrespective of the nominatedNodeName changed or not, to ensure
// that pod pointer is updated.
npm.deleteUnlocked(oldPod)
npm.addNominatedPodUnlocked(logger, newPodInfo, nominatingInfo)
}
// DeleteNominatedPodIfExists deletes <pod> from nominatedPods.
func (npm *nominator) DeleteNominatedPodIfExists(pod *v1.Pod) {
npm.nLock.Lock()
npm.deleteUnlocked(pod)
npm.nLock.Unlock()
}
func (npm *nominator) deleteUnlocked(p *v1.Pod) {
nnn, ok := npm.nominatedPodToNode[p.UID]
if !ok {
return
}
for i, np := range npm.nominatedPods[nnn] {
if np.uid == p.UID {
npm.nominatedPods[nnn] = append(npm.nominatedPods[nnn][:i], npm.nominatedPods[nnn][i+1:]...)
if len(npm.nominatedPods[nnn]) == 0 {
delete(npm.nominatedPods, nnn)
}
break
}
}
delete(npm.nominatedPodToNode, p.UID)
}
func (npm *nominator) nominatedPodsForNode(nodeName string) []podRef {
npm.nLock.RLock()
defer npm.nLock.RUnlock()
return slices.Clone(npm.nominatedPods[nodeName])
}
// nominatedNodeName returns nominated node name of a Pod.
func nominatedNodeName(pod *v1.Pod) string {
return pod.Status.NominatedNodeName
}
type podRef struct {
name string
namespace string
uid types.UID
}
func podToRef(pod *v1.Pod) podRef {
return podRef{
name: pod.Name,
namespace: pod.Namespace,
uid: pod.UID,
}
}
func (np podRef) toPod() *v1.Pod {
return &v1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: np.name,
Namespace: np.namespace,
UID: np.uid,
},
}
}

File diff suppressed because it is too large Load Diff

View File

@ -0,0 +1,63 @@
/*
Copyright 2021 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package queue
import (
"context"
"time"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/client-go/informers"
"k8s.io/client-go/kubernetes/fake"
"k8s.io/kubernetes/pkg/scheduler/framework"
"k8s.io/kubernetes/pkg/scheduler/metrics"
)
// NewTestQueue creates a priority queue with an empty informer factory.
func NewTestQueue(ctx context.Context, lessFn framework.LessFunc, opts ...Option) *PriorityQueue {
return NewTestQueueWithObjects(ctx, lessFn, nil, opts...)
}
// NewTestQueueWithObjects creates a priority queue with an informer factory
// populated with the provided objects.
func NewTestQueueWithObjects(
ctx context.Context,
lessFn framework.LessFunc,
objs []runtime.Object,
opts ...Option,
) *PriorityQueue {
informerFactory := informers.NewSharedInformerFactory(fake.NewClientset(objs...), 0)
// Because some major functions (e.g., Pop) requires the metric recorder to be set,
// we always set a metric recorder here.
recorder := metrics.NewMetricsAsyncRecorder(10, 20*time.Microsecond, ctx.Done())
// We set it before the options that users provide, so that users can override it.
opts = append([]Option{WithMetricsRecorder(*recorder)}, opts...)
return NewTestQueueWithInformerFactory(ctx, lessFn, informerFactory, opts...)
}
func NewTestQueueWithInformerFactory(
ctx context.Context,
lessFn framework.LessFunc,
informerFactory informers.SharedInformerFactory,
opts ...Option,
) *PriorityQueue {
pq := NewPriorityQueue(lessFn, informerFactory, opts...)
informerFactory.Start(ctx.Done())
informerFactory.WaitForCacheSync(ctx.Done())
return pq
}