Understanding Kafka Producer Idempotence: Mechanisms and Implementation Details
This article explains how Kafka achieves producer idempotence by assigning unique producer IDs and sequence numbers, describes the broker’s validation process, and walks through the relevant producer‑side and broker‑side code paths, highlighting configuration considerations and limitations.
Idempotence
Under normal operation a Producer sends messages to a Broker, which appends them to the appropriate topic partition and returns an ACK. If the ACK is lost after the message is written, the Producer may resend the message, causing duplicates.
Before Kafka 0.11 this problem could not be solved by Kafka itself, so only at‑least‑once semantics were possible and downstream systems had to perform deduplication. Starting with version 0.11, enabling idempotence by setting enable.idempotence to true guarantees that a resent message is written only once.
Kafka achieves this by adding two markers to each message batch:
PID – a unique producer identifier assigned when the Producer session starts.
Sequence number – a monotonically increasing number (starting at 0) attached to each ProducerBatch.
The Broker maintains PID and sequence number for each TopicPartition. When a batch arrives, the Broker checks that the sequence number is exactly the previous value plus one; otherwise the batch is rejected, preventing duplicates.
If a Producer restarts (changing its PID) or writes across topics/partitions, plain idempotence is insufficient and transactional support is required.
Producer‑Side Processing Logic
Key components involved in the Producer side are:
KafkaProducer – the Producer instance.
Sender – the internal thread that sends messages to the Broker.
RecordAccumulator – buffers ProducerBatches until they are full.
TransactionManager – records PID, per‑partition sequence numbers and transaction state when idempotence or transactions are enabled.
When KafkaProducer.send() is called, the message is placed into a ProducerBatch inside the RecordAccumulator without PID or sequence number. The Sender thread later invokes maybeWaitForProducerId() to obtain a PID if needed. The relevant code is:
private void maybeWaitForProducerId() {
while (!transactionManager.hasProducerId() && !transactionManager.hasError()) {
try {
Node node = awaitLeastLoadedNodeReady(requestTimeout);
if (node != null) {
ClientResponse response = sendAndAwaitInitProducerIdRequest(node);
InitProducerIdResponse initProducerIdResponse = (InitProducerIdResponse) response.responseBody();
Errors error = initProducerIdResponse.error();
if (error == Errors.NONE) {
ProducerIdAndEpoch producerIdAndEpoch = new ProducerIdAndEpoch(
initProducerIdResponse.producerId(), initProducerIdResponse.epoch());
transactionManager.setProducerIdAndEpoch(producerIdAndEpoch);
return;
} else if (error.exception() instanceof RetriableException) {
log.debug("Retriable error from InitProducerId response", error.message());
} else {
transactionManager.transitionToFatalError(error.exception());
break;
}
} else {
log.debug("Could not find an available broker to send InitProducerIdRequest to. " +
"We will back off and try again.");
}
} catch (UnsupportedVersionException e) {
transactionManager.transitionToFatalError(e);
break;
} catch (IOException e) {
log.debug("Broker {} disconnected while awaiting InitProducerId response", e);
}
log.trace("Retry InitProducerIdRequest in {}ms.", retryBackoffMs);
time.sleep(retryBackoffMs);
metadata.requestUpdate();
}
}After a PID is obtained, the Sender calls sendProducerData(), which eventually invokes RecordAccumulator.drain(). The drain method contains several checks related to idempotence, such as ensuring the batch is not in a retry interval, the PID and epoch are valid, and no previous batch for the same partition is still in‑flight.
public Map<Integer, List> drain(Cluster cluster, Set nodes, int maxSize, long now) {
if (nodes.isEmpty())
return Collections.emptyMap();
// ... additional logic omitted for brevity ...
}If the batch passes all checks, the Producer assigns PID, epoch, and a new sequence number, increments the internal sequence counter, and marks the batch as in‑flight.
Broker‑Side Processing Logic
Key data structures on the Broker side include:
BatchMetadata – stores metadata for a batch, such as the last sequence number and offset.
ProducerIdEntry – a fixed‑size (5) queue of BatchMetadata for a single PID.
ProducerStateManager – manages ProducerIdEntry objects for all TopicPartition s.
When a ProduceRequest arrives, KafkaApis.handleProduceRequest() eventually calls Log.append(), which validates PID and sequence number via analyzeAndValidateProducerState:
private def analyzeAndValidateProducerState(records: MemoryRecords, isFromClient: Boolean):
(mutable.Map[Long, ProducerAppendInfo], List[CompletedTxn], Option[BatchMetadata]) = {
val updatedProducers = mutable.Map.empty[Long, ProducerAppendInfo]
val completedTxns = ListBuffer.empty[CompletedTxn]
for (batch <- records.batches.asScala if batch.hasProducerId) {
val maybeLastEntry = producerStateManager.lastEntry(batch.producerId)
if (isFromClient)
maybeLastEntry.flatMap(_.duplicateOf(batch)).foreach { duplicate =>
return (updatedProducers, completedTxns.toList, Some(duplicate))
}
val maybeCompletedTxn = updateProducers(batch, updatedProducers, isFromClient = isFromClient)
maybeCompletedTxn.foreach(completedTxns += _)
}
(updatedProducers, completedTxns.toList, None)
}The method duplicateOf checks whether the incoming batch’s sequence range matches a stored batch, indicating a duplicate. If not a duplicate, checkSequence validates that the sequence numbers are strictly increasing and consistent with the producer epoch:
private def checkSequence(producerEpoch: Short, firstSeq: Int, lastSeq: Int): Unit = {
if (producerEpoch != currentEntry.producerEpoch) {
if (firstSeq != 0) {
if (currentEntry.producerEpoch != RecordBatch.NO_PRODUCER_EPOCH) {
throw new OutOfOrderSequenceException(...)
} else {
throw new UnknownProducerIdException(...)
}
}
} else if (currentEntry.lastSeq == RecordBatch.NO_SEQUENCE && firstSeq != 0) {
throw new OutOfOrderSequenceException(...)
} else if (isDuplicate(firstSeq, lastSeq)) {
throw new DuplicateSequenceException(...)
} else if (!inSequence(firstSeq, lastSeq)) {
throw new OutOfOrderSequenceException(...)
}
}Because the ProducerIdEntry cache holds only five BatchMetadata objects, the configuration max.in.flight.requests.per.connection must not exceed 5 when idempotence is enabled; otherwise metadata may be evicted and retries could fail.
Overall, Kafka’s idempotence guarantees exactly‑once delivery for a single Producer session on a single TopicPartition by coupling a unique PID with a strictly monotonic sequence number, and the broker enforces this contract during log appends.
Signed-in readers can open the original source through BestHub's protected redirect.
This article has been distilled and summarized from source material, then republished for learning and reference. If you believe it infringes your rights, please contactand we will review it promptly.
Big Data Technology & Architecture
Wang Zhiwu, a big data expert, dedicated to sharing big data technology.
How this landed with the community
Was this worth your time?
0 Comments
Thoughtful readers leave field notes, pushback, and hard-won operational detail here.
