Big Data 13 min read

Why Pulsar Partition Metrics Vanish After Broker Restart? Root Cause & Fix

During a gray‑scale upgrade, a Vivo Pulsar deployment using the KoP protocol experienced a sudden drop in partition consumption rates because the consumer group name vanished from Zookeeper nodes, prompting an in‑depth analysis of the root cause, replication steps, and three progressively refined remediation strategies.

vivo Internet Technology
vivo Internet Technology
vivo Internet Technology
Why Pulsar Partition Metrics Vanish After Broker Restart? Root Cause & Fix

Problem Background

During a version gray‑scale upgrade, a topic using the KoP protocol showed a significant drop in consumption rate. The metric kop_server_MESSAGE_OUT and kop_server_BYTES_OUT were reported, but the group tag was empty, causing partition‑level metrics to disappear.

Data sample:

kop_server_MESSAGE_OUT{group="",partition="0",tenant="kop",topic="persistent://kop-tenant/kop-ns/service-raw-stats"} 3
kop_server_BYTES_OUT{group="",partition="0",tenant="kop",topic="persistent://kop-tenant/kop-ns/service-raw-stats"} 188

Problem Analysis

1. Locate the Problem Code

The group name is fetched from requestHandler.currentConnectedGroup. If the map does not contain the host, the code reads the Zookeeper path /client_group_id/host-clientId. If the ZK node is missing, an empty string is returned.

private void handleEntries(final List<Entry> entries, final TopicPartition topicPartition, final FetchRequest.PartitionData partitionData, final KafkaTopicConsumerManager tcm, final ManagedCursor cursor, final AtomicLong cursorOffset, final boolean readCommitted) {
    // Get consumer group name
    CompletableFuture<String> groupNameFuture = requestHandler
        .getCurrentConnectedGroup()
        .computeIfAbsent(clientHost, clientHost -> {
            CompletableFuture<String> future = new CompletableFuture<>();
            String groupIdPath = GroupIdUtils.groupIdPathFormat(clientHost, header.clientId());
            requestHandler.getMetadataStore()
                .get(requestHandler.getGroupIdStoredPath() + groupIdPath)
                .thenAccept(getResultOpt -> {
                    if (getResultOpt.isPresent()) {
                        GetResult getResult = getResultOpt.get();
                        future.complete(new String(getResult.getValue() == null ? new byte[0] : getResult.getValue(), StandardCharsets.UTF_8));
                    } else {
                        // ZK node not found, group is empty
                        future.complete("");
                    }
                })
                .exceptionally(ex -> { future.completeExceptionally(ex); return null; });
            return future;
        });
    // ... later use groupName to update stats
}

The missing ZK node /client_group_id/xxx is the root cause.

2. Why the ZK Node Might Be Missing

Two possibilities: the node was never written, or it was deleted. The write occurs in handleFindCoordinatorRequest:

@Override
protected void handleFindCoordinatorRequest(KafkaHeaderAndRequest findCoordinator, CompletableFuture<AbstractResponse> resultFuture) {
    // Store group name to metadata store for current client, use to collect consumer metrics.
    storeGroupId(groupId, groupIdPath).whenComplete((stat, ex) -> {
        if (ex != null) {
            log.warn("Store groupId failed, the groupId might already stored.", ex);
        }
        // continue processing
    });
}

Deletion happens when the broker connection closes:

protected void close() {
    if (isActive.getAndSet(false)) {
        currentConnectedClientId.forEach(clientId -> {
            String path = groupIdStoredPath + GroupIdUtils.groupIdPathFormat(clientHost, clientId);
            // Delete ZK node /client_group_id/xxx
            metadataStore.delete(path, Optional.empty())
                .whenComplete((__, ex) -> {
                    if (ex != null) {
                        log.error("Delete groupId failed. Path: [{}]", path, ex);
                    }
                });
        });
    }
}

Thus, when a broker restarts, the ZK node is removed, and other brokers cannot retrieve the group name.

Solution

Approach 1

Move the write of the group mapping from FindCoordinator to JoinGroup. However, after a client disconnects it does not reconnect to rewrite the node, so this approach fails.

Approach 2

Keep writing in FindCoordinator but change deletion to be triggered by the GroupCoordinator when a consumer leaves or expires. Example deletion call:

private void removeMemberAndUpdateGroup(GroupMetadata group, MemberMetadata member) {
    group.remove(member.memberId());
    // ... other state handling ...
    // Delete /client_group_id/xxx node
    deleteClientIdGroupMapping(group, member.clientHost(), member.clientId());
}

This ensures only actual consumer disconnects delete the mapping. A shutdown hook is added to prevent deletion during broker shutdown.

Approach 3 – Consistency Check

Because write and delete are performed by different nodes, race conditions can still delete a freshly written node. The final solution adds a periodic consistency check that scans all groups and rewrites missing ZK nodes:

private void checkZkGroupMapping() {
    for (GroupMetadata group : groupManager.currentGroups()) {
        for (MemberMetadata memberMetadata : group.allMemberMetadata()) {
            String clientPath = GroupIdUtils.groupIdPathFormat(memberMetadata.clientHost(), memberMetadata.clientId());
            String zkGroupClientPath = kafkaConfig.getGroupIdZooKeeperPath() + clientPath;
            metadataStore.get(zkGroupClientPath).thenAccept(resultOpt -> {
                if (!resultOpt.isPresent()) {
                    // Repair missing node
                    metadataStore.put(zkGroupClientPath, memberMetadata.groupId().getBytes(StandardCharsets.UTF_8), Optional.empty())
                        .thenAccept(stat -> log.info("repaired clientId and group mapping: {}({})", zkGroupClientPath, memberMetadata.groupId()))
                        .exceptionally(ex -> { log.warn("repaired clientId and group mapping failed: {}({})", zkGroupClientPath, memberMetadata.groupId()); return null; });
                }
            }).exceptionally(ex -> { log.warn("repaired clientId and group mapping failed: {}", zkGroupClientPath, ex); return null; });
        }
    }
}

After deploying this third‑stage solution, the missing group field issue disappeared, as shown by the restored metrics charts.

Conclusion

Through iterative analysis, replication, and three progressively refined solutions, the disappearance of consumer‑group metrics in a trillion‑level Pulsar deployment was finally eliminated. The experience highlights the difficulty of reproducing concurrency bugs in distributed systems and the importance of robust coordination and consistency checks.

Original Source

Signed-in readers can open the original source through BestHub's protected redirect.

Sign in to view source
Republication Notice

This article has been distilled and summarized from source material, then republished for learning and reference. If you believe it infringes your rights, please contactadmin@besthub.devand we will review it promptly.

ZooKeeperPulsarBrokerConsumer MetricsKoP
vivo Internet Technology
Written by

vivo Internet Technology

Sharing practical vivo Internet technology insights and salon events, plus the latest industry news and hot conferences.

0 followers
Reader feedback

How this landed with the community

Sign in to like

Rate this article

Was this worth your time?

Sign in to rate
Discussion

0 Comments

Thoughtful readers leave field notes, pushback, and hard-won operational detail here.