Unlocking Ordered Messaging: How RocketMQ and DDMQ Ensure Sequence
This article deeply explores the sequential consumption mechanisms of RocketMQ and its derivative DDMQ, comparing their source‑code implementations, detailing producer ordering, broker locking, consumer locking, and the architectural differences that allow DDMQ to achieve ordered processing with only proxy‑side locks.
Overview
This article focuses on the sequential consumption mechanisms of RocketMQ and the DDMQ implementation built on it. It reads the source code to explain how ordered sending, broker handling, and ordered consumption are realized, and compares the two systems.
RocketMQ Model
RocketMQ consists of NameServer (provides routing information) and Broker (stores messages and serves pull requests). Topics contain queues; each queue is a FIFO line of messages.
Ordered Sending
Messages are sent to the same queue using a MessageQueueSelector. The selector can be hash‑based, random, or custom.
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-client</artifactId>
<version>5.3.1</version>
</dependency> public static void main(String[] args) throws Exception {
DefaultMQProducer producer = new DefaultMQProducer("producer_group_name");
producer.setNamesrvAddr("127.0.0.1:9876");
producer.start();
Message message = new Message("test_topic", "test_tag", "Hello, Rocketmq!".getBytes());
SendResult sendResult = producer.send(message, new SelectMessageQueueByHash(), "test_arg");
System.out.println(sendResult);
producer.shutdown();
}The send method takes three parameters: Message (topic, tag, body), MessageQueueSelector (chooses the queue), and an Object argument passed to the selector.
RocketMQ provides two built‑in selectors:
public class SelectMessageQueueByHash implements MessageQueueSelector {
@Override
public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
int value = arg.hashCode() % mqs.size();
if (value < 0) value = Math.abs(value);
return mqs.get(value);
}
} public class SelectMessageQueueByRandom implements MessageQueueSelector {
private Random random = new Random(System.currentTimeMillis());
@Override
public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
int value = random.nextInt(mqs.size());
return mqs.get(value);
}
}Ensuring all messages of the same logical flow go to the same queue guarantees order.
Ordered Consumption in RocketMQ
In cluster mode only one consumer in a group pulls and consumes a given queue at a time. The consumer thread holds a local lock on the MessageQueue and the ProcessQueue to prevent concurrent consumption and rebalance interference.
@Override
public void run() {
if (processQueue.isDropped()) return;
Object objLock = messageQueueLock.fetchLockObject(messageQueue);
synchronized (objLock) {
if (MessageModel.BROADCASTING.equals(messageModel) || (processQueue.isLocked() && !processQueue.isLockExpired())) {
// consume messages sequentially
}
}
}The broker must grant a lock for the queue; the client periodically sends a lock‑request (default every 20 s). If the lock expires or fails, consumption is postponed.
public void lockMQPeriodically() {
if (!stopped) {
defaultMQPushConsumerImpl.getRebalanceImpl().lockAll();
}
}DDMQ Sequential Consumption
DDMQ adds a proxy layer (carrera) between the client and RocketMQ. The proxy handles ordering using only local thread locks; no broker‑side distributed lock is needed because the proxy runs as a single node.
Message flow:
Client pulls messages from the proxy.
Proxy stores pulled messages in a PullBuffer.
Each message may carry an orderKey (QID, tag, or JSONPath). The proxy builds a per‑order linked list.
When a consumer finishes processing a message, the proxy releases the lock and enqueues the next message in the same order chain.
public Status act(UpstreamJob job) {
job.setState("PullSvr.InBuffer");
buffer.offer(job);
return Status.ASYNCHRONIZED;
}Order handling inside the proxy:
public UpstreamJob fetchJob() throws InterruptedException {
readyJobs.acquire();
UpstreamJob job = reActivationQueue.poll();
if (job != null) return job;
mainQueueLock.lock();
try {
job = mainQueue.poll();
Integer orderId = job.getOrderId();
if (orderId == null) return job; // no ordering needed
ReentrantLock orderLock = getLocks(orderId);
orderLock.lock();
// build or extend order chain
} finally {
mainQueueLock.unlock();
}
// after processing, release orderLock
orderLock.unlock();
return null;
}When a job finishes:
public void onJobFinished(UpstreamJob job) {
Integer orderId = job.getOrderId();
if (orderId != null) {
ReentrantLock orderLock = getLocks(orderId);
orderLock.lock();
try {
UpstreamJob next = job.getNext();
if (next != null) {
reActivationQueue.offer(next);
readyJobs.release();
}
} finally {
orderLock.unlock();
}
}
jobSize.decrementAndGet();
}Key Differences
RocketMQ requires a broker‑side distributed lock plus client‑side locks to guarantee that only one consumer processes a queue at a time across multiple broker replicas. DDMQ’s proxy is a single process, so a simple in‑memory lock suffices, eliminating the need for broker distributed locking.
Both systems achieve ordered consumption, but DDMQ’s architecture simplifies the locking model while preserving strict ordering guarantees.
Signed-in readers can open the original source through BestHub's protected redirect.
This article has been distilled and summarized from source material, then republished for learning and reference. If you believe it infringes your rights, please contactand we will review it promptly.
Cognitive Technology Team
Cognitive Technology Team regularly delivers the latest IT news, original content, programming tutorials and experience sharing, with daily perks awaiting you.
How this landed with the community
Was this worth your time?
0 Comments
Thoughtful readers leave field notes, pushback, and hard-won operational detail here.
