Cloud Native 29 min read

Inside Kubernetes Scheduler: How the SchedulingQueue and PriorityQueue Work

This article provides an in‑depth analysis of the Kubernetes scheduler's queue subsystem, detailing the internal data structures, heap implementation, priority queue logic, and the lifecycle methods that add, activate, update, and remove Pods from active, backoff, and unschedulable queues.

Cloud Native Technology Community
Cloud Native Technology Community
Cloud Native Technology Community
Inside Kubernetes Scheduler: How the SchedulingQueue and PriorityQueue Work

1. Introduction

The queue mechanism is a core component of the Kubernetes scheduler, responsible for selecting the most suitable Pod for the next scheduling cycle. Because Pods can declare various resource requirements and constraints—such as PersistentVolumes, anti‑affinity rules, or node taints—the queue must be able to postpone scheduling until all conditions are satisfied. The SchedulingQueue stores Pods waiting to be scheduled, and this article explores its design and implementation based on the Kubernetes release-1.28 source code.

2. Internal Data Structures

2.1 Pod representation in the queue

Kubernetes defines the Pod API object, but it does not include fields needed for queue management. The QueuedPodInfo struct extends the API object with scheduling‑specific information.

// QueuedPodInfo adds scheduling‑related fields to the Pod API object
type QueuedPodInfo struct {
    // Pre‑processed Pod information to speed up scheduling
    *PodInfo
    // Time when the Pod was first added to the queue
    Timestamp time.Time
    // Number of scheduling attempts before the Pod is successfully scheduled
    Attempts int
    // Timestamp of the first scheduling attempt (set once and never updated)
    InitialAttemptTimestamp *time.Time
    // Names of plugins that caused the Pod to be unschedulable during this cycle
    // Typically set in PreFilter, Filter, Reserve, or Permit stages
    UnschedulablePlugins sets.Set[string]
    // Indicates whether the Pod is gated by scheduling gates (PreEnqueuePlugins)
    Gated bool
}

2.2 Heap implementation

The queue needs fast look‑ups (provided by a map) and ordered retrieval (provided by a slice). A heap combines both capabilities, enabling efficient insertion, deletion, and priority‑based ordering.

// Heap implements a producer/consumer queue for a specific data structure
type Heap struct {
    data *data
    // Metric recorder for heap operations
    metricRecorder metrics.MetricRecorder
}

type data struct {
    // Map that stores all objects by key
    items map[string]*heapItem
    // Slice that stores the keys in order
    queue []string
    // Function to compute a deterministic key for an object
    keyFunc KeyFunc
    // Comparison function used to order items in the heap
    lessFunc lessFunc
}

type heapItem struct {
    // The object stored in the heap (e.g., *QueuedPodInfo)
    obj interface{}
    // Index of the item in the heap's slice
    index int
}

2.3 UnschedulablePods (temporary queue)

The UnschedulablePods structure holds Pods that cannot currently be scheduled, such as those lacking a suitable Node. Although called a queue, it is implemented as a map.

// UnschedulablePods is a map‑based wrapper for temporarily unschedulable Pods
type UnschedulablePods struct {
    // Map from Pod name to *QueuedPodInfo
    podInfoMap map[string]*framework.QueuedPodInfo
    // Function to compute the key (same as in the heap)
    keyFunc func(*v1.Pod) string
    // Metric recorders for add/remove operations
    unschedulableRecorder, gatedRecorder metrics.MetricRecorder
}

2.4 SchedulingQueue abstraction

The SchedulingQueue interface abstracts the storage of Pods awaiting scheduling. It combines a FIFO queue ( activeQ) and a heap‑based priority queue, along with backoff and unschedulable structures.

