How RocketMQ Handles Concurrent and Ordered Message Consumption

This article explains RocketMQ's internal workflow for concurrent and ordered message consumption, covering thread‑pool configuration, pull‑message handling, request submission, execution steps, offset management, expired‑message cleanup, and answers common questions about batch retries and offset ordering.

ShiZhen AI
ShiZhen AI
ShiZhen AI
How RocketMQ Handles Concurrent and Ordered Message Consumption

Concurrent Consumption – ConsumeMessageConcurrentlyService

consumeMessageBatchMaxSize : maximum number of messages processed per batch, default 1

consumeThreadMin : minimum consumer thread count, default 20

consumeThreadMax : maximum consumer thread count, default 20

consumeTimeout : consume timeout in minutes, default 15

Message‑consumption thread pool

The constructor creates an unbounded LinkedBlockingQueue<> and a ThreadPoolExecutor:

this.consumeRequestQueue = new LinkedBlockingQueue<>();
String consumerGroupTag = (consumerGroup.length() > 100 ? consumerGroup.substring(0, 100) : consumerGroup) + "_";
this.consumeExecutor = new ThreadPoolExecutor(
    this.defaultMQPushConsumer.getConsumeThreadMin(),
    this.defaultMQPushConsumer.getConsumeThreadMax(),
    1000 * 60,
    TimeUnit.MILLISECONDS,
    this.consumeRequestQueue,
    new ThreadFactoryImpl("ConsumeMessageThread_" + consumerGroupTag));

The queue length is Integer.MAX_VALUE, storing pending ConsumeRequest objects.

Thread names are prefixed with ConsumeMessageThread_ plus the consumer group.

Core and maximum pool sizes default to 20 and can be changed via consumeThreadMin and consumeThreadMax.

Submitting consume requests

Pulled messages are handed to consumeExecutor:

@Override
public void submitConsumeRequest(final List<MessageExt> msgs,
                                 final ProcessQueue processQueue,
                                 final MessageQueue messageQueue,
                                 final boolean dispatchToConsume) {
    final int consumeBatchSize = this.defaultMQPushConsumer.getConsumeMessageBatchMaxSize();
    if (msgs.size() <= consumeBatchSize) {
        // single ConsumeRequest when batch size not exceeded
        ConsumeRequest consumeRequest = new ConsumeRequest(msgs, processQueue, messageQueue);
        try {
            this.consumeExecutor.submit(consumeRequest);
        } catch (RejectedExecutionException e) {
            this.submitConsumeRequestLater(consumeRequest);
        }
    } else {
        // split into multiple ConsumeRequest objects (not shown)
    }
}

If the number of messages ≤ consumeMessageBatchMaxSize (default 1), a single ConsumeRequest is created.

If the executor queue is full, the request is delayed 5 seconds and retried.

ConsumeRequest execution

The run method performs:

public void run() {
    if (this.processQueue.isDropped()) {
        log.info("the message queue not be able to consume, because it's dropped. group={} {}",
                ConsumeMessageConcurrentlyService.this.consumerGroup, this.messageQueue);
        return;
    }
    MessageListenerConcurrently listener = ConsumeMessageConcurrentlyService.this.messageListener;
    ConsumeConcurrentlyContext context = new ConsumeConcurrentlyContext(messageQueue);
    // optional pre‑hook
    if (ConsumeMessageConcurrentlyService.this.defaultMQPushConsumerImpl.hasHook()) {
        ConsumeMessageContext consumeMessageContext = new ConsumeMessageContext();
        // set fields …
        ConsumeMessageConcurrentlyService.this.defaultMQPushConsumerImpl.executeHookBefore(consumeMessageContext);
    }
    long beginTimestamp = System.currentTimeMillis();
    boolean hasException = false;
    ConsumeReturnType returnType = ConsumeReturnType.SUCCESS;
    try {
        if (msgs != null && !msgs.isEmpty()) {
            for (MessageExt msg : msgs) {
                MessageAccessor.setConsumeStartTimeStamp(msg, String.valueOf(System.currentTimeMillis()));
            }
        }
        status = listener.consumeMessage(Collections.unmodifiableList(msgs), context);
    } catch (Throwable e) {
        log.warn(String.format("consumeMessage exception: %s Group: %s Msgs: %s MQ: %s",
                UtilAll.exceptionSimpleDesc(e),
                ConsumeMessageConcurrentlyService.this.consumerGroup,
                msgs,
                messageQueue), e);
        hasException = true;
    }
    long consumeRT = System.currentTimeMillis() - beginTimestamp;
    // determine return type (EXCEPTION, RETURNNULL, TIME_OUT, FAILED, SUCCESS)
    // optional post‑hook
    if (ConsumeMessageConcurrentlyService.this.defaultMQPushConsumerImpl.hasHook()) {
        // populate context and executeHookAfter
    }
    // update stats
    ConsumeMessageConcurrentlyService.this.getConsumerStatsManager()
        .incConsumeRT(ConsumeMessageConcurrentlyService.this.consumerGroup,
                    messageQueue.getTopic(), consumeRT);
    // process result if queue still valid
    if (!processQueue.isDropped()) {
        ConsumeMessageConcurrentlyService.this.processConsumeResult(status, context, this);
    } else {
        log.warn("processQueue is dropped without process consume result. messageQueue={}, msgs={}",
                messageQueue, msgs);
    }
}

