How Kafka Consumer Clients Send and Manage Heartbeat Requests
The article walks through the Kafka consumer heartbeat lifecycle, detailing how the HeartbeatThread is started, paused, and used to send heartbeat and LeaveGroup requests, how the GroupCoordinator validates and processes these requests, the client’s response handling, and the resulting state transitions illustrated with diagrams.
1. Initiating Heartbeat Requests
When a consumer client starts, it creates and starts a HeartbeatThread named kafka-coordinator-heartbeat-thread|<group.id> (e.g., kafka-coordinator-heartbeat-thread|consumer0). The thread’s enabled flag is initially false, so no heartbeat is sent until the consumer joins a group.
1.1 Starting the Heartbeat Thread
After a successful JoinGroupRequest, the client sets enabled=true (see JoinGroupResponseHandler#handle), which transitions the client state to COMPLETING_REBALANCE and starts the heartbeat monitoring.
1.2 Pausing the Heartbeat Thread
If the client state becomes UNJOINED or PREPARING_REBALANCE, or if the heartbeat thread encounters an exception, the thread is temporarily stopped because these states do not require periodic coordinator checks.
1.3 Sending Heartbeat Requests
The relevant configuration is heartbeat.interval.ms, which must be lower than session.timeout.ms (default 3000 ms) and is usually set to about one‑third of the session timeout.
synchronized RequestFuture<Void> sendHeartbeatRequest() {
log.debug("Sending Heartbeat request with generation {} and member id {} to coordinator {}",
generation.generationId, generation.memberId, coordinator);
HeartbeatRequest.Builder requestBuilder = new HeartbeatRequest.Builder(
new HeartbeatRequestData()
.setGroupId(rebalanceConfig.groupId)
.setMemberId(this.generation.memberId)
.setGroupInstanceId(this.rebalanceConfig.groupInstanceId.orElse(null))
.setGenerationId(this.generation.generationId));
return client.send(coordinator, requestBuilder)
.compose(new HeartbeatResponseHandler(generation));
}1.4 Sending LeaveGroup Requests
If the client detects that session.timeout.ms has been exceeded or that max.poll.interval.ms (default 300000 ms) has elapsed without a poll, it triggers a LeaveGroupRequest. The request is sent only by dynamic members (post‑2.3); static members rely solely on session timeout.
public synchronized RequestFuture<Void> maybeLeaveGroup(String leaveReason) {
RequestFuture<Void> future = null;
if (isDynamicMember() && !coordinatorUnknown() &&
state != MemberState.UNJOINED && generation.hasMemberId()) {
log.info("Member {} sending LeaveGroup request to coordinator {} due to {}",
generation.memberId, coordinator, leaveReason);
LeaveGroupRequest.Builder request = new LeaveGroupRequest.Builder(
rebalanceConfig.groupId,
Collections.singletonList(new MemberIdentity().setMemberId(generation.memberId))
);
future = client.send(coordinator, request).compose(new LeaveGroupResponseHandler(generation));
client.pollNoWakeup();
}
resetGenerationOnLeaveGroup();
return future;
}Only dynamic members send LeaveGroupRequest after Kafka 2.3.
The client state is reset to UNJOINED after leaving.
2. GroupCoordinator Processing
The coordinator validates the request, checks group existence, and handles various states.
def handleHeartbeat(groupId: String,
memberId: String,
groupInstanceId: Option[String],
generationId: Int,
responseCallback: Errors => Unit): Unit = {
// Validate group status
validateGroupStatus(groupId, ApiKeys.HEARTBEAT).foreach { error =>
if (error == Errors.COORDINATOR_LOAD_IN_PROGRESS) responseCallback(Errors.NONE)
else responseCallback(error)
return
}
groupManager.getGroup(groupId) match {
case None => responseCallback(Errors.UNKNOWN_MEMBER_ID)
case Some(group) => group.inLock {
if (group.is(Dead)) responseCallback(Errors.COORDINATOR_NOT_AVAILABLE)
else if (group.isStaticMemberFenced(memberId, groupInstanceId, "heartbeat"))
responseCallback(Errors.FENCED_INSTANCE_ID)
else if (!group.has(memberId)) responseCallback(Errors.UNKNOWN_MEMBER_ID)
else if (generationId != group.generationId) responseCallback(Errors.ILLEGAL_GENERATION)
else {
group.currentState match {
case Empty => responseCallback(Errors.UNKNOWN_MEMBER_ID)
case CompletingRebalance | Stable =>
val member = group.get(memberId)
completeAndScheduleNextHeartbeatExpiration(group, member)
responseCallback(Errors.NONE)
case PreparingRebalance =>
val member = group.get(memberId)
completeAndScheduleNextHeartbeatExpiration(group, member)
responseCallback(Errors.REBALANCE_IN_PROGRESS)
case Dead => throw new IllegalStateException(s"Reached unexpected condition for Dead group $groupId")
}
}
}
}
}Key validation steps:
Verify the member exists in the coordinator.
If the group is Dead, the client must find a new coordinator and re‑join.
If generation IDs differ, the client must re‑join.
When the group state is PreparingRebalance and the client is STABLE, it triggers a new JoinGroup.
For CompletingRebalance or Stable, the coordinator schedules the next heartbeat expiration based on session.timeout.ms.
2.1 GroupCoordinator Timeout Task
If no heartbeat is received within session.timeout.ms, the coordinator runs its timeout task:
def onExpireHeartbeat(group: GroupMetadata, memberId: String, isPending: Boolean): Unit = {
group.inLock {
if (group.is(Dead)) {
info(s"Received notification of heartbeat expiration for member $memberId after group ${group.groupId} had already been unloaded or deleted.")
} else if (isPending) {
info(s"Pending member $memberId in group ${group.groupId} has been removed after session timeout expiration.")
removePendingMemberAndUpdateGroup(group, memberId)
} else if (!group.has(memberId)) {
debug(s"Member $memberId has already been removed from the group.")
} else {
val member = group.get(memberId)
if (!member.hasSatisfiedHeartbeat) {
info(s"Member ${member.memberId} in group ${group.groupId} has failed, removing it from the group")
removeMemberAndUpdateGroup(group, member, s"removing member ${member.memberId} on heartbeat expiration")
}
}
}
}If the group is Dead, nothing is done.
If the member is Pending, it is removed, forcing a fresh JoinGroup.
Otherwise, the member is removed and the group metadata is updated; if the group state is Stable or CompletingRebalance, the coordinator triggers the prepareRebalance flow.
3. Client Handling of Responses
The HeartbeatResponseHandler#handle processes the response error codes: COORDINATOR_NOT_AVAILABLE or NOT_COORDINATOR: client searches for a new coordinator. REBALANCE_IN_PROGRESS: if the client is STABLE, it re‑issues a JoinGroupRequest. ILLEGAL_GENERATION, UNKNOWN_MEMBER_ID, FENCED_INSTANCE_ID: client sets its state to UNJOINED and re‑joins the group.
4. Heartbeat Thread State Diagram
The consumer client member state transition diagram (image) shows the possible states.
The heartbeat thread runs only in the COMPLETING_REBALANCE and STABLE states.
ShiZhen AI
Tech blogger with over 10 years of experience at leading tech firms, AI efficiency and delivery expert focusing on AI productivity. Covers tech gadgets, AI-driven efficiency, and leisure— AI leisure community. 🛰 szzdzhp001
How this landed with the community
Was this worth your time?
0 Comments
Thoughtful readers leave field notes, pushback, and hard-won operational detail here.
