Kafka Consumer Deep Dive: Offset Management, Rebalance Strategies, and Thread Safety
This article explains Kafka consumer semantics such as at‑most‑once, at‑least‑once and exactly‑once delivery, shows how to configure and commit offsets, describes the rebalance process and partition‑assignment strategies, and discusses thread‑safety and task scheduling with illustrative Java code examples.
Greetings, I am a senior architect sharing practical knowledge about Kafka consumer internals.
Pre‑knowledge: Delivery Guarantees
At most once – messages may be lost but never duplicated.
At least once – messages are never lost but may be duplicated.
Exactly once – each message is delivered only once.
Exactly‑once Implementation
Producer side: assign a globally unique ID to each message and let the consumer filter duplicates.
Consumer side: two main approaches – process‑then‑commit (may cause at‑most‑once) or commit‑then‑process (may cause at‑least‑once). Detailed steps are listed in the ordered list below.
Process the message first, then commit the offset (auto‑commit or manual). If the server crashes before the offset is persisted, the same message may be re‑processed (at‑most‑once risk).
Commit the offset first, then process the message. If the server crashes after the commit but before processing, the message is lost (at‑least‑once risk).
Consumer‑side solution: disable auto‑commit, and treat offset commit and message processing as a single transaction stored in a database or Redis. On failure, use KafkaConsumer.seek to resume from the stored offset.
Consumer Rebalance Triggers
Change in the number of group members (new consumer joins or leaves).
Change in the number of subscribed topics.
Change in the number of partitions of a subscribed topic.
The following diagram (originally from Kafka Rebalance documentation) illustrates the process.
Partition Assignment Strategies
RoundRobinAssignor : lists all topic‑partitions and all consumer members, then distributes partitions in a round‑robin fashion.
RangeAssignor : assigns contiguous ranges of partitions to each consumer; example with 7 partitions and 5 consumers is shown.
// Example of calculating partition distribution
numPartitionsPerConsumer = numPartitionsForTopic / consumersForTopic.size();
consumersWithExtraPartition = numPartitionsForTopic % consumersForTopic.size();
// Resulting start and length for each consumer are listed in the comment.KafkaConsumer Analysis – Thread Safety
KafkaConsumer is not thread‑safe; the responsibility is shifted to the caller.
Solution: use two thread pools – one pool owns a KafkaConsumer per thread to poll data, the other processes the records, optionally via a queue.
The client uses a highly abstracted request/response model (e.g., HeartbeatTask, AutoCommitTask) with RequestFuture and listeners to propagate success or failure.
public static void main(String[] args) {
Properties props = new Properties();
String topic = "test";
String group = "test0";
props.put("bootstrap.servers", "XXX:9092,XXX:9092");
props.put("group.id", group);
props.put("auto.offset.reset", "earliest");
props.put("enable.auto.commit", "true");
props.put("auto.commit.interval.ms", "1000");
props.put("session.timeout.ms", "30000");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer
consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList(topic));
while (true) {
ConsumerRecords
records = consumer.poll(100);
for (ConsumerRecord
record : records) {
System.out.printf("offset = %d, key = %s, value = %s \n", record.offset(), record.key(), record.value());
try { Thread.sleep(100); } catch (InterruptedException e) { e.printStackTrace(); }
}
}
}Overall Consumer Workflow
SubscriptionState manages the set of subscribed topics and partition assignments.
Fetch retrieves data from the broker.
ConsumerCoordinator coordinates with the GroupCoordinator for rebalance and offset commits.
ConsumerNetworkClient wraps NetworkClient to send requests.
The core poll method acquires a lock, calls pollOnce , sends fetches, and handles delayed tasks such as heartbeats and auto‑commits.
public ConsumerRecords
poll(long timeout) {
acquire();
try {
do {
Map
>> records = pollOnce(remaining);
if (!records.isEmpty()) {
if (this.interceptors == null) {
return new ConsumerRecords<>(records);
} else {
return this.interceptors.onConsume(new ConsumerRecords<>(records));
}
}
} while (remaining > 0);
return ConsumerRecords.empty();
} finally {
release();
}
}The article concludes with an invitation for readers to discuss, ask questions, and join the author’s community.
Top Architect
Top Architect focuses on sharing practical architecture knowledge, covering enterprise, system, website, large‑scale distributed, and high‑availability architectures, plus architecture adjustments using internet technologies. We welcome idea‑driven, sharing‑oriented architects to exchange and learn together.
How this landed with the community
Was this worth your time?
0 Comments
Thoughtful readers leave field notes, pushback, and hard-won operational detail here.