How Kafka Uses a Timing Wheel for Efficient Timeout Handling
Kafka handles many requests that require asynchronous processing or waiting for conditions by attaching a timeout parameter; if the condition isn’t met within the timeout, Kafka returns a timeout response, and it implements this efficiently using a hierarchical Timing Wheel data structure that offers O(1) insertion and fast expiration checks.
In Kafka, many requests are not answered immediately; they wait for asynchronous operations or certain conditions. Each request carries a timeout parameter, and if the server cannot satisfy the condition before the timeout expires, Kafka returns a timeout response so the client knows the request has timed out. For example, a producer request with ack=-1 must wait until all ISR replicas have persisted the data, or return a timeout response.
This scenario can be implemented with a delayed task: define a task that runs after the timeout, checks whether the return condition is satisfied, and either returns the required response or a timeout response.
Java’s built‑in implementations, Timer and ScheduledThreadPoolExecutor, use a delay queue backed by a min‑heap, giving O(logN) insertion and removal costs. For a high‑throughput system like Kafka, this is insufficient, so Kafka’s designers adopted a Timing Wheel data structure, achieving O(1) insertion.
Timing Wheel
The wheel has 8 slots, with the current time pointing to slot 0. Kafka’s implementation is a hierarchical wheel that can create additional layers when needed.
private[timer] class TimingWheel(tickMs: Long, wheelSize: Int, startMs: Long, taskCounter: AtomicInteger, queue: DelayQueue[TimerTaskList]) {
private[this] val interval = tickMs * wheelSize
private[this] val buckets = Array.tabulate[TimerTaskList](wheelSize) { _ => new TimerTaskList(taskCounter) }
private[this] var currentTime = startMs - (startMs % tickMs) // rounding down to multiple of tickMs
}tickMs : time range represented by one slot (default 1 ms). wheelSize : number of slots (default 20). startMs : start time of the wheel. taskCounter : total number of tasks. queue : a DelayQueue of TimerTaskList objects. interval : total time span the wheel can represent (tickMs × wheelSize). buckets : array of TimerTaskList representing the slots. currentTime : the time the wheel pointer currently points to.
Operation Principle
When adding a delayed task, Kafka computes the target slot with:
buckets[expiration / tickMs % wheelSize]For example, a task with delayMs = 2 ms and currentTime = 0 ms gets an expiration of 2 ms and is placed in slot 2. The task is wrapped in a TimerTaskEntry and added to the corresponding TimerTaskList.
A dedicated thread advances the wheel pointer by polling the DelayQueue:
var bucket = delayQueue.poll(timeoutMs, TimeUnit.MILLISECONDS)
if (bucket != null) {
writeLock.lock()
try {
while (bucket != null) {
timingWheel.advanceClock(bucket.getExpiration())
bucket.flush(reinsert)
bucket = delayQueue.poll()
}
} finally {
writeLock.unlock()
}
}The wheel’s advanceClock method updates currentTime and, if an overflow wheel exists, advances it as well.
Handling Time Overflow
Kafka’s default wheel (tickMs = 1 ms, wheelSize = 20) can represent delays up to 20 ms. For larger delays, Kafka creates a higher‑level wheel (overflow wheel). The overflow wheel’s slots each cover the full range of the lower wheel (20 ms), and its own wheelSize remains 20, extending the representable range (e.g., up to 400 ms with two layers).
overflowWheel = new TimingWheel(
tickMs = interval,
wheelSize = wheelSize,
startMs = currentTime,
taskCounter = taskCounter,
queue
)When a task’s expiration exceeds currentTime + interval, the overflow wheel is created (if not already present) and the task is added there. This hierarchical design continues, adding more layers as needed.
Adding a New Delayed Task (Scala)
private def addTimerTaskEntry(timerTaskEntry: TimerTaskEntry): Unit = {
if (!timingWheel.add(timerTaskEntry)) {
// Already expired or cancelled
if (!timerTaskEntry.cancelled)
taskExecutor.submit(timerTaskEntry.timerTask)
}
}The TimingWheel.add method checks whether the task can fit in the current wheel; if not, it delegates to the overflow wheel.
def add(timerTaskEntry: TimerTaskEntry): Boolean = {
val expiration = timerTaskEntry.expirationMs
if (timerTaskEntry.cancelled) false
else if (expiration < currentTime + tickMs) false
else if (expiration < currentTime + interval) {
val virtualId = expiration / tickMs
val bucket = buckets((virtualId % wheelSize.toLong).toInt)
bucket.add(timerTaskEntry)
if (bucket.setExpiration(virtualId * tickMs)) queue.offer(bucket)
true
} else {
if (overflowWheel == null) addOverflowWheel()
overflowWheel.add(timerTaskEntry)
}
}Summary
Compared with the conventional DelayQueue whose operations are O(logN), Kafka’s Timing Wheel inserts tasks in O(1) time and retrieves expired tasks with even lower complexity. Moreover, the wheel checks task completion before insertion, making it especially efficient for scenarios where tasks may finish early before a timeout.
Signed-in readers can open the original source through BestHub's protected redirect.
This article has been distilled and summarized from source material, then republished for learning and reference. If you believe it infringes your rights, please contactand we will review it promptly.
Programmer DD
A tinkering programmer and author of "Spring Cloud Microservices in Action"
How this landed with the community
Was this worth your time?
0 Comments
Thoughtful readers leave field notes, pushback, and hard-won operational detail here.
