Cloud Native 11 min read

Deep Dive into Kubelet syncLoopIteration: How PLEG Handles Pod Events (Part 2)

This article dissects the kubelet source code, explaining how the Pod Lifecycle Event Generator (PLEG) works in both GenericPLEG and EventedPLEG modes, how events are streamed, processed, and fed into syncLoopIteration, and how the kubelet reacts to container lifecycle changes.

Infra Learning Club
Infra Learning Club
Infra Learning Club
Deep Dive into Kubelet syncLoopIteration: How PLEG Handles Pod Events (Part 2)

Pod Lifecycle Event Generator (PLEG) Overview

PLEG (Pod Lifecycle Event Generator) updates a pod's status by reading container events. It provides two implementations: GenericPLEG , which polls events at a fixed interval, and EventedPLEG , which receives events via a push stream.

GenericPLEG

GenericPLEG is configured to read events once per second. genericPlegRelistPeriod = time.Second * 1 After Start() is called, a goroutine repeatedly invokes g.Relist every second.

func (g *GenericPLEG) Start() {
    if !g.isRunning {
        go wait.Until(g.Relist, g.relistDuration.RelistPeriod, g.stopCh)
    }
}

The g.Relist method performs the following steps:

Obtain all pods on the node using the CRI API methods ListPodSandbox and ListContainers.

// Get all the pods.
podList, err := g.runtime.GetPods(ctx, true)

Update the internal podRecords with the current pod list. g.podRecords.setCurrent(pods) Iterate over the combined old and current pod containers, compare their states, and record any changes.

func convertState(state kubecontainer.State) plegContainerState {
    switch state {
    case kubecontainer.ContainerStateCreated:
        // kubelet doesn't use the "created" state yet, hence convert it to "unknown".
        return plegContainerUnknown
    case kubecontainer.ContainerStateRunning:
        return plegContainerRunning
    case kubecontainer.ContainerStateExited:
        return plegContainerExited
    case kubecontainer.ContainerStateUnknown:
        return plegContainerUnknown
    default:
        panic(fmt.Sprintf("unrecognized container state: %v", state))
    }
}
func generateEvents(podID types.UID, cid string, oldState, newState plegContainerState) []*PodLifecycleEvent {
    switch newState {
    case plegContainerRunning:
        return []*PodLifecycleEvent{{ID: podID, Type: ContainerStarted, Data: cid}}
    case plegContainerExited:
        return []*PodLifecycleEvent{{ID: podID, Type: ContainerDied, Data: cid}}
    case plegContainerUnknown:
        return []*PodLifecycleEvent{{ID: podID, Type: ContainerChanged, Data: cid}}
    case plegContainerNonExistent:
        switch oldState {
        case plegContainerExited:
            // We already reported that the container died before.
            return []*PodLifecycleEvent{{ID: podID, Type: ContainerRemoved, Data: cid}}
        default:
            return []*PodLifecycleEvent{{ID: podID, Type: ContainerDied, Data: cid}, {ID: podID, Type: ContainerRemoved, Data: cid}}
        }
    default:
        panic(fmt.Sprintf("unrecognized container state: %v", newState))
    }
}

Send the generated events to eventChannel, where they are later consumed by syncLoopIteration. If the channel is full, the event is discarded and a metric is incremented.

for pid, events := range eventsByPodID {
    for i := range events {
        if events[i].Type == ContainerChanged {
            continue
        }
        select {
        case g.eventChannel <- events[i]:
        default:
            metrics.PLEGDiscardEvents.Inc()
            klog.ErrorS(nil, "Event channel is full, discard this relist() cycle event")
        }
    }
}

EventedPLEG

EventedPLEG creates a GenericPLEG as a fallback when the number of stream‑receive failures exceeds the configured limit (default 5). Its fallback interval is 300 seconds.

When Start() is invoked, two goroutines are launched: one to watch the event stream and another to periodically update a global cache.

func (e *EventedPLEG) Start() {
    go wait.Until(e.watchEventsChannel, 0, e.stopCh)
    go wait.Until(e.updateGlobalCache, globalCacheUpdatePeriod, e.stopCacheUpdateCh)
}

The watchEventsChannel routine first checks whether the maximum retry count ( e.eventedPlegMaxStreamRetries = 5) has been reached. If so, it stops the EventedPLEG, switches the relist period back to the GenericPLEG default, and starts the GenericPLEG.

