Mastering RocketMQ with Spring Boot: From Simple to Transactional Messaging
This guide demonstrates how to integrate RocketMQ 4.8.0 into a Spring Boot 2.4.12 application, covering dependency setup, configuration, sending and receiving normal, ordered, cluster/broadcast, and transactional messages, complete with code snippets and execution results to help developers implement reliable messaging patterns.
Dependencies
<code><dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
<version>2.2.0</version>
</dependency></code>Configuration
<code>server:
port: 8080
---
rocketmq:
nameServer: localhost:9876
producer:
group: demo-mq</code>Normal Message
Sending code:
<code>@Resource
private RocketMQTemplate rocketMQTemplate;
public void send(String message) {
rocketMQTemplate.convertAndSend("test-topic:tag2", MessageBuilder.withPayload(message).build());
}</code>Receiving code:
<code>@RocketMQMessageListener(topic = "test-topic", consumerGroup = "consumer01-group", selectorExpression = "tag1 || tag2")
@Component
public class ConsumerListener implements RocketMQListener<String> {
@Override
public void onMessage(String message) {
System.out.println("接收到消息:" + message);
}
}</code>Ordered Message
Sending code (orderly):
<code>public void sendOrder(String topic, String message, String tags, int id) {
rocketMQTemplate.asyncSendOrderly(topic + ":" + tags,
MessageBuilder.withPayload(message).build(),
"order-" + id,
new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
System.err.println("msg-id: " + sendResult.getMsgId() + ": " + message + "\tqueueId: " + sendResult.getMessageQueue().getQueueId());
}
@Override
public void onException(Throwable e) {
e.printStackTrace();
}
});
}</code>Receiving code (ordered mode):
<code>@RocketMQMessageListener(topic = "order-topic", consumerGroup = "consumer02-group", selectorExpression = "tag3 || tag4", consumeMode = ConsumeMode.ORDERLY)
@Component
public class ConsumerOrderListener implements RocketMQListener<String> {
@Override
public void onMessage(String message) {
System.out.println(Thread.currentThread().getName() + " 接收到Order消息:" + message);
}
}</code>In ORDERLY mode, each queue is processed by a single thread, guaranteeing message order.
Cluster / Broadcast Message Mode
Sending code (shared for both modes):
<code>public void send(String topic, String message, String tags) {
rocketMQTemplate.send(topic + ":" + tags, MessageBuilder.withPayload(message).build());
}</code>Cluster consumer (load‑balanced):
<code>@RocketMQMessageListener(topic = "broad-topic", consumerGroup = "consumer03-group", selectorExpression = "tag6 || tag7", messageModel = MessageModel.CLUSTERING)
@Component
public class ConsumerBroadListener implements RocketMQListener<String> {
@Override
public void onMessage(String message) {
System.out.println("ConsumerBroadListener1接收到消息:" + message);
}
}</code>Broadcast consumer (each instance receives all messages):
<code>@RocketMQMessageListener(topic = "broad-topic", consumerGroup = "consumer03-group", selectorExpression = "tag6 || tag7", messageModel = MessageModel.BROADCASTING)
@Component
public class ConsumerBroadListener implements RocketMQListener<String> {
@Override
public void onMessage(String message) {
System.out.println("ConsumerBroadListener1接收到消息:" + message);
}
}</code>Transactional Message
RocketMQ defines three transaction statuses:
CommitTransaction : the transaction is committed and the message can be consumed.
RollbackTransaction : the transaction is rolled back and the message is discarded.
Unknown : the broker will later query the producer to determine the final state.
Transactional messaging follows a two‑phase process: first the producer sends a half‑message and executes a local transaction; then, based on the local outcome, it commits or rolls back. If the broker does not receive a commit/rollback within a timeout, it triggers a check callback.
Sending a transactional message:
<code>@Resource
private RocketMQTemplate rocketMQTemplate;
public void sendTx(String topic, Long id, String tags) {
rocketMQTemplate.sendMessageInTransaction(topic + ":" + tags,
MessageBuilder.withPayload(new Users(id, UUID.randomUUID().toString().replaceAll("-", "")))
.setHeader("BID", UUID.randomUUID().toString().replaceAll("-", ""))
.build(),
UUID.randomUUID().toString().replaceAll("-", ""));
}</code>Producer transaction listener:
<code>@RocketMQTransactionListener
public class ProducerTxListener implements RocketMQLocalTransactionListener {
@Resource
private BusinessService bs;
@Override
public RocketMQLocalTransactionState executeLocalTransaction(Message msg, Object arg) {
try {
String id = (String) msg.getHeaders().get("BID");
Users users = new JsonMapper().readValue((byte[]) msg.getPayload(), Users.class);
System.out.println("消息内容:" + users + "\t参与数据:" + arg + "\t本次事务的唯一编号:" + id);
bs.save(users, new UsersLog(users.getId(), id));
} catch (Exception e) {
e.printStackTrace();
return RocketMQLocalTransactionState.ROLLBACK;
}
return RocketMQLocalTransactionState.COMMIT;
}
@Override
public RocketMQLocalTransactionState checkLocalTransaction(Message msg) {
String id = (String) msg.getHeaders().get("BID");
System.out.println("执行查询ID为:" + id + " 的数据是否存在");
UsersLog usersLog = bs.queryUsersLog(id);
if (usersLog == null) {
return RocketMQLocalTransactionState.ROLLBACK;
}
return RocketMQLocalTransactionState.COMMIT;
}
}</code>Consumer for transactional messages:
<code>@RocketMQMessageListener(topic = "tx-topic", consumerGroup = "consumer05-group", selectorExpression = "tag10")
@Component
public class ConsumerTxListener implements RocketMQListener<Users> {
@Override
public void onMessage(Users users) {
System.out.println("TX接收到消息:" + users);
}
}</code>Service layer with @Transactional:
<code>@Transactional
public boolean save(Users users, UsersLog usersLog) {
usersRepository.save(users);
usersLogRepository.save(usersLog);
if (users.getId() == 1) {
throw new RuntimeException("数据错误");
}
return true;
}
public UsersLog queryUsersLog(String bid) {
return usersLogRepository.findByBid(bid);
}</code>Controller endpoint to trigger the transaction:
<code>@GetMapping("/tx/{id}")
public Object sendTx(@PathVariable("id") Long id) {
ps.sendTx("tx-topic", id, "tag10");
return "send transaction success";
}</code>The logs show that the local transaction completes before the consumer receives the message, confirming the two‑phase commit behavior.
Spring Full-Stack Practical Cases
Full-stack Java development with Vue 2/3 front-end suite; hands-on examples and source code analysis for Spring, Spring Boot 2/3, and Spring Cloud.
How this landed with the community
Was this worth your time?
0 Comments
Thoughtful readers leave field notes, pushback, and hard-won operational detail here.