Inside Kubernetes Scheduler: How the SchedulingQueue and PriorityQueue Work
This article provides an in‑depth analysis of the Kubernetes scheduler's queue subsystem, detailing the internal data structures, heap implementation, priority queue logic, and the lifecycle methods that add, activate, update, and remove Pods from active, backoff, and unschedulable queues.
1. Introduction
The queue mechanism is a core component of the Kubernetes scheduler, responsible for selecting the most suitable Pod for the next scheduling cycle. Because Pods can declare various resource requirements and constraints—such as PersistentVolumes, anti‑affinity rules, or node taints—the queue must be able to postpone scheduling until all conditions are satisfied. The SchedulingQueue stores Pods waiting to be scheduled, and this article explores its design and implementation based on the Kubernetes release-1.28 source code.
2. Internal Data Structures
2.1 Pod representation in the queue
Kubernetes defines the Pod API object, but it does not include fields needed for queue management. The QueuedPodInfo struct extends the API object with scheduling‑specific information.
// QueuedPodInfo adds scheduling‑related fields to the Pod API object
type QueuedPodInfo struct {
// Pre‑processed Pod information to speed up scheduling
*PodInfo
// Time when the Pod was first added to the queue
Timestamp time.Time
// Number of scheduling attempts before the Pod is successfully scheduled
Attempts int
// Timestamp of the first scheduling attempt (set once and never updated)
InitialAttemptTimestamp *time.Time
// Names of plugins that caused the Pod to be unschedulable during this cycle
// Typically set in PreFilter, Filter, Reserve, or Permit stages
UnschedulablePlugins sets.Set[string]
// Indicates whether the Pod is gated by scheduling gates (PreEnqueuePlugins)
Gated bool
}2.2 Heap implementation
The queue needs fast look‑ups (provided by a map) and ordered retrieval (provided by a slice). A heap combines both capabilities, enabling efficient insertion, deletion, and priority‑based ordering.
// Heap implements a producer/consumer queue for a specific data structure
type Heap struct {
data *data
// Metric recorder for heap operations
metricRecorder metrics.MetricRecorder
}
type data struct {
// Map that stores all objects by key
items map[string]*heapItem
// Slice that stores the keys in order
queue []string
// Function to compute a deterministic key for an object
keyFunc KeyFunc
// Comparison function used to order items in the heap
lessFunc lessFunc
}
type heapItem struct {
// The object stored in the heap (e.g., *QueuedPodInfo)
obj interface{}
// Index of the item in the heap's slice
index int
}2.3 UnschedulablePods (temporary queue)
The UnschedulablePods structure holds Pods that cannot currently be scheduled, such as those lacking a suitable Node. Although called a queue, it is implemented as a map.
// UnschedulablePods is a map‑based wrapper for temporarily unschedulable Pods
type UnschedulablePods struct {
// Map from Pod name to *QueuedPodInfo
podInfoMap map[string]*framework.QueuedPodInfo
// Function to compute the key (same as in the heap)
keyFunc func(*v1.Pod) string
// Metric recorders for add/remove operations
unschedulableRecorder, gatedRecorder metrics.MetricRecorder
}2.4 SchedulingQueue abstraction
The SchedulingQueue interface abstracts the storage of Pods awaiting scheduling. It combines a FIFO queue ( activeQ) and a heap‑based priority queue, along with backoff and unschedulable structures.
// SchedulingQueue abstracts the storage of Pods waiting to be scheduled.
type SchedulingQueue interface {
framework.PodNominator
// Add a Pod to the active queue
Add(logger klog.Logger, pod *v1.Pod) error
// Move Pods from Unschedulable or Backoff queues to the active queue
Activate(logger klog.Logger, pods map[string]*v1.Pod)
// Add an unschedulable Pod if it is not already present
AddUnschedulableIfNotPresent(logger klog.Logger, podInfo *framework.QueuedPodInfo, podSchedulingCycle int64) error
// Return the current scheduling cycle number
SchedulingCycle() int64
// Pop the highest‑priority Pod from the active queue (blocks if empty)
Pop() (*framework.QueuedPodInfo, error)
// Mark a Pod as processed after it leaves the queue
Done(types.UID)
// Update a Pod that may have changed while in the queue
Update(logger klog.Logger, oldPod, newPod *v1.Pod) error
// Delete a Pod from all internal queues
Delete(pod *v1.Pod) error
// Gracefully close the queue
Close()
// Run background workers that manage backoff and unschedulable queues
Run(logger klog.Logger)
}3. PriorityQueue implementation
3.1 Structure
The PriorityQueue implements SchedulingQueue. It contains two sub‑queues ( activeQ and podBackoffQ) and the unschedulable map.
type PriorityQueue struct {
*nominator
stopchan struct{}
clock clock.Clock
// Initial backoff duration for a Pod that fails scheduling (default 1s)
podInitialBackoffDuration time.Duration
// Maximum backoff duration (default 10s)
podMaxBackoffDuration time.Duration
// Maximum time a Pod may stay in the unschedulable queue
podMaxInUnschedulablePodsDuration time.Duration
cond sync.Cond
inFlightPods map[types.UID]inFlightPod
receivedEvents *list.List
// Active queue (heap) for ready Pods
activeQ *heap.Heap
// Backoff queue (heap) for Pods waiting for backoff to expire
podBackoffQ *heap.Heap
// Unschedulable Pods map
unschedulablePods *UnschedulablePods
schedulingCycle int64
moveRequestCycle int64
preEnqueuePluginMap map[string][]framework.PreEnqueuePlugin
queueingHintMap QueueingHintMapPerProfile
closed bool
nsLister listersv1.NamespaceLister
metricsRecorder metrics.MetricAsyncRecorder
pluginMetricsSamplePercent int
isSchedulingQueueHintEnabled bool
}3.2 runPreEnqueuePlugins
Pre‑enqueue plugins run before a Pod is placed into activeQ. They can enforce scheduling gates such as waiting for a PersistentVolumeClaim to be bound.
func (p *PriorityQueue) runPreEnqueuePlugins(ctx context.Context, pInfo *framework.QueuedPodInfo) bool {
// Randomly decide whether to record metrics for this plugin execution
shouldRecordMetric := rand.Intn(100) < p.pluginMetricsSamplePercent
for _, pl := range p.preEnqueuePluginMap[pod.Spec.SchedulerName] {
// Execute the plugin; if it succeeds, continue to the next one
s := p.runPreEnqueuePlugin(ctx, pl, pod, shouldRecordMetric)
if s.IsSuccess() {
continue
}
// Record the unschedulable plugin name
pInfo.UnschedulablePlugins.Insert(pl.Name())
metrics.UnschedulableReason(pl.Name(), pod.Spec.SchedulerName).Inc()
return false
}
return true
}3.3 addToActiveQ
Attempts to insert a Pod into activeQ. If the Pod fails pre‑enqueue checks, it is placed into the unschedulable map.
func (p *PriorityQueue) addToActiveQ(logger klog.Logger, pInfo *framework.QueuedPodInfo) (bool, error) {
pInfo.Gated = !p.runPreEnqueuePlugins(context.Background(), pInfo)
if pInfo.Gated {
p.unschedulablePods.addOrUpdate(pInfo)
return false, nil
}
// Ensure InitialAttemptTimestamp is set
if pInfo.InitialAttemptTimestamp == nil {
now := p.clock.Now()
pInfo.InitialAttemptTimestamp = &now
}
if err := p.activeQ.Add(pInfo); err != nil {
logger.Error(err, "Error adding pod to the active queue", "pod", klog.KObj(pInfo.Pod))
return false, err
}
return true, nil
}3.4 Add (public API)
Creates a QueuedPodInfo from a fresh v1.Pod and adds it to the active queue, removing any stale references from other queues.
func (p *PriorityQueue) Add(logger klog.Logger, pod *v1.Pod) error {
pInfo := p.newQueuedPodInfo(pod)
added, err := p.addToActiveQ(logger, pInfo)
if !added {
return err
}
// Clean up possible stale entries in unschedulable and backoff queues
if p.unschedulablePods.get(pod) != nil {
p.unschedulablePods.delete(pod, pInfo.Gated)
}
if err := p.podBackoffQ.Delete(pInfo); err == nil {
// No action needed; just ensure the pod is not duplicated
}
return nil
}3.5 Activate
Moves Pods from the backoff and unschedulable queues into the active queue when they become eligible.
func (p *PriorityQueue) Activate(logger klog.Logger, pods map[string]*v1.Pod) {
activated := false
for _, pod := range pods {
if p.activate(logger, pod) {
activated = true
}
}
if activated {
p.cond.Broadcast()
}
}
func (p *PriorityQueue) activate(logger klog.Logger, pod *v1.Pod) bool {
// If the pod is already in activeQ, nothing to do
if _, exists, _ := p.activeQ.Get(newQueuedPodInfoForLookup(pod)); exists {
return false
}
var pInfo *framework.QueuedPodInfo
// Try to find the pod in unschedulable or backoff queues
if pInfo = p.unschedulablePods.get(pod); pInfo == nil {
if obj, exists, _ := p.podBackoffQ.Get(newQueuedPodInfoForLookup(pod)); !exists {
logger.Error(nil, "Pod to activate not found in unschedulable or backoff queues", "pod", klog.KObj(pod))
return false
} else {
pInfo = obj.(*framework.QueuedPodInfo)
}
}
// Add to active queue
added, _ := p.addToActiveQ(logger, pInfo)
if !added {
return false
}
// Remove from other queues
p.unschedulablePods.delete(pInfo.Pod, pInfo.Gated)
p.podBackoffQ.Delete(pInfo)
return true
}3.6 AddUnschedulableIfNotPresent
Places a Pod into the unschedulable map unless it already exists in any queue.
func (p *PriorityQueue) AddUnschedulableIfNotPresent(logger klog.Logger, pInfo *framework.QueuedPodInfo, podSchedulingCycle int64) error {
pod := pInfo.Pod
if p.unschedulablePods.get(pod) != nil {
return fmt.Errorf("Pod %v is already present in unschedulable queue", klog.KObj(pod))
}
if _, exists, _ := p.activeQ.Get(pInfo); exists {
return fmt.Errorf("Pod %v is already present in the active queue", klog.KObj(pod))
}
if _, exists, _ := p.podBackoffQ.Get(pInfo); exists {
return fmt.Errorf("Pod %v is already present in the backoff queue", klog.KObj(pod))
}
// Update timestamp because the pod is being re‑queued
pInfo.Timestamp = p.clock.Now()
// Determine whether the pod should be re‑queued immediately or after backoff
schedulingHint := p.determineSchedulingHintForInFlightPod(logger, pInfo, podSchedulingCycle)
// Re‑queue based on the hint
p.requeuePodViaQueueingHint(logger, pInfo, schedulingHint, ScheduleAttemptFailure)
return nil
}3.7 Pop
Retrieves the highest‑priority Pod from activeQ. The method increments attempt counters and updates scheduling cycle information.
func (p *PriorityQueue) Pop() (*framework.QueuedPodInfo, error) {
p.lock.Lock()
defer p.lock.Unlock()
for p.activeQ.Len() == 0 {
if p.closed {
klog.V(2).InfoS("Scheduling queue is closed")
return nil, nil
}
p.cond.Wait()
}
obj, err := p.activeQ.Pop()
if err != nil {
return nil, err
}
pInfo := obj.(*framework.QueuedPodInfo)
pInfo.Attempts++
p.schedulingCycle++
if p.isSchedulingQueueHintEnabled {
p.inFlightPods[pInfo.Pod.UID] = inFlightPod{previousEvent: p.receivedEvents.Back()}
}
return pInfo, nil
}3.8 Update
Updates a Pod that may have changed while in any of the queues. If the Pod is not found, it is added to the active queue.
func (p *PriorityQueue) Update(logger klog.Logger, oldPod, newPod *v1.Pod) error {
p.lock.Lock()
defer p.lock.Unlock()
if oldPod != nil {
oldPodInfo := newQueuedPodInfoForLookup(oldPod)
if oldPodInfo, exists, _ := p.activeQ.Get(oldPodInfo); exists {
pInfo := updatePod(oldPodInfo, newPod)
p.updateNominatedPodUnlocked(logger, oldPod, pInfo.PodInfo)
return p.activeQ.Update(pInfo)
}
if oldPodInfo, exists, _ := p.podBackoffQ.Get(oldPodInfo); exists {
pInfo := updatePod(oldPodInfo, newPod)
p.updateNominatedPodUnlocked(logger, oldPod, pInfo.PodInfo)
return p.podBackoffQ.Update(pInfo)
}
if usPodInfo := p.unschedulablePods.get(newPod); usPodInfo != nil {
pInfo := updatePod(usPodInfo, newPod)
p.updateNominatedPodUnlocked(logger, oldPod, pInfo.PodInfo)
if isPodUpdated(oldPod, newPod) {
gated := usPodInfo.Gated
if p.isPodBackingoff(usPodInfo) {
if err := p.podBackoffQ.Add(pInfo); err != nil {
return err
}
p.unschedulablePods.delete(usPodInfo.Pod, gated)
} else {
if added, err := p.addToActiveQ(logger, pInfo); !added {
return err
}
p.unschedulablePods.delete(usPodInfo.Pod, gated)
}
} else {
p.unschedulablePods.addOrUpdate(pInfo)
}
return nil
}
}
// Pod not found in any queue – add it to activeQ
pInfo := p.newQueuedPodInfo(newPod)
if added, err := p.addToActiveQ(logger, pInfo); !added {
return err
}
return nil
}3.9 Delete
Removes a Pod from all internal queues and clears any nomination information.
func (p *PriorityQueue) Delete(pod *v1.Pod) error {
p.lock.Lock()
defer p.lock.Unlock()
p.deleteNominatedPodIfExistsUnlocked(pod)
pInfo := newQueuedPodInfoForLookup(pod)
if err := p.activeQ.Delete(pInfo); err != nil {
p.podBackoffQ.Delete(pInfo)
if pInfo = p.unschedulablePods.get(pod); pInfo != nil {
p.unschedulablePods.delete(pod, pInfo.Gated)
}
}
return nil
}3.10 MoveAllToActiveOrBackoffQueue
Periodically moves Pods from the unschedulable map to either the active or backoff queue based on plugin hints and backoff status.
func (p *PriorityQueue) MoveAllToActiveOrBackoffQueue(logger klog.Logger, event framework.ClusterEvent, oldObj, newObj interface{}, preCheck PreEnqueueCheck) {
p.lock.Lock()
defer p.lock.Unlock()
// Collect Pods that pass the optional pre‑check
var podInfoList []*framework.QueuedPodInfo
for _, pInfo := range p.unschedulablePods.podInfoMap {
if preCheck == nil || preCheck(pInfo.Pod) {
podInfoList = append(podInfoList, pInfo)
}
}
p.movePodsToActiveOrBackoffQueue(logger, podInfoList, event, oldObj, newObj)
}
func (p *PriorityQueue) movePodsToActiveOrBackoffQueue(logger klog.Logger, podInfoList []*framework.QueuedPodInfo, event framework.ClusterEvent, oldObj, newObj interface{}) {
for _, pInfo := range podInfoList {
schedulingHint := p.isPodWorthRequeuing(logger, pInfo, event, oldObj, newObj)
queue := p.requeuePodViaQueueingHint(logger, pInfo, schedulingHint, event.Label)
logger.V(4).Info("Pod moved to an internal scheduling queue", "pod", klog.KObj(pInfo.Pod), "event", event.Label, "queue", queue, "hint", schedulingHint)
if queue == activeQ {
// Signal that the active queue now has work
}
p.unschedulablePods.delete(pInfo.Pod, pInfo.Gated)
}
p.moveRequestCycle = p.schedulingCycle
if len(p.inFlightPods) != 0 {
p.receivedEvents.PushBack(&clusterEvent{event: event, inFlightPodsNum: len(p.inFlightPods), oldObj: oldObj, newObj: newObj})
}
}3.11 Run (background workers)
Starts two goroutines: one flushes the backoff queue every second, moving ready Pods to the active queue; the other flushes the unschedulable queue every 30 seconds, moving Pods that have exceeded the maximum unschedulable duration.
func (p *PriorityQueue) Run(logger klog.Logger) {
go wait.Until(func() { p.flushBackoffQCompleted(logger) }, 1*time.Second, p.stop)
go wait.Until(func() { p.flushUnschedulablePodsLeftover(logger) }, 30*time.Second, p.stop)
}
func (p *PriorityQueue) flushBackoffQCompleted(logger klog.Logger) {
p.lock.Lock()
defer p.lock.Unlock()
for {
raw := p.podBackoffQ.Peek()
if raw == nil {
break
}
pInfo := raw.(*framework.QueuedPodInfo)
if p.isPodBackingoff(pInfo) {
break // Backoff not yet finished
}
p.podBackoffQ.Pop()
if added, _ := p.addToActiveQ(logger, pInfo); added {
logger.V(5).Info("Pod moved from backoff to active queue", "pod", klog.KObj(pInfo.Pod))
}
}
}
func (p *PriorityQueue) flushUnschedulablePodsLeftover(logger klog.Logger) {
p.lock.Lock()
defer p.lock.Unlock()
var podsToMove []*framework.QueuedPodInfo
now := p.clock.Now()
for _, pInfo := range p.unschedulablePods.podInfoMap {
if now.Sub(pInfo.Timestamp) > p.podMaxInUnschedulablePodsDuration {
podsToMove = append(podsToMove, pInfo)
}
}
if len(podsToMove) > 0 {
p.movePodsToActiveOrBackoffQueue(logger, podsToMove, UnschedulableTimeout, nil, nil)
}
}4. Summary
New Pods are inserted into the active queue via the Add() method.
The scheduler obtains a Pod for processing by calling Pop().
If scheduling fails, the Pod is placed into the unschedulable map with AddUnschedulableIfNotPresent().
When a dependent Pod becomes ready, AssignedPodAdded() moves it from the unschedulable map to either the active or backoff queue.
The priority queue flushes the unschedulable map every 30 seconds; Pods that have been waiting longer than podMaxInUnschedulablePodsDuration (default 60 s) are re‑queued.
The backoff queue is flushed every second, moving Pods whose backoff period has expired into the active queue.
Although the implementation uses a priority queue, the actual ordering is defined by the lessFunc supplied at creation, allowing custom scheduling priorities.
Cloud Native Technology Community
The Cloud Native Technology Community, part of the CNBPA Cloud Native Technology Practice Alliance, focuses on evangelizing cutting‑edge cloud‑native technologies and practical implementations. It shares in‑depth content, case studies, and event/meetup information on containers, Kubernetes, DevOps, Service Mesh, and other cloud‑native tech, along with updates from the CNBPA alliance.
How this landed with the community
Was this worth your time?
0 Comments
Thoughtful readers leave field notes, pushback, and hard-won operational detail here.
