How Kafka Sends and Receives Messages: A Deep Dive into Its Network Layer

This article thoroughly explains Kafka's network layer by dissecting the complete send and receive workflow, covering message pre‑sending, selector polling, write and read operations, buffer management, and callback handling, all illustrated with core source code snippets.

Xiao Lou's Tech Notes
Xiao Lou's Tech Notes
Xiao Lou's Tech Notes
How Kafka Sends and Receives Messages: A Deep Dive into Its Network Layer

01 Overall Overview

Using a scenario‑driven approach, the article first outlines the steps required for Kafka to transmit and receive messages over the network.

How does Kafka wrap Java NIO's SocketChannel to implement basic network connections and read/write operations? How does KafkaChannel encapsulate transport layer and buffer operations? How does the industrial‑grade NIO implementation use bitwise operations for event listening, packet framing, and handling of sticky packets? How does Kafka wrap the Selector multiplexer? How does the Selector initialization connect to the broker and perform network I/O? How does Kafka send messages and receive responses?

02 Message Sending Process

02.1 Message Pre‑send

The client prepares messages to be sent. The Sender thread pulls a batch from RecordAccumulator and stores it in several collections, including inFlightRequests and completedRequests. It then creates a ClientRequest and constructs InFlightRequest and NetworkSend objects.

/** * Message pre‑send */
public void send(Send send) {
    // 1. Get connectionId from the server
    String connectionId = send.destination();
    // 2. Retrieve the corresponding channel
    KafkaChannel channel = openOrClosingChannelOrFail(connectionId);
    if (closingChannels.containsKey(connectionId)) {
        // Put connectionId into failedSends
        this.failedSends.add(connectionId);
    } else {
        try {
            // 4. Temporarily store data for pre‑send (one at a time)
            channel.setSend(send);
        } catch (Exception e) {
            // 5. Mark send failure
            channel.state(ChannelState.FAILED_SEND);
            // 6. Record failure
            this.failedSends.add(connectionId);
            // 7. Close the channel
            close(channel, CloseMode.DISCARD_NO_NOTIFY);
            ...
        }
    }
}

The setSend method stores the Send object in the channel and registers an OP_WRITE interest.

public void setSend(Send send) {
    if (this.send != null)
        throw new IllegalStateException("Attempt to begin a send operation with prior send operation still in progress, connection id is " + id);
    this.send = send;
    this.transportLayer.addInterestOps(SelectionKey.OP_WRITE);
}

02.2 Message Actual Send

The Sender thread invokes Selector.poll() to dispatch pending writes.

public void poll(long timeout) throws IOException {
    int numReadyKeys = select(timeout);
    if (numReadyKeys > 0 || !immediatelyConnectedKeys.isEmpty() || dataInBuffers) {
        if (dataInBuffers) {
            pollSelectionKeys(toPoll, false, endSelect);
        }
        pollSelectionKeys(readyKeys, false, endSelect);
        pollSelectionKeys(immediatelyConnectedKeys, true, endSelect);
    } else {
        ...
    }
}
pollSelectionKeys

processes each SelectionKey, handling read and write events.

void pollSelectionKeys(Set<SelectionKey> selectionKeys, boolean isImmediatelyConnected, long currentTimeNanos) {
    for (SelectionKey key : determineHandlingOrder(selectionKeys)) {
        KafkaChannel channel = channel(key);
        String nodeId = channel.id();
        try {
            if (channel.ready() && (key.isReadable() || channel.hasBytesBuffered()) && !hasCompletedReceive(channel) && !explicitlyMutedChannels.contains(channel)) {
                attemptRead(channel);
            }
            try {
                attemptWrite(key, channel, nowNanos);
            } catch (Exception e) {
                sendFailed = true;
                throw e;
            }
        } catch (Exception e) {
            ...
        } finally {
            ...
        }
    }
}
attemptWrite

performs the actual write when four conditions are met: data pending, channel ready, key writable, and no client re‑authentication.

