Understanding RocketMQ Flow Control Scenarios and Mitigation Strategies
This article explains the eight situations that trigger flow control in RocketMQ—four on the broker side and four on the consumer side—detailing their causes, relevant code paths, and practical measures such as broker scaling or consumer optimization to resolve them.
Hello, I am Jun. When using RocketMQ, you may encounter logs like
[TIMEOUT_CLEAN_QUEUE]broker busy, start flow control for a while, period in queue: 206ms, size of queue: 5, which indicate that the system has activated flow control.
Flow control is introduced to prevent the broker from being overloaded and potentially crashing, by throttling producer or consumer traffic under certain conditions.
1. Broker Flow Control
1.1 broker busy
RocketMQ writes incoming messages to the page cache first; a background thread periodically flushes the cache to disk. If the page cache becomes busy, the broker returns a system‑busy response code (code=2) to the producer.
Broker fast‑failure logic periodically cleans expired requests every 10 ms. The relevant code is:
public void start() {
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
if (brokerController.getBrokerConfig().isBrokerFastFailureEnable()) {
cleanExpiredRequest();
}
}
}, 1000, 10, TimeUnit.MILLISECONDS);
}Two main conditions trigger this:
Page cache is busy: if the lock for writing to the CommitLog is held longer than osPageCacheBusyTimeOutMills (default 1 s), the broker treats the cache as busy and returns the busy code.
Cleaning expired requests: if a request has been waiting longer than waitTimeMillsInSendQueue (default 200 ms), it is discarded and the busy code is returned.
1.2 system busy
In NettyRemotingAbstract#processRequestCommand, a request may be rejected, causing the broker to send the busy code [REJECTREQUEST]system busy, start flow control for a while. The rejection logic is:
// SendMessageProcessor
public boolean rejectRequest() {
return this.brokerController.getMessageStore().isOSPageCacheBusy()
|| this.brokerController.getMessageStore().isTransientStorePoolDeficient();
}The two possible reasons are page‑cache busy and insufficient transient store pool buffers.
1.3 thread‑pool rejection
If the broker’s request‑handling thread pool is full (default queue size 10 000, configurable via sendThreadPoolQueueCapacity), a RejectedExecutionException is thrown and the broker returns the busy code [OVERLOAD]system busy, start flow control for a while. The check for one‑way RPC is:
public boolean isOnewayRPC() {
int bits = 1 << RPC_ONEWAY;
return (this.flag & bits) == bits;
}1.4 message retry handling
When the broker returns code 2, the producer does not retry. The producer’s retryable response codes are defined as:
// DefaultMQProducer
private final Set<Integer> retryResponseCodes = new CopyOnWriteArraySet<Integer>(Arrays.asList(
ResponseCode.TOPIC_NOT_EXIST,
ResponseCode.SERVICE_NOT_AVAILABLE,
ResponseCode.SYSTEM_ERROR,
ResponseCode.NO_PERMISSION,
ResponseCode.NO_BUYER_ID,
ResponseCode.NOT_IN_CURRENT_UNIT
));2. Consumer Flow Control
2.1 Cached message count exceeds threshold
If the number of messages cached in ProcessQueue exceeds pullThresholdForQueue (default 1000), the consumer delays the next pull request:
if (cachedMessageCount > this.defaultMQPushConsumer.getPullThresholdForQueue()) {
this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL);
// log warning every 1000 occurrences
return;
}2.2 Cached message size exceeds threshold
When the total size of cached messages exceeds pullThresholdSizeForQueue (default 100 MiB), a similar delay is applied:
if (cachedMessageSizeInMiB > this.defaultMQPushConsumer.getPullThresholdSizeForQueue()) {
this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL);
// log warning every 1000 occurrences
return;
}2.3 Cached message span exceeds threshold
For non‑orderly consumption, if the offset span between the first and last cached messages exceeds consumeConcurrentlyMaxSpan (default 2000), the consumer postpones pulling:
if (!this.consumeOrderly) {
if (processQueue.getMaxSpan() > this.defaultMQPushConsumer.getConsumeConcurrentlyMaxSpan()) {
this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL);
// log warning every 1000 occurrences
return;
}
}2.4 Lock acquisition failure
For orderly consumption, if the consumer fails to acquire the lock on the ProcessQueue, it delays the next pull by a configurable interval (default 3 seconds).
3. Summary
The article identifies eight RocketMQ flow‑control scenarios—four on the broker side (broker busy, system busy, thread‑pool overload, and retry handling) and four on the consumer side (message count, size, span, and lock failure). Broker‑side control is essentially producer throttling and is best mitigated by scaling the broker, while consumer‑side control requires addressing slow consumption, such as external service latency or heavy SQL queries. By examining the logged error messages, operators can pinpoint the exact cause and apply the corresponding remedy.
Signed-in readers can open the original source through BestHub's protected redirect.
This article has been distilled and summarized from source material, then republished for learning and reference. If you believe it infringes your rights, please contactand we will review it promptly.
IT Services Circle
Delivering cutting-edge internet insights and practical learning resources. We're a passionate and principled IT media platform.
How this landed with the community
Was this worth your time?
0 Comments
Thoughtful readers leave field notes, pushback, and hard-won operational detail here.
