Inside Kafka Consumer SyncGroupRequest: How Rebalance Works
The article walks through the complete lifecycle of a Kafka consumer SyncGroupRequest, detailing request headers and bodies, coordinator selection, state handling on the GroupCoordinator, metadata persistence, and the client‑side response processing that transitions members to a stable state.
1. Request Parameters
Every SyncGroupRequest consists of a header and a body. The header includes header_version (0 or 1, resolved via ApiMessageType#requestHeaderVersion), api_key (the numeric ID for SyncGroupRequest), api_version (the schema version, currently 5 for SyncGroupRequest), client_id , and a monotonically increasing correlation_id .
The body carries the fields required for group coordination:
group_id – the consumer group identifier
member_id – assigned by the coordinator (empty on the first request)
group_instance_id – optional static member identifier (Kafka 2.3+)
generation_id – the current generation of the group
protocol_type – usually consumer protocol_name – the partition‑assignment strategy (e.g., range)
assignments – the list of SyncGroupRequestAssignment objects; only the leader member populates this field.
Leader request construction (simplified):
private RequestFuture<ByteBuffer> onJoinLeader(JoinGroupResponse joinResponse) {
try {
Map<String, ByteBuffer> groupAssignment = performAssignment(
joinResponse.data().leader(),
joinResponse.data().protocolName(),
joinResponse.data().members());
List<SyncGroupRequestData.SyncGroupRequestAssignment> groupAssignmentList = new ArrayList<>();
for (Map.Entry<String, ByteBuffer> assignment : groupAssignment.entrySet()) {
groupAssignmentList.add(new SyncGroupRequestData.SyncGroupRequestAssignment()
.setMemberId(assignment.getKey())
.setAssignment(Utils.toArray(assignment.getValue())));
}
SyncGroupRequest.Builder requestBuilder = new SyncGroupRequest.Builder(
new SyncGroupRequestData()
.setGroupId(rebalanceConfig.groupId)
.setMemberId(generation.memberId)
.setProtocolType(protocolType())
.setProtocolName(generation.protocolName)
.setGroupInstanceId(this.rebalanceConfig.groupInstanceId.orElse(null))
.setGenerationId(generation.generationId)
.setAssignments(groupAssignmentList));
log.debug("Sending leader SyncGroup to coordinator {} at generation {}: {}",
this.coordinator, this.generation, requestBuilder);
return sendSyncGroupRequest(requestBuilder);
} catch (RuntimeException e) {
return RequestFuture.failure(e);
}
}Follower request (empty assignments):
private RequestFuture<ByteBuffer> onJoinFollower() {
SyncGroupRequest.Builder requestBuilder = new SyncGroupRequest.Builder(
new SyncGroupRequestData()
.setGroupId(rebalanceConfig.groupId)
.setMemberId(generation.memberId)
.setProtocolType(protocolType())
.setProtocolName(generation.protocolName)
.setGroupInstanceId(this.rebalanceConfig.groupInstanceId.orElse(null))
.setGenerationId(generation.generationId)
.setAssignments(Collections.emptyList()));
log.debug("Sending follower SyncGroup to coordinator {} at generation {}: {}",
this.coordinator, this.generation, requestBuilder);
return sendSyncGroupRequest(requestBuilder);
}2. Initiating the Request
After a successful JoinGroup, the client immediately sends a SyncGroup request. The target node is not the least‑loaded broker but the specific coordinator that owns the partition calculated as hash(group.id) % __consumer_offsets.partitionCount. The leader of that partition becomes the coordinator. If the leader is unavailable, the client receives the error The coordinator is not available.
The request timing is tied to the JoinGroupResponseHandler callbacks ( onJoinLeader or onJoinFollower), ensuring that every member participates in the rebalance as soon as the join phase finishes.
3. Coordinator Processing
The entry point on the broker side is KafkaApi#handleSyncGroupRequest, which forwards to GroupCoordinator#handleSyncGroup:
def handleSyncGroup(groupId: String,
generation: Int,
memberId: String,
protocolType: Option[String],
protocolName: Option[String],
groupInstanceId: Option[String],
groupAssignment: Map[String, Array[Byte]],
responseCallback: SyncCallback): Unit = {
validateGroupStatus(groupId, ApiKeys.SYNC_GROUP) match {
case Some(error) if error == Errors.COORDINATOR_LOAD_IN_PROGRESS =>
responseCallback(SyncGroupResult(Errors.REBALANCE_IN_PROGRESS))
case Some(error) =>
responseCallback(SyncGroupResult(error))
case None =>
groupManager.getGroup(groupId) match {
case None => responseCallback(SyncGroupResult(Errors.UNKNOWN_MEMBER_ID))
case Some(group) => doSyncGroup(group, generation, memberId, protocolType, protocolName,
groupInstanceId, groupAssignment, responseCallback)
}
}
}The doSyncGroup routine (code omitted) performs a series of state checks:
If the group state is Empty , it returns UNKNOWN_MEMBER_ID.
If the state is PreparingRebalance , it returns REBALANCE_IN_PROGRESS because the join phase has not completed.
If the state is Stable , the coordinator simply returns the current assignment.
If the state is CompletingRebalance and the request comes from the leader, the coordinator stores the group metadata, propagates the assignments to all members, and transitions the group to Stable . Errors during storage trigger a reset of all assignments and a subsequent rebalance.
Storing group metadata involves locating the __consumer_offsets partition derived from group.id, writing a record with the appropriate compression codec (default NONE), and respecting offsets.commit.timeout.ms (5000 ms) and offsets.commit.required.acks (‑1). On write failure the coordinator resets assignments, returns UNKNOWN_MEMBER_ID, and invokes maybePrepareRebalance to start a new rebalance cycle.
4. Client‑Side Response Handling
The response is processed by SyncGroupResponseHandler:
private class SyncGroupResponseHandler extends CoordinatorResponseHandler<SyncGroupResponse, ByteBuffer> {
private SyncGroupResponseHandler(final Generation generation) { super(generation); }
@Override
public void handle(SyncGroupResponse syncResponse, RequestFuture<ByteBuffer> future) {
Errors error = syncResponse.error();
if (error == Errors.NONE) {
if (isProtocolTypeInconsistent(syncResponse.data.protocolType())) {
// omitted
} else {
synchronized (AbstractCoordinator.this) {
if (!generation.equals(Generation.NO_GENERATION) && state == MemberState.COMPLETING_REBALANCE) {
if (protocolNameInconsistent) {
// omitted
} else {
state = MemberState.STABLE;
rejoinNeeded = false;
lastRebalanceEndMs = time.milliseconds();
lastRebalanceStartMs = -1L;
future.complete(ByteBuffer.wrap(syncResponse.data.assignment()));
}
} else {
requestRejoin();
if (error == Errors.GROUP_AUTHORIZATION_FAILED) {
// omitted
}
}
}
}
} else {
// other error handling, possibly requestRejoin()
}
}
}When the member state moves from CompletingRebalance to Stable , the client updates its metadata, validates the protocol, refreshes regex‑based topic subscriptions if needed, invokes the assignor’s onAssignment callback, and caches the new assignment.
Final client callback:
protected void onJoinComplete(int generation,
String memberId,
String assignmentStrategy,
ByteBuffer assignmentBuffer) {
// update metadata, perform simple checks, handle regex subscriptions,
// invoke assignor.onAssignment, cache the assignment
}5. Summary
The article provides a step‑by‑step walkthrough of the Kafka consumer SyncGroupRequest flow: constructing request headers and bodies, selecting the correct coordinator, handling the request on the broker with precise state transitions, persisting group metadata, and finally processing the response on the client to achieve a stable assignment. The included code snippets and diagrams illustrate each stage of the rebalance lifecycle.
ShiZhen AI
Tech blogger with over 10 years of experience at leading tech firms, AI efficiency and delivery expert focusing on AI productivity. Covers tech gadgets, AI-driven efficiency, and leisure— AI leisure community. 🛰 szzdzhp001
How this landed with the community
Was this worth your time?
0 Comments
Thoughtful readers leave field notes, pushback, and hard-won operational detail here.
