Kubelet Source Dive: syncLoopIteration (Part 3) – How probeCh Is Built from Probe Managers
The article explains that the apparent probeCh in kubelet is actually three separate channels—livenessCh, readinessCh, and startupCh—managed by livenessManager, readinessManager, and startupManager, details the ProbeManager implementation that creates probe workers via AddPod, and shows how syncLoopIteration processes probe updates to adjust pod status.
In the kubelet source code there is no single probeCh channel; instead three distinct channels— livenessCh, readinessCh, and startupCh —are created by the livenessManager, readinessManager and startupManager. These channels are unified in the diagram but remain separate in the implementation.
ProbeManager
The core logic resides in the probeManager (type prober.Manager). The manager creates a probe “worker” for each container that has a probe configured (via AddPod). Workers run periodically, cache their results, and the manager uses the cache when updating a pod’s Ready condition ( UpdatePodStatus). Updating probe parameters is currently unsupported.
type Manager interface {
// AddPod creates new probe workers for every container probe. This should be called for every pod created.
AddPod(pod *v1.Pod)
// StopLivenessAndStartup handles stopping liveness and startup probes during termination.
StopLivenessAndStartup(pod *v1.Pod)
// RemovePod handles cleaning up the removed pod state, including terminating probe workers and deleting cached results.
RemovePod(pod *v1.Pod)
// CleanupPods handles cleaning up pods which should no longer be running.
// It takes a map of "desired pods" which should not be cleaned up.
CleanupPods(desiredPods map[types.UID]sets.Empty)
// UpdatePodStatus modifies the given PodStatus with the appropriate Ready state for each container based on container running status, cached probe results and worker states.
UpdatePodStatus(*v1.Pod, *v1.PodStatus)
}The AddPod method iterates over all containers (including restartable init containers). For each configured probe type it checks whether a worker already exists; if not, it creates a new worker and launches it in its own goroutine.
func (m *manager) AddPod(pod *v1.Pod) {
m.workerLock.Lock()
defer m.workerLock.Unlock()
key := probeKey{podUID: pod.UID}
for _, c := range append(pod.Spec.Containers, getRestartableInitContainers(pod)...) {
key.containerName = c.Name
if c.StartupProbe != nil {
key.probeType = startup
if _, ok := m.workers[key]; ok { return }
w := newWorker(m, startup, pod, c)
m.workers[key] = w
go w.run()
}
if c.ReadinessProbe != nil {
key.probeType = readiness
if _, ok := m.workers[key]; ok { return }
w := newWorker(m, readiness, pod, c)
m.workers[key] = w
go w.run()
}
if c.LivenessProbe != nil {
key.probeType = liveness
if _, ok := m.workers[key]; ok { return }
w := newWorker(m, liveness, pod, c)
m.workers[key] = w
go w.run()
}
}
}When a worker runs, it calls runProbe, which selects the appropriate probe implementation based on the fields set in v1.Probe (Exec, HTTPGet, TCPSocket, or GRPC) and forwards the request to the corresponding probe subsystem.
func (pb *prober) runProbe(ctx context.Context, probeType probeType, p *v1.Probe, pod *v1.Pod, status v1.PodStatus, container v1.Container, containerID kubecontainer.ContainerID) (probe.Result, string, error) {
switch {
case p.Exec != nil:
command := kubecontainer.ExpandContainerCommandOnlyStatic(p.Exec.Command, container.Env)
return pb.exec.Probe(pb.newExecInContainer(ctx, container, containerID, command, timeout))
case p.HTTPGet != nil:
req, err := httpprobe.NewRequestForHTTPGetAction(p.HTTPGet, &container, status.PodIP, "probe")
return pb.http.Probe(req, timeout)
case p.TCPSocket != nil:
port, err := probe.ResolveContainerPort(p.TCPSocket.Port, &container)
return pb.tcp.Probe(host, port, timeout)
case p.GRPC != nil:
host := status.PodIP
return pb.grpc.Probe(host, service, int(p.GRPC.Port), timeout)
default:
klog.InfoS("Failed to find probe builder for container", "containerName", container.Name)
return probe.Unknown, "", fmt.Errorf("missing probe handler for %s:%s", format.Pod(pod), container.Name)
}
}The results of the three probes are written back to the prober.Manager. Each manager exposes an Updates() channel; syncLoopIteration reads from these channels and invokes handleProbeSync with the appropriate status string ("unhealthy", "ready", "started", etc.).
func (kl *Kubelet) syncLoopIteration(ctx context.Context, configCh <-chan kubetypes.PodUpdate, handler SyncHandler,
syncCh <-chan time.Time, housekeepingCh <-chan time.Time, plegCh <-chan *pleg.PodLifecycleEvent) bool {
select {
case update := <-kl.livenessManager.Updates():
if update.Result == proberesults.Failure {
handleProbeSync(kl, update, handler, "liveness", "unhealthy")
}
case update := <-kl.readinessManager.Updates():
ready := update.Result == proberesults.Success
kl.statusManager.SetContainerReadiness(update.PodUID, update.ContainerID, ready)
status := ""
if ready { status = "ready" }
handleProbeSync(kl, update, handler, "readiness", status)
case update := <-kl.startupManager.Updates():
started := update.Result == proberesults.Success
kl.statusManager.SetContainerStartup(update.PodUID, update.ContainerID, started)
status := "unhealthy"
if started { status = "started" }
handleProbeSync(kl, update, handler, "startup", status)
}
return true
}The analysis clarifies that the three probe channels are distinct, how workers are created and scheduled, and how the kubelet sync loop integrates probe results into pod status updates.
Signed-in readers can open the original source through BestHub's protected redirect.
This article has been distilled and summarized from source material, then republished for learning and reference. If you believe it infringes your rights, please contactand we will review it promptly.
Infra Learning Club
Infra Learning Club shares study notes, cutting-edge technology, and career discussions.
How this landed with the community
Was this worth your time?
0 Comments
Thoughtful readers leave field notes, pushback, and hard-won operational detail here.
