Understanding Kafka's Time Wheel Implementation for Efficient Delayed Task Processing
The article explains how Kafka implements a hierarchical time‑wheel—a ring of bucket sets with a map for O(1) insertion, removal and expiration—to replace costly per‑tick scans, enabling efficient management of millions of delayed tasks across multiple timeout layers.
This article explains the time wheel data structure and its application in Apache Kafka for handling large numbers of delayed tasks efficiently. It begins with a motivating interview problem: detecting idle TCP connections among 100,000 concurrent clients when no packets are received for 30 seconds.
The naive solution scans a map of connection last‑packet timestamps every second, resulting in O(N) work per tick. The proposed solution uses a ring‑based time wheel: a fixed‑size array (buckets) where each bucket holds a set of connection IDs, and a map records which bucket each ID resides in. A current index advances each second; the bucket at that index contains IDs that have timed out.
Three core data structures are described: An array of buckets (size = timeout range / tickMs) representing the time wheel. Each bucket is a Set<uid> storing connection IDs assigned to that time slot. A Map<uid, index> tracking the bucket location of each ID for O(1) removal and reinsertion. When a packet arrives, the ID is removed from its current bucket and reinserted into the bucket pointed to by the current index (the slot that will expire in the next tick). After each tick, the bucket at the current index is processed: all IDs therein are considered expired and can be acted upon.
Kafka extends this basic time wheel to a hierarchical (multi‑layer) design to support very large timeout values while keeping insertion and deletion O(1). Each layer has its own tickMs and wheelSize; the tickMs of an upper layer equals the interval (tickMs × wheelSize) of the layer below. When a task’s expiration exceeds the current layer’s interval, it is promoted to an overflow (upper) layer, possibly creating that layer on demand.
Key parameters: tickMs (basic time granularity), wheelSize (number of slots per layer), interval = tickMs × wheelSize (total time span of a layer), startMs (layer’s start time), and currentTime (aligned to tickMs). The currentTime pointer determines which bucket is ready for processing.
Core implementation snippets (shown as plain code): //在Systemtimer中添加一个任务,任务被包装为一个TimerTaskEntry private def addTimerTaskEntry(timerTaskEntry: TimerTaskEntry): Unit = { //先判断是否可以添加进时间轮中,如果不可以添加进去代表任务已经过期或者任务被取消,注意这里的timingWheel持有上一层时间轮的引用,所以可能存在递归调用 if (!timingWheel.add(timerTaskEntry)) { // Already expired or cancelled if (!timerTaskEntry.cancelled) //过期任务直接线程池异步执行掉 taskExecutor.submit(timerTaskEntry.timerTask) } } timingWheel添加任务,递归添加直到添加该任务进合适的时间轮的bucket中 def add(timerTaskEntry: TimerTaskEntry): Boolean = { val expiration = timerTaskEntry.expirationMs //任务取消 if (timerTaskEntry.cancelled) { // Cancelled false } else if (expiration < currentTime + tickMs) { // 任务过期后会被执行 false } else if (expiration < currentTime + interval) {//任务过期时间比当前时间轮时间加周期小说明任务过期时间在本时间轮周期内 val virtualId = expiration / tickMs //找到任务对应本时间轮的bucket val bucket = buckets((virtualId % wheelSize.toLong).toInt) bucket.add(timerTaskEntry) // Set the bucket expiration time //只有本bucket内的任务都过期后才会bucket.setExpiration返回true此时将bucket放入延迟队列 if (bucket.setExpiration(virtualId * tickMs)) { //bucket是一个TimerTaskList,它实现了java.util.concurrent.Delayed接口,里面是一个多任务组成的链表,图2有说明 queue.offer(bucket) } true } else { // Out of the interval. Put it into the parent timer //任务的过期时间不在本时间轮周期内说明需要升级时间轮,如果不存在则构造上一层时间轮,继续用上一层时间轮添加任务 if (overflowWheel == null) addOverflowWheel() overflowWheel.add(timerTaskEntry) } } private[this] def addOverflowWheel(): Unit = { synchronized { if (overflowWheel == null) { overflowWheel = new TimingWheel( tickMs = interval, wheelSize = wheelSize, startMs = currentTime, taskCounter = taskCounter, queue ) } } } def advanceClock(timeMs: Long): Unit = { if (timeMs >= currentTime + tickMs) { //把当前时间打平为时间轮tickMs的整数倍 currentTime = timeMs - (timeMs % tickMs) // Try to advance the clock of the overflow wheel if present //驱动上层时间轮,这里的传给上层的currentTime时间是本层时间轮打平过的,但是在上层时间轮还是会继续打平 if (overflowWheel != null) overflowWheel.advanceClock(currentTime) } } //循环bucket里面的任务列表,一个个重新添加进时间轮,对符合条件的时间轮进行升降级或者执行任务 private[this] val reinsert = (timerTaskEntry: TimerTaskEntry) => addTimerTaskEntry(timerTaskEntry) /* * Advances the clock if there is an expired bucket. If there isn't any expired bucket when called, * waits up to timeoutMs before giving up. */ def advanceClock(timeoutMs: Long): Boolean = { var bucket = delayQueue.poll(timeoutMs, TimeUnit.MILLISECONDS) if (bucket != null) { writeLock.lock() try { while (bucket != null) { //驱动时间轮 timingWheel.advanceClock(bucket.getExpiration()) //循环buckek也就是任务列表,任务列表一个个继续添加进时间轮以此来升级或者降级时间轮,把过期任务找出来执行 bucket.flush(reinsert) //循环 //这里就是从延迟队列取出bucket,bucket是有延迟时间的,取出代表该bucket过期,我们通过bucket能取到bucket包含的任务列表 bucket = delayQueue.poll() } } finally { writeLock.unlock() } true } else { false } } The hierarchical time wheel reduces both insertion and removal complexity to O(1), making it suitable for high‑throughput systems like Kafka where millions of delayed operations (e.g., delayed produce, fetch, delete) must be managed efficiently.
Didi Tech
Official Didi technology account
How this landed with the community
Was this worth your time?
0 Comments
Thoughtful readers leave field notes, pushback, and hard-won operational detail here.