Inside Kafka’s Network Layer: How Messages Are Sent and Received
This article walks through Kafka’s network‑layer internals, detailing how the client prepares, pre‑sends, and actually sends messages, then reads and parses responses, with code snippets that illustrate the Selector, KafkaChannel, and related classes in Java NIO.
Reading this article takes about 30 minutes.
Hello everyone, I am Sanyou.
Today we discuss Kafka message flow at the network layer.
Reading this article carefully will give you a deeper understanding of Kafka’s network‑layer source code.
01 Overview
Using a scenario‑driven approach, we examine how messages are transmitted over the network and what work is involved.
Send message flow analysis Message pre‑send Message actual send Receive response flow analysis Read response result Parse response information Handle callback
Only the core source code is retained for clarity.
02 Send Message Flow Analysis
02.1 Message Pre‑send
This part involves many components; a detailed analysis will follow in future articles.
The client prepares the message to be sent as follows:
Sender thread pulls a batch of messages from RecordAccumulator and stores them in: inFlightRequests and the KafkaChannel send object (used to cache pending messages). completedRequests after sending. Expired data is processed. A ClientRequest object is created and handed to NetworkClient : Construct an InFlightRequest from the ClientRequest . Construct a NetworkSend and place it into the KafkaChannel buffer. Pre‑send ends here.
Next we look at the concrete implementations of Selector and KafkaChannel .
02.1.1 Request Data Temporarily Stored in Memory
GitHub source links:
https://github.com/apache/kafka/blob/2.7/clients/src/main/java/org/apache/kafka/common/network/Selector.java
https://github.com/apache/kafka/blob/2.7/clients/src/main/java/org/apache/kafka/common/network/KafkaChannel.java
<code>/**
* Message pre‑send
*/
public void send(Send send) {
// 1. Get connectionId from the server
String connectionId = send.destination();
// 2. Get the corresponding channel
KafkaChannel channel = openOrClosingChannelOrFail(connectionId);
// 3. If the connection is in the closing set
if (closingChannels.containsKey(connectionId)) {
// Put connectionId into failedSends
this.failedSends.add(connectionId);
} else {
try {
// 4. Temporarily store data for pre‑send; only one send at a time
channel.setSend(send);
} catch (Exception e) {
// 5. Mark channel state as FAILED_SEND
channel.state(ChannelState.FAILED_SEND);
// 6. Add connectionId to failedSends
this.failedSends.add(connectionId);
// 7. Close the connection
close(channel, CloseMode.DISCARD_NO_NOTIFY);
...
}
}
}
</code>The setSend() method stores the ByteBuffer data in the channel and registers the OP_WRITE interest.
<code>public void setSend(Send send) {
if (this.send != null)
throw new IllegalStateException("Attempt to begin a send operation with prior send operation still in progress, connection id is " + id);
// Set the data to be sent
this.send = send;
// Register write interest
this.transportLayer.addInterestOps(SelectionKey.OP_WRITE);
}
// PlaintextTransportLayer method
@Override
public void addInterestOps(int ops) {
// Add interest via key.interestOps() | ops
key.interestOps(key.interestOps() | ops);
}
</code>Pre‑send essentially buffers the data in KafkaChannel and enables both OP_WRITE and OP_READ events.
02.2 Message Actual Send
The Sender thread calls Selector.poll() to actually send the request.
02.2.1 poll()
<code>@Override
public void poll(long timeout) throws IOException {
...
// Block waiting for I/O events
int numReadyKeys = select(timeout);
if (numReadyKeys > 0 || !immediatelyConnectedKeys.isEmpty() || dataInBuffers) {
if (dataInBuffers) {
// Process buffered data
pollSelectionKeys(toPoll, false, endSelect);
}
// Process ready events
pollSelectionKeys(readyKeys, false, endSelect);
// Process immediately connected keys
pollSelectionKeys(immediatelyConnectedKeys, true, endSelect);
} else {
...
}
...
}
</code>poll() collects ready events and dispatches network operations.
02.2.2 pollSelectionKeys()
<code>void pollSelectionKeys(Set<SelectionKey> selectionKeys, boolean isImmediatelyConnected, long currentTimeNanos) {
// 1. Iterate over events
for (SelectionKey key : determineHandlingOrder(selectionKeys)) {
// 2. Get the channel registered on the key
KafkaChannel channel = channel(key);
...
// 3. Get node id
String nodeId = channel.id();
...
try {
// 4. If read is ready, attempt read
if (channel.ready() && (key.isReadable() || channel.hasBytesBuffered()) && !hasCompletedReceive(channel) && !explicitlyMutedChannels.contains(channel)) {
attemptRead(channel);
}
...
// 5. Attempt write
attemptWrite(key, channel, nowNanos);
} catch (Exception e) {
sendFailed = true;
throw e;
}
...
}
}
</code>This method handles connection, read, and write events.
02.2.3 attemptWrite()
<code>private void attemptWrite(SelectionKey key, KafkaChannel channel, long nowNanos) throws IOException {
// Write is performed only when four conditions are met
if (channel.hasSend()
&& channel.ready()
&& key.isWritable()
&& !channel.maybeBeginClientReauthentication(() -> nowNanos)) {
// Perform the write
write(channel);
}
}
public boolean ready() {
return transportLayer.ready() && authenticator.complete();
}
public final boolean isWritable() {
return (readyOps() & OP_WRITE) != 0;
}
</code>The four required conditions are: pending data, channel ready, write interest, and no client re‑authentication.
Channel still has data to send. Channel connection is ready. Write event is writable (OP_WRITE). Client authentication is not in progress.
02.2.4 write()
<code>void write(KafkaChannel channel) throws IOException {
// 1. Get node id
String nodeId = channel.id();
// 2. Write data; may not finish in one call
long bytesSent = channel.write();
// 3. Check if send is complete
Send send = channel.maybeCompleteSend();
// 4. Record metrics and handle completion
if (bytesSent > 0 || send != null) {
long currentTimeMs = time.milliseconds();
if (bytesSent > 0)
this.sensors.recordBytesSent(nodeId, bytesSent, currentTimeMs);
if (send != null) {
this.completedSends.add(send);
this.sensors.recordCompletedSend(nodeId, send.size(), currentTimeMs);
}
}
}
</code>The method writes buffered data, records metrics, and moves completed sends to completedSends .
02.2.6 KafkaChannel.write()
<code>public long write() throws IOException {
// If send is null, nothing to write
if (send == null)
return 0;
midWrite = true;
// Delegate to ByteBufferSend.writeTo
return send.writeTo(transportLayer);
}
</code>02.2.6 writeTo()
<code>@Override
public long writeTo(GatheringByteChannel channel) throws IOException {
// 1. Write buffers via NIO
long written = channel.write(buffers);
if (written < 0)
throw new EOFException("Wrote negative bytes to channel. This shouldn't happen.");
// 2. Update remaining bytes
remaining -= written;
// 3. Check if pending writes remain
pending = TransportLayers.hasPendingWrites(channel);
return written;
}
</code>It writes the ByteBuffer array to the socket channel, handling partial writes.
02.2.7 maybeCompleteSend()
<code>public Send maybeCompleteSend() {
// If send is not null and completed, clear interest and return the result
if (send != null && send.completed()) {
midWrite = false;
transportLayer.removeInterestOps(SelectionKey.OP_WRITE);
Send result = send;
send = null;
return result;
}
return null;
}
@Override
public void removeInterestOps(int ops) {
// Remove interest via key.interestOps() & ~ops
key.interestOps(key.interestOps() & ~ops);
}
</code>When a send finishes, the OP_WRITE interest is cleared and the send is returned.
The completedSends collection works together with inFlightRequests ; completed sends correspond to the last request in the in‑flight queue for the same broker.
03 Receive Response Flow Analysis
When a read event is ready, attemptRead() is invoked.
03.1 Read Response Result
03.1.1 attemptRead()
<code>private void attemptRead(KafkaChannel channel) throws IOException {
// Get node id
String nodeId = channel.id();
// Read data into NetworkReceive
long bytesReceived = channel.read();
if (bytesReceived != 0) {
// Check if the receive object is complete
NetworkReceive receive = channel.maybeCompleteReceive();
if (receive != null) {
addToCompletedReceives(channel, receive, currentTimeMs);
}
}
...
}
public long read() throws IOException {
if (receive == null) {
// Initialize NetworkReceive
receive = new NetworkReceive(maxReceiveSize, id, memoryPool);
}
// Read data into the receive object
long bytesReceived = receive(receive);
return bytesReceived;
}
</code>The method reads data, creates a NetworkReceive if needed, and adds completed receives to the collection.
03.1.2 maybeCompleteReceive()
<code>public NetworkReceive maybeCompleteReceive() {
if (receive != null && receive.complete()) {
receive.payload().rewind();
NetworkReceive result = receive;
receive = null;
return result;
}
return null;
}
public boolean complete() {
return !size.hasRemaining() && buffer != null && !buffer.hasRemaining();
}
</code>Completion is determined when both the size header and the payload buffer have been fully read.
03.2 Parse Response Message
After a full response is read, it is parsed as follows:
<code>private void handleCompletedReceives(List<ClientResponse> responses, long now) {
for (NetworkReceive receive : this.selector.completedReceives()) {
String source = receive.source();
InFlightRequest req = inFlightRequests.completeNext(source);
Struct responseStruct = parseStructMaybeUpdateThrottleTimeMetrics(receive.payload(), req.header,
throttleTimeSensor, now);
...
responses.add(req.completed(response, now));
}
}
</code>The method removes the request from inFlightRequests and parses the response.
<code>private static Struct parseStructMaybeUpdateThrottleTimeMetrics(ByteBuffer responseBuffer,
RequestHeader requestHeader, Sensor throttleTimeSensor, long now) {
// Parse response header
ResponseHeader responseHeader = ResponseHeader.parse(responseBuffer,
requestHeader.apiKey().responseHeaderVersion(requestHeader.apiVersion()));
// Parse response body
Struct responseBody = requestHeader.apiKey().parseResponse(requestHeader.apiVersion(), responseBuffer);
// Verify correlation IDs match
correlate(requestHeader, responseHeader);
...
return responseBody;
}
</code>It validates that the correlation IDs of the request and response match.
03.3 Handle Callback
<code>private void completeResponses(List<ClientResponse> responses) {
for (ClientResponse response : responses) {
try {
response.onComplete();
} catch (Exception e) {
log.error("Uncaught error in request completion:", e);
}
}
}
public void onComplete() {
if (callback != null)
callback.onComplete(this);
}
</code>Each response invokes its registered callback after parsing.
04 Summary
We have outlined the entire Kafka network‑layer send and receive process, dissected the source code for each step, and explained how messages are buffered, written, read, parsed, and finally handed to callbacks.
The key takeaways are: The flow is divided into a send path and a receive path. Each path involves multiple coordinated classes such as Selector , KafkaChannel , NetworkClient , and the request/response collections. Understanding these internals helps when debugging or extending Kafka clients.
Sanyou's Java Diary
Passionate about technology, though not great at solving problems; eager to share, never tire of learning!
How this landed with the community
Was this worth your time?
0 Comments
Thoughtful readers leave field notes, pushback, and hard-won operational detail here.