Deep Dive into Kafka Consumer JoinGroupRequest Flow
This article walks through the complete Kafka consumer JoinGroupRequest lifecycle, detailing how the client builds and sends the request, how the group coordinator selects the coordinator node, processes unknown and known members, elects a leader, chooses a partition assignment protocol, and transitions group states.
1. Initiating JoinGroup request
When a consumer starts it builds a JoinGroupRequest in AbstractCoordinator#sendJoinGroupRequest. It checks that a coordinator is known, fills fields such as groupId, sessionTimeoutMs, memberId, optional groupInstanceId, protocolType, the list of protocols, and rebalanceTimeoutMs. The request timeout is the larger of the client default timeout and rebalanceTimeoutMs + JOIN_GROUP_TIMEOUT_LAPSE. The request is sent to the coordinator node returned by the previous FindCoordinator lookup.
RequestFuture<ByteBuffer> sendJoinGroupRequest() {
if (coordinatorUnknown())
return RequestFuture.coordinatorNotAvailable();
JoinGroupRequest.Builder requestBuilder = new JoinGroupRequest.Builder(
new JoinGroupRequestData()
.setGroupId(rebalanceConfig.groupId)
.setSessionTimeoutMs(rebalanceConfig.sessionTimeoutMs)
.setMemberId(generation.memberId)
.setGroupInstanceId(rebalanceConfig.groupInstanceId.orElse(null))
.setProtocolType(protocolType())
.setProtocols(metadata())
.setRebalanceTimeoutMs(rebalanceConfig.rebalanceTimeoutMs));
int joinGroupTimeoutMs = Math.max(client.defaultRequestTimeoutMs(),
rebalanceConfig.rebalanceTimeoutMs + JOIN_GROUP_TIMEOUT_LAPSE);
return client.send(coordinator, requestBuilder, joinGroupTimeoutMs)
.compose(new JoinGroupResponseHandler(generation));
}1.2 Request parameters
The request header contains header_version, api_key (11 for JoinGroup) and api_version (current schema version 7). The body includes group_id, session_timeout_ms, member_id, optional group_instance_id, protocol_type, rebalance_timeout_ms and a list of protocols (name + serialized metadata).
1.3 Coordinator selection
The coordinator is the broker that leads the partition of __consumer_offsets whose partition number is hash(groupId) % partitionCount. The client obtains the node via a FindCoordinatorRequest and then sends the JoinGroup request to that node.
2. Coordinator handling
The entry point is KafkaApi#handleJoinGroupRequest. The coordinator validates configuration, checks that session.timeout.ms lies between group.min.session.timeout.ms and group.max.session.timeout.ms, and decides between doUnknownJoinGroup (empty memberId) and doJoinGroup (known memberId).
2.1 doUnknownJoinGroup
If memberId is empty the coordinator generates a new member id. For dynamic members the format is clientId-UUID; for static members it is groupInstanceId-UUID. It returns error MEMBER_ID_REQUIRED together with the generated id, causing the client to retry.
private def doUnknownJoinGroup(group: GroupMetadata,
groupInstanceId: Option[String],
requireKnownMemberId: Boolean,
clientId: String,
clientHost: String,
rebalanceTimeoutMs: Int,
sessionTimeoutMs: Int,
protocolType: String,
protocols: List[(String, Array[Byte])],
responseCallback: JoinCallback): Unit = {
// generate newMemberId …
if (requireKnownMemberId) {
responseCallback(JoinGroupResult(UNKNOWN_MEMBER_ID, Errors.MEMBER_ID_REQUIRED))
} else {
// add to pending members …
}
}2.2 doJoinGroup
When the client retries with the generated member id the coordinator creates a MemberMetadata object, adds it to the group, updates heartbeat timers and may transition the group state to PreparingRebalance. For static members the static‑member cache is updated.
private def doJoinGroup(group: GroupMetadata,
memberId: String,
groupInstanceId: Option[String],
clientId: String,
clientHost: String,
rebalanceTimeoutMs: Int,
sessionTimeoutMs: Int,
protocolType: String,
protocols: List[(String, Array[Byte])],
responseCallback: JoinCallback): Unit = {
val member = new MemberMetadata(memberId, group.groupId, groupInstanceId,
clientId, clientHost, rebalanceTimeoutMs, sessionTimeoutMs,
protocolType, protocols)
group.add(member, responseCallback)
// schedule heartbeat, maybePrepareRebalance …
}onCompleteJoin
After members are added the coordinator attempts to elect a leader, increments the generation id, stores group metadata and, if the group is empty, writes an empty record to __consumer_offsets. Otherwise it builds a JoinGroupResult for each member; the leader receives the full member list.
3. Client processing of the response
JoinGroupResponseHandlerchecks the error code. On success it updates the client state to COMPLETING_REBALANCE, starts the heartbeat thread, records the new generation and then:
If the client is the leader it invokes onJoinLeader to compute a new partition assignment and sends a SyncGroupRequest.
If the client is a follower it directly sends a SyncGroupRequest via onJoinFollower.
4. Protocol selection and partition assignment
Each consumer configures partition.assignment.strategy (e.g. RangeAssignor, RoundRobinAssignor). The coordinator selects the first strategy that is supported by all members and appears earliest in each member’s configuration list. If a new member’s supported protocols do not intersect with the existing group, the coordinator returns INCONSISTENT_GROUP_PROTOCOL.
partition.assignment.strategy=org.apache.kafka.clients.consumer.RoundRobinAssignor,org.apache.kafka.clients.consumer.RangeAssignorExample selection:
consumer‑0 supports roundrobin, range; consumer‑1 supports range → chosen range.
consumer‑0 supports roundrobin, range, sticky; consumer‑1 supports range, sticky → chosen range.
5. State transition summary
The client sends the first JoinGroup; the coordinator returns a generated member id.
The client sends a second JoinGroup with that member id.
The coordinator adds the member, moves the group to PreparingRebalance, and creates a new generation.
Depending on the member set the group state becomes CompletingRebalance or Empty.
The coordinator returns a JoinGroupResult (the leader receives the full member list).
The client receives the result, starts heartbeats, and sends a SyncGroupRequest (the leader includes the partition assignment).
6. Q&A
Who is the group coordinator?
Each consumer group maps to a partition of __consumer_offsets . The broker that leads that partition is the group coordinator.
What is a leader member and how is it elected?
The coordinator picks one member as the leader; the leader is responsible for computing the partition assignment and sending it to the coordinator.
ShiZhen AI
Tech blogger with over 10 years of experience at leading tech firms, AI efficiency and delivery expert focusing on AI productivity. Covers tech gadgets, AI-driven efficiency, and leisure— AI leisure community. 🛰 szzdzhp001
How this landed with the community
Was this worth your time?
0 Comments
Thoughtful readers leave field notes, pushback, and hard-won operational detail here.
