Understanding Kafka Producer Idempotence: PID, Sequence Numbers, and Implementation Details
This article explains how Apache Kafka implements producer idempotence by introducing Producer IDs (PID) and sequence numbers, describes the request‑response flow for PID allocation, details server‑side PID management, shows the exact‑once guarantee mechanism, and answers common configuration questions with code examples.
Apache Kafka added transactional support in version 0.11.0, which includes three aspects: idempotence, transactions, and exactly‑once semantics for Kafka Streams. This article focuses on the first two, especially the producer idempotence mechanism that forms the basis for transactional messaging.
Producer Idempotence ensures that a message sent multiple times by the same producer within a single session is persisted only once on the broker. It does not guarantee cross‑session or cross‑partition idempotence; for those cases, full transactions are required.
Idempotence Example
Properties props = new Properties();
props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true");
props.put("acks", "all"); // default when idempotence is true
props.put("bootstrap.servers", "localhost:9092");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
KafkaProducer producer = new KafkaProducer(props);
producer.send(new ProducerRecord(topic, "test"));The idempotent producer hides the complexity of guaranteeing exactly‑once delivery from the application.
Why Idempotence Is Needed
Before 0.11.0, Kafka provided at‑least‑once delivery. Network retries could cause duplicate messages because the broker could not tell whether a retried request had already been processed. For use‑cases such as payment processing that require precise counting, duplicates must be eliminated, which idempotence achieves.
Implementation Principles
Idempotence relies on two mechanisms:
PID (Producer ID) – a globally unique identifier assigned to each producer client.
Sequence numbers – a per‑partition, per‑PID counter that the broker uses to detect duplicates.
PID Allocation
def handleInitProducerIdRequest(request: RequestChannel.Request): Unit = {
val initProducerIdRequest = request.body[InitProducerIdRequest]
val transactionalId = initProducerIdRequest.transactionalId
if (transactionalId != null) {
if (!authorize(request.session, Write, Resource(TransactionalId, transactionalId, LITERAL))) {
sendErrorResponseMaybeThrottle(request, Errors.TRANSACTIONAL_ID_AUTHORIZATION_FAILED.exception)
return
}
} else if (!authorize(request.session, IdempotentWrite, Resource.ClusterResource)) {
sendErrorResponseMaybeThrottle(request, Errors.CLUSTER_AUTHORIZATION_FAILED.exception)
return
}
// allocate PID
val producerId = producerIdManager.generateProducerId()
responseCallback(InitProducerIdResult(producerId, producerEpoch = 0, Errors.NONE))
}The broker assigns a PID via ProducerIdManager.generateProducerId(), which obtains a block of IDs from ZooKeeper (node /latest_producer_id_block) and updates the block atomically.
{"version":1,"broker":35,"block_start":"4000","block_end":"4999"}When a block is exhausted, the manager requests a new block, writes it back to ZooKeeper with a conditional version check, and retries if the write fails.
private def getNewProducerIdBlock(): Unit = {
var zkWriteComplete = false
while (!zkWriteComplete) {
val (dataOpt, zkVersion) = zkClient.getDataAndVersion(ProducerIdBlockZNode.path)
// parse current block, compute next block, write back
val (succeeded, version) = zkClient.conditionalUpdatePath(ProducerIdBlockZNode.path, newProducerIdBlockData, zkVersion, Some(checkProducerIdBlockZkData))
zkWriteComplete = succeeded
}
}Each call to generateProducerId() returns the next available PID from the current block, synchronizing access to ensure monotonic allocation.
def generateProducerId(): Long = {
this.synchronized {
if (nextProducerId > currentProducerIdBlock.blockEndId) {
getNewProducerIdBlock()
nextProducerId = currentProducerIdBlock.blockStartId + 1
} else {
nextProducerId += 1
}
nextProducerId - 1
}
}Sequence Numbers
When a batch is ready, the producer calls ProducerBatch.setProducerState() to attach PID, epoch, base sequence, and transaction flag. The broker validates these fields in analyzeAndValidateProducerState() and stores the last five batches per PID/partition for duplicate detection.
def analyzeAndValidateProducerState(records: MemoryRecords, isFromClient: Boolean): (mutable.Map[Long, ProducerAppendInfo], List[CompletedTxn], Option[BatchMetadata]) = {
val updatedProducers = mutable.Map.empty[Long, ProducerAppendInfo]
val completedTxns = ListBuffer.empty[CompletedTxn]
for (batch <- records.batches.asScala if batch.hasProducerId) {
val maybeLastEntry = producerStateManager.lastEntry(batch.producerId)
if (isFromClient) {
maybeLastEntry.flatMap(_.findDuplicateBatch(batch)).foreach { duplicate =>
return (updatedProducers, completedTxns.toList, Some(duplicate))
}
}
// further validation and state update
}
(updatedProducers, completedTxns.toList, None)
}The broker checks for duplicates by comparing the batch’s sequence range with the cached metadata. If a duplicate is found, the broker acknowledges the write without persisting new data.
Client Sending Flow
The producer’s send() method adds records to a RecordAccumulator. The background Sender thread obtains a PID if needed via maybeWaitForProducerId(), then calls sendProducerData(). During drain(), the accumulator ensures that a retried batch is only sent after all earlier in‑flight batches for the same partition have completed, effectively limiting in‑flight requests to one when retries are pending.
private void maybeWaitForProducerId() {
while (!transactionManager.hasProducerId() && !transactionManager.hasError()) {
try {
Node node = awaitLeastLoadedNodeReady(requestTimeoutMs);
if (node != null) {
ClientResponse response = sendAndAwaitInitProducerIdRequest(node);
InitProducerIdResponse initProducerIdResponse = (InitProducerIdResponse) response.responseBody();
if (initProducerIdResponse.error() == Errors.NONE) {
transactionManager.setProducerIdAndEpoch(new ProducerIdAndEpoch(initProducerIdResponse.producerId(), initProducerIdResponse.epoch()));
return;
}
// handle retriable and fatal errors
}
} catch (Exception e) {
// retry with backoff
}
}
}When a batch fails, RecordAccumulator.reenqueue() inserts it back into the queue in sequence order based on its sequence number, preserving ordering even with MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION greater than one.
public void reenqueue(ProducerBatch batch, long now) {
batch.reenqueued(now);
Deque<ProducerBatch> deque = getOrCreateDeque(batch.topicPartition);
synchronized (deque) {
if (transactionManager != null)
insertInSequenceOrder(deque, batch);
else
deque.addFirst(batch);
}
}Server‑Side Produce Request Handling
The broker’s handleProduceRequest() validates permissions, checks for PID presence, and then calls analyzeAndValidateProducerState(). It verifies that the incoming sequence number is continuous; otherwise it throws OutOfOrderSequenceException or UnknownProducerIdException, prompting the client to retry.
private void checkSequence(short producerEpoch, int appendFirstSeq) {
if (producerEpoch != updatedEntry.producerEpoch) {
if (appendFirstSeq != 0) {
if (updatedEntry.producerEpoch != RecordBatch.NO_PRODUCER_EPOCH)
throw new OutOfOrderSequenceException("Invalid sequence number for new epoch: " + producerEpoch);
else
throw new UnknownProducerIdException("Found no record of producerId=" + producerId);
}
} else {
int currentLastSeq = updatedEntry.isEmpty ? RecordBatch.NO_SEQUENCE : updatedEntry.lastSeq;
if (!inSequence(currentLastSeq, appendFirstSeq))
throw new OutOfOrderSequenceException("Out of order sequence number for producerId " + producerId);
}
}Because the broker retains only the last five batches per PID/partition, the producer must keep MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION ≤ 5; otherwise older batches could be evicted, leading to OutOfOrderSequenceException on retry.
Key Takeaways
Idempotence solves duplicate‑message problems by using PID and per‑partition sequence numbers.
PID allocation is coordinated through ZooKeeper blocks, ensuring global uniqueness.
Both client and broker enforce strict sequence continuity, providing exactly‑once semantics within a session.
Configuration limits (e.g., max in‑flight requests) are tied to the broker’s five‑batch cache.
Even with multiple in‑flight requests, ordering is preserved by re‑enqueueing retries in sequence order and by the sender waiting for the earliest pending batch to complete.
Big Data Technology & Architecture
Wang Zhiwu, a big data expert, dedicated to sharing big data technology.
How this landed with the community
Was this worth your time?
0 Comments
Thoughtful readers leave field notes, pushback, and hard-won operational detail here.