If the processing queue is dropped, the task returns immediately.

If a ConsumeMessageHook is configured, its pre‑ and post‑hooks are executed.

The listener’s result maps to ConsumeReturnType:

EXCEPTION – listener threw an exception

RETURNNULL – listener returned null

TIME_OUT – processing exceeded consumeTimeout FAILED – listener returned ConsumeConcurrentlyStatus.RECONSUME_LATER SUCCESS – listener returned ConsumeConcurrentlyStatus.CONSUME_SUCCESS Successful messages are removed from the ProcessQueue TreeMap and their offsets are updated; failed messages are sent back to the retry queue.

Periodic cleanup of expired messages

A scheduled task runs every consumeTimeout minutes to scan each ProcessQueue for messages whose consumption time exceeds the timeout:

public void start() {
    this.cleanExpireMsgExecutors.scheduleAtFixedRate(new Runnable() {
        @Override
        public void run() {
            try {
                cleanExpireMsg();
            } catch (Throwable e) {
                log.error("scheduleAtFixedRate cleanExpireMsg exception", e);
            }
        }
    }, this.defaultMQPushConsumer.getConsumeTimeout(),
       this.defaultMQPushConsumer.getConsumeTimeout(), TimeUnit.MINUTES);
}

When a message is identified as expired, the task:

Resends it to the retry queue with a back‑off level of 3 (default 10 s).

Removes it from the TreeMap, allowing subsequent offsets to be committed.

Uses the timestamp recorded at the start of ConsumeRequest execution to calculate the timeout.

Ordered Consumption – ConsumeMessageOrderlyService

consumeThreadMin : default 20

consumeThreadMax : default 20

consumeTimeout : default 15 minutes

Message‑consumption thread pool

Identical to the concurrent case.

Submitting consume requests

@Override
public void submitConsumeRequest(final List<MessageExt> msgs,
                                 final ProcessQueue processQueue,
                                 final MessageQueue messageQueue,
                                 final boolean dispathToConsume) {
    if (dispathToConsume) {
        ConsumeRequest consumeRequest = new ConsumeRequest(processQueue, messageQueue);
        this.consumeExecutor.submit(consumeRequest);
    }
}

Ordered consumption creates a single ConsumeRequest per queue to preserve order, unlike concurrent mode which may split a batch.

Execution flow

The ConsumeMessageOrderlyService#run method contains the ordered‑processing logic (code omitted for brevity).

Q&A

1. Partial success in batch consumption

Concurrent mode: If the listener returns ConsumeConcurrentlyStatus.RECONSUME_LATER, the entire batch is retried. To retry only failed messages, set ackIndex in ConsumeMessageContext and return ConsumeConcurrentlyStatus.SUCCESS.

Ordered mode: The listener returns ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT to indicate a retry. If the maximum retry count is exceeded, the batch is sent to the retry queue; otherwise the retry is delayed by suspendCurrentQueueTimeMillis (default 1000 ms).

2. How concurrent consumption guarantees offset commit order

After a message is successfully processed, it is removed from the ProcessQueue TreeMap. The consumer then commits the smallest offset remaining in the TreeMap. If earlier messages fail, later offsets are not committed, ensuring ordered offset commits even with concurrent batch processing.

3. Enabling batch consumption

Set consumer.consumeMessageBatchMaxSize(N) where N is the desired batch size.

Message QueueRocketMQThreadPoolExecutorOrdered ConsumptionOffset ManagementConcurrent ConsumptionConsumeRequest
ShiZhen AI
Written by

ShiZhen AI

Tech blogger with over 10 years of experience at leading tech firms, AI efficiency and delivery expert focusing on AI productivity. Covers tech gadgets, AI-driven efficiency, and leisure— AI leisure community. 🛰 szzdzhp001

0 followers
Reader feedback

How this landed with the community

Sign in to like

Rate this article

Was this worth your time?

Sign in to rate
Discussion

0 Comments

Thoughtful readers leave field notes, pushback, and hard-won operational detail here.