Why Pulsar Partition Metrics Vanish After Broker Restart? Root Cause & Fix
During a gray‑scale upgrade, a Vivo Pulsar deployment using the KoP protocol experienced a sudden drop in partition consumption rates because the consumer group name vanished from Zookeeper nodes, prompting an in‑depth analysis of the root cause, replication steps, and three progressively refined remediation strategies.
Problem Background
During a version gray‑scale upgrade, a topic using the KoP protocol showed a significant drop in consumption rate. The metric kop_server_MESSAGE_OUT and kop_server_BYTES_OUT were reported, but the group tag was empty, causing partition‑level metrics to disappear.
Data sample:
kop_server_MESSAGE_OUT{group="",partition="0",tenant="kop",topic="persistent://kop-tenant/kop-ns/service-raw-stats"} 3
kop_server_BYTES_OUT{group="",partition="0",tenant="kop",topic="persistent://kop-tenant/kop-ns/service-raw-stats"} 188Problem Analysis
1. Locate the Problem Code
The group name is fetched from requestHandler.currentConnectedGroup. If the map does not contain the host, the code reads the Zookeeper path /client_group_id/host-clientId. If the ZK node is missing, an empty string is returned.
private void handleEntries(final List<Entry> entries, final TopicPartition topicPartition, final FetchRequest.PartitionData partitionData, final KafkaTopicConsumerManager tcm, final ManagedCursor cursor, final AtomicLong cursorOffset, final boolean readCommitted) {
// Get consumer group name
CompletableFuture<String> groupNameFuture = requestHandler
.getCurrentConnectedGroup()
.computeIfAbsent(clientHost, clientHost -> {
CompletableFuture<String> future = new CompletableFuture<>();
String groupIdPath = GroupIdUtils.groupIdPathFormat(clientHost, header.clientId());
requestHandler.getMetadataStore()
.get(requestHandler.getGroupIdStoredPath() + groupIdPath)
.thenAccept(getResultOpt -> {
if (getResultOpt.isPresent()) {
GetResult getResult = getResultOpt.get();
future.complete(new String(getResult.getValue() == null ? new byte[0] : getResult.getValue(), StandardCharsets.UTF_8));
} else {
// ZK node not found, group is empty
future.complete("");
}
})
.exceptionally(ex -> { future.completeExceptionally(ex); return null; });
return future;
});
// ... later use groupName to update stats
}The missing ZK node /client_group_id/xxx is the root cause.
2. Why the ZK Node Might Be Missing
Two possibilities: the node was never written, or it was deleted. The write occurs in handleFindCoordinatorRequest:
@Override
protected void handleFindCoordinatorRequest(KafkaHeaderAndRequest findCoordinator, CompletableFuture<AbstractResponse> resultFuture) {
// Store group name to metadata store for current client, use to collect consumer metrics.
storeGroupId(groupId, groupIdPath).whenComplete((stat, ex) -> {
if (ex != null) {
log.warn("Store groupId failed, the groupId might already stored.", ex);
}
// continue processing
});
}Deletion happens when the broker connection closes:
protected void close() {
if (isActive.getAndSet(false)) {
currentConnectedClientId.forEach(clientId -> {
String path = groupIdStoredPath + GroupIdUtils.groupIdPathFormat(clientHost, clientId);
// Delete ZK node /client_group_id/xxx
metadataStore.delete(path, Optional.empty())
.whenComplete((__, ex) -> {
if (ex != null) {
log.error("Delete groupId failed. Path: [{}]", path, ex);
}
});
});
}
}Thus, when a broker restarts, the ZK node is removed, and other brokers cannot retrieve the group name.
Solution
Approach 1
Move the write of the group mapping from FindCoordinator to JoinGroup. However, after a client disconnects it does not reconnect to rewrite the node, so this approach fails.
Approach 2
Keep writing in FindCoordinator but change deletion to be triggered by the GroupCoordinator when a consumer leaves or expires. Example deletion call:
private void removeMemberAndUpdateGroup(GroupMetadata group, MemberMetadata member) {
group.remove(member.memberId());
// ... other state handling ...
// Delete /client_group_id/xxx node
deleteClientIdGroupMapping(group, member.clientHost(), member.clientId());
}This ensures only actual consumer disconnects delete the mapping. A shutdown hook is added to prevent deletion during broker shutdown.
Approach 3 – Consistency Check
Because write and delete are performed by different nodes, race conditions can still delete a freshly written node. The final solution adds a periodic consistency check that scans all groups and rewrites missing ZK nodes:
private void checkZkGroupMapping() {
for (GroupMetadata group : groupManager.currentGroups()) {
for (MemberMetadata memberMetadata : group.allMemberMetadata()) {
String clientPath = GroupIdUtils.groupIdPathFormat(memberMetadata.clientHost(), memberMetadata.clientId());
String zkGroupClientPath = kafkaConfig.getGroupIdZooKeeperPath() + clientPath;
metadataStore.get(zkGroupClientPath).thenAccept(resultOpt -> {
if (!resultOpt.isPresent()) {
// Repair missing node
metadataStore.put(zkGroupClientPath, memberMetadata.groupId().getBytes(StandardCharsets.UTF_8), Optional.empty())
.thenAccept(stat -> log.info("repaired clientId and group mapping: {}({})", zkGroupClientPath, memberMetadata.groupId()))
.exceptionally(ex -> { log.warn("repaired clientId and group mapping failed: {}({})", zkGroupClientPath, memberMetadata.groupId()); return null; });
}
}).exceptionally(ex -> { log.warn("repaired clientId and group mapping failed: {}", zkGroupClientPath, ex); return null; });
}
}
}After deploying this third‑stage solution, the missing group field issue disappeared, as shown by the restored metrics charts.
Conclusion
Through iterative analysis, replication, and three progressively refined solutions, the disappearance of consumer‑group metrics in a trillion‑level Pulsar deployment was finally eliminated. The experience highlights the difficulty of reproducing concurrency bugs in distributed systems and the importance of robust coordination and consistency checks.
Signed-in readers can open the original source through BestHub's protected redirect.
This article has been distilled and summarized from source material, then republished for learning and reference. If you believe it infringes your rights, please contactand we will review it promptly.
vivo Internet Technology
Sharing practical vivo Internet technology insights and salon events, plus the latest industry news and hot conferences.
How this landed with the community
Was this worth your time?
0 Comments
Thoughtful readers leave field notes, pushback, and hard-won operational detail here.
