Kafka Consumer Group Rebalance: Mechanisms, Strategies, Protocols, and Java Implementation
This article provides a comprehensive overview of Kafka consumer group rebalance, covering version compatibility, rebalance triggers, assignment strategies, generation handling, protocol details, the full rebalance workflow, listener usage, and complete Java code examples for offset management with database integration.
1. Kafka Version
Kafka version 1.1.1 is used, which is compatible with most Kafka 0.10.x and later versions.
2. Rebalance
In a ConsumerGroup, multiple Consumer instances read from a topic 's partition. When a new consumer joins, it reads messages originally consumed by other consumers, and when a consumer leaves or crashes, its partitions are reassigned to the remaining consumers. Partition changes trigger a rebalance, which is a protocol that coordinates how all consumers agree on partition ownership.
Rebalance provides high availability and scalability, but during the rebalance period consumers cannot read messages, causing temporary unavailability.
Consumers maintain group membership by sending heartbeats to the GroupCoordinator. Heartbeat handling changed after Kafka 0.10.1, where a dedicated heartbeat thread was introduced.
3. Rebalance Strategy
The assignment strategy determines which consumer receives which partition. Kafka offers three built‑in strategies selectable via the partition.assignment.strategy property: range: partitions of a topic are ordered and divided into contiguous ranges assigned to consumers. round‑robin: partitions are distributed in a round‑robin fashion across consumers. sticky (introduced in 0.11.0.0): preserves previous assignments as much as possible to avoid data skew.
4. Rebalance Generation
Each rebalance is identified by a generation number that starts at 0 and increments after every successful rebalance. The generation helps prevent stale offset commits; if a consumer tries to commit an offset with an old generation, the commit is rejected with an ILLEGAL_GENERATION error.
5. Rebalance Protocol
The rebalance protocol consists of several request types: JoinGroup: consumer requests to join a group. SyncGroup: the group leader synchronizes the assignment to all members. Heartbeat: periodic liveness check. LeaveGroup: consumer notifies the coordinator it is leaving. DescribeGroup: administrative query of group state.
During rebalance, the coordinator processes JoinGroup and SyncGroup requests, distributes the assignment, and members receive their partition list.
6. Rebalance Process
The process includes:
Identify the coordinator broker by hashing the group ID and locating the leader of the __consumer_offsets partition.
Consumers send JoinGroup requests; the coordinator selects a leader.
The leader computes the assignment based on the chosen strategy and sends a SyncGroup request to the coordinator.
The coordinator returns the assignment to each consumer, which then seeks to the appropriate offsets.
Assignments are performed on the consumer side to allow flexibility, such as rack‑aware placement, without requiring a broker restart.
7. Rebalance Listener
When using a consumer group, a ConsumerRebalanceListener can be implemented to persist offsets to external storage and perform audit actions before and after rebalance. Automatic offset commits are handled by the consumer if enabled, but custom listeners are needed for manual or external offset management.
Code Example: Consumer with External Offset Storage
public class ConsumerOffsetSaveDB {
private final static Logger logger = LoggerFactory.getLogger("kafka-consumer");
@Test
public void testConsumerOffsetSaveDB() {
Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka-master:9092,kafka-slave1:9093,kafka-slave2:9094");
props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 10);
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");
props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "30000");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
String groupId = "test_group_offset_db11";
props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
String topic = "testTopic";
Map<TopicPartition, OffsetAndMetadata> offsetAndMetadataMap = new HashMap<>();
consumer.subscribe(Collections.singletonList(topic), new SaveOffsetsRebalance(consumer, offsetAndMetadataMap, groupId));
consumer.poll(0);
OffsetService offsetService = new OffsetService();
for (TopicPartition partition : consumer.assignment()) {
Offset offset = offsetService.getOffset(groupId, partition.topic(), partition.partition());
if (offset != null && offset.getOffset() != null) {
consumer.seek(partition, offset.getOffset());
} else {
logger.info("Initial DB has no offset");
}
}
try {
while (true) {
ConsumerRecords<String, String> records = consumer.poll(1000);
for (ConsumerRecord<String, String> record : records) {
logger.info("key:{},value:{},offset:{}", record.key(), record.value(), record.offset());
offsetAndMetadataMap.put(new TopicPartition(record.topic(), record.partition()),
new OffsetAndMetadata(record.offset() + 1, ""));
offsetService.insertOffset(groupId, record.topic(), record.partition(), record.offset() + 1);
}
}
} catch (WakeupException e) {
// ignore
} catch (Exception e) {
logger.error(e.getMessage(), e);
} finally {
consumer.close();
}
}
}Rebalance Listener Implementation
public class SaveOffsetsRebalance implements ConsumerRebalanceListener {
private Logger logger = LoggerFactory.getLogger(SaveOffsetsRebalance.class);
private Consumer consumer;
private Map<TopicPartition, OffsetAndMetadata> map;
private String groupId;
OffsetService offsetService = new OffsetService();
public SaveOffsetsRebalance(Consumer consumer, Map<TopicPartition, OffsetAndMetadata> map, String groupId) {
this.consumer = consumer;
this.map = map;
this.groupId = groupId;
}
@Override
public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
logger.info("rebalance before trigger, partitions count: {}", partitions.size());
for (TopicPartition tp : partitions) {
long position = consumer.position(tp);
OffsetAndMetadata oam = map.get(tp);
if (oam != null) {
logger.info("position:{}, offset:{}", position, oam.offset());
} else {
logger.info("position:{}, offset:null", position);
}
}
}
@Override
public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
logger.info("rebalance after trigger");
for (TopicPartition tp : partitions) {
Offset offset = offsetService.getOffset(this.groupId, tp.topic(), tp.partition());
if (offset != null && offset.getOffset() != null) {
consumer.seek(tp, offset.getOffset());
}
}
}
}Database Schema
DROP TABLE IF EXISTS `t_offset`;
CREATE TABLE `t_offset` (
`id` bigint(20) NOT NULL AUTO_INCREMENT COMMENT 'primary key',
`group_id` varchar(50) NOT NULL,
`topic` varchar(50) NOT NULL COMMENT 'topic',
`partition` int(11) NOT NULL,
`offset` bigint(20) DEFAULT NULL,
PRIMARY KEY (`id`),
UNIQUE KEY `unique_gtp` (`group_id`,`topic`,`partition`) USING BTREE
) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8 COMMENT='offset storage';Dependencies
compile group: 'commons-dbutils', name: 'commons-dbutils', version: '1.7'
compile group: 'mysql', name: 'mysql-connector-java', version: '5.1.30'
compile group: 'org.aeonbits.owner', name: 'owner', version: '1.0.9'Configuration (config.properties)
config.mysql.url=jdbc:mysql://jannal.mac.com:3306/test
config.mysql.username=root
config.mysql.password=root
config.mysql.driverClass=com.mysql.jdbc.DriverUtility Classes
public class JdbcUtils {
public static Connection getConnection(String driverClass, String url, String username, String password) throws SQLException, ClassNotFoundException {
Class.forName(driverClass);
return DriverManager.getConnection(url, username, password);
}
}
@Sources({"classpath:config.properties"})
public interface MysqlJdbcConfig extends Reloadable {
@Key("config.mysql.url") String url();
@Key("config.mysql.username") String username();
@Key("config.mysql.password") String password();
@Key("config.mysql.driverClass") String driverClass();
public static final MysqlJdbcConfig instance = ConfigFactory.create(MysqlJdbcConfig.class);
}The article concludes with a reminder to like, share, and bookmark if the content was helpful.
Signed-in readers can open the original source through BestHub's protected redirect.
This article has been distilled and summarized from source material, then republished for learning and reference. If you believe it infringes your rights, please contactand we will review it promptly.
Big Data Technology & Architecture
Wang Zhiwu, a big data expert, dedicated to sharing big data technology.
How this landed with the community
Was this worth your time?
0 Comments
Thoughtful readers leave field notes, pushback, and hard-won operational detail here.
