How RocketMQ Handles Concurrent and Ordered Message Consumption
This article explains RocketMQ's internal workflow for concurrent and ordered message consumption, covering thread‑pool configuration, pull‑message handling, request submission, execution steps, offset management, expired‑message cleanup, and answers common questions about batch retries and offset ordering.
Concurrent Consumption – ConsumeMessageConcurrentlyService
consumeMessageBatchMaxSize : maximum number of messages processed per batch, default 1
consumeThreadMin : minimum consumer thread count, default 20
consumeThreadMax : maximum consumer thread count, default 20
consumeTimeout : consume timeout in minutes, default 15
Message‑consumption thread pool
The constructor creates an unbounded LinkedBlockingQueue<> and a ThreadPoolExecutor:
this.consumeRequestQueue = new LinkedBlockingQueue<>();
String consumerGroupTag = (consumerGroup.length() > 100 ? consumerGroup.substring(0, 100) : consumerGroup) + "_";
this.consumeExecutor = new ThreadPoolExecutor(
this.defaultMQPushConsumer.getConsumeThreadMin(),
this.defaultMQPushConsumer.getConsumeThreadMax(),
1000 * 60,
TimeUnit.MILLISECONDS,
this.consumeRequestQueue,
new ThreadFactoryImpl("ConsumeMessageThread_" + consumerGroupTag));The queue length is Integer.MAX_VALUE, storing pending ConsumeRequest objects.
Thread names are prefixed with ConsumeMessageThread_ plus the consumer group.
Core and maximum pool sizes default to 20 and can be changed via consumeThreadMin and consumeThreadMax.
Submitting consume requests
Pulled messages are handed to consumeExecutor:
@Override
public void submitConsumeRequest(final List<MessageExt> msgs,
final ProcessQueue processQueue,
final MessageQueue messageQueue,
final boolean dispatchToConsume) {
final int consumeBatchSize = this.defaultMQPushConsumer.getConsumeMessageBatchMaxSize();
if (msgs.size() <= consumeBatchSize) {
// single ConsumeRequest when batch size not exceeded
ConsumeRequest consumeRequest = new ConsumeRequest(msgs, processQueue, messageQueue);
try {
this.consumeExecutor.submit(consumeRequest);
} catch (RejectedExecutionException e) {
this.submitConsumeRequestLater(consumeRequest);
}
} else {
// split into multiple ConsumeRequest objects (not shown)
}
}If the number of messages ≤ consumeMessageBatchMaxSize (default 1), a single ConsumeRequest is created.
If the executor queue is full, the request is delayed 5 seconds and retried.
ConsumeRequest execution
The run method performs:
public void run() {
if (this.processQueue.isDropped()) {
log.info("the message queue not be able to consume, because it's dropped. group={} {}",
ConsumeMessageConcurrentlyService.this.consumerGroup, this.messageQueue);
return;
}
MessageListenerConcurrently listener = ConsumeMessageConcurrentlyService.this.messageListener;
ConsumeConcurrentlyContext context = new ConsumeConcurrentlyContext(messageQueue);
// optional pre‑hook
if (ConsumeMessageConcurrentlyService.this.defaultMQPushConsumerImpl.hasHook()) {
ConsumeMessageContext consumeMessageContext = new ConsumeMessageContext();
// set fields …
ConsumeMessageConcurrentlyService.this.defaultMQPushConsumerImpl.executeHookBefore(consumeMessageContext);
}
long beginTimestamp = System.currentTimeMillis();
boolean hasException = false;
ConsumeReturnType returnType = ConsumeReturnType.SUCCESS;
try {
if (msgs != null && !msgs.isEmpty()) {
for (MessageExt msg : msgs) {
MessageAccessor.setConsumeStartTimeStamp(msg, String.valueOf(System.currentTimeMillis()));
}
}
status = listener.consumeMessage(Collections.unmodifiableList(msgs), context);
} catch (Throwable e) {
log.warn(String.format("consumeMessage exception: %s Group: %s Msgs: %s MQ: %s",
UtilAll.exceptionSimpleDesc(e),
ConsumeMessageConcurrentlyService.this.consumerGroup,
msgs,
messageQueue), e);
hasException = true;
}
long consumeRT = System.currentTimeMillis() - beginTimestamp;
// determine return type (EXCEPTION, RETURNNULL, TIME_OUT, FAILED, SUCCESS)
// optional post‑hook
if (ConsumeMessageConcurrentlyService.this.defaultMQPushConsumerImpl.hasHook()) {
// populate context and executeHookAfter
}
// update stats
ConsumeMessageConcurrentlyService.this.getConsumerStatsManager()
.incConsumeRT(ConsumeMessageConcurrentlyService.this.consumerGroup,
messageQueue.getTopic(), consumeRT);
// process result if queue still valid
if (!processQueue.isDropped()) {
ConsumeMessageConcurrentlyService.this.processConsumeResult(status, context, this);
} else {
log.warn("processQueue is dropped without process consume result. messageQueue={}, msgs={}",
messageQueue, msgs);
}
}If the processing queue is dropped, the task returns immediately.
If a ConsumeMessageHook is configured, its pre‑ and post‑hooks are executed.
The listener’s result maps to ConsumeReturnType:
EXCEPTION – listener threw an exception
RETURNNULL – listener returned null
TIME_OUT – processing exceeded consumeTimeout FAILED – listener returned ConsumeConcurrentlyStatus.RECONSUME_LATER SUCCESS – listener returned ConsumeConcurrentlyStatus.CONSUME_SUCCESS Successful messages are removed from the ProcessQueue TreeMap and their offsets are updated; failed messages are sent back to the retry queue.
Periodic cleanup of expired messages
A scheduled task runs every consumeTimeout minutes to scan each ProcessQueue for messages whose consumption time exceeds the timeout:
public void start() {
this.cleanExpireMsgExecutors.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
try {
cleanExpireMsg();
} catch (Throwable e) {
log.error("scheduleAtFixedRate cleanExpireMsg exception", e);
}
}
}, this.defaultMQPushConsumer.getConsumeTimeout(),
this.defaultMQPushConsumer.getConsumeTimeout(), TimeUnit.MINUTES);
}When a message is identified as expired, the task:
Resends it to the retry queue with a back‑off level of 3 (default 10 s).
Removes it from the TreeMap, allowing subsequent offsets to be committed.
Uses the timestamp recorded at the start of ConsumeRequest execution to calculate the timeout.
Ordered Consumption – ConsumeMessageOrderlyService
consumeThreadMin : default 20
consumeThreadMax : default 20
consumeTimeout : default 15 minutes
Message‑consumption thread pool
Identical to the concurrent case.
Submitting consume requests
@Override
public void submitConsumeRequest(final List<MessageExt> msgs,
final ProcessQueue processQueue,
final MessageQueue messageQueue,
final boolean dispathToConsume) {
if (dispathToConsume) {
ConsumeRequest consumeRequest = new ConsumeRequest(processQueue, messageQueue);
this.consumeExecutor.submit(consumeRequest);
}
}Ordered consumption creates a single ConsumeRequest per queue to preserve order, unlike concurrent mode which may split a batch.
Execution flow
The ConsumeMessageOrderlyService#run method contains the ordered‑processing logic (code omitted for brevity).
Q&A
1. Partial success in batch consumption
Concurrent mode: If the listener returns ConsumeConcurrentlyStatus.RECONSUME_LATER, the entire batch is retried. To retry only failed messages, set ackIndex in ConsumeMessageContext and return ConsumeConcurrentlyStatus.SUCCESS.
Ordered mode: The listener returns ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT to indicate a retry. If the maximum retry count is exceeded, the batch is sent to the retry queue; otherwise the retry is delayed by suspendCurrentQueueTimeMillis (default 1000 ms).
2. How concurrent consumption guarantees offset commit order
After a message is successfully processed, it is removed from the ProcessQueue TreeMap. The consumer then commits the smallest offset remaining in the TreeMap. If earlier messages fail, later offsets are not committed, ensuring ordered offset commits even with concurrent batch processing.
3. Enabling batch consumption
Set consumer.consumeMessageBatchMaxSize(N) where N is the desired batch size.
ShiZhen AI
Tech blogger with over 10 years of experience at leading tech firms, AI efficiency and delivery expert focusing on AI productivity. Covers tech gadgets, AI-driven efficiency, and leisure— AI leisure community. 🛰 szzdzhp001
How this landed with the community
Was this worth your time?
0 Comments
Thoughtful readers leave field notes, pushback, and hard-won operational detail here.
