Master RocketMQ: Transaction, Ordered, Filtered & Delayed Messaging in Java & Spring

This tutorial walks through practical implementations of RocketMQ transaction, ordered, filtered, and delayed messages using native Java code and Spring Boot, explaining the underlying concepts, showing complete code examples, and highlighting how to configure producers, consumers, and listeners for reliable distributed messaging.

Programmer DD
Programmer DD
Programmer DD
Master RocketMQ: Transaction, Ordered, Filtered & Delayed Messaging in Java & Spring

1. Transaction Message Implementation

First, a native Java producer creates a TransactionProducer with a custom TransactionListenerImpl. The producer sets up a thread pool, registers the listener, starts, and sends a half‑message. If the half‑message fails, it can be stored for later rollback. The listener defines executeLocalTransaction to commit or rollback based on local business logic and checkLocalTransaction for broker‑initiated checks.

package com.huc.rocketmq.transaction;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.TransactionListener;
import org.apache.rocketmq.client.producer.TransactionMQProducer;
import org.apache.rocketmq.client.producer.TransactionSendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.common.RemotingHelper;
import java.io.UnsupportedEncodingException;
import java.util.concurrent.*;
public class TransactionProducer {
    public static void main(String[] args) throws MQClientException, UnsupportedEncodingException {
        TransactionListener transactionListener = new TransactionListenerImpl();
        TransactionMQProducer producer = new TransactionMQProducer("testTransactionGroup");
        ExecutorService executorService = new ThreadPoolExecutor(2,5,100,TimeUnit.SECONDS,
            new ArrayBlockingQueue<Runnable>(2000),
            new ThreadFactory(){
                public Thread newThread(Runnable r){
                    Thread thread = new Thread(r);
                    thread.setName("testThread");
                    return thread;
                }
            });
        producer.setExecutorService(executorService);
        producer.setTransactionListener(transactionListener);
        producer.start();
        Message message = new Message("PayOrderSuccessTopic","testTag","testKey",
            "订单支付消息".getBytes(RemotingHelper.DEFAULT_CHARSET));
        try {
            TransactionSendResult result = producer.sendMessageInTransaction(message, null);
        } catch (Exception e) {
            // handle half‑message failure, rollback logic
        }
    }
}
package com.huc.rocketmq.transaction;
import org.apache.rocketmq.client.producer.LocalTransactionState;
import org.apache.rocketmq.client.producer.TransactionListener;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageExt;
public class TransactionListenerImpl implements TransactionListener {
    @Override
    public LocalTransactionState executeLocalTransaction(Message message, Object o) {
        try {
            return LocalTransactionState.COMMIT_MESSAGE;
        } catch (Exception e) {
            return LocalTransactionState.ROLLBACK_MESSAGE;
        }
    }
    @Override
    public LocalTransactionState checkLocalTransaction(MessageExt messageExt) {
        try {
            return LocalTransactionState.COMMIT_MESSAGE;
        } catch (Exception e) {
            return LocalTransactionState.ROLLBACK_MESSAGE;
        }
    }
}

Spring Boot version uses RocketMQTemplate and a TransactionListenerImpl annotated with @RocketMQTransactionListener to achieve the same logic.

2. Ordered Message Implementation

To guarantee order, messages are sent to the same MessageQueue based on a key (e.g., order ID). The native implementation provides a MessageQueueSelector that selects the queue by computing orderId % queues.size(). The consumer registers a MessageListenerOrderly so each queue is processed by a single thread.

producer.send(msg, new MessageQueueSelector(){
    public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg){
        Long orderId = (Long) arg;
        long index = orderId % mqs.size();
        return mqs.get((int)index);
    }
}, orderId, new SendCallback(){
    public void onSuccess(SendResult sendResult){ System.out.println(sendResult); }
    public void onException(Throwable t){ System.out.println(t); }
});
consumer.registerMessageListener(new MessageListenerOrderly(){
    public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context){
        try {
            for(MessageExt t : msgs){ /* process in order */ }
            return ConsumeOrderlyStatus.SUCCESS;
        } catch (Exception e){
            return ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT;
        }
    }
});

Spring Boot equivalents use RocketMQTemplate.syncSendOrderly and @RocketMQMessageListener(consumeMode = ConsumeMode.ORDERLY).

3. Message Filtering Implementation

Tags and user properties allow consumers to receive only relevant messages. The producer sets a tag (e.g., "A") and optional custom properties. The consumer subscribes with a tag expression like "A||B".

Message msg = new Message(topic, "A", body);
msg.putUserProperty("name", "value");
producer.send(msg);
consumer.subscribe("test", "A||B");

In Spring Boot, tags are added via MessageConst.PROPERTY_TAGS and the listener uses selectorExpression = "A||B" in the @RocketMQMessageListener annotation.

4. Delayed Message Implementation

Delayed messages postpone consumption. The native API sets a delay level on the message (e.g., level 3 equals 10 seconds). The producer sends the message, and the consumer receives it after the configured delay.

Message msg = new Message(topic, body);
msg.setDelayTimeLevel(3); // 10 seconds
producer.send(msg);

Spring Boot provides

rocketMQTemplate.syncSend(topic, message, timeout, delayLevel)

where the last argument specifies the delay level.

Message<DemoMessage> message = MessageBuilder.withPayload(new DemoMessage().setId(id)).build();
rocketMQTemplate.syncSend(DemoMessage.TOPIC, message, 30*1000, 3);

These patterns illustrate how RocketMQ can handle reliable transaction processing, strict ordering, efficient filtering, and time‑based delivery, both with raw Java APIs and with the higher‑level Spring Boot integration.

Original Source

Signed-in readers can open the original source through BestHub's protected redirect.

Sign in to view source
Republication Notice

This article has been distilled and summarized from source material, then republished for learning and reference. If you believe it infringes your rights, please contactadmin@besthub.devand we will review it promptly.

JavaSpring BootRocketMQDelayed Messagingordered messagingTransaction MessagingMessage Filtering
Programmer DD
Written by

Programmer DD

A tinkering programmer and author of "Spring Cloud Microservices in Action"

0 followers
Reader feedback

How this landed with the community

Sign in to like

Rate this article

Was this worth your time?

Sign in to rate
Discussion

0 Comments

Thoughtful readers leave field notes, pushback, and hard-won operational detail here.