Cloud Native 19 min read

Inside Kubernetes kube-scheduler: A Deep Dive into Its Code Structure and Scheduling Logic

This article dissects the internal architecture of Kubernetes' kube-scheduler, walking through its initialization with Cobra, the Setup function, the creation of scheduler instances, the priority queue mechanics, scheduling cycles, and binding processes, providing comprehensive code examples to illuminate each step of the scheduling workflow.

MaGe Linux Operations
MaGe Linux Operations
MaGe Linux Operations
Inside Kubernetes kube-scheduler: A Deep Dive into Its Code Structure and Scheduling Logic

0. Preface

This section introduces the kube-scheduler code structure and explains that the article will analyze the scheduling logic from a source‑code perspective.

1. Start kube-scheduler

kube-scheduler

uses the Cobra framework to initialize parameters, configuration, and the command line interface.

func main() {
    // Start kube-scheduler entry point
    command := app.NewSchedulerCommand()
    ...
}

func NewSchedulerCommand(registryOptions ...Option) *cobra.Command {
    // Create kube-scheduler options
    opts := options.NewOptions()
    cmd := &cobra.Command{
        Use: "kube-scheduler",
        ...
        RunE: func(cmd *cobra.Command, args []string) error {
            return runCommand(cmd, opts, registryOptions...)
        },
        ...
    }
    ...
}

func runCommand(cmd *cobra.Command, opts *options.Options, registryOptions ...Option) error {
    // Create kube-scheduler config cc and instance sched
    cc, sched, err := Setup(ctx, opts, registryOptions...)
    if err != nil {
        return err
    }
    ...
    return Run(ctx, cc, sched)
}

The key function is Setup, which creates the scheduler configuration cc and the scheduler instance sched.

func Setup(ctx context.Context, opts *options.Options, outOfTreeRegistryOptions ...Option) (*schedulerserverconfig.CompletedConfig, *scheduler.Scheduler, error) {
    // Validate options
    if errs := opts.Validate(); len(errs) > 0 {
        return nil, nil, utilerrors.NewAggregate(errs)
    }
    // Create config from options
    c, err := opts.Config(ctx)
    if err != nil {
        return nil, nil, err
    }
    cc := c.Complete()
    // Register out‑of‑tree plugins
    outOfTreeRegistry := make(runtime.Registry)
    for _, option := range outOfTreeRegistryOptions {
        if err := option(outOfTreeRegistry); err != nil {
            return nil, nil, err
        }
    }
    ...
    // Create scheduler instance
    sched, err := scheduler.New(ctx, cc.Client, cc.InformerFactory, cc.DynInformerFactory, recorderFactory,
        scheduler.WithComponentConfigVersion(cc.ComponentConfig.TypeMeta.APIVersion),
        scheduler.WithKubeConfig(cc.KubeConfig),
        scheduler.WithProfiles(cc.ComponentConfig.Profiles...),
        scheduler.WithPercentageOfNodesToScore(cc.ComponentConfig.PercentageOfNodesToScore),
        scheduler.WithFrameworkOutOfTreeRegistry(outOfTreeRegistry),
        scheduler.WithPodMaxBackoffSeconds(cc.ComponentConfig.PodMaxBackoffSeconds),
        scheduler.WithPodInitialBackoffSeconds(cc.ComponentConfig.PodInitialBackoffSeconds),
        scheduler.WithPodMaxInUnschedulablePodsDuration(cc.PodMaxInUnschedulablePodsDuration),
        scheduler.WithExtenders(cc.ComponentConfig.Extenders...),
        scheduler.WithParallelism(cc.ComponentConfig.Parallelism),
        scheduler.WithBuildFrameworkCapturer(func(profile kubeschedulerconfig.KubeSchedulerProfile) {
            completedProfiles = append(completedProfiles, profile)
        }),
    )
    ...
    return &cc, sched, nil
}

The scheduler.New function creates the scheduler instance sched and ties together the snapshot, event handlers, and profiles.

