Master RocketMQ: Transaction, Ordered, Filtered & Delayed Messaging in Java & Spring
This tutorial walks through practical implementations of RocketMQ transaction, ordered, filtered, and delayed messages using native Java code and Spring Boot, explaining the underlying concepts, showing complete code examples, and highlighting how to configure producers, consumers, and listeners for reliable distributed messaging.
1. Transaction Message Implementation
First, a native Java producer creates a TransactionProducer with a custom TransactionListenerImpl. The producer sets up a thread pool, registers the listener, starts, and sends a half‑message. If the half‑message fails, it can be stored for later rollback. The listener defines executeLocalTransaction to commit or rollback based on local business logic and checkLocalTransaction for broker‑initiated checks.
package com.huc.rocketmq.transaction;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.TransactionListener;
import org.apache.rocketmq.client.producer.TransactionMQProducer;
import org.apache.rocketmq.client.producer.TransactionSendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.common.RemotingHelper;
import java.io.UnsupportedEncodingException;
import java.util.concurrent.*;
public class TransactionProducer {
public static void main(String[] args) throws MQClientException, UnsupportedEncodingException {
TransactionListener transactionListener = new TransactionListenerImpl();
TransactionMQProducer producer = new TransactionMQProducer("testTransactionGroup");
ExecutorService executorService = new ThreadPoolExecutor(2,5,100,TimeUnit.SECONDS,
new ArrayBlockingQueue<Runnable>(2000),
new ThreadFactory(){
public Thread newThread(Runnable r){
Thread thread = new Thread(r);
thread.setName("testThread");
return thread;
}
});
producer.setExecutorService(executorService);
producer.setTransactionListener(transactionListener);
producer.start();
Message message = new Message("PayOrderSuccessTopic","testTag","testKey",
"订单支付消息".getBytes(RemotingHelper.DEFAULT_CHARSET));
try {
TransactionSendResult result = producer.sendMessageInTransaction(message, null);
} catch (Exception e) {
// handle half‑message failure, rollback logic
}
}
} package com.huc.rocketmq.transaction;
import org.apache.rocketmq.client.producer.LocalTransactionState;
import org.apache.rocketmq.client.producer.TransactionListener;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageExt;
public class TransactionListenerImpl implements TransactionListener {
@Override
public LocalTransactionState executeLocalTransaction(Message message, Object o) {
try {
return LocalTransactionState.COMMIT_MESSAGE;
} catch (Exception e) {
return LocalTransactionState.ROLLBACK_MESSAGE;
}
}
@Override
public LocalTransactionState checkLocalTransaction(MessageExt messageExt) {
try {
return LocalTransactionState.COMMIT_MESSAGE;
} catch (Exception e) {
return LocalTransactionState.ROLLBACK_MESSAGE;
}
}
}Spring Boot version uses RocketMQTemplate and a TransactionListenerImpl annotated with @RocketMQTransactionListener to achieve the same logic.
2. Ordered Message Implementation
To guarantee order, messages are sent to the same MessageQueue based on a key (e.g., order ID). The native implementation provides a MessageQueueSelector that selects the queue by computing orderId % queues.size(). The consumer registers a MessageListenerOrderly so each queue is processed by a single thread.
producer.send(msg, new MessageQueueSelector(){
public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg){
Long orderId = (Long) arg;
long index = orderId % mqs.size();
return mqs.get((int)index);
}
}, orderId, new SendCallback(){
public void onSuccess(SendResult sendResult){ System.out.println(sendResult); }
public void onException(Throwable t){ System.out.println(t); }
}); consumer.registerMessageListener(new MessageListenerOrderly(){
public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context){
try {
for(MessageExt t : msgs){ /* process in order */ }
return ConsumeOrderlyStatus.SUCCESS;
} catch (Exception e){
return ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT;
}
}
});Spring Boot equivalents use RocketMQTemplate.syncSendOrderly and @RocketMQMessageListener(consumeMode = ConsumeMode.ORDERLY).
3. Message Filtering Implementation
Tags and user properties allow consumers to receive only relevant messages. The producer sets a tag (e.g., "A") and optional custom properties. The consumer subscribes with a tag expression like "A||B".
Message msg = new Message(topic, "A", body);
msg.putUserProperty("name", "value");
producer.send(msg); consumer.subscribe("test", "A||B");In Spring Boot, tags are added via MessageConst.PROPERTY_TAGS and the listener uses selectorExpression = "A||B" in the @RocketMQMessageListener annotation.
4. Delayed Message Implementation
Delayed messages postpone consumption. The native API sets a delay level on the message (e.g., level 3 equals 10 seconds). The producer sends the message, and the consumer receives it after the configured delay.
Message msg = new Message(topic, body);
msg.setDelayTimeLevel(3); // 10 seconds
producer.send(msg);Spring Boot provides
rocketMQTemplate.syncSend(topic, message, timeout, delayLevel)where the last argument specifies the delay level.
Message<DemoMessage> message = MessageBuilder.withPayload(new DemoMessage().setId(id)).build();
rocketMQTemplate.syncSend(DemoMessage.TOPIC, message, 30*1000, 3);These patterns illustrate how RocketMQ can handle reliable transaction processing, strict ordering, efficient filtering, and time‑based delivery, both with raw Java APIs and with the higher‑level Spring Boot integration.
Signed-in readers can open the original source through BestHub's protected redirect.
This article has been distilled and summarized from source material, then republished for learning and reference. If you believe it infringes your rights, please contactand we will review it promptly.
Programmer DD
A tinkering programmer and author of "Spring Cloud Microservices in Action"
How this landed with the community
Was this worth your time?
0 Comments
Thoughtful readers leave field notes, pushback, and hard-won operational detail here.
