Master RocketMQ with Spring Boot: From Simple to Transactional Messaging
This guide walks through integrating RocketMQ 4.8.0 with Spring Boot 2.3.9, covering dependencies, configuration, code examples for normal, ordered, cluster/broadcast, and transactional messages, and demonstrates testing results with detailed explanations of each messaging mode.
Environment
Spring Boot 2.3.9 RELEASE with RocketMQ 4.8.0.
Dependencies
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
<version>2.2.0</version>
</dependency>Configuration
server:
port: 8080
---
rocketmq:
nameServer: localhost:9876
producer:
group: demo-mqNormal Message
Sending
@Resource
private RocketMQTemplate rocketMQTemplate ;
public void send(String message) {
rocketMQTemplate.convertAndSend("test-topic:tag2", MessageBuilder.withPayload(message).build());
}Receiving
@RocketMQMessageListener(topic = "test-topic", consumerGroup = "consumer01-group", selectorExpression = "tag1 || tag2")
@Component
public class ConsumerListener implements RocketMQListener<String> {
@Override
public void onMessage(String message) {
System.out.println("Received message: " + message);
}
}Ordered Message
Sending
@Resource
private RocketMQTemplate rocketMQTemplate ;
public void sendOrder(String topic, String message, String tags, int id) {
rocketMQTemplate.asyncSendOrderly(topic + ":" + tags, MessageBuilder.withPayload(message).build(),
"order-" + id, new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
System.err.println("msg-id: " + sendResult.getMsgId() + ": " + message + "\tqueueId: " + sendResult.getMessageQueue().getQueueId());
}
@Override
public void onException(Throwable e) {
e.printStackTrace();
}
});
}Consumer
@RocketMQMessageListener(topic = "order-topic", consumerGroup = "consumer02-group", selectorExpression = "tag3 || tag4", consumeMode = ConsumeMode.ORDERLY)
@Component
public class ConsumerOrderListener implements RocketMQListener<String> {
@Override
public void onMessage(String message) {
System.out.println(Thread.currentThread().getName() + " Received Order message: " + message);
}
}ConsumeMode.ORDERLY indicates a single queue and a single thread for ordered processing.
Cluster and Broadcast Modes
Cluster mode sending
@Resource
private RocketMQTemplate rocketMQTemplate ;
public void send(String topic, String message, String tags) {
rocketMQTemplate.send(topic + ":" + tags, MessageBuilder.withPayload(message).build());
}Cluster consumer
@RocketMQMessageListener(topic = "broad-topic", consumerGroup = "consumer03-group", selectorExpression = "tag6 || tag7", messageModel = MessageModel.CLUSTERING)
@Component
public class ConsumerBroadListener implements RocketMQListener<String> {
@Override
public void onMessage(String message) {
System.out.println("ConsumerBroadListener1 received: " + message);
}
}Broadcast consumer
@RocketMQMessageListener(topic = "broad-topic", consumerGroup = "consumer03-group", selectorExpression = "tag6 || tag7", messageModel = MessageModel.BROADCASTING)
@Component
public class ConsumerBroadListener implements RocketMQListener<String> {
@Override
public void onMessage(String message) {
System.out.println("ConsumerBroadListener1 received: " + message);
}
}Test results (cluster mode shows load‑balanced consumption, broadcast mode shows each instance receiving all messages):
Transactional Message
RocketMQ defines three transaction statuses:
CommitTransaction : commit the transaction, consumer can consume the message.
RollbackTransaction : rollback, the message is deleted and cannot be consumed.
Unknown : intermediate state, requires a status check.
The two‑phase process consists of a normal send/commit phase and a compensation flow.
Normal transaction send and commit
Producer sends a half‑message (cannot be consumed).
Server acknowledges the half‑message.
Local transaction executes.
Based on the local result, the producer commits or rolls back.
Compensation flow
If the server does not receive a commit/rollback status for a long time, it sends a check request to the producer.
The producer checks the local transaction status.
According to the check result, it commits or rolls back.
Sending code
@Resource
private RocketMQTemplate rocketMQTemplate ;
public void sendTx(String topic, Long id, String tags) {
rocketMQTemplate.sendMessageInTransaction(topic + ":" + tags, MessageBuilder.withPayload(
new Users(id, UUID.randomUUID().toString().replaceAll("-", "")))
.setHeader("BID", UUID.randomUUID().toString().replaceAll("-", ""))
.build(),
UUID.randomUUID().toString().replaceAll("-", ""));
}Producer transaction listener
@RocketMQTransactionListener
public class ProducerTxListener implements RocketMQLocalTransactionListener {
@Resource
private BusinessService bs;
@Override
public RocketMQLocalTransactionState executeLocalTransaction(Message msg, Object arg) {
try {
String id = (String) msg.getHeaders().get("BID");
Users users = new JsonMapper().readValue((byte[])msg.getPayload(), Users.class);
System.out.println("Message: " + users + "\tData: " + arg + "\tTx ID: " + id);
bs.save(users, new UsersLog(users.getId(), id));
} catch (Exception e) {
e.printStackTrace();
return RocketMQLocalTransactionState.ROLLBACK;
}
return RocketMQLocalTransactionState.COMMIT;
}
@Override
public RocketMQLocalTransactionState checkLocalTransaction(Message msg) {
String id = (String) msg.getHeaders().get("BID");
System.out.println("Checking if ID " + id + " exists");
UsersLog usersLog = bs.queryUsersLog(id);
if (usersLog == null) {
return RocketMQLocalTransactionState.ROLLBACK;
}
return RocketMQLocalTransactionState.COMMIT;
}
}Consumer
@RocketMQMessageListener(topic = "tx-topic", consumerGroup = "consumer05-group", selectorExpression = "tag10")
@Component
public class ConsumerTxListener implements RocketMQListener<Users> {
@Override
public void onMessage(Users users) {
System.out.println("TX received: " + users);
}
}Service layer
@Transactional
public boolean save(Users users, UsersLog usersLog) {
usersRepository.save(users);
usersLogRepository.save(usersLog);
if (users.getId() == 1) {
throw new RuntimeException("Data error");
}
return true;
}
public UsersLog queryUsersLog(String bid) {
return usersLogRepository.findByBid(bid);
}Controller
@GetMapping("/tx/{id}")
public Object sendTx(@PathVariable("id") Long id) {
ps.sendTx("tx-topic", id, "tag10");
return "send transaction success";
}Testing shows that after the local transaction commits, the consumer receives the message, and rollback scenarios prevent consumption.
Signed-in readers can open the original source through BestHub's protected redirect.
This article has been distilled and summarized from source material, then republished for learning and reference. If you believe it infringes your rights, please contactand we will review it promptly.
Spring Full-Stack Practical Cases
Full-stack Java development with Vue 2/3 front-end suite; hands-on examples and source code analysis for Spring, Spring Boot 2/3, and Spring Cloud.
How this landed with the community
Was this worth your time?
0 Comments
Thoughtful readers leave field notes, pushback, and hard-won operational detail here.