// SchedulingQueue abstracts the storage of Pods waiting to be scheduled.
type SchedulingQueue interface {
    framework.PodNominator
    // Add a Pod to the active queue
    Add(logger klog.Logger, pod *v1.Pod) error
    // Move Pods from Unschedulable or Backoff queues to the active queue
    Activate(logger klog.Logger, pods map[string]*v1.Pod)
    // Add an unschedulable Pod if it is not already present
    AddUnschedulableIfNotPresent(logger klog.Logger, podInfo *framework.QueuedPodInfo, podSchedulingCycle int64) error
    // Return the current scheduling cycle number
    SchedulingCycle() int64
    // Pop the highest‑priority Pod from the active queue (blocks if empty)
    Pop() (*framework.QueuedPodInfo, error)
    // Mark a Pod as processed after it leaves the queue
    Done(types.UID)
    // Update a Pod that may have changed while in the queue
    Update(logger klog.Logger, oldPod, newPod *v1.Pod) error
    // Delete a Pod from all internal queues
    Delete(pod *v1.Pod) error
    // Gracefully close the queue
    Close()
    // Run background workers that manage backoff and unschedulable queues
    Run(logger klog.Logger)
}

3. PriorityQueue implementation

3.1 Structure

The PriorityQueue implements SchedulingQueue. It contains two sub‑queues ( activeQ and podBackoffQ) and the unschedulable map.

type PriorityQueue struct {
    *nominator
    stopchan struct{}
    clock clock.Clock
    // Initial backoff duration for a Pod that fails scheduling (default 1s)
    podInitialBackoffDuration time.Duration
    // Maximum backoff duration (default 10s)
    podMaxBackoffDuration time.Duration
    // Maximum time a Pod may stay in the unschedulable queue
    podMaxInUnschedulablePodsDuration time.Duration
    cond sync.Cond
    inFlightPods map[types.UID]inFlightPod
    receivedEvents *list.List
    // Active queue (heap) for ready Pods
    activeQ *heap.Heap
    // Backoff queue (heap) for Pods waiting for backoff to expire
    podBackoffQ *heap.Heap
    // Unschedulable Pods map
    unschedulablePods *UnschedulablePods
    schedulingCycle int64
    moveRequestCycle int64
    preEnqueuePluginMap map[string][]framework.PreEnqueuePlugin
    queueingHintMap QueueingHintMapPerProfile
    closed bool
    nsLister listersv1.NamespaceLister
    metricsRecorder metrics.MetricAsyncRecorder
    pluginMetricsSamplePercent int
    isSchedulingQueueHintEnabled bool
}

3.2 runPreEnqueuePlugins

Pre‑enqueue plugins run before a Pod is placed into activeQ. They can enforce scheduling gates such as waiting for a PersistentVolumeClaim to be bound.

func (p *PriorityQueue) runPreEnqueuePlugins(ctx context.Context, pInfo *framework.QueuedPodInfo) bool {
    // Randomly decide whether to record metrics for this plugin execution
    shouldRecordMetric := rand.Intn(100) < p.pluginMetricsSamplePercent
    for _, pl := range p.preEnqueuePluginMap[pod.Spec.SchedulerName] {
        // Execute the plugin; if it succeeds, continue to the next one
        s := p.runPreEnqueuePlugin(ctx, pl, pod, shouldRecordMetric)
        if s.IsSuccess() {
            continue
        }
        // Record the unschedulable plugin name
        pInfo.UnschedulablePlugins.Insert(pl.Name())
        metrics.UnschedulableReason(pl.Name(), pod.Spec.SchedulerName).Inc()
        return false
    }
    return true
}

3.3 addToActiveQ

Attempts to insert a Pod into activeQ. If the Pod fails pre‑enqueue checks, it is placed into the unschedulable map.

func (p *PriorityQueue) addToActiveQ(logger klog.Logger, pInfo *framework.QueuedPodInfo) (bool, error) {
    pInfo.Gated = !p.runPreEnqueuePlugins(context.Background(), pInfo)
    if pInfo.Gated {
        p.unschedulablePods.addOrUpdate(pInfo)
        return false, nil
    }
    // Ensure InitialAttemptTimestamp is set
    if pInfo.InitialAttemptTimestamp == nil {
        now := p.clock.Now()
        pInfo.InitialAttemptTimestamp = &now
    }
    if err := p.activeQ.Add(pInfo); err != nil {
        logger.Error(err, "Error adding pod to the active queue", "pod", klog.KObj(pInfo.Pod))
        return false, err
    }
    return true, nil
}

3.4 Add (public API)

