Mastering RocketMQ with Spring Boot: From Simple to Transactional Messaging

This guide demonstrates how to integrate RocketMQ 4.8.0 into a Spring Boot 2.4.12 application, covering dependency setup, configuration, sending and receiving normal, ordered, cluster/broadcast, and transactional messages, complete with code snippets and execution results to help developers implement reliable messaging patterns.

Spring Full-Stack Practical Cases
Spring Full-Stack Practical Cases
Spring Full-Stack Practical Cases
Mastering RocketMQ with Spring Boot: From Simple to Transactional Messaging

Dependencies

<dependency>
  <groupId>org.springframework.boot</groupId>
  <artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
  <groupId>org.apache.rocketmq</groupId>
  <artifactId>rocketmq-spring-boot-starter</artifactId>
  <version>2.2.0</version>
</dependency>

Configuration

server:
  port: 8080
---
rocketmq:
  nameServer: localhost:9876
  producer:
    group: demo-mq

Normal Message

Sending code:

@Resource
private RocketMQTemplate rocketMQTemplate;

public void send(String message) {
    rocketMQTemplate.convertAndSend("test-topic:tag2", MessageBuilder.withPayload(message).build());
}

Receiving code:

@RocketMQMessageListener(topic = "test-topic", consumerGroup = "consumer01-group", selectorExpression = "tag1 || tag2")
@Component
public class ConsumerListener implements RocketMQListener<String> {
    @Override
    public void onMessage(String message) {
        System.out.println("接收到消息:" + message);
    }
}

Ordered Message

Sending code (orderly):

public void sendOrder(String topic, String message, String tags, int id) {
    rocketMQTemplate.asyncSendOrderly(topic + ":" + tags,
        MessageBuilder.withPayload(message).build(),
        "order-" + id,
        new SendCallback() {
            @Override
            public void onSuccess(SendResult sendResult) {
                System.err.println("msg-id: " + sendResult.getMsgId() + ": " + message + "\tqueueId: " + sendResult.getMessageQueue().getQueueId());
            }
            @Override
            public void onException(Throwable e) {
                e.printStackTrace();
            }
        });
}

Receiving code (ordered mode):

@RocketMQMessageListener(topic = "order-topic", consumerGroup = "consumer02-group", selectorExpression = "tag3 || tag4", consumeMode = ConsumeMode.ORDERLY)
@Component
public class ConsumerOrderListener implements RocketMQListener<String> {
    @Override
    public void onMessage(String message) {
        System.out.println(Thread.currentThread().getName() + " 接收到Order消息:" + message);
    }
}

In ORDERLY mode, each queue is processed by a single thread, guaranteeing message order.

Cluster / Broadcast Message Mode

Sending code (shared for both modes):

public void send(String topic, String message, String tags) {
    rocketMQTemplate.send(topic + ":" + tags, MessageBuilder.withPayload(message).build());
}

Cluster consumer (load‑balanced):

@RocketMQMessageListener(topic = "broad-topic", consumerGroup = "consumer03-group", selectorExpression = "tag6 || tag7", messageModel = MessageModel.CLUSTERING)
@Component
public class ConsumerBroadListener implements RocketMQListener<String> {
    @Override
    public void onMessage(String message) {
        System.out.println("ConsumerBroadListener1接收到消息:" + message);
    }
}

Broadcast consumer (each instance receives all messages):

@RocketMQMessageListener(topic = "broad-topic", consumerGroup = "consumer03-group", selectorExpression = "tag6 || tag7", messageModel = MessageModel.BROADCASTING)
@Component
public class ConsumerBroadListener implements RocketMQListener<String> {
    @Override
    public void onMessage(String message) {
        System.out.println("ConsumerBroadListener1接收到消息:" + message);
    }
}
Cluster message result
Cluster message result
Broadcast message result
Broadcast message result

Transactional Message

RocketMQ defines three transaction statuses:

CommitTransaction : the transaction is committed and the message can be consumed.

RollbackTransaction : the transaction is rolled back and the message is discarded.

