Mastering RocketMQ with Spring Boot: From Simple to Transactional Messaging
This guide demonstrates how to integrate RocketMQ 4.8.0 into a Spring Boot 2.4.12 application, covering dependency setup, configuration, sending and receiving normal, ordered, cluster/broadcast, and transactional messages, complete with code snippets and execution results to help developers implement reliable messaging patterns.
Dependencies
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
<version>2.2.0</version>
</dependency>Configuration
server:
port: 8080
---
rocketmq:
nameServer: localhost:9876
producer:
group: demo-mqNormal Message
Sending code:
@Resource
private RocketMQTemplate rocketMQTemplate;
public void send(String message) {
rocketMQTemplate.convertAndSend("test-topic:tag2", MessageBuilder.withPayload(message).build());
}Receiving code:
@RocketMQMessageListener(topic = "test-topic", consumerGroup = "consumer01-group", selectorExpression = "tag1 || tag2")
@Component
public class ConsumerListener implements RocketMQListener<String> {
@Override
public void onMessage(String message) {
System.out.println("接收到消息:" + message);
}
}Ordered Message
Sending code (orderly):
public void sendOrder(String topic, String message, String tags, int id) {
rocketMQTemplate.asyncSendOrderly(topic + ":" + tags,
MessageBuilder.withPayload(message).build(),
"order-" + id,
new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
System.err.println("msg-id: " + sendResult.getMsgId() + ": " + message + "\tqueueId: " + sendResult.getMessageQueue().getQueueId());
}
@Override
public void onException(Throwable e) {
e.printStackTrace();
}
});
}Receiving code (ordered mode):
@RocketMQMessageListener(topic = "order-topic", consumerGroup = "consumer02-group", selectorExpression = "tag3 || tag4", consumeMode = ConsumeMode.ORDERLY)
@Component
public class ConsumerOrderListener implements RocketMQListener<String> {
@Override
public void onMessage(String message) {
System.out.println(Thread.currentThread().getName() + " 接收到Order消息:" + message);
}
}In ORDERLY mode, each queue is processed by a single thread, guaranteeing message order.
Cluster / Broadcast Message Mode
Sending code (shared for both modes):
public void send(String topic, String message, String tags) {
rocketMQTemplate.send(topic + ":" + tags, MessageBuilder.withPayload(message).build());
}Cluster consumer (load‑balanced):
@RocketMQMessageListener(topic = "broad-topic", consumerGroup = "consumer03-group", selectorExpression = "tag6 || tag7", messageModel = MessageModel.CLUSTERING)
@Component
public class ConsumerBroadListener implements RocketMQListener<String> {
@Override
public void onMessage(String message) {
System.out.println("ConsumerBroadListener1接收到消息:" + message);
}
}Broadcast consumer (each instance receives all messages):
@RocketMQMessageListener(topic = "broad-topic", consumerGroup = "consumer03-group", selectorExpression = "tag6 || tag7", messageModel = MessageModel.BROADCASTING)
@Component
public class ConsumerBroadListener implements RocketMQListener<String> {
@Override
public void onMessage(String message) {
System.out.println("ConsumerBroadListener1接收到消息:" + message);
}
}Transactional Message
RocketMQ defines three transaction statuses:
CommitTransaction : the transaction is committed and the message can be consumed.
RollbackTransaction : the transaction is rolled back and the message is discarded.
Unknown : the broker will later query the producer to determine the final state.
Transactional messaging follows a two‑phase process: first the producer sends a half‑message and executes a local transaction; then, based on the local outcome, it commits or rolls back. If the broker does not receive a commit/rollback within a timeout, it triggers a check callback.
Sending a transactional message:
@Resource
private RocketMQTemplate rocketMQTemplate;
public void sendTx(String topic, Long id, String tags) {
rocketMQTemplate.sendMessageInTransaction(topic + ":" + tags,
MessageBuilder.withPayload(new Users(id, UUID.randomUUID().toString().replaceAll("-", "")))
.setHeader("BID", UUID.randomUUID().toString().replaceAll("-", ""))
.build(),
UUID.randomUUID().toString().replaceAll("-", ""));
}Producer transaction listener:
@RocketMQTransactionListener
public class ProducerTxListener implements RocketMQLocalTransactionListener {
@Resource
private BusinessService bs;
@Override
public RocketMQLocalTransactionState executeLocalTransaction(Message msg, Object arg) {
try {
String id = (String) msg.getHeaders().get("BID");
Users users = new JsonMapper().readValue((byte[]) msg.getPayload(), Users.class);
System.out.println("消息内容:" + users + "\t参与数据:" + arg + "\t本次事务的唯一编号:" + id);
bs.save(users, new UsersLog(users.getId(), id));
} catch (Exception e) {
e.printStackTrace();
return RocketMQLocalTransactionState.ROLLBACK;
}
return RocketMQLocalTransactionState.COMMIT;
}
@Override
public RocketMQLocalTransactionState checkLocalTransaction(Message msg) {
String id = (String) msg.getHeaders().get("BID");
System.out.println("执行查询ID为:" + id + " 的数据是否存在");
UsersLog usersLog = bs.queryUsersLog(id);
if (usersLog == null) {
return RocketMQLocalTransactionState.ROLLBACK;
}
return RocketMQLocalTransactionState.COMMIT;
}
}Consumer for transactional messages:
@RocketMQMessageListener(topic = "tx-topic", consumerGroup = "consumer05-group", selectorExpression = "tag10")
@Component
public class ConsumerTxListener implements RocketMQListener<Users> {
@Override
public void onMessage(Users users) {
System.out.println("TX接收到消息:" + users);
}
}Service layer with @Transactional:
@Transactional
public boolean save(Users users, UsersLog usersLog) {
usersRepository.save(users);
usersLogRepository.save(usersLog);
if (users.getId() == 1) {
throw new RuntimeException("数据错误");
}
return true;
}
public UsersLog queryUsersLog(String bid) {
return usersLogRepository.findByBid(bid);
}Controller endpoint to trigger the transaction:
@GetMapping("/tx/{id}")
public Object sendTx(@PathVariable("id") Long id) {
ps.sendTx("tx-topic", id, "tag10");
return "send transaction success";
}The logs show that the local transaction completes before the consumer receives the message, confirming the two‑phase commit behavior.
Signed-in readers can open the original source through BestHub's protected redirect.
This article has been distilled and summarized from source material, then republished for learning and reference. If you believe it infringes your rights, please contactand we will review it promptly.
Spring Full-Stack Practical Cases
Full-stack Java development with Vue 2/3 front-end suite; hands-on examples and source code analysis for Spring, Spring Boot 2/3, and Spring Cloud.
How this landed with the community
Was this worth your time?
0 Comments
Thoughtful readers leave field notes, pushback, and hard-won operational detail here.