Creates a QueuedPodInfo from a fresh v1.Pod and adds it to the active queue, removing any stale references from other queues.

func (p *PriorityQueue) Add(logger klog.Logger, pod *v1.Pod) error {
    pInfo := p.newQueuedPodInfo(pod)
    added, err := p.addToActiveQ(logger, pInfo)
    if !added {
        return err
    }
    // Clean up possible stale entries in unschedulable and backoff queues
    if p.unschedulablePods.get(pod) != nil {
        p.unschedulablePods.delete(pod, pInfo.Gated)
    }
    if err := p.podBackoffQ.Delete(pInfo); err == nil {
        // No action needed; just ensure the pod is not duplicated
    }
    return nil
}

3.5 Activate

Moves Pods from the backoff and unschedulable queues into the active queue when they become eligible.

func (p *PriorityQueue) Activate(logger klog.Logger, pods map[string]*v1.Pod) {
    activated := false
    for _, pod := range pods {
        if p.activate(logger, pod) {
            activated = true
        }
    }
    if activated {
        p.cond.Broadcast()
    }
}

func (p *PriorityQueue) activate(logger klog.Logger, pod *v1.Pod) bool {
    // If the pod is already in activeQ, nothing to do
    if _, exists, _ := p.activeQ.Get(newQueuedPodInfoForLookup(pod)); exists {
        return false
    }
    var pInfo *framework.QueuedPodInfo
    // Try to find the pod in unschedulable or backoff queues
    if pInfo = p.unschedulablePods.get(pod); pInfo == nil {
        if obj, exists, _ := p.podBackoffQ.Get(newQueuedPodInfoForLookup(pod)); !exists {
            logger.Error(nil, "Pod to activate not found in unschedulable or backoff queues", "pod", klog.KObj(pod))
            return false
        } else {
            pInfo = obj.(*framework.QueuedPodInfo)
        }
    }
    // Add to active queue
    added, _ := p.addToActiveQ(logger, pInfo)
    if !added {
        return false
    }
    // Remove from other queues
    p.unschedulablePods.delete(pInfo.Pod, pInfo.Gated)
    p.podBackoffQ.Delete(pInfo)
    return true
}

3.6 AddUnschedulableIfNotPresent

Places a Pod into the unschedulable map unless it already exists in any queue.

func (p *PriorityQueue) AddUnschedulableIfNotPresent(logger klog.Logger, pInfo *framework.QueuedPodInfo, podSchedulingCycle int64) error {
    pod := pInfo.Pod
    if p.unschedulablePods.get(pod) != nil {
        return fmt.Errorf("Pod %v is already present in unschedulable queue", klog.KObj(pod))
    }
    if _, exists, _ := p.activeQ.Get(pInfo); exists {
        return fmt.Errorf("Pod %v is already present in the active queue", klog.KObj(pod))
    }
    if _, exists, _ := p.podBackoffQ.Get(pInfo); exists {
        return fmt.Errorf("Pod %v is already present in the backoff queue", klog.KObj(pod))
    }
    // Update timestamp because the pod is being re‑queued
    pInfo.Timestamp = p.clock.Now()
    // Determine whether the pod should be re‑queued immediately or after backoff
    schedulingHint := p.determineSchedulingHintForInFlightPod(logger, pInfo, podSchedulingCycle)
    // Re‑queue based on the hint
    p.requeuePodViaQueueingHint(logger, pInfo, schedulingHint, ScheduleAttemptFailure)
    return nil
}

3.7 Pop

Retrieves the highest‑priority Pod from activeQ. The method increments attempt counters and updates scheduling cycle information.

func (p *PriorityQueue) Pop() (*framework.QueuedPodInfo, error) {
    p.lock.Lock()
    defer p.lock.Unlock()
    for p.activeQ.Len() == 0 {
        if p.closed {
            klog.V(2).InfoS("Scheduling queue is closed")
            return nil, nil
        }
        p.cond.Wait()
    }
    obj, err := p.activeQ.Pop()
    if err != nil {
        return nil, err
    }
    pInfo := obj.(*framework.QueuedPodInfo)
    pInfo.Attempts++
    p.schedulingCycle++
    if p.isSchedulingQueueHintEnabled {
        p.inFlightPods[pInfo.Pod.UID] = inFlightPod{previousEvent: p.receivedEvents.Back()}
    }
    return pInfo, nil
}

