RocketMQ Fast‑Failure Mechanism: Case Study, Analysis, and Optimization
This article presents a detailed case study of RocketMQ's fast‑failure mechanism, explains the root causes of a massive message‑write surge, analyzes broker thread models, and proposes topic‑level rate‑limiting solutions with concrete implementation and code examples.
1. What is RocketMQ Fast‑Failure Mechanism?
The fast‑failure mechanism in RocketMQ discards client requests that stay in the broker's queue longer than a configurable threshold (default 200 ms) to protect the broker from overload.
2. Problems Triggered by the Mechanism
Sudden email alerts indicated production failures across many topics. Logs showed the error [TIMEOUT_CLEAN_QUEUE] broker busy , confirming that broker pressure triggered fast‑failure.
Immediate mitigation raised the timeout threshold to 2000 ms to keep the cluster operational.
3. Investigation
Broker traffic investigation Message write volume spiked from ~30 k/min to 225 k/min (≈8×). Load on the affected broker reached 10.32 on a 4‑core machine, while other brokers with similar traffic stayed below 1.
Topic causing traffic surge Stats.log revealed a massive write rate for the retry topic %RETRY%get-pugc-to-ai-consumer (TPS ≈ 785). 2022-02-08 15:40:00 INFO - [TOPIC_PUT_NUMS] [%RETRY%get-pugc-to-ai-consumer] Stats In One Minute, SUM: 47119 TPS: 785.32 AVGPT: 1.00
Root cause in consumer code The Python consumer omitted the ACK return, causing every consumed message to be treated as a failure and re‑queued to the retry topic. def callback(msg): print(msg.id, msg.body) return ConsumeStatus.CONSUME_SUCCESS // ACK statement missing
4. Conclusion of the Investigation
Improper consumer implementation generated a flood of retry messages, overloading a low‑spec broker and triggering fast‑failure, which indiscriminately rejected requests from other topics.
5. Thoughts on the Fast‑Failure Mechanism
The mechanism protects the broker but does so with coarse granularity; a single misbehaving consumer can affect the whole cluster.
6. Optimization Proposals
Isolate retry‑message handling into a dedicated thread‑pool separate from normal topic writes.
Apply fine‑grained rate‑limiting per IP or per topic to prevent a single source from overwhelming the broker.
Topic‑level rate limiting is preferred because it aligns with business usage patterns.
7. Where to Apply Rate Limiting?
Insert the limiter in the Netty handler pipeline (the "handler" layer) before the broker business thread ( SendMessageThread ) to stop excessive write requests early.
8. Rate‑Limiter Design Choices
Three common strategies were considered:
Semaphore‑based concurrency limiting.
Guava Token‑Bucket limiter.
Sliding‑window limiters (Sentinel, Hystrix).
Given the large number of topics, a lightweight token‑bucket (Guava) with custom modifications is chosen.
8.1. Problems with the Default Guava Limiter
It sleeps when tokens are unavailable, which is unsuitable for broker flow control.
It inflates the "next request time" causing prolonged blocking for subsequent requests.
8.2. Modified Token‑Bucket with Circuit‑Breaker
The limiter now immediately returns a failure when tokens are exhausted and opens a short‑lived circuit‑breaker (default 1 s) before resuming normal operation.
public class TokenBucketRateLimiter {
public boolean acquire(int permits) {
synchronized (this) {
long nowMicros = readMicros();
if (circuitBreakerOpen) {
if (nowMicros - nextFreeTicketMicros >= circuitBreakerOpenTimeInMicros) {
circuitBreakerOpen = false;
}
}
if (!circuitBreakerOpen) {
lastNeedWaitMicrosecs = reserveAndGetWaitLength(permits, nowMicros);
if (lastNeedWaitMicrosecs > 0) {
circuitBreakerOpen = true;
lastRateLimitTimestamp = System.currentTimeMillis();
}
}
return !circuitBreakerOpen;
}
}
}8.3. Rate‑Limit Handler Implementation
@ChannelHandler.Sharable
public class RateLimitHandler extends SimpleChannelInboundHandler<RemotingCommand> {
private ConcurrentMap<String, TokenBucketRateLimiter> rateLimiterMap = new ConcurrentHashMap<>();
private volatile boolean disabled;
private EventExecutorGroup rateLimitEventExecutorGroup;
@Override
protected void channelRead0(ChannelHandlerContext ctx, RemotingCommand cmd) throws Exception {
if (!ctx.executor().inEventLoop()) {ctx.fireChannelRead(cmd); return;}
if (disabled || cmd == null || cmd.getType() != RemotingCommandType.REQUEST_COMMAND) {ctx.fireChannelRead(cmd); return;}
String resource = getResource(cmd);
if (resource == null) {ctx.fireChannelRead(cmd); return;}
double limitQps = defaultLimitQps;
if (cmd.getCode() == RequestCode.CONSUMER_SEND_MSG_BACK) {limitQps = sendMsgBackLimitQps;}
TokenBucketRateLimiter rateLimiter = rateLimiterMap.computeIfAbsent(resource, k -> new TokenBucketRateLimiter(limitQps));
boolean acquired = rateLimiter.acquire();
if (acquired) {ctx.fireChannelRead(cmd); return;}
RemotingCommand response = RemotingCommand.createResponseCommand(RemotingSysResponseCode.SYSTEM_BUSY, "RateLimit");
response.setOpaque(cmd.getOpaque());
ctx.writeAndFlush(response);
}
}9. Summary
The fast‑failure mechanism protects brokers but is too coarse; a single topic's retry surge can cripple the whole cluster. By inserting a dedicated rate‑limit handler in the Netty pipeline and using a topic‑level token‑bucket limiter with a short circuit‑breaker, the impact of abnormal traffic is isolated, improving overall stability.
10. References
RocketMQ Development Guide – https://github.com/apache/rocketmq/tree/develop/docs/cn
Guava Rate Limiting Principles – https://zhuanlan.zhihu.com/p/60979444
Sohu Tech Products
A knowledge-sharing platform for Sohu's technology products. As a leading Chinese internet brand with media, video, search, and gaming services and over 700 million users, Sohu continuously drives tech innovation and practice. We’ll share practical insights and tech news here.
How this landed with the community
Was this worth your time?
0 Comments
Thoughtful readers leave field notes, pushback, and hard-won operational detail here.