How Kafka Sends and Receives Messages: A Deep Dive into Its Network Layer
This article thoroughly explains Kafka's network layer by dissecting the complete send and receive workflow, covering message pre‑sending, selector polling, write and read operations, buffer management, and callback handling, all illustrated with core source code snippets.
01 Overall Overview
Using a scenario‑driven approach, the article first outlines the steps required for Kafka to transmit and receive messages over the network.
How does Kafka wrap Java NIO's SocketChannel to implement basic network connections and read/write operations? How does KafkaChannel encapsulate transport layer and buffer operations? How does the industrial‑grade NIO implementation use bitwise operations for event listening, packet framing, and handling of sticky packets? How does Kafka wrap the Selector multiplexer? How does the Selector initialization connect to the broker and perform network I/O? How does Kafka send messages and receive responses?
02 Message Sending Process
02.1 Message Pre‑send
The client prepares messages to be sent. The Sender thread pulls a batch from RecordAccumulator and stores it in several collections, including inFlightRequests and completedRequests. It then creates a ClientRequest and constructs InFlightRequest and NetworkSend objects.
/** * Message pre‑send */
public void send(Send send) {
// 1. Get connectionId from the server
String connectionId = send.destination();
// 2. Retrieve the corresponding channel
KafkaChannel channel = openOrClosingChannelOrFail(connectionId);
if (closingChannels.containsKey(connectionId)) {
// Put connectionId into failedSends
this.failedSends.add(connectionId);
} else {
try {
// 4. Temporarily store data for pre‑send (one at a time)
channel.setSend(send);
} catch (Exception e) {
// 5. Mark send failure
channel.state(ChannelState.FAILED_SEND);
// 6. Record failure
this.failedSends.add(connectionId);
// 7. Close the channel
close(channel, CloseMode.DISCARD_NO_NOTIFY);
...
}
}
}The setSend method stores the Send object in the channel and registers an OP_WRITE interest.
public void setSend(Send send) {
if (this.send != null)
throw new IllegalStateException("Attempt to begin a send operation with prior send operation still in progress, connection id is " + id);
this.send = send;
this.transportLayer.addInterestOps(SelectionKey.OP_WRITE);
}02.2 Message Actual Send
The Sender thread invokes Selector.poll() to dispatch pending writes.
public void poll(long timeout) throws IOException {
int numReadyKeys = select(timeout);
if (numReadyKeys > 0 || !immediatelyConnectedKeys.isEmpty() || dataInBuffers) {
if (dataInBuffers) {
pollSelectionKeys(toPoll, false, endSelect);
}
pollSelectionKeys(readyKeys, false, endSelect);
pollSelectionKeys(immediatelyConnectedKeys, true, endSelect);
} else {
...
}
} pollSelectionKeysprocesses each SelectionKey, handling read and write events.
void pollSelectionKeys(Set<SelectionKey> selectionKeys, boolean isImmediatelyConnected, long currentTimeNanos) {
for (SelectionKey key : determineHandlingOrder(selectionKeys)) {
KafkaChannel channel = channel(key);
String nodeId = channel.id();
try {
if (channel.ready() && (key.isReadable() || channel.hasBytesBuffered()) && !hasCompletedReceive(channel) && !explicitlyMutedChannels.contains(channel)) {
attemptRead(channel);
}
try {
attemptWrite(key, channel, nowNanos);
} catch (Exception e) {
sendFailed = true;
throw e;
}
} catch (Exception e) {
...
} finally {
...
}
}
} attemptWriteperforms the actual write when four conditions are met: data pending, channel ready, key writable, and no client re‑authentication.
private void attemptWrite(SelectionKey key, KafkaChannel channel, long nowNanos) throws IOException {
if (channel.hasSend() && channel.ready() && key.isWritable() && !channel.maybeBeginClientReauthentication(() -> nowNanos)) {
write(channel);
}
}The write method sends bytes, records metrics, and checks for completion.
void write(KafkaChannel channel) throws IOException {
String nodeId = channel.id();
long bytesSent = channel.write();
Send send = channel.maybeCompleteSend();
if (bytesSent > 0 || send != null) {
long currentTimeMs = time.milliseconds();
if (bytesSent > 0)
this.sensors.recordBytesSent(nodeId, bytesSent, currentTimeMs);
if (send != null) {
this.completedSends.add(send);
this.sensors.recordCompletedSend(nodeId, send.size(), currentTimeMs);
}
}
} maybeCompleteSenddetermines whether the send has finished and, if so, removes the OP_WRITE interest.
public Send maybeCompleteSend() {
if (send != null && send.completed()) {
midWrite = false;
transportLayer.removeInterestOps(SelectionKey.OP_WRITE);
Send result = send;
send = null;
return result;
}
return null;
}03 Receive Response Process
03.1 Read Response Result
attemptReadreads bytes from the channel into a NetworkReceive object.
private void attemptRead(KafkaChannel channel) throws IOException {
String nodeId = channel.id();
long bytesReceived = channel.read();
if (bytesReceived != 0) {
NetworkReceive receive = channel.maybeCompleteReceive();
if (receive != null) {
addToCompletedReceives(channel, receive, currentTimeMs);
}
}
}The channel creates a NetworkReceive when needed and fills it.
public long read() throws IOException {
if (receive == null) {
receive = new NetworkReceive(maxReceiveSize, id, memoryPool);
}
long bytesReceived = receive(receive);
return bytesReceived;
} maybeCompleteReceivechecks if the buffer has been fully consumed.
public NetworkReceive maybeCompleteReceive() {
if (receive != null && receive.complete()) {
receive.payload().rewind();
NetworkReceive result = receive;
receive = null;
return result;
}
return null;
}03.2 Parse Response Message
Completed receives are processed to parse the response header and body.
private static Struct parseStructMaybeUpdateThrottleTimeMetrics(ByteBuffer responseBuffer, RequestHeader requestHeader, Sensor throttleTimeSensor, long now) {
ResponseHeader responseHeader = ResponseHeader.parse(responseBuffer, requestHeader.apiKey().responseHeaderVersion(requestHeader.apiVersion()));
Struct responseBody = requestHeader.apiKey().parseResponse(requestHeader.apiVersion(), responseBuffer);
correlate(requestHeader, responseHeader);
...
return responseBody;
}03.3 Process Callback
After parsing, each ClientResponse invokes its callback.
private void completeResponses(List<ClientResponse> responses) {
for (ClientResponse response : responses) {
try {
response.onComplete();
} catch (Exception e) {
log.error("Uncaught error in request completion:", e);
}
}
}
public void onComplete() {
if (callback != null)
callback.onComplete(this);
}04 Summary
The article provides a complete walkthrough of Kafka's network layer, separating the sending and receiving workflows, and dives into the source code that implements each step, from pre‑sending buffering to selector polling, actual socket writes, response reading, parsing, and callback execution.
Xiao Lou's Tech Notes
Backend technology sharing, architecture design, performance optimization, source code reading, troubleshooting, and pitfall practices
How this landed with the community
Was this worth your time?
0 Comments
Thoughtful readers leave field notes, pushback, and hard-won operational detail here.