func New(ctx context.Context, client clientset.Interface, informerFactory informers.SharedInformerFactory, dynInformerFactory dynamicinformer.DynamicSharedInformerFactory, recorderFactory profile.RecorderFactory, opts ...Option) (*Scheduler, error) {
    // Register in‑tree plugins
    registry := frameworkplugins.NewInTreeRegistry()
    // Merge in‑tree and out‑of‑tree plugins
    if err := registry.Merge(options.frameworkOutOfTreeRegistry); err != nil {
        return nil, err
    }
    metrics.Register()
    extenders, err := buildExtenders(logger, options.extenders, options.profiles)
    if err != nil {
        return nil, fmt.Errorf("couldn't build extenders: %w", err)
    }
    podLister := informerFactory.Core().V1().Pods().Lister()
    nodeLister := informerFactory.Core().V1().Nodes().Lister()
    snapshot := internalcache.NewEmptySnapshot()
    profiles, err := profile.NewMap(ctx, options.profiles, registry, recorderFactory,
        frameworkruntime.WithComponentConfigVersion(options.componentConfigVersion),
        frameworkruntime.WithClientSet(client),
        frameworkruntime.WithKubeConfig(options.kubeConfig),
        frameworkruntime.WithInformerFactory(informerFactory),
        frameworkruntime.WithSnapshotSharedLister(snapshot),
        frameworkruntime.WithCaptureProfile(frameworkruntime.CaptureProfile(options.frameworkCapturer)),
        frameworkruntime.WithParallelism(int(options.parallelism)),
        frameworkruntime.WithExtenders(extenders),
        frameworkruntime.WithMetricsRecorder(metricsRecorder),
    )
    ...
    podQueue := internalqueue.NewSchedulingQueue(
        profiles[options.profiles[0].SchedulerName].QueueSortFunc(),
        informerFactory,
        internalqueue.WithPodInitialBackoffDuration(time.Duration(options.podInitialBackoffSeconds)*time.Second),
        internalqueue.WithPodMaxBackoffDuration(time.Duration(options.podMaxBackoffSeconds)*time.Second),
        internalqueue.WithPodLister(podLister),
        internalqueue.WithPodMaxInUnschedulablePodsDuration(options.podMaxInUnschedulablePodsDuration),
        internalqueue.WithPreEnqueuePluginMap(preEnqueuePluginMap),
        internalqueue.WithQueueingHintMapPerProfile(queueingHintsPerProfile),
        internalqueue.WithPluginMetricsSamplePercent(pluginMetricsSamplePercent),
        internalqueue.WithMetricsRecorder(*metricsRecorder),
    )
    schedulerCache := internalcache.New(ctx, durationToExpireAssumedPod)
    sched := &Scheduler{
        Cache: schedulerCache,
        client: client,
        nodeInfoSnapshot: snapshot,
        percentageOfNodesToScore: options.percentageOfNodesToScore,
        Extenders: extenders,
        StopEverything: stopEverything,
        SchedulingQueue: podQueue,
        Profiles: profiles,
        logger: logger,
    }
    sched.NextPod = podQueue.Pop
    if err = addAllEventHandlers(sched, informerFactory, dynInformerFactory, unionedGVKs(queueingHintsPerProfile)); err != nil {
        return nil, fmt.Errorf("adding event handlers: %w", err)
    }
    return sched, nil
}

The scheduler instance now holds the snapshot, event handlers, and profiles, ready for the main scheduling loop.

2. Run kube-scheduler

After all objects are created, the scheduler is started by calling Run, which performs leader election, starts informers, and begins processing pods.

func Run(ctx context.Context, cc *schedulerserverconfig.CompletedConfig, sched *scheduler.Scheduler) error {
    // Leader election (skip if single node)
    waitingForLeader := make(chan struct{})
    isLeader := func() bool {
        select {
        case _, ok := <-waitingForLeader:
            return !ok
        default:
            return false
        }
    }
    // Start informers and wait for caches to sync
    startInformersAndWaitForSync := func(ctx context.Context) {
        cc.InformerFactory.Start(ctx.Done())
        if cc.DynInformerFactory != nil {
            cc.DynInformerFactory.Start(ctx.Done())
        }
        cc.InformerFactory.WaitForCacheSync(ctx.Done())
        if cc.DynInformerFactory != nil {
            cc.DynInformerFactory.WaitForCacheSync(ctx.Done())
        }
        if err := sched.WaitForHandlersSync(ctx); err != nil {
            logger.Error(err, "waiting for handlers to sync")
        }
        logger.V(3).Info("Handlers synced")
    }
    if !cc.ComponentConfig.DelayCacheUntilActive || cc.LeaderElection == nil {
        startInformersAndWaitForSync(ctx)
    }
    // Run the scheduler main loop
    close(waitingForLeader)
    sched.Run(ctx)
    return fmt.Errorf("finished without leader elect")
}