func (e *EventedPLEG) watchEventsChannel() {
    go func() {
        numAttempts := 0
        for {
            if numAttempts >= e.eventedPlegMaxStreamRetries {
                e.Stop()
                e.genericPleg.Stop()
                e.Update(e.relistDuration)
                e.genericPleg.Start()
                break
            }
            err := e.runtimeService.GetContainerEvents(context.Background(), containerEventsResponseCh, func(runtimeapi.RuntimeService_GetContainerEventsClient) {
                metrics.EventedPLEGConn.Inc()
            })
            if err != nil {
                // handle error and increment attempts
                numAttempts++
            }
        }
    }()
}

The underlying gRPC method GetContainerEvents returns a bidirectional stream, allowing the kubelet to receive container events as they occur.

rpc GetContainerEvents(GetEventsRequest) returns (stream ContainerEventResponse) {}

Received events are processed by processCRIEvents, which filters and forwards them to the GenericPLEG's eventChannel. Specific container event types (created, started, stopped, deleted) are translated into corresponding PodLifecycleEvent values and logged.

func (e *EventedPLEG) processCRIEvent(event *runtimeapi.ContainerEventResponse) {
    switch event.ContainerEventType {
    case runtimeapi.ContainerEventType_CONTAINER_STOPPED_EVENT:
        e.sendPodLifecycleEvent(&PodLifecycleEvent{ID: types.UID(event.PodSandboxStatus.Metadata.Uid), Type: ContainerDied, Data: event.ContainerId})
        klog.V(4).InfoS("Received Container Stopped Event", "event", event.String())
    case runtimeapi.ContainerEventType_CONTAINER_CREATED_EVENT:
        klog.V(4).InfoS("Received Container Created Event", "event", event.String())
    case runtimeapi.ContainerEventType_CONTAINER_STARTED_EVENT:
        e.sendPodLifecycleEvent(&PodLifecycleEvent{ID: types.UID(event.PodSandboxStatus.Metadata.Uid), Type: ContainerStarted, Data: event.ContainerId})
        klog.V(4).InfoS("Received Container Started Event", "event", event.String())
    case runtimeapi.ContainerEventType_CONTAINER_DELETED_EVENT:
        e.sendPodLifecycleEvent(&PodLifecycleEvent{ID: types.UID(event.PodSandboxStatus.Metadata.Uid), Type: ContainerDied, Data: event.ContainerId})
        e.sendPodLifecycleEvent(&PodLifecycleEvent{ID: types.UID(event.PodSandboxStatus.Metadata.Uid), Type: ContainerRemoved, Data: event.ContainerId})
        klog.V(4).InfoS("Received Container Deleted Event", "event", event)
    }
}

syncLoopIteration

The kubelet's main loop receives PLEG events via the plegCh channel. For each event, it checks whether the pod still exists, logs the event, and invokes the sync handler.

func (kl *Kubelet) syncLoopIteration(ctx context.Context, configCh <-chan kubetypes.PodUpdate, handler SyncHandler,
    syncCh <-chan time.Time, housekeepingCh <-chan time.Time, plegCh <-chan *pleg.PodLifecycleEvent) bool {
    select {
    case e := <-plegCh:
        if isSyncPodWorthy(e) {
            if pod, ok := kl.podManager.GetPodByUID(e.ID); ok {
                klog.V(2).InfoS("SyncLoop (PLEG): event for pod", "pod", klog.KObj(pod), "event", e)
                handler.HandlePodSyncs([]*v1.Pod{pod})
            } else {
                klog.V(4).InfoS("SyncLoop (PLEG): pod does not exist, ignore irrelevant event", "event", e)
            }
        }
        if e.Type == pleg.ContainerDied {
            if containerID, ok := e.Data.(string); ok {
                kl.cleanUpContainersInPod(e.ID, containerID)
            }
        }
    }
    return true
}

The core execution flow is:

If the event type is not ContainerRemoved, and the pod is still present in the pod manager, the kubelet calls HandlePodSyncs, which ultimately triggers an UpdatePod operation on the pod worker.

If the event type is ContainerDied, the kubelet cleans up the corresponding container.

Through this chain, the kubelet translates low‑level container lifecycle notifications into higher‑level pod sync actions, ensuring the node's view of pod status stays consistent with the container runtime.

Original Source

Signed-in readers can open the original source through BestHub's protected redirect.

Sign in to view source
Republication Notice

This article has been distilled and summarized from source material, then republished for learning and reference. If you believe it infringes your rights, please contactadmin@besthub.devand we will review it promptly.

KubernetesPLEGsyncLoopIterationEventedPLEGGenericPLEG
Infra Learning Club
Written by

Infra Learning Club

Infra Learning Club shares study notes, cutting-edge technology, and career discussions.

0 followers
Reader feedback

How this landed with the community

Sign in to like

Rate this article

Was this worth your time?

Sign in to rate
Discussion

0 Comments

Thoughtful readers leave field notes, pushback, and hard-won operational detail here.