3.8 Update

Updates a Pod that may have changed while in any of the queues. If the Pod is not found, it is added to the active queue.

func (p *PriorityQueue) Update(logger klog.Logger, oldPod, newPod *v1.Pod) error {
    p.lock.Lock()
    defer p.lock.Unlock()
    if oldPod != nil {
        oldPodInfo := newQueuedPodInfoForLookup(oldPod)
        if oldPodInfo, exists, _ := p.activeQ.Get(oldPodInfo); exists {
            pInfo := updatePod(oldPodInfo, newPod)
            p.updateNominatedPodUnlocked(logger, oldPod, pInfo.PodInfo)
            return p.activeQ.Update(pInfo)
        }
        if oldPodInfo, exists, _ := p.podBackoffQ.Get(oldPodInfo); exists {
            pInfo := updatePod(oldPodInfo, newPod)
            p.updateNominatedPodUnlocked(logger, oldPod, pInfo.PodInfo)
            return p.podBackoffQ.Update(pInfo)
        }
        if usPodInfo := p.unschedulablePods.get(newPod); usPodInfo != nil {
            pInfo := updatePod(usPodInfo, newPod)
            p.updateNominatedPodUnlocked(logger, oldPod, pInfo.PodInfo)
            if isPodUpdated(oldPod, newPod) {
                gated := usPodInfo.Gated
                if p.isPodBackingoff(usPodInfo) {
                    if err := p.podBackoffQ.Add(pInfo); err != nil {
                        return err
                    }
                    p.unschedulablePods.delete(usPodInfo.Pod, gated)
                } else {
                    if added, err := p.addToActiveQ(logger, pInfo); !added {
                        return err
                    }
                    p.unschedulablePods.delete(usPodInfo.Pod, gated)
                }
            } else {
                p.unschedulablePods.addOrUpdate(pInfo)
            }
            return nil
        }
    }
    // Pod not found in any queue – add it to activeQ
    pInfo := p.newQueuedPodInfo(newPod)
    if added, err := p.addToActiveQ(logger, pInfo); !added {
        return err
    }
    return nil
}

3.9 Delete

Removes a Pod from all internal queues and clears any nomination information.

func (p *PriorityQueue) Delete(pod *v1.Pod) error {
    p.lock.Lock()
    defer p.lock.Unlock()
    p.deleteNominatedPodIfExistsUnlocked(pod)
    pInfo := newQueuedPodInfoForLookup(pod)
    if err := p.activeQ.Delete(pInfo); err != nil {
        p.podBackoffQ.Delete(pInfo)
        if pInfo = p.unschedulablePods.get(pod); pInfo != nil {
            p.unschedulablePods.delete(pod, pInfo.Gated)
        }
    }
    return nil
}

3.10 MoveAllToActiveOrBackoffQueue

Periodically moves Pods from the unschedulable map to either the active or backoff queue based on plugin hints and backoff status.

func (p *PriorityQueue) MoveAllToActiveOrBackoffQueue(logger klog.Logger, event framework.ClusterEvent, oldObj, newObj interface{}, preCheck PreEnqueueCheck) {
    p.lock.Lock()
    defer p.lock.Unlock()
    // Collect Pods that pass the optional pre‑check
    var podInfoList []*framework.QueuedPodInfo
    for _, pInfo := range p.unschedulablePods.podInfoMap {
        if preCheck == nil || preCheck(pInfo.Pod) {
            podInfoList = append(podInfoList, pInfo)
        }
    }
    p.movePodsToActiveOrBackoffQueue(logger, podInfoList, event, oldObj, newObj)
}

