How RocketMQ and DDMQ Achieve Ordered Consumption: Deep Source Code Dive
This article deeply explores the ordered consumption mechanisms of RocketMQ and its derivative DDMQ, explaining the implementation differences, source‑code details of sequential sending, broker locking, client‑side processing, and compares the two systems' approaches to guarantee message order.
Introduction
This article focuses on the sequential consumption mechanisms of RocketMQ and the DDMQ implementation built on top of it, providing a detailed source‑code analysis of how ordered consumption is achieved, including ordered sending, broker‑side pulling, and client‑side processing.
RocketMQ Model
RocketMQ consists of three core components: NameServer (provides routing information), Broker (stores and forwards messages), and MessageQueue (the physical queue). Topics contain multiple queues, and each queue holds messages of a single topic.
Queue Model
A Topic is a collection of messages; each Tag further categorizes messages within a topic. Queues under different topics are independent.
Ordered Sending
Ordered sending is achieved by directing messages to the same queue using a MessageQueueSelector. The SDK provides several selectors, such as SelectMessageQueueByHash and SelectMessageQueueByRandom. The key code is:
<span><dependency></span>
<span><groupId>org.apache.rocketmq</groupId></span>
<span><artifactId>rocketmq-client</artifactId></span>
<span><version>5.3.1</version></span>
<span></dependency></span> // Ordered send example
public static void main(String[] args) throws Exception {
DefaultMQProducer producer = new DefaultMQProducer("producer_group_name");
producer.setNamesrvAddr("127.0.0.1:9876");
producer.start();
Message message = new Message("test_topic", "test_tag", "Hello, Rocketmq!".getBytes());
SendResult sendResult = producer.send(message, new SelectMessageQueueByHash(), "test_arg");
System.out.println(sendResult);
producer.shutdown();
}The send method takes three parameters: the Message, a MessageQueueSelector, and an argument used by the selector. The selector must ensure that all related messages are routed to the same queue.
Ordered Consumption in RocketMQ
In cluster consumption, only one consumer in the group pulls and consumes messages from a given queue at a time. The consumer thread must acquire a local lock on the MessageQueue and on the ProcessQueue (a local snapshot) to avoid concurrent consumption and rebalance conflicts.
// Client ordered consumption example
public static void main(String[] args) throws Exception {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumer_group_name");
consumer.setNamesrvAddr("127.0.0.1:9876");
consumer.setMessageModel(MessageModel.CLUSTERING);
consumer.subscribe("test_topic", "test_tag");
consumer.registerMessageListener(new MessageListenerOrderly() {
@Override
public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {
for (MessageExt msg : msgs) {
System.out.println(new String(msg.getBody()));
}
return ConsumeOrderlyStatus.SUCCESS;
}
});
consumer.start();
}The consumer periodically locks the queue on the broker (default 20 seconds) and releases it when the lock expires. The lock ensures that only one consumer processes a specific queue, preserving order.
DDMQ Ordered Consumption
DDMQ builds a proxy layer on top of RocketMQ. The proxy uses local thread locks only; because the proxy is a single‑node service, it does not need the distributed broker lock used by RocketMQ. The workflow is:
Client pulls messages from the proxy without any local lock.
The proxy stores the pulled messages in a PullBuffer and processes them using an UpstreamJob chain.
Order is guaranteed by assigning an orderId (derived from QID, message key, or a JSON path) and placing jobs with the same orderId into a linked list protected by a ReentrantLock.
// DDMQ client pull example
public static void main(String[] args) throws Exception {
// Create consumer instance
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumer_group");
consumer.setNamesrvAddr("127.0.0.1:9876");
consumer.startConsume(new BaseMessageProcessor() {
@Override
public Result process(Message msg, Context ctx) {
// business logic
return Result.SUCCESS;
}
});
}The proxy maintains a jobOrderMap where each orderId maps to the tail of a linked list. When a job finishes, the next job in the list is released to the processing queue, ensuring strict ordering without broker‑side distributed locks.
Comparison
RocketMQ requires both broker‑side distributed locks and client‑side thread locks to guarantee order across a cluster, while DDMQ achieves the same guarantee with only proxy‑side thread locks because the proxy runs as a single node and handles ordering centrally.
Key Takeaways
Ordered sending relies on a selector that consistently maps related messages to the same queue.
RocketMQ uses broker locks (20 s interval) and client locks on MessageQueue and ProcessQueue to prevent concurrent consumption.
DDMQ simplifies the model by moving ordering logic to the proxy, using a local lock per orderId and a linked‑list job chain.
Both systems provide extensible selectors and can be customized for specific ordering keys.
Signed-in readers can open the original source through BestHub's protected redirect.
This article has been distilled and summarized from source material, then republished for learning and reference. If you believe it infringes your rights, please contactand we will review it promptly.
How this landed with the community
Was this worth your time?
0 Comments
Thoughtful readers leave field notes, pushback, and hard-won operational detail here.
