Cloud Native 7 min read

Deep Dive into kubelet syncLoopIteration: How podUpdateCh Handles Pod Updates

The article dissects kubelet's podUpdateCh, explaining how it ingests pod data from file, URL, and the API server, the Linux‑only file watch, periodic full scans, merging logic, and the underlying code paths that push updates into the channel.

Infra Learning Club
Infra Learning Club
Infra Learning Club
Deep Dive into kubelet syncLoopIteration: How podUpdateCh Handles Pod Updates

kubelet's podUpdateCh is responsible for receiving pod updates from three sources—file, URL, and the Kubernetes API server—and feeding them into the internal processing channel.

Source constants

const (
    // Filesource identifies updates from a file.
    FileSource = "file"
    // HTTPSource identifies updates from querying a web page.
    HTTPSource = "http"
    // ApiserverSource identifies updates from Kubernetes API Server.
    ApiserverSource = "api"
    // AllSource identifies updates from all sources.
    AllSource = "*"
)

File source

The file watch works only on Linux; other OSes lack support. It converts fsnotify events into pod operation types.

func (s *sourceFile) produceWatchEvent(e *fsnotify.Event) error {
    var eventType podEventType
    switch {
    case (e.Op & fsnotify.Create) > 0:
        eventType = podAdd
    case (e.Op & fsnotify.Write) > 0:
        eventType = podModify
    case (e.Op & fsnotify.Chmod) > 0:
        eventType = podModify
    case (e.Op & fsnotify.Remove) > 0:
        eventType = podDelete
    case (e.Op & fsnotify.Rename) > 0:
        eventType = podDelete
    default:
        // Ignore rest events
        return nil
    }
    s.watchEvents <- &watchEvent{e.Name, eventType}
    return nil
}

When a watch event is consumed, the code either adds/modifies the pod in the store or deletes it, handling file‑key mappings as needed.

func (s *sourceFile) consumeWatchEvent(e *watchEvent) error {
    switch e.eventType {
    case podAdd, podModify:
        pod, _ := s.extractFromFile(e.fileName)
        return s.store.Add(pod)
    case podDelete:
        if objKey, keyExist := s.fileKeyMapping[e.fileName]; keyExist {
            pod, podExist, err := s.store.GetByKey(objKey)
            if err = s.store.Delete(pod); err != nil {
                return fmt.Errorf("failed to remove deleted pod from cache: %v", err)
            }
            // ...
        }
    }
    return nil
}

Besides reacting to file events, a time.Ticker triggers a full directory scan every 10 seconds to ensure the store stays up‑to‑date.

func (s *sourceFile) run() {
    listTicker := time.NewTicker(s.period) // default 10s
    go func() {
        if err := s.listConfig(); err != nil {
            klog.ErrorS(err, "Unable to read config path", "path", s.path)
        }
        for {
            select {
            case <-listTicker.C:
                if err := s.listConfig(); err != nil {
                    klog.ErrorS(err, "Unable to read config path", "path", s.path)
                }
            case e := <-s.watchEvents:
                if err := s.consumeWatchEvent(e); err != nil {
                    klog.ErrorS(err, "Unable to process watch event")
                }
            }
        }
    }()
    s.startWatch()
}

URL source

The URL source periodically (every 20 seconds) fetches pod definitions from a user‑configured external service via HTTP GET. The response may be a single object or an array, both of which are parsed and then merged.

func (s *sourceURL) run() {
    if err := s.extractFromURL(); err != nil {
        // handle error
    }
}

API source

For the API source, kubelet uses a list‑watch against the kube‑apiserver, filtering pods by the current node name.

lw := cache.NewListWatchFromClient(
    c.CoreV1().RESTClient(),
    "pods",
    metav1.NamespaceAll,
    fields.OneTermEqualSelector("spec.nodeName", string(nodeName)),
)

The store passed to the reflector implements a send function identical to the file source: any pod change triggers a full push of all pods to the merge step.

r := cache.NewReflector(lw, &v1.Pod{}, cache.NewUndeltaStore(send, cache.MetaNamespaceKeyFunc), 0)

Merge step

The merge function combines pods from the three sources, de‑duplicates them, resolves conflicts, and produces a consistent set of operations. It returns separate slices for adds, updates, deletes, removes, and reconciles.

adds, updates, deletes, removes, reconciles := s.merge(source, change)

Determining whether an existing pod needs an update is handled by checkAndUpdatePod, which examines spec, labels, annotations, deletion timestamps, and status changes.

func checkAndUpdatePod(existing, ref *v1.Pod) (needUpdate, needReconcile, needGracefulDelete bool) {
    // 1. Compare Spec, Labels, DeletionTimestamp, DeletionGracePeriodSeconds, Annotations
    // 2. If unchanged, compare Status; update only if Status changed
    // 3. If DeletionTimestamp is set, mark for deletion
    // ...
}

Overall, the article walks through the complete read‑write flow of podUpdateCh, from low‑level file system events to high‑level API watches, illustrating how kubelet maintains an up‑to‑date view of pods for the node.

Original Source

Signed-in readers can open the original source through BestHub's protected redirect.

Sign in to view source
Republication Notice

This article has been distilled and summarized from source material, then republished for learning and reference. If you believe it infringes your rights, please contactadmin@besthub.devand we will review it promptly.

Kuberneteskubeletsource codepodUpdateChsyncLoopIteration
Infra Learning Club
Written by

Infra Learning Club

Infra Learning Club shares study notes, cutting-edge technology, and career discussions.

0 followers
Reader feedback

How this landed with the community

Sign in to like

Rate this article

Was this worth your time?

Sign in to rate
Discussion

0 Comments

Thoughtful readers leave field notes, pushback, and hard-won operational detail here.