Big Data 26 min read

Deep Dive into Kafka Consumer JoinGroupRequest Flow

This article walks through the complete Kafka consumer JoinGroupRequest lifecycle, detailing how the client builds and sends the request, how the group coordinator selects the coordinator node, processes unknown and known members, elects a leader, chooses a partition assignment protocol, and transitions group states.

ShiZhen AI
ShiZhen AI
ShiZhen AI
Deep Dive into Kafka Consumer JoinGroupRequest Flow

1. Initiating JoinGroup request

When a consumer starts it builds a JoinGroupRequest in AbstractCoordinator#sendJoinGroupRequest. It checks that a coordinator is known, fills fields such as groupId, sessionTimeoutMs, memberId, optional groupInstanceId, protocolType, the list of protocols, and rebalanceTimeoutMs. The request timeout is the larger of the client default timeout and rebalanceTimeoutMs + JOIN_GROUP_TIMEOUT_LAPSE. The request is sent to the coordinator node returned by the previous FindCoordinator lookup.

RequestFuture<ByteBuffer> sendJoinGroupRequest() {
    if (coordinatorUnknown())
        return RequestFuture.coordinatorNotAvailable();
    JoinGroupRequest.Builder requestBuilder = new JoinGroupRequest.Builder(
        new JoinGroupRequestData()
            .setGroupId(rebalanceConfig.groupId)
            .setSessionTimeoutMs(rebalanceConfig.sessionTimeoutMs)
            .setMemberId(generation.memberId)
            .setGroupInstanceId(rebalanceConfig.groupInstanceId.orElse(null))
            .setProtocolType(protocolType())
            .setProtocols(metadata())
            .setRebalanceTimeoutMs(rebalanceConfig.rebalanceTimeoutMs));
    int joinGroupTimeoutMs = Math.max(client.defaultRequestTimeoutMs(),
        rebalanceConfig.rebalanceTimeoutMs + JOIN_GROUP_TIMEOUT_LAPSE);
    return client.send(coordinator, requestBuilder, joinGroupTimeoutMs)
                 .compose(new JoinGroupResponseHandler(generation));
}

1.2 Request parameters

The request header contains header_version, api_key (11 for JoinGroup) and api_version (current schema version 7). The body includes group_id, session_timeout_ms, member_id, optional group_instance_id, protocol_type, rebalance_timeout_ms and a list of protocols (name + serialized metadata).

1.3 Coordinator selection

The coordinator is the broker that leads the partition of __consumer_offsets whose partition number is hash(groupId) % partitionCount. The client obtains the node via a FindCoordinatorRequest and then sends the JoinGroup request to that node.

2. Coordinator handling

The entry point is KafkaApi#handleJoinGroupRequest. The coordinator validates configuration, checks that session.timeout.ms lies between group.min.session.timeout.ms and group.max.session.timeout.ms, and decides between doUnknownJoinGroup (empty memberId) and doJoinGroup (known memberId).

2.1 doUnknownJoinGroup

If memberId is empty the coordinator generates a new member id. For dynamic members the format is clientId-UUID; for static members it is groupInstanceId-UUID. It returns error MEMBER_ID_REQUIRED together with the generated id, causing the client to retry.

private def doUnknownJoinGroup(group: GroupMetadata,
                               groupInstanceId: Option[String],
                               requireKnownMemberId: Boolean,
                               clientId: String,
                               clientHost: String,
                               rebalanceTimeoutMs: Int,
                               sessionTimeoutMs: Int,
                               protocolType: String,
                               protocols: List[(String, Array[Byte])],
                               responseCallback: JoinCallback): Unit = {
    // generate newMemberId …
    if (requireKnownMemberId) {
        responseCallback(JoinGroupResult(UNKNOWN_MEMBER_ID, Errors.MEMBER_ID_REQUIRED))
    } else {
        // add to pending members …
    }
}

2.2 doJoinGroup

When the client retries with the generated member id the coordinator creates a MemberMetadata object, adds it to the group, updates heartbeat timers and may transition the group state to PreparingRebalance. For static members the static‑member cache is updated.

private def doJoinGroup(group: GroupMetadata,
                        memberId: String,
                        groupInstanceId: Option[String],
                        clientId: String,
                        clientHost: String,
                        rebalanceTimeoutMs: Int,
                        sessionTimeoutMs: Int,
                        protocolType: String,
                        protocols: List[(String, Array[Byte])],
                        responseCallback: JoinCallback): Unit = {
    val member = new MemberMetadata(memberId, group.groupId, groupInstanceId,
        clientId, clientHost, rebalanceTimeoutMs, sessionTimeoutMs,
        protocolType, protocols)
    group.add(member, responseCallback)
    // schedule heartbeat, maybePrepareRebalance …
}

onCompleteJoin

After members are added the coordinator attempts to elect a leader, increments the generation id, stores group metadata and, if the group is empty, writes an empty record to __consumer_offsets. Otherwise it builds a JoinGroupResult for each member; the leader receives the full member list.

3. Client processing of the response

JoinGroupResponseHandler

checks the error code. On success it updates the client state to COMPLETING_REBALANCE, starts the heartbeat thread, records the new generation and then:

If the client is the leader it invokes onJoinLeader to compute a new partition assignment and sends a SyncGroupRequest.

If the client is a follower it directly sends a SyncGroupRequest via onJoinFollower.

4. Protocol selection and partition assignment

Each consumer configures partition.assignment.strategy (e.g. RangeAssignor, RoundRobinAssignor). The coordinator selects the first strategy that is supported by all members and appears earliest in each member’s configuration list. If a new member’s supported protocols do not intersect with the existing group, the coordinator returns INCONSISTENT_GROUP_PROTOCOL.

partition.assignment.strategy=org.apache.kafka.clients.consumer.RoundRobinAssignor,org.apache.kafka.clients.consumer.RangeAssignor

Example selection:

consumer‑0 supports roundrobin, range; consumer‑1 supports range → chosen range.

consumer‑0 supports roundrobin, range, sticky; consumer‑1 supports range, sticky → chosen range.

5. State transition summary

The client sends the first JoinGroup; the coordinator returns a generated member id.

The client sends a second JoinGroup with that member id.

The coordinator adds the member, moves the group to PreparingRebalance, and creates a new generation.

Depending on the member set the group state becomes CompletingRebalance or Empty.

The coordinator returns a JoinGroupResult (the leader receives the full member list).

The client receives the result, starts heartbeats, and sends a SyncGroupRequest (the leader includes the partition assignment).

6. Q&A

Who is the group coordinator?

Each consumer group maps to a partition of __consumer_offsets . The broker that leads that partition is the group coordinator.

What is a leader member and how is it elected?

The coordinator picks one member as the leader; the leader is responsible for computing the partition assignment and sending it to the coordinator.
KafkaConsumerProtocol SelectionPartition AssignmentrebalanceGroup CoordinationJoinGroupRequest
ShiZhen AI
Written by

ShiZhen AI

Tech blogger with over 10 years of experience at leading tech firms, AI efficiency and delivery expert focusing on AI productivity. Covers tech gadgets, AI-driven efficiency, and leisure— AI leisure community. 🛰 szzdzhp001

0 followers
Reader feedback

How this landed with the community

Sign in to like

Rate this article

Was this worth your time?

Sign in to rate
Discussion

0 Comments

Thoughtful readers leave field notes, pushback, and hard-won operational detail here.