Cloud Native 7 min read

Implementing a Sharded WorkQueue in controller-runtime for Partitioned Concurrency

The article explains how to design and implement a sharded WorkQueue in Kubernetes' controller-runtime, enabling concurrent processing while guaranteeing that resources belonging to the same node are handled sequentially within a single goroutine, using hash-based partitioning and custom queue structures.

Infra Learning Club
Infra Learning Club
Infra Learning Club
Implementing a Sharded WorkQueue in controller-runtime for Partitioned Concurrency

client-go currently provides three types of Work Queues—generic, rate‑limited, and delayed—under the staging/src/k8s.io/client-go package.

A new requirement arose: the controller must coordinate resources concurrently, yet ensure that a group of related resources (e.g., all Pods on the same Node) are processed in the same goroutine, similar to Kafka's partitioned queues.

To satisfy this, a custom Shard Queue was created. Before a Pod enters the queue, a hash value is computed to decide which sub‑queue it should be placed into.

The core structure is defined as follows:

The shards array holds rate‑limiting queues; the hash of the request determines the target shard. HashFn computes a hash from reconcile.Request and the shard count. CurrentShard records the latest shard index assigned to a goroutine. GoroutineSharding maps a goroutine ID to its shard index.

type CacheKey[T reconcile.Request] func(T reconcile.Request, shardCount int) int

type TypedShardedQueueConfig[T reconcile.Request] struct {
    Name               string
    shards []workqueue.TypedRateLimitingInterface[reconcile.Request]
    HashFn             CacheKey[reconcile.Request]
    CurrentShard       int
    GoroutineSharding  map[int64]int
    shardCount         int
    mu                 sync.RWMutex
}

The existing rate‑limited queue achieves concurrency by launching multiple goroutines that safely pull items from a single queue; however, those goroutines cannot be directly mapped to specific partitions.

HashFn

Users implement HashFn according to their resource type. The example for Pods fetches the full Pod object via the client, extracts NodeName, and hashes it to obtain a shard index.

func (r *CustomController) Hash(value reconcile.Request, shardCount int) int {
    var pod corev1.Pod
    if err := r.Client.Get(context.Background(), client.ObjectKey{Namespace: value.Namespace, Name: value.Name}, &pod); err != nil {
        return 0
    }
    if pod.Spec.NodeName == "" {
        return 0
    }
    h := fnv.New32a()
    h.Write([]byte(pod.Spec.NodeName))
    if shardCount > 0 {
        return int(h.Sum32() % uint32(shardCount))
    }
    return 0
}

The Get call works because client‑go stores the object in its internal Store before the work‑queue processing step, making the full object available during hashing.

Get method

The first goroutine that calls Get checks GoroutineSharding for an existing mapping; if none exists, it records the current shard index. Subsequent calls retrieve items from the mapped shard, ensuring the same goroutine always processes the same partition.

func (sq *TypedShardedQueueConfig[T]) Get() (T, bool) {
    goroutineID := GetGoroutineID()
    if goroutineID == -1 {
        panic("goroutine ID is invalid")
    }
    var zero T
    sq.mu.RLock()
    if shads, ok := sq.GoroutineSharding[goroutineID]; ok {
        sq.mu.RUnlock()
        if item, shutdown := sq.shards[shads].Get(); !shutdown {
            return T(item), shutdown
        }
        return zero, true
    }
    sq.mu.RUnlock()

    sq.mu.Lock()
    sq.GoroutineSharding[goroutineID] = sq.CurrentShard
    sq.CurrentShard += 1
    sq.mu.Unlock()

    sq.mu.RLock()
    if shads, ok := sq.GoroutineSharding[goroutineID]; ok {
        sq.mu.RUnlock()
        if item, shutdown := sq.shards[shads].Get(); !shutdown {
            return T(item), shutdown
        }
        return zero, true
    }
    return zero, true
}

Add

The Add operation simply computes the shard index via HashFn and inserts the item into the corresponding sub‑queue.

func (sq *TypedShardedQueueConfig[T]) Add(item T) {
    hashKey := sq.HashFn(reconcile.Request(item), sq.shardCount)
    klog.V(5).InfoS("Add", "hashKey", hashKey)
    sq.shards[hashKey].Add(reconcile.Request(item))
}

Other methods follow similar straightforward patterns and are omitted for brevity. The complete implementation is available at the GitHub repository.

GitHub: https://github.com/lengrongfu/study-demo/blob/main/controller-runtime/sharded_queue/README.md

Original Source

Signed-in readers can open the original source through BestHub's protected redirect.

Sign in to view source
Republication Notice

This article has been distilled and summarized from source material, then republished for learning and reference. If you believe it infringes your rights, please contactadmin@besthub.devand we will review it promptly.

KubernetesGoclient-goworkQueuecontroller-runtimesharded queue
Infra Learning Club
Written by

Infra Learning Club

Infra Learning Club shares study notes, cutting-edge technology, and career discussions.

0 followers
Reader feedback

How this landed with the community

Sign in to like

Rate this article

Was this worth your time?

Sign in to rate
Discussion

0 Comments

Thoughtful readers leave field notes, pushback, and hard-won operational detail here.