func (p *PriorityQueue) movePodsToActiveOrBackoffQueue(logger klog.Logger, podInfoList []*framework.QueuedPodInfo, event framework.ClusterEvent, oldObj, newObj interface{}) {
    for _, pInfo := range podInfoList {
        schedulingHint := p.isPodWorthRequeuing(logger, pInfo, event, oldObj, newObj)
        queue := p.requeuePodViaQueueingHint(logger, pInfo, schedulingHint, event.Label)
        logger.V(4).Info("Pod moved to an internal scheduling queue", "pod", klog.KObj(pInfo.Pod), "event", event.Label, "queue", queue, "hint", schedulingHint)
        if queue == activeQ {
            // Signal that the active queue now has work
        }
        p.unschedulablePods.delete(pInfo.Pod, pInfo.Gated)
    }
    p.moveRequestCycle = p.schedulingCycle
    if len(p.inFlightPods) != 0 {
        p.receivedEvents.PushBack(&clusterEvent{event: event, inFlightPodsNum: len(p.inFlightPods), oldObj: oldObj, newObj: newObj})
    }
}

3.11 Run (background workers)

Starts two goroutines: one flushes the backoff queue every second, moving ready Pods to the active queue; the other flushes the unschedulable queue every 30 seconds, moving Pods that have exceeded the maximum unschedulable duration.

func (p *PriorityQueue) Run(logger klog.Logger) {
    go wait.Until(func() { p.flushBackoffQCompleted(logger) }, 1*time.Second, p.stop)
    go wait.Until(func() { p.flushUnschedulablePodsLeftover(logger) }, 30*time.Second, p.stop)
}

func (p *PriorityQueue) flushBackoffQCompleted(logger klog.Logger) {
    p.lock.Lock()
    defer p.lock.Unlock()
    for {
        raw := p.podBackoffQ.Peek()
        if raw == nil {
            break
        }
        pInfo := raw.(*framework.QueuedPodInfo)
        if p.isPodBackingoff(pInfo) {
            break // Backoff not yet finished
        }
        p.podBackoffQ.Pop()
        if added, _ := p.addToActiveQ(logger, pInfo); added {
            logger.V(5).Info("Pod moved from backoff to active queue", "pod", klog.KObj(pInfo.Pod))
        }
    }
}

func (p *PriorityQueue) flushUnschedulablePodsLeftover(logger klog.Logger) {
    p.lock.Lock()
    defer p.lock.Unlock()
    var podsToMove []*framework.QueuedPodInfo
    now := p.clock.Now()
    for _, pInfo := range p.unschedulablePods.podInfoMap {
        if now.Sub(pInfo.Timestamp) > p.podMaxInUnschedulablePodsDuration {
            podsToMove = append(podsToMove, pInfo)
        }
    }
    if len(podsToMove) > 0 {
        p.movePodsToActiveOrBackoffQueue(logger, podsToMove, UnschedulableTimeout, nil, nil)
    }
}

4. Summary

New Pods are inserted into the active queue via the Add() method.

The scheduler obtains a Pod for processing by calling Pop().

If scheduling fails, the Pod is placed into the unschedulable map with AddUnschedulableIfNotPresent().

When a dependent Pod becomes ready, AssignedPodAdded() moves it from the unschedulable map to either the active or backoff queue.

The priority queue flushes the unschedulable map every 30 seconds; Pods that have been waiting longer than podMaxInUnschedulablePodsDuration (default 60 s) are re‑queued.

The backoff queue is flushed every second, moving Pods whose backoff period has expired into the active queue.

Although the implementation uses a priority queue, the actual ordering is defined by the lessFunc supplied at creation, allowing custom scheduling priorities.

Kubernetes scheduling queue diagram
Kubernetes scheduling queue diagram
cloud nativeKubernetesGoschedulerqueuePriorityQueueSchedulingQueue
Cloud Native Technology Community
Written by

Cloud Native Technology Community

The Cloud Native Technology Community, part of the CNBPA Cloud Native Technology Practice Alliance, focuses on evangelizing cutting‑edge cloud‑native technologies and practical implementations. It shares in‑depth content, case studies, and event/meetup information on containers, Kubernetes, DevOps, Service Mesh, and other cloud‑native tech, along with updates from the CNBPA alliance.

0 followers
Reader feedback

How this landed with the community

Sign in to like

Rate this article

Was this worth your time?

Sign in to rate
Discussion

0 Comments

Thoughtful readers leave field notes, pushback, and hard-won operational detail here.