Unknown : the broker will later query the producer to determine the final state.

Transactional messaging follows a two‑phase process: first the producer sends a half‑message and executes a local transaction; then, based on the local outcome, it commits or rolls back. If the broker does not receive a commit/rollback within a timeout, it triggers a check callback.

Sending a transactional message:

@Resource
private RocketMQTemplate rocketMQTemplate;

public void sendTx(String topic, Long id, String tags) {
    rocketMQTemplate.sendMessageInTransaction(topic + ":" + tags,
        MessageBuilder.withPayload(new Users(id, UUID.randomUUID().toString().replaceAll("-", "")))
            .setHeader("BID", UUID.randomUUID().toString().replaceAll("-", ""))
            .build(),
        UUID.randomUUID().toString().replaceAll("-", ""));
}

Producer transaction listener:

@RocketMQTransactionListener
public class ProducerTxListener implements RocketMQLocalTransactionListener {
    @Resource
    private BusinessService bs;

    @Override
    public RocketMQLocalTransactionState executeLocalTransaction(Message msg, Object arg) {
        try {
            String id = (String) msg.getHeaders().get("BID");
            Users users = new JsonMapper().readValue((byte[]) msg.getPayload(), Users.class);
            System.out.println("消息内容:" + users + "\t参与数据:" + arg + "\t本次事务的唯一编号:" + id);
            bs.save(users, new UsersLog(users.getId(), id));
        } catch (Exception e) {
            e.printStackTrace();
            return RocketMQLocalTransactionState.ROLLBACK;
        }
        return RocketMQLocalTransactionState.COMMIT;
    }

    @Override
    public RocketMQLocalTransactionState checkLocalTransaction(Message msg) {
        String id = (String) msg.getHeaders().get("BID");
        System.out.println("执行查询ID为:" + id + " 的数据是否存在");
        UsersLog usersLog = bs.queryUsersLog(id);
        if (usersLog == null) {
            return RocketMQLocalTransactionState.ROLLBACK;
        }
        return RocketMQLocalTransactionState.COMMIT;
    }
}

Consumer for transactional messages:

@RocketMQMessageListener(topic = "tx-topic", consumerGroup = "consumer05-group", selectorExpression = "tag10")
@Component
public class ConsumerTxListener implements RocketMQListener<Users> {
    @Override
    public void onMessage(Users users) {
        System.out.println("TX接收到消息:" + users);
    }
}

Service layer with @Transactional:

@Transactional
public boolean save(Users users, UsersLog usersLog) {
    usersRepository.save(users);
    usersLogRepository.save(usersLog);
    if (users.getId() == 1) {
        throw new RuntimeException("数据错误");
    }
    return true;
}

public UsersLog queryUsersLog(String bid) {
    return usersLogRepository.findByBid(bid);
}

Controller endpoint to trigger the transaction:

@GetMapping("/tx/{id}")
public Object sendTx(@PathVariable("id") Long id) {
    ps.sendTx("tx-topic", id, "tag10");
    return "send transaction success";
}
Transaction execution log
Transaction execution log

The logs show that the local transaction completes before the consumer receives the message, confirming the two‑phase commit behavior.

Original Source

Signed-in readers can open the original source through BestHub's protected redirect.

Sign in to view source
Republication Notice

This article has been distilled and summarized from source material, then republished for learning and reference. If you believe it infringes your rights, please contactadmin@besthub.devand we will review it promptly.

JavaSpring BootRocketMQClusterMessagingtransactionalbroadcast
Spring Full-Stack Practical Cases
Written by

Spring Full-Stack Practical Cases

Full-stack Java development with Vue 2/3 front-end suite; hands-on examples and source code analysis for Spring, Spring Boot 2/3, and Spring Cloud.

0 followers
Reader feedback

How this landed with the community

Sign in to like

Rate this article

Was this worth your time?

Sign in to rate
Discussion

0 Comments

Thoughtful readers leave field notes, pushback, and hard-won operational detail here.