The Run function contains three main steps: leader election, informer synchronization, and the scheduler loop.

Elect a leader (skipped in single‑node clusters).

Run informers to watch pod and node changes.

Execute the scheduler.

The scheduler loop repeatedly pulls pods from the priority queue and processes them.

func (sched *Scheduler) Run(ctx context.Context) {
    // Pull pods from the queue
    sched.SchedulingQueue.Run(logger)
    // Schedule pods
    go wait.UntilWithContext(ctx, sched.scheduleOne, 0)
    <-ctx.Done()
    ...
}

The priority queue consists of ActiveQ, BackoffQ, and UnschedulableQ. Two goroutines continuously flush pods between these queues.

func (p *PriorityQueue) Run(logger klog.Logger) {
    go wait.Until(func() { p.flushBackoffQCompleted(logger) }, 1*time.Second, p.stop)
    go wait.Until(func() { p.flushUnschedulablePodsLeftover(logger) }, 30*time.Second, p.stop)
}
sched.NextPod

blocks until ActiveQ has a pod, then returns it for scheduling.

func (p *PriorityQueue) Pop(logger klog.Logger) (*framework.QueuedPodInfo, error) {
    for p.activeQ.Len() == 0 {
        if p.closed {
            logger.V(2).Info("Scheduling queue is closed")
            return nil, nil
        }
        p.cond.Wait()
    }
    obj, err := p.activeQ.Pop()
    if err != nil {
        return nil, err
    }
    return obj.(*framework.QueuedPodInfo), nil
}

The scheduling cycle performs the following steps:

func (sched *Scheduler) schedulingCycle(ctx context.Context, state *framework.CycleState, fwk framework.Framework, podInfo *framework.QueuedPodInfo, start time.Time, podsToActivate *framework.PodsToActivate) (ScheduleResult, *framework.QueuedPodInfo, *framework.Status) {
    // Schedule the pod
    scheduleResult, err := sched.SchedulePod(ctx, fwk, state, pod)
    assumedPodInfo := podInfo.DeepCopy()
    assumedPod := assumedPodInfo.Pod
    err = sched.assume(logger, assumedPod, scheduleResult.SuggestedHost)
    // Run Reserve plugins
    if sts := fwk.RunReservePluginsReserve(ctx, state, assumedPod, scheduleResult.SuggestedHost); !sts.IsSuccess() {
        ...
    }
    // Run Permit plugins
    runPermitStatus := fwk.RunPermitPlugins(ctx, state, assumedPod, scheduleResult.SuggestedHost)
    if !runPermitStatus.IsWait() && !runPermitStatus.IsSuccess() {
        ...
    }
    return scheduleResult, assumedPodInfo, nil
}
SchedulePod

finds feasible nodes, runs filter plugins, and then priority plugins to pick the best node.

func (sched *Scheduler) SchedulePod(ctx context.Context, fwk framework.Framework, state *framework.CycleState, pod *v1.Pod) (ScheduleResult, error) {
    feasibleNodes, diagnosis, err := sched.findNodesThatFitPod(ctx, fwk, state, pod)
    if err != nil {
        return ScheduleResult{}, err
    }
    if len(feasibleNodes) == 1 {
        return ScheduleResult{SuggestedHost: feasibleNodes[0].Node().Name, EvaluatedNodes: 1 + len(diagnosis.NodeToStatusMap), FeasibleNodes: 1}, nil
    }
    priorityList, err := sched.prioritizeNodes(ctx, fwk, state, pod, feasibleNodes)
    if err != nil {
        return ScheduleResult{}, err
    }
    host, _, err := selectHost(priorityList, numberOfHighestScoredNodesToReport)
    return ScheduleResult{SuggestedHost: host, EvaluatedNodes: len(feasibleNodes) + len(diagnosis.NodeToStatusMap), FeasibleNodes: len(feasibleNodes)}, err
}

