Implementing a Sharded WorkQueue in controller-runtime for Partitioned Concurrency
The article explains how to design and implement a sharded WorkQueue in Kubernetes' controller-runtime, enabling concurrent processing while guaranteeing that resources belonging to the same node are handled sequentially within a single goroutine, using hash-based partitioning and custom queue structures.
client-go currently provides three types of Work Queues—generic, rate‑limited, and delayed—under the staging/src/k8s.io/client-go package.
A new requirement arose: the controller must coordinate resources concurrently, yet ensure that a group of related resources (e.g., all Pods on the same Node) are processed in the same goroutine, similar to Kafka's partitioned queues.
To satisfy this, a custom Shard Queue was created. Before a Pod enters the queue, a hash value is computed to decide which sub‑queue it should be placed into.
The core structure is defined as follows:
The shards array holds rate‑limiting queues; the hash of the request determines the target shard. HashFn computes a hash from reconcile.Request and the shard count. CurrentShard records the latest shard index assigned to a goroutine. GoroutineSharding maps a goroutine ID to its shard index.
type CacheKey[T reconcile.Request] func(T reconcile.Request, shardCount int) int
type TypedShardedQueueConfig[T reconcile.Request] struct {
Name string
shards []workqueue.TypedRateLimitingInterface[reconcile.Request]
HashFn CacheKey[reconcile.Request]
CurrentShard int
GoroutineSharding map[int64]int
shardCount int
mu sync.RWMutex
}The existing rate‑limited queue achieves concurrency by launching multiple goroutines that safely pull items from a single queue; however, those goroutines cannot be directly mapped to specific partitions.
HashFn
Users implement HashFn according to their resource type. The example for Pods fetches the full Pod object via the client, extracts NodeName, and hashes it to obtain a shard index.
func (r *CustomController) Hash(value reconcile.Request, shardCount int) int {
var pod corev1.Pod
if err := r.Client.Get(context.Background(), client.ObjectKey{Namespace: value.Namespace, Name: value.Name}, &pod); err != nil {
return 0
}
if pod.Spec.NodeName == "" {
return 0
}
h := fnv.New32a()
h.Write([]byte(pod.Spec.NodeName))
if shardCount > 0 {
return int(h.Sum32() % uint32(shardCount))
}
return 0
}The Get call works because client‑go stores the object in its internal Store before the work‑queue processing step, making the full object available during hashing.
Get method
The first goroutine that calls Get checks GoroutineSharding for an existing mapping; if none exists, it records the current shard index. Subsequent calls retrieve items from the mapped shard, ensuring the same goroutine always processes the same partition.
func (sq *TypedShardedQueueConfig[T]) Get() (T, bool) {
goroutineID := GetGoroutineID()
if goroutineID == -1 {
panic("goroutine ID is invalid")
}
var zero T
sq.mu.RLock()
if shads, ok := sq.GoroutineSharding[goroutineID]; ok {
sq.mu.RUnlock()
if item, shutdown := sq.shards[shads].Get(); !shutdown {
return T(item), shutdown
}
return zero, true
}
sq.mu.RUnlock()
sq.mu.Lock()
sq.GoroutineSharding[goroutineID] = sq.CurrentShard
sq.CurrentShard += 1
sq.mu.Unlock()
sq.mu.RLock()
if shads, ok := sq.GoroutineSharding[goroutineID]; ok {
sq.mu.RUnlock()
if item, shutdown := sq.shards[shads].Get(); !shutdown {
return T(item), shutdown
}
return zero, true
}
return zero, true
}Add
The Add operation simply computes the shard index via HashFn and inserts the item into the corresponding sub‑queue.
func (sq *TypedShardedQueueConfig[T]) Add(item T) {
hashKey := sq.HashFn(reconcile.Request(item), sq.shardCount)
klog.V(5).InfoS("Add", "hashKey", hashKey)
sq.shards[hashKey].Add(reconcile.Request(item))
}Other methods follow similar straightforward patterns and are omitted for brevity. The complete implementation is available at the GitHub repository.
GitHub: https://github.com/lengrongfu/study-demo/blob/main/controller-runtime/sharded_queue/README.md
Signed-in readers can open the original source through BestHub's protected redirect.
This article has been distilled and summarized from source material, then republished for learning and reference. If you believe it infringes your rights, please contactand we will review it promptly.
Infra Learning Club
Infra Learning Club shares study notes, cutting-edge technology, and career discussions.
How this landed with the community
Was this worth your time?
0 Comments
Thoughtful readers leave field notes, pushback, and hard-won operational detail here.
