Mastering RocketMQ with Spring Boot: From Simple to Transactional Messaging
This guide demonstrates how to integrate RocketMQ 4.8.0 into a Spring Boot 2.3.9 application, covering dependency setup, configuration, sending and receiving plain, ordered, clustered, broadcast, and transactional messages, with complete code examples and execution results.
Dependencies
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
<version>2.2.0</version>
</dependency>Configuration
server:
port: 8080
---
rocketmq:
nameServer: localhost:9876
producer:
group: demo-mqPlain Message
Sending a simple message:
@Resource
private RocketMQTemplate rocketMQTemplate;
public void send(String message) {
rocketMQTemplate.convertAndSend("test-topic:tag2", MessageBuilder.withPayload(message).build());
}Receiving the message:
@RocketMQMessageListener(topic = "test-topic", consumerGroup = "consumer01-group", selectorExpression = "tag1 || tag2")
@Component
public class ConsumerListener implements RocketMQListener<String> {
@Override
public void onMessage(String message) {
System.out.println("接收到消息:" + message);
}
}Ordered Message
Sending an ordered message:
public void sendOrder(String topic, String message, String tags, int id) {
rocketMQTemplate.asyncSendOrderly(topic + ":" + tags, MessageBuilder.withPayload(message).build(),
"order-" + id, new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
System.err.println("msg-id: " + sendResult.getMsgId() + ": " + message + "\tqueueId: " + sendResult.getMessageQueue().getQueueId());
}
@Override
public void onException(Throwable e) {
e.printStackTrace();
}
});
}Receiving ordered messages (ConsumeMode.ORDERLY means one queue, one thread):
@RocketMQMessageListener(topic = "order-topic", consumerGroup = "consumer02-group", selectorExpression = "tag3 || tag4", consumeMode = ConsumeMode.ORDERLY)
@Component
public class ConsumerOrderListener implements RocketMQListener<String> {
@Override
public void onMessage(String message) {
System.out.println(Thread.currentThread().getName() + " 接收到Order消息:" + message);
}
}Cluster and Broadcast Modes
Clustered sending:
public void send(String topic, String message, String tags) {
rocketMQTemplate.send(topic + ":" + tags, MessageBuilder.withPayload(message).build());
}Clustered consumer (MessageModel.CLUSTERING):
@RocketMQMessageListener(topic = "broad-topic", consumerGroup = "consumer03-group", selectorExpression = "tag6 || tag7", messageModel = MessageModel.CLUSTERING)
@Component
public class ConsumerBroadListener implements RocketMQListener<String> {
@Override
public void onMessage(String message) {
System.out.println("ConsumerBroadListener1接收到消息:" + message);
}
}Broadcast consumer (MessageModel.BROADCASTING):
@RocketMQMessageListener(topic = "broad-topic", consumerGroup = "consumer03-group", selectorExpression = "tag6 || tag7", messageModel = MessageModel.BROADCASTING)
@Component
public class ConsumerBroadListener implements RocketMQListener<String> {
@Override
public void onMessage(String message) {
System.out.println("ConsumerBroadListener1接收到消息:" + message);
}
}Transactional Messages
RocketMQ transaction states: CommitTransaction, RollbackTransaction, Unknown.
Two‑phase process: send a half message, execute local transaction, then commit or rollback; if the broker does not receive the final state, it will invoke a check callback.
Sending a transactional message:
public void sendTx(String topic, Long id, String tags) {
rocketMQTemplate.sendMessageInTransaction(topic + ":" + tags,
MessageBuilder.withPayload(new Users(id, UUID.randomUUID().toString().replaceAll("-", "")))
.setHeader("BID", UUID.randomUUID().toString().replaceAll("-", ""))
.build(),
UUID.randomUUID().toString().replaceAll("-", ""));
}Producer transaction listener:
@RocketMQTransactionListener
public class ProducerTxListener implements RocketMQLocalTransactionListener {
@Resource
private BusinessService bs;
@Override
public RocketMQLocalTransactionState executeLocalTransaction(Message msg, Object arg) {
try {
String id = (String) msg.getHeaders().get("BID");
Users users = new JsonMapper().readValue((byte[]) msg.getPayload(), Users.class);
System.out.println("消息内容:" + users + "\t参与数据:" + arg + "\t本次事务的唯一编号:" + id);
bs.save(users, new UsersLog(users.getId(), id));
} catch (Exception e) {
e.printStackTrace();
return RocketMQLocalTransactionState.ROLLBACK;
}
return RocketMQLocalTransactionState.COMMIT;
}
@Override
public RocketMQLocalTransactionState checkLocalTransaction(Message msg) {
String id = (String) msg.getHeaders().get("BID");
System.out.println("执行查询ID为:" + id + " 的数据是否存在");
UsersLog usersLog = bs.queryUsersLog(id);
if (usersLog == null) {
return RocketMQLocalTransactionState.ROLLBACK;
}
return RocketMQLocalTransactionState.COMMIT;
}
}Consumer for transactional messages:
@RocketMQMessageListener(topic = "tx-topic", consumerGroup = "consumer05-group", selectorExpression = "tag10")
@Component
public class ConsumerTxListener implements RocketMQListener<Users> {
@Override
public void onMessage(Users users) {
System.out.println("TX接收到消息:" + users);
}
}Service with @Transactional:
@Transactional
public boolean save(Users users, UsersLog usersLog) {
usersRepository.save(users);
usersLogRepository.save(usersLog);
if (users.getId() == 1) {
throw new RuntimeException("数据错误");
}
return true;
}
public UsersLog queryUsersLog(String bid) {
return usersLogRepository.findByBid(bid);
}Controller endpoint to trigger the transaction:
@GetMapping("/tx/{id}")
public Object sendTx(@PathVariable("id") Long id) {
ps.sendTx("tx-topic", id, "tag10");
return "send transaction success";
}Signed-in readers can open the original source through BestHub's protected redirect.
This article has been distilled and summarized from source material, then republished for learning and reference. If you believe it infringes your rights, please contactand we will review it promptly.
Spring Full-Stack Practical Cases
Full-stack Java development with Vue 2/3 front-end suite; hands-on examples and source code analysis for Spring, Spring Boot 2/3, and Spring Cloud.
How this landed with the community
Was this worth your time?
0 Comments
Thoughtful readers leave field notes, pushback, and hard-won operational detail here.