The assume step updates the pod's NodeName and adds it to the scheduler cache, allowing the scheduler to treat the pod as already placed without waiting for the API server.

func (sched *Scheduler) assume(logger klog.Logger, assumed *v1.Pod, host string) error {
    assumed.Spec.NodeName = host
    if err := sched.Cache.AssumePod(logger, assumed); err != nil {
        logger.Error(err, "Scheduler cache AssumePod failed")
        return err
    }
    return nil
}

After a successful scheduling cycle, the binding cycle runs asynchronously to bind the pod to the chosen node.

func (sched *Scheduler) bindingCycle(ctx context.Context, state *framework.CycleState, fwk framework.Framework, scheduleResult ScheduleResult, assumedPodInfo *framework.QueuedPodInfo, start time.Time, podsToActivate *framework.PodsToActivate) *framework.Status {
    // Wait on Permit plugins
    if status := fwk.WaitOnPermit(ctx, assumedPod); !status.IsSuccess() {
        return status
    }
    // Run PreBind plugins
    if status := fwk.RunPreBindPlugins(ctx, state, assumedPod, scheduleResult.SuggestedHost); !status.IsSuccess() {
        return status
    }
    // Run Bind plugins (default binder sends a Binding request to the API server)
    if status := sched.bind(ctx, fwk, assumedPod, scheduleResult.SuggestedHost, state); !status.IsSuccess() {
        return status
    }
    // Run PostBind plugins
    fwk.RunPostBindPlugins(ctx, state, assumedPod, scheduleResult.SuggestedHost)
    return nil
}

func (sched *Scheduler) bind(ctx context.Context, fwk framework.Framework, assumed *v1.Pod, targetNode string, state *framework.CycleState) *framework.Status {
    return fwk.RunBindPlugins(ctx, state, assumed, targetNode)
}

The default bind plugin creates a v1.Binding object and calls the Kubernetes API server to bind the pod to the node.

func (b DefaultBinder) Bind(ctx context.Context, state *framework.CycleState, p *v1.Pod, nodeName string) *framework.Status {
    logger.V(3).Info("Attempting to bind pod to node", "pod", klog.KObj(p), "node", klog.KRef("", nodeName))
    binding := &v1.Binding{ObjectMeta: metav1.ObjectMeta{Namespace: p.Namespace, Name: p.Name, UID: p.UID}, Target: v1.ObjectReference{Kind: "Node", Name: nodeName}}
    err := b.handle.ClientSet().CoreV1().Pods(binding.Namespace).Bind(ctx, binding, metav1.CreateOptions{})
    if err != nil {
        return framework.AsStatus(err)
    }
    return nil
}

3. Summary

The article provides a comprehensive source‑code analysis of the Kubernetes kube-scheduler workflow, covering initialization, configuration, the scheduling queue, scheduling cycles, and binding operations, thereby offering a clear understanding of how pods are scheduled and bound within the cluster.

Original Source

Signed-in readers can open the original source through BestHub's protected redirect.

Sign in to view source
Republication Notice

This article has been distilled and summarized from source material, then republished for learning and reference. If you believe it infringes your rights, please contactadmin@besthub.devand we will review it promptly.

cloud-nativecode analysisGoSchedulerkube-scheduler
MaGe Linux Operations
Written by

MaGe Linux Operations

Founded in 2009, MaGe Education is a top Chinese high‑end IT training brand. Its graduates earn 12K+ RMB salaries, and the school has trained tens of thousands of students. It offers high‑pay courses in Linux cloud operations, Python full‑stack, automation, data analysis, AI, and Go high‑concurrency architecture. Thanks to quality courses and a solid reputation, it has talent partnerships with numerous internet firms.

0 followers
Reader feedback

How this landed with the community

Sign in to like

Rate this article

Was this worth your time?

Sign in to rate
Discussion

0 Comments

Thoughtful readers leave field notes, pushback, and hard-won operational detail here.