private void attemptWrite(SelectionKey key, KafkaChannel channel, long nowNanos) throws IOException {
    if (channel.hasSend() && channel.ready() && key.isWritable() && !channel.maybeBeginClientReauthentication(() -> nowNanos)) {
        write(channel);
    }
}

The write method sends bytes, records metrics, and checks for completion.

void write(KafkaChannel channel) throws IOException {
    String nodeId = channel.id();
    long bytesSent = channel.write();
    Send send = channel.maybeCompleteSend();
    if (bytesSent > 0 || send != null) {
        long currentTimeMs = time.milliseconds();
        if (bytesSent > 0)
            this.sensors.recordBytesSent(nodeId, bytesSent, currentTimeMs);
        if (send != null) {
            this.completedSends.add(send);
            this.sensors.recordCompletedSend(nodeId, send.size(), currentTimeMs);
        }
    }
}
maybeCompleteSend

determines whether the send has finished and, if so, removes the OP_WRITE interest.

public Send maybeCompleteSend() {
    if (send != null && send.completed()) {
        midWrite = false;
        transportLayer.removeInterestOps(SelectionKey.OP_WRITE);
        Send result = send;
        send = null;
        return result;
    }
    return null;
}

03 Receive Response Process

03.1 Read Response Result

attemptRead

reads bytes from the channel into a NetworkReceive object.

private void attemptRead(KafkaChannel channel) throws IOException {
    String nodeId = channel.id();
    long bytesReceived = channel.read();
    if (bytesReceived != 0) {
        NetworkReceive receive = channel.maybeCompleteReceive();
        if (receive != null) {
            addToCompletedReceives(channel, receive, currentTimeMs);
        }
    }
}

The channel creates a NetworkReceive when needed and fills it.

public long read() throws IOException {
    if (receive == null) {
        receive = new NetworkReceive(maxReceiveSize, id, memoryPool);
    }
    long bytesReceived = receive(receive);
    return bytesReceived;
}
maybeCompleteReceive

checks if the buffer has been fully consumed.

public NetworkReceive maybeCompleteReceive() {
    if (receive != null && receive.complete()) {
        receive.payload().rewind();
        NetworkReceive result = receive;
        receive = null;
        return result;
    }
    return null;
}

03.2 Parse Response Message

Completed receives are processed to parse the response header and body.

private static Struct parseStructMaybeUpdateThrottleTimeMetrics(ByteBuffer responseBuffer, RequestHeader requestHeader, Sensor throttleTimeSensor, long now) {
    ResponseHeader responseHeader = ResponseHeader.parse(responseBuffer, requestHeader.apiKey().responseHeaderVersion(requestHeader.apiVersion()));
    Struct responseBody = requestHeader.apiKey().parseResponse(requestHeader.apiVersion(), responseBuffer);
    correlate(requestHeader, responseHeader);
    ...
    return responseBody;
}

03.3 Process Callback

After parsing, each ClientResponse invokes its callback.

private void completeResponses(List<ClientResponse> responses) {
    for (ClientResponse response : responses) {
        try {
            response.onComplete();
        } catch (Exception e) {
            log.error("Uncaught error in request completion:", e);
        }
    }
}

public void onComplete() {
    if (callback != null)
        callback.onComplete(this);
}

04 Summary

The article provides a complete walkthrough of Kafka's network layer, separating the sending and receiving workflows, and dives into the source code that implements each step, from pre‑sending buffering to selector polling, actual socket writes, response reading, parsing, and callback execution.

KafkaNetwork I/OJava NIOselectorResponse HandlingMessage Sending
Xiao Lou's Tech Notes
Written by

Xiao Lou's Tech Notes

Backend technology sharing, architecture design, performance optimization, source code reading, troubleshooting, and pitfall practices

0 followers
Reader feedback

How this landed with the community

Sign in to like

Rate this article

Was this worth your time?

Sign in to rate
Discussion

0 Comments

Thoughtful readers leave field notes, pushback, and hard-won operational detail here.