Backend Development 9 min read

Mastering RocketMQ with Spring Boot: From Simple to Transactional Messaging

This guide demonstrates how to integrate RocketMQ 4.8.0 into a Spring Boot 2.4.12 application, covering dependency setup, configuration, sending and receiving normal, ordered, cluster/broadcast, and transactional messages, complete with code snippets and execution results to help developers implement reliable messaging patterns.

Spring Full-Stack Practical Cases
Spring Full-Stack Practical Cases
Spring Full-Stack Practical Cases
Mastering RocketMQ with Spring Boot: From Simple to Transactional Messaging

Dependencies

<code>&lt;dependency&gt;
  &lt;groupId&gt;org.springframework.boot&lt;/groupId&gt;
  &lt;artifactId&gt;spring-boot-starter-web&lt;/artifactId&gt;
&lt;/dependency&gt;
&lt;dependency&gt;
  &lt;groupId&gt;org.apache.rocketmq&lt;/groupId&gt;
  &lt;artifactId&gt;rocketmq-spring-boot-starter&lt;/artifactId&gt;
  &lt;version&gt;2.2.0&lt;/version&gt;
&lt;/dependency&gt;</code>

Configuration

<code>server:
  port: 8080
---
rocketmq:
  nameServer: localhost:9876
  producer:
    group: demo-mq</code>

Normal Message

Sending code:

<code>@Resource
private RocketMQTemplate rocketMQTemplate;

public void send(String message) {
    rocketMQTemplate.convertAndSend("test-topic:tag2", MessageBuilder.withPayload(message).build());
}</code>

Receiving code:

<code>@RocketMQMessageListener(topic = "test-topic", consumerGroup = "consumer01-group", selectorExpression = "tag1 || tag2")
@Component
public class ConsumerListener implements RocketMQListener<String> {
    @Override
    public void onMessage(String message) {
        System.out.println("接收到消息:" + message);
    }
}</code>

Ordered Message

Sending code (orderly):

<code>public void sendOrder(String topic, String message, String tags, int id) {
    rocketMQTemplate.asyncSendOrderly(topic + ":" + tags,
        MessageBuilder.withPayload(message).build(),
        "order-" + id,
        new SendCallback() {
            @Override
            public void onSuccess(SendResult sendResult) {
                System.err.println("msg-id: " + sendResult.getMsgId() + ": " + message + "\tqueueId: " + sendResult.getMessageQueue().getQueueId());
            }
            @Override
            public void onException(Throwable e) {
                e.printStackTrace();
            }
        });
}</code>

Receiving code (ordered mode):

<code>@RocketMQMessageListener(topic = "order-topic", consumerGroup = "consumer02-group", selectorExpression = "tag3 || tag4", consumeMode = ConsumeMode.ORDERLY)
@Component
public class ConsumerOrderListener implements RocketMQListener<String> {
    @Override
    public void onMessage(String message) {
        System.out.println(Thread.currentThread().getName() + " 接收到Order消息:" + message);
    }
}</code>

In ORDERLY mode, each queue is processed by a single thread, guaranteeing message order.

Cluster / Broadcast Message Mode

Sending code (shared for both modes):

<code>public void send(String topic, String message, String tags) {
    rocketMQTemplate.send(topic + ":" + tags, MessageBuilder.withPayload(message).build());
}</code>

Cluster consumer (load‑balanced):

<code>@RocketMQMessageListener(topic = "broad-topic", consumerGroup = "consumer03-group", selectorExpression = "tag6 || tag7", messageModel = MessageModel.CLUSTERING)
@Component
public class ConsumerBroadListener implements RocketMQListener<String> {
    @Override
    public void onMessage(String message) {
        System.out.println("ConsumerBroadListener1接收到消息:" + message);
    }
}</code>

Broadcast consumer (each instance receives all messages):

<code>@RocketMQMessageListener(topic = "broad-topic", consumerGroup = "consumer03-group", selectorExpression = "tag6 || tag7", messageModel = MessageModel.BROADCASTING)
@Component
public class ConsumerBroadListener implements RocketMQListener<String> {
    @Override
    public void onMessage(String message) {
        System.out.println("ConsumerBroadListener1接收到消息:" + message);
    }
}</code>
Cluster message result
Cluster message result
Broadcast message result
Broadcast message result

