Inside Kafka Broker: How Its Network Architecture Handles Millions of Requests
This article deeply dissects Kafka Broker's network architecture and request‑processing pipeline, covering sequential, multithreaded, and event‑driven designs, the Reactor pattern, Acceptor and Processor threads, core request flow, and practical tuning parameters for high‑throughput, low‑latency deployments.
Overall Overview
Kafka follows a classic Request/Response model where producers, consumers and other brokers communicate with the broker via network requests that must be processed efficiently to achieve its high‑throughput promise.
Sequential Processing Model
The simplest implementation uses a single while‑loop that accepts connections and processes them one by one, which leads to two fatal drawbacks: request blocking (each request must wait for the previous one to finish) and extremely low throughput.
Multithreaded Asynchronous Model
Creating a dedicated thread for each incoming request (the "connection‑per‑thread" model) removes blocking and improves throughput, but the overhead of spawning a thread per request quickly becomes a bottleneck under heavy load.
Event‑Driven Multiplexing
To avoid the selector bottleneck, the design introduces multiple selectors that share the load, allowing the system to scale to very high concurrency.
Reactor Pattern
The broker adopts the Reactor pattern (as described in Doug Lea’s "Scalable I/O in Java"). A Dispatcher (Acceptor) receives connections and distributes them to a pool of Request Handlers (Processors) which perform the actual I/O work.
Acceptor Thread
<code>/**
* Thread that accepts and configures new connections. There is one of these per endpoint.
*/
private[kafka] class Acceptor(val endPoint: EndPoint,
val sendBufferSize: Int,
val recvBufferSize: Int,
brokerId: Int,
connectionQuotas: ConnectionQuotas,
metricPrefix: String) extends AbstractServerThread(connectionQuotas) with KafkaMetricsGroup {
// 1. Create NIO Selector for accept, read, write events
private val nioSelector = NSelector.open()
// 2. Open ServerSocketChannel and register with selector
val serverChannel = openServerSocket(endPoint.host, endPoint.port)
// 3. Create Processor thread pool
private val processors = new ArrayBuffer[Processor]()
...
def run(): Unit = {
serverChannel.register(nioSelector, SelectionKey.OP_ACCEPT)
startupComplete()
try {
var currentProcessorIndex = 0
while (isRunning) {
try {
val ready = nioSelector.select(500)
if (ready > 0) {
// accept new connections and assign a Processor
accept(key).foreach { socketChannel =>
processor = synchronized { processors(currentProcessorIndex) }
currentProcessorIndex += 1
}
}
} catch { case _: Throwable => /* ignore */ }
}
} finally { shutdownComplete.countDown() }
}
}
</code>The Acceptor creates a NIO selector, opens a ServerSocketChannel, registers OP_ACCEPT, and continuously accepts new TCP connections, assigning each to a Processor thread from a round‑robin pool.
Processor Thread
<code>override def run(): Unit = {
startupComplete()
try {
while (isRunning) {
try {
// create new connections
configureNewConnections()
// send responses
processNewResponses()
// poll for ready I/O events
poll()
// move completed receives to request queue
processCompletedReceives()
...
} catch { case _: Throwable => /* ignore */ }
}
} finally { /* cleanup */ }
}
// Queues used by each Processor
val newConnections = new ArrayBlockingQueue[SocketChannel](ConnectionQueueSize)
val inflightResponses = mutable.Map[String, RequestChannel.Response]()
val responseQueue = new LinkedBlockingDeque[RequestChannel.Response]()
</code>Each Processor handles new connections, registers OP_READ/OP_WRITE on the associated channels, polls for ready events, moves completed reads into Request objects, and maintains three queues: newConnections , inflightResponses , and responseQueue .
Request Handling Core Flow
Clients send requests to the Acceptor thread.
Acceptor creates a NIO selector, opens a ServerSocketChannel and registers OP_ACCEPT.
Acceptor also creates a pool of Processor threads (configured by num.network.threads ) and enqueues new connections into newConnections .
Processor threads poll the selector, register OP_READ/OP_WRITE, and turn ready reads into Request objects that are placed into the RequestChannel queue.
KafkaRequestHandler threads (configured by num.io.threads ) continuously fetch Request objects, invoke KafkaApis.handle , and write responses.
Responses are put back into the Processor’s responseQueue and eventually written to the client socket.
System Tuning
num.network.threads : set to CPU cores × 2 to provide enough Processor threads.
num.io.threads : set to disk count × 2 for the KafkaRequestHandler pool.
num.replica.fetchers : set to CPU cores / 4 to improve follower‑to‑leader replication parallelism.
compression.type : use lz4 to reduce network traffic while keeping CPU overhead low.
auto.leader.rebalance.enable : disable (set false ) to avoid unpredictable leader movements.
Leave log‑flush parameters ( log.flush.scheduler.interval.ms , log.flush.interval.ms , log.flush.interval.messages ) at defaults and let the OS handle flushing.
Conclusion
The article demonstrates that a high‑performance Kafka broker must combine a non‑blocking, event‑driven Reactor architecture with carefully tuned thread pools and configuration parameters to achieve the desired throughput and latency characteristics.
Sanyou's Java Diary
Passionate about technology, though not great at solving problems; eager to share, never tire of learning!
How this landed with the community
Was this worth your time?
0 Comments
Thoughtful readers leave field notes, pushback, and hard-won operational detail here.