Root‑Cause Solutions for Kafka Message Duplicates
The article analyzes why Kafka messages are duplicated on both producer and consumer sides, explains at‑most‑once, at‑least‑once, and exactly‑once delivery semantics, and walks through three concrete remedies—idempotent producers, transactions, and consumer‑side deduplication—backed by configuration details, code samples, and practical deployment steps.
Problem Overview
Message duplication is a common issue in Kafka pipelines; it can arise anywhere from the producer to the consumer. Typical producer scenarios include LeaderNotAvailableException when a partition leader is down, NotControllerException during controller election, and various network exceptions. On the consumer side, if a batch is polled, processed, and the process crashes before committing the offset, the same batch will be polled again, causing duplicate consumption.
Delivery Semantics
At most once – the message is sent only once; loss is possible but duplication never occurs (e.g., MQTT QoS 0).
At least once – the message is guaranteed to be delivered, but duplicates may appear (e.g., MQTT QoS 1).
Exactly once – the message is delivered once without loss or duplication (e.g., MQTT QoS 2).
How to Achieve Exactly‑Once
Idempotent Producer Enable idempotence with enable.idempotence=true , set acks=all , and limit max.in.flight.requests.per.connection to ≤ 5. This guarantees that a single‑partition, single‑session producer will not produce duplicate records. The broker assigns a global pid to each producer instance and maintains a per‑ <topic,partition> sequence number. When nextSeq == lastSeq + 1 the broker accepts the record; otherwise it rejects it as a duplicate.
Idempotence means that executing the same operation any number of times yields the same result.
Properties props = new Properties();
props.put("enable.idempotence", true);
props.put("acks", "all");
props.put("max.in.flight.requests.per.connection", 5);Limitations: works only for a single partition and a single session (the pid changes after a restart).
Kafka Transactions Transactions extend idempotence to multiple partitions and multiple sessions. Configure the producer with transactional.id , call initTransactions() , then wrap sends between beginTransaction() and commitTransaction() . On any error, call abortTransaction() . Consumers must set isolation.level=read_committed to see only committed records.
props.put("transactional.id", "my-transactional-id");
producer.initTransactions();
try {
producer.beginTransaction();
producer.send(new ProducerRecord<>(topic, key, value));
producer.commitTransaction();
} catch (Exception e) {
producer.abortTransaction();
}Using transactions removes the single‑partition limitation of plain idempotence.
Consumer‑Side Idempotency Even with an idempotent producer, the consumer may still process duplicates. A common pattern is to store each consumed message’s unique Id in a local deduplication table inside a transaction. If an INSERT fails because the Id already exists, the transaction rolls back, preventing duplicate handling.
“How to solve message duplication?” – by making the consumer itself idempotent.
Practical Setup
Run Zookeeper in Docker: $ docker run -d --name zookeeper -p 2181:2181 zookeeper Start Kafka 2.7.1 from source.
Create a topic with 1 replica and 2 partitions:
$ ./kafka-topics.sh --bootstrap-server localhost:9092 --topic myTopic --create --replication-factor 1 --partitions 2Sample producer code (Java) demonstrates the configuration above and shows how to read a file line‑by‑line and send each line as a record.
public class KafkaProducerApplication {
private final Producer<String, String> producer;
private final String outTopic;
public KafkaProducerApplication(Producer<String, String> producer, String topic) {
this.producer = producer;
this.outTopic = topic;
}
public void produce(String message) {
String[] parts = message.split("-");
String key = parts.length > 1 ? parts[0] : null;
String value = parts.length > 1 ? parts[1] : parts[0];
ProducerRecord<String, String> record = new ProducerRecord<>(outTopic, key, value);
producer.send(record, (metadata, e) -> {
if (e != null) e.printStackTrace();
else System.out.println("key/value " + key + "/" + value + " written to topic[" + metadata.topic() + "] partition[" + metadata.partition() + "] at offset " + metadata.offset());
});
}
public void shutdown() { producer.close(); }
public static void main(String[] args) throws Exception {
Properties props = new Properties();
props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true");
props.put(ProducerConfig.ACKS_CONFIG, "all");
props.put(ProducerConfig.CLIENT_ID_CONFIG, "myApp");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
String topic = "myTopic";
Producer<String, String> producer = new KafkaProducer<>(props);
KafkaProducerApplication app = new KafkaProducerApplication(producer, topic);
// read file and produce …
app.shutdown();
}
}Running the producer with enable.idempotence=true forces acks=all. Changing acks to 0 or 1 or setting max.in.flight.requests.per.connection > 5 results in a ConfigException at startup, as shown in the logs.
When to Use Idempotence
If the application already uses acks=all, enabling idempotence adds safety without extra cost.
If the application prefers high throughput and sets acks=0 or acks=1, idempotence should be avoided because the system is not aiming for strict consistency.
Signed-in readers can open the original source through BestHub's protected redirect.
This article has been distilled and summarized from source material, then republished for learning and reference. If you believe it infringes your rights, please contactand we will review it promptly.
Programmer XiaoFu
xiaofucode.com – a programmer learning guide driven by the pursuit of profit
How this landed with the community
Was this worth your time?
0 Comments
Thoughtful readers leave field notes, pushback, and hard-won operational detail here.