Transactional Message

RocketMQ defines three transaction statuses:

CommitTransaction : the transaction is committed and the message can be consumed.

RollbackTransaction : the transaction is rolled back and the message is discarded.

Unknown : the broker will later query the producer to determine the final state.

Transactional messaging follows a two‑phase process: first the producer sends a half‑message and executes a local transaction; then, based on the local outcome, it commits or rolls back. If the broker does not receive a commit/rollback within a timeout, it triggers a check callback.

Sending a transactional message:

<code>@Resource
private RocketMQTemplate rocketMQTemplate;

public void sendTx(String topic, Long id, String tags) {
    rocketMQTemplate.sendMessageInTransaction(topic + ":" + tags,
        MessageBuilder.withPayload(new Users(id, UUID.randomUUID().toString().replaceAll("-", "")))
            .setHeader("BID", UUID.randomUUID().toString().replaceAll("-", ""))
            .build(),
        UUID.randomUUID().toString().replaceAll("-", ""));
}</code>

Producer transaction listener:

<code>@RocketMQTransactionListener
public class ProducerTxListener implements RocketMQLocalTransactionListener {
    @Resource
    private BusinessService bs;

    @Override
    public RocketMQLocalTransactionState executeLocalTransaction(Message msg, Object arg) {
        try {
            String id = (String) msg.getHeaders().get("BID");
            Users users = new JsonMapper().readValue((byte[]) msg.getPayload(), Users.class);
            System.out.println("消息内容:" + users + "\t参与数据:" + arg + "\t本次事务的唯一编号:" + id);
            bs.save(users, new UsersLog(users.getId(), id));
        } catch (Exception e) {
            e.printStackTrace();
            return RocketMQLocalTransactionState.ROLLBACK;
        }
        return RocketMQLocalTransactionState.COMMIT;
    }

    @Override
    public RocketMQLocalTransactionState checkLocalTransaction(Message msg) {
        String id = (String) msg.getHeaders().get("BID");
        System.out.println("执行查询ID为:" + id + " 的数据是否存在");
        UsersLog usersLog = bs.queryUsersLog(id);
        if (usersLog == null) {
            return RocketMQLocalTransactionState.ROLLBACK;
        }
        return RocketMQLocalTransactionState.COMMIT;
    }
}</code>

Consumer for transactional messages:

<code>@RocketMQMessageListener(topic = "tx-topic", consumerGroup = "consumer05-group", selectorExpression = "tag10")
@Component
public class ConsumerTxListener implements RocketMQListener<Users> {
    @Override
    public void onMessage(Users users) {
        System.out.println("TX接收到消息:" + users);
    }
}</code>

Service layer with @Transactional:

<code>@Transactional
public boolean save(Users users, UsersLog usersLog) {
    usersRepository.save(users);
    usersLogRepository.save(usersLog);
    if (users.getId() == 1) {
        throw new RuntimeException("数据错误");
    }
    return true;
}

public UsersLog queryUsersLog(String bid) {
    return usersLogRepository.findByBid(bid);
}</code>

Controller endpoint to trigger the transaction:

<code>@GetMapping("/tx/{id}")
public Object sendTx(@PathVariable("id") Long id) {
    ps.sendTx("tx-topic", id, "tag10");
    return "send transaction success";
}</code>
Transaction execution log
Transaction execution log

The logs show that the local transaction completes before the consumer receives the message, confirming the two‑phase commit behavior.

JavaSpring BootrocketmqclusterMessagingTransactionalBroadcast
Spring Full-Stack Practical Cases
Written by

Spring Full-Stack Practical Cases

Full-stack Java development with Vue 2/3 front-end suite; hands-on examples and source code analysis for Spring, Spring Boot 2/3, and Spring Cloud.

0 followers
Reader feedback

How this landed with the community

login Sign in to like

Rate this article

Was this worth your time?

Sign in to rate
Discussion

0 Comments

Thoughtful readers leave field notes, pushback, and hard-won operational detail here.