How to Achieve Reliable Message Consistency with Spring Boot & RocketMQ
This article explains the principle of reliable message eventual consistency using Spring Boot 2.3.9 and RocketMQ 4.8.0, detailing the transaction flow, broker handling of commit/rollback, project structure, code implementation for producers and consumers, and testing procedures.
Reliable Message Consistency Principle
Environment: Spring Boot 2.3.9 + RocketMQ 4.8.0
Execution Flow
Producer sends a Prepare message to the broker.
After the Prepare message is successfully sent, the local transaction starts.
If the local transaction succeeds, the producer returns commit; otherwise it returns rollback (determined by the developer in the transaction callback).
The producer sends the commit or rollback result to the broker, which leads to two possible scenarios:
1. Broker receives commit/rollback: If commit is received, the broker treats the whole transaction as successful and delivers the message to the consumer. If rollback is received, the broker assumes the local transaction failed and deletes the half message, not delivering it. 2. Broker does not receive a confirmation (e.g., the local transaction crashes and returns UNKNOWN ): The broker periodically checks the local transaction result. If the check returns commit, the broker treats the transaction as successful; if it returns UNKNOWN , the broker keeps retrying. The retry interval and count are configurable.
Project Structure
Parent project with two child modules: account-manager and integral-manager.
Dependencies
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-jpa</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
<version>2.2.0</version>
</dependency>Account Sub‑module
Configuration (application.yml)
server:
port: 8081
---
rocketmq:
nameServer: localhost:9876
producer:
group: pack-mq
---
spring:
jpa:
generateDdl: false
hibernate:
ddlAuto: update
openInView: true
show-sql: true
---
spring:
datasource:
driverClassName: com.mysql.cj.jdbc.Driver
url: jdbc:mysql://localhost:3306/account?serverTimezone=GMT%2B8
username: root
password: ******
type: com.zaxxer.hikari.HikariDataSource
hikari:
minimumIdle: 10
maximumPoolSize: 200
autoCommit: true
idleTimeout: 30000
poolName: MasterDatabookHikariCP
maxLifetime: 1800000
connectionTimeout: 30000
connectionTestQuery: SELECT 1Entity Classes
// User table
@Entity
@Table(name = "t_account")
public class Account {
@Id
private Long id;
private String name;
}
// Business log table (used for deduplication)
@Entity
@Table(name = "t_account_log")
public class AccountLog {
@Id
private Long txid;
private Date createTime;
}DAO Interfaces
public interface AccountRepository extends JpaRepository<Account, Long> {}
public interface AccountLogRepository extends JpaRepository<AccountLog, Long> {}Service Layer
@Resource
private AccountRepository accountRepository;
@Resource
private AccountLogRepository accountLogRepository;
// Save business data and a corresponding log for later verification
@Transactional
public boolean register(Account account) {
accountRepository.save(account);
AccountLog accountLog = new AccountLog(account.getId(), new Date());
accountLogRepository.save(accountLog);
return true;
}
public AccountLog existsTxId(Long txid) {
return accountLogRepository.findById(txid).orElse(null);
}Message Sending
@Resource
private RocketMQTemplate rocketMQTemplate;
public String sendTx(String topic, String tags, Account account) {
String uuid = UUID.randomUUID().toString().replaceAll("-", "");
TransactionSendResult result = rocketMQTemplate.sendMessageInTransaction(
topic + ":" + tags,
MessageBuilder.withPayload(account).setHeader("tx_id", uuid).build(),
uuid);
return result.getSendStatus().name();
}Producer Transaction Listener
@RocketMQTransactionListener
public class ProducerMessageListener implements RocketMQLocalTransactionListener {
@Resource
private AccountService accountService;
@Override
public RocketMQLocalTransactionState executeLocalTransaction(Message msg, Object arg) {
try {
Account account = new JsonMapper().readValue((byte[]) msg.getPayload(), Account.class);
accountService.register(account);
} catch (Exception e) {
e.printStackTrace();
return RocketMQLocalTransactionState.ROLLBACK;
}
return RocketMQLocalTransactionState.COMMIT;
}
@Override
public RocketMQLocalTransactionState checkLocalTransaction(Message msg) {
try {
Account account = new JsonMapper().readValue((byte[]) msg.getPayload(), Account.class);
System.out.println("Checking if ID " + account.getId() + " exists");
AccountLog accountLog = accountService.existsTxId(account.getId());
if (accountLog == null) {
return RocketMQLocalTransactionState.UNKNOWN;
}
} catch (Exception e) {
e.printStackTrace();
return RocketMQLocalTransactionState.UNKNOWN;
}
return RocketMQLocalTransactionState.COMMIT;
}
}Controller
@RestController
@RequestMapping("/accounts")
public class AccountController {
@Resource
private ProducerMessageService messageService;
@PostMapping("/send")
public Object sendMessage(@RequestBody Account account) {
return messageService.sendTx("tx-topic", "mks", account);
}
}Integral Sub‑module
Entity
@Entity
@Table(name = "t_integral")
public class Integral {
@Id
private Long id;
private Integer score;
private Long acccountId;
}DAO
public interface IntegralRepository extends JpaRepository<Integral, Long> {}Service
@Resource
private IntegralRepository integralRepository;
@Transactional
public Integral saveIntegral(Integral integral) {
return integralRepository.save(integral);
}Message Listener
@RocketMQMessageListener(topic = "tx-topic", consumerGroup = "consumer05-group", selectorExpression = "mks")
@Component
public class IntegralMessageListener implements RocketMQListener<String> {
@Resource
private IntegralService integralService;
@Override
public void onMessage(String message) {
System.out.println("Integral received message: " + message);
try {
Map<String, Object> jsonMap = new JsonMapper().readValue(message, Map.class);
Integer id = (Integer) jsonMap.get("id");
integralService.saveIntegral(new Integral(1L, 1000, id + 0L));
} catch (Exception e) {
throw new RuntimeException(e);
}
}
}Testing
Start both sub‑modules separately.
Initial Database Tables
Postman Test
Account module
Integral module
When the Account sub‑module encounters an error in its local transaction, the transaction rolls back and the half message is deleted, so the Integral sub‑module never receives the message.
End of tutorial.
Signed-in readers can open the original source through BestHub's protected redirect.
This article has been distilled and summarized from source material, then republished for learning and reference. If you believe it infringes your rights, please contactand we will review it promptly.
Spring Full-Stack Practical Cases
Full-stack Java development with Vue 2/3 front-end suite; hands-on examples and source code analysis for Spring, Spring Boot 2/3, and Spring Cloud.
How this landed with the community
Was this worth your time?
0 Comments
Thoughtful readers leave field notes, pushback, and hard-won operational detail here.
