Uncovering RocketMQ Sync Lag: Why Messages Reach the Broker but Not the Consumer
A detailed investigation reveals that RocketMQ can report successful writes to the broker while consumers see minutes‑long delays because the transient store pool and commit‑log forwarding mechanisms delay the transfer of messages from the broker’s memory to the consumer queue.
Problem Overview
A data‑sync pipeline streamed MySQL binlog records to a RocketMQ cluster, then consumed the messages and indexed them into Elasticsearch. The RocketMQ consumer group showed no backlog and the binlog sync status reported no delay, yet Elasticsearch lagged by several minutes.
Root‑Cause Investigation
Running mqadmin topicStatus revealed that the broker’s system time was 19:01 while the latest write timestamp for the topic was 18:50, a gap of about 10 minutes.
The topicStatus command is provided by RocketMQ’s admin tool.
Understanding topicStatus Internals
The command invokes DefaultMQAdminExtImpl.examineTopicStats, which delegates to AdminBrokerProcessor.getTopicStatsInfo. The key steps are:
Obtain the minimum offset via MessageStore.getMinOffsetInQueue.
Obtain the maximum offset via MessageStore.getMaxOffsetInQueue.
Derive the last update time from the storage time of the message at maxOffset‑1.
Offset Calculation Details
The maximum offset is calculated as fileFromOffset + readPosition from the last file in the ConsumeQueue. The readPosition is returned by MappedFile.getReadPosition:
public int getReadPosition() {
return this.writeBuffer == null ? this.wrotePosition.get() : this.committedPosition.get();
}Because ConsumeQueue does not enable transientStorePoolEnable, writeBuffer is null and the method returns wrotePosition. The wrotePosition is updated by the asynchronous ReputMessageService#doReput thread (which converts CommitLog entries to ConsumeQueue) and during broker startup by CommitLog#recoverAbnormally.
CommitLog Forwarding Mechanism
RocketMQ writes all topic messages sequentially to a single CommitLog file, then asynchronously forwards them to ConsumeQueue and Index files via ReputMessageService. Whether forwarding occurs immediately depends on isCommitlogAvailable(), which checks the transientStorePoolEnable flag.
If transientStorePoolEnable is true, messages first land in off‑heap memory ( writeBuffer) and the broker returns a success response. A background thread CommitRealTimeService flushes this buffer to the FileChannel at a default interval of 200 ms, updating committedPosition. Only after this flush does the message become visible to the consumer queue.
Identified Causes of the Delay
The CommitRealTimeService thread did not promptly flush off‑heap data to the FileChannel.
The ReputMessageService thread did not forward the flushed data to the ConsumeQueue in time.
These factors caused the broker to acknowledge a successful write while the consumer queue lagged, leading to the observed Elasticsearch delay.
Mitigation Steps
By migrating high‑traffic topics to a less loaded RocketMQ cluster, write pressure decreased, forwarding became near‑real‑time, and overall system latency recovered.
Internal message‑operation platform supports topic migration without restarting producer or consumer applications. Open‑source repository: https://github.com/ZTO-Express/zms
Signed-in readers can open the original source through BestHub's protected redirect.
This article has been distilled and summarized from source material, then republished for learning and reference. If you believe it infringes your rights, please contactand we will review it promptly.
ITPUB
Official ITPUB account sharing technical insights, community news, and exciting events.
How this landed with the community
Was this worth your time?
0 Comments
Thoughtful readers leave field notes, pushback, and hard-won operational detail here.
