Cloud Native 19 min read

Understanding the Core Workflow of Kubernetes Informer in client-go

This article explains the internal workflow of the Kubernetes informer package in client-go, covering its architecture, key components such as Reflector, DeltaFIFO, and Indexer, and provides a step‑by‑step code example that demonstrates how informers are created, registered, started, and used to handle watch events efficiently.

政采云技术
政采云技术
政采云技术
Understanding the Core Workflow of Kubernetes Informer in client-go

The article analyzes the basic process of starting an informer in a Kubernetes controller, emphasizing that both built‑in components and custom controllers need to watch etcd events via the apiserver to implement their control loops.

The client-go informer package offers three main capabilities: a local cache (store), an indexing mechanism (indexer), and event‑handler registration.

Informer Architecture

The upper part of the architecture is provided by client-go, while the lower part is the user‑defined control‑loop logic. The upper part consists of several components:

Reflector

Reflector interacts with the apiserver using list and watch APIs, pushing resource objects into a queue.

DeltaFIFO

DeltaFIFO maintains a FIFO queue ( queue) and a map of items ( items) that store objects together with their change type. The queue holds the order of processing, while items keep the actual objects and their deltas.

type DeltaFIFO struct {
  ...
  // items map[string]Deltas // objects indexed by key
  // queue []string          // FIFO order of keys
  ...
}

Two parts are distinguished:

FIFO – a first‑in‑first‑out queue.

Delta – a map that stores the object together with the operation type (Added, Updated, Deleted, etc.).

Indexer

The indexer is a local store that keeps resource objects and provides indexing functions, keeping the cache in sync with the etcd data.

Basic Example

func main() {
  stopCh := make(chan struct{})
  defer close(stopCh)

  // (1) Build clientset
  masterUrl := "172.27.32.110:8080"
  config, err := clientcmd.BuildConfigFromFlags(masterUrl, "")
  if err != nil { klog.Errorf("BuildConfigFromFlags err: %v", err) }
  clientset, err := k.NewForConfig(config)
  if err != nil { klog.Errorf("Get clientset err: %v", err) }

  // (2) Create shared informer factory
  sharedInformers := informers.NewSharedInformerFactory(clientset, defaultResync)

  // (3) Register a pod informer
  podInformer := sharedInformers.Core().V1().Pods().Informer()

  // (4) Register event handler
  podInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{
    AddFunc: func(obj interface{}) {
      mObj := obj.(v1.Object)
      klog.Infof("Get new obj: %v", mObj)
      klog.Infof("Get new obj name: %s", mObj.GetName())
    },
  })

  // (5) Start all informers
  sharedInformers.Start(stopCh)

  // (6) Wait for cache sync
  if !cache.WaitForCacheSync(stopCh, podInformer.HasSynced) {
    klog.Infof("Cache sync fail!")
  }

  // (7) Use lister to list pods
  podLister := sharedInformers.Core().V1().Pods().Lister()
  pods, err := podLister.List(labels.Everything())
  if err != nil { klog.Infof("err: %v", err) }
  klog.Infof("len(pods), %d", len(pods))
  for _, v := range pods { klog.Infof("pod: %s", v.Name) }

  <-stopCh
}

The example highlights steps (2)–(5): creating a shared informer factory, obtaining a pod informer, adding an event handler, and starting the informer.

Process Analysis

3.1 New a sharedInformers factory

Creating a sharedInformerFactory avoids creating duplicate informers that would overload the apiserver. The factory holds a map of informers and starts them on demand.

sharedInformers := informers.NewSharedInformerFactory(clientset, defaultResync)
factory := &sharedInformerFactory{
  client: client,
  namespace: v1.NamespaceAll,
  defaultResync: defaultResync,
  informers: make(map[reflect.Type]cache.SharedIndexInformer),
  startedInformers: make(map[reflect.Type]bool),
  customResync: make(map[reflect.Type]time.Duration),
}

3.2 Register a informer

The informer is generated and stored in the factory via InformerFor and defaultInformer which ultimately calls NewFilteredPodInformer.

podInformer := sharedInformers.Core().V1().Pods().Informer()
func (f *podInformer) Informer() cache.SharedIndexInformer {
  return f.factory.InformerFor(&corev1.Pod{}, f.defaultInformer)
}

func (f *sharedInformerFactory) InformerFor(obj runtime.Object, newFunc internalinterfaces.NewInformerFunc) cache.SharedIndexInformer {
  informer := newFunc(f.client, resyncPeriod)
  f.informers[informerType] = informer
  return informer
}

3.3 Register event handler

The handler is added to the shared processor, which creates a listener that forwards events through addChnextCh → appropriate callback.

podInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{ AddFunc: func(obj interface{}) { /* ... */ } })

func (s *sharedIndexInformer) AddEventHandler(handler ResourceEventHandler) {
  s.processor.addListener(newProcessorListener(handler, ...))
}

3.4 Start all informers

Calling sharedInformers.Start(stopCh) launches each informer in a separate goroutine.

func (f *sharedInformerFactory) Start(stopCh <-chan struct{}) {
  for informerType, informer := range f.informers {
    if !f.startedInformers[informerType] {
      go informer.Run(stopCh)
      f.startedInformers[informerType] = true
    }
  }
}

Informer Run Logic

The run method creates a DeltaFIFO, builds a controller.Config, and starts the reflector and processing loop.

func (s *sharedIndexInformer) Run(stopCh <-chan struct{}) {
  fifo := NewDeltaFIFOWithOptions(DeltaFIFOOptions{KnownObjects: s.indexer, EmitDeltaTypeReplaced: true})
  cfg := &Config{Queue: fifo, ListerWatcher: s.listerWatcher, ObjectType: s.objectType, FullResyncPeriod: s.resyncCheckPeriod, Process: s.HandleDeltas, ...}
  s.controller = New(cfg)
  s.controller.Run(stopCh)
}

The controller runs the reflector ( ListAndWatch) to fill the queue and a processing loop that pops items, calls the registered handlers, and updates the indexer.

Reflector ListAndWatch

The reflector first performs a list to obtain the current state, stores objects in the DeltaFIFO, then enters a watch loop that converts watch events into deltas and pushes them into the queue.

func (r *Reflector) ListAndWatch(stopCh <-chan struct{}) error {
  // List phase
  list, err := r.listerWatcher.List(opts)
  resourceVersion = listMetaInterface.GetResourceVersion()
  items, err := meta.ExtractList(list)
  r.syncWith(items, resourceVersion)

  // Watch phase
  for {
    w, err := r.listerWatcher.Watch(options)
    if err != nil { return err }
    r.watchHandler(start, w, &resourceVersion, errc, stopCh)
  }
}

DeltaFIFO Production & Consumption

Production occurs in Reflector.watchHandler, where events are added to the FIFO via store.Add (which calls queueActionLocked). Consumption happens in controller.processLoop, which pops items, runs HandleDeltas, and distributes notifications to listeners.

func (f *DeltaFIFO) Add(obj interface{}) error { return f.queueActionLocked(Added, obj) }
func (f *DeltaFIFO) queueActionLocked(action DeltaType, obj interface{}) error { /* update items and queue */ }

func (c *controller) processLoop() {
  for {
    obj, err := c.config.Queue.Pop(PopProcessFunc(c.config.Process))
    // handle err, requeue if needed
  }
}

Conclusion

The informer mechanism is essentially a producer‑consumer system built on DeltaFIFO with a local cache and indexer, providing a convenient API for registering callbacks and efficiently watching Kubernetes resources. Understanding the upper‑layer workflow—Reflector, DeltaFIFO, Indexer, and the shared informer factory—helps developers build reliable custom controllers.

Original Source

Signed-in readers can open the original source through BestHub's protected redirect.

Sign in to view source
Republication Notice

This article has been distilled and summarized from source material, then republished for learning and reference. If you believe it infringes your rights, please contactadmin@besthub.devand we will review it promptly.

KubernetesGoControllerevent-handlingclient-goDeltaFIFOinformer
政采云技术
Written by

政采云技术

ZCY Technology Team (Zero), based in Hangzhou, is a growth-oriented team passionate about technology and craftsmanship. With around 500 members, we are building comprehensive engineering, project management, and talent development systems. We are committed to innovation and creating a cloud service ecosystem for government and enterprise procurement. We look forward to your joining us.

0 followers
Reader feedback

How this landed with the community

Sign in to like

Rate this article

Was this worth your time?

Sign in to rate
Discussion

0 Comments

Thoughtful readers leave field notes, pushback, and hard-won operational detail here.