Ensuring Reliable Message Consistency with Spring Boot & RocketMQ
This guide explains how to achieve reliable message consistency in a Spring Boot 2.3.9 application using RocketMQ 4.8.0 by outlining the transactional message flow, broker handling, project structure, code implementation, and testing procedures.
Reliable Message Consistency Principle
Environment: Spring Boot 2.3.9 + RocketMQ 4.8.0
Execution Flow
Producer sends a Prepare message to the broker.
After the Prepare message is successfully sent, the local transaction starts.
If the local transaction succeeds, the producer returns COMMIT ; if it fails, it returns ROLLBACK (decided by the developer in the transaction callback).
The producer sends the COMMIT or ROLLBACK result to the broker.
1. If the broker receives COMMIT, it treats the whole transaction as successful and delivers the message to the consumer. 2. If the broker receives ROLLBACK, it deletes the half message and does not deliver it. 3. If the broker does not receive any confirmation (e.g., the local transaction crashes and returns UNKNOWN ), the broker periodically checks the transaction status. If the check returns COMMIT, the broker delivers the message; if it returns ROLLBACK or remains UNKNOWN , the broker keeps retrying until a final result is obtained.
Project Structure
Dependencies
<code><dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-jpa</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
<version>2.2.0</version>
</dependency>
</code>Account Sub‑module
Configuration (application.yml)
<code>server:
port: 8081
---
rocketmq:
nameServer: localhost:9876
producer:
group: pack-mq
---
spring:
jpa:
generateDdl: false
hibernate:
ddlAuto: update
openInView: true
show-sql: true
---
spring:
datasource:
driverClassName: com.mysql.cj.jdbc.Driver
url: jdbc:mysql://localhost:3306/account?serverTimezone=GMT%2B8
username: root
password: ******
type: com.zaxxer.hikari.HikariDataSource
hikari:
minimumIdle: 10
maximumPoolSize: 200
autoCommit: true
idleTimeout: 30000
poolName: MasterDatabookHikariCP
maxLifetime: 1800000
connectionTimeout: 30000
connectionTestQuery: SELECT 1
</code>Entity Classes
<code>// User table
@Entity
@Table(name = "t_account")
public class Account {
@Id
private Long id;
private String name;
}
// Business log table (used for deduplication)
@Entity
@Table(name = "t_account_log")
public class AccountLog {
@Id
private Long txid;
private Date createTime;
}
</code>DAO Interfaces
<code>public interface AccountRepository extends JpaRepository<Account, Long> {}
public interface AccountLogRepository extends JpaRepository<AccountLog, Long> {}
</code>Service Layer
<code>@Resource
private AccountRepository accountRepository;
@Resource
private AccountLogRepository accountLogRepository;
// Save business data and a corresponding log for later verification
@Transactional
public boolean register(Account account) {
accountRepository.save(account);
AccountLog accountLog = new AccountLog(account.getId(), new Date());
accountLogRepository.save(accountLog);
return true;
}
public AccountLog existsTxId(Long txid) {
return accountLogRepository.findById(txid).orElse(null);
}
</code>Sending Transactional Messages
<code>@Resource
private RocketMQTemplate rocketMQTemplate;
public String sendTx(String topic, String tags, Account account) {
String uuid = UUID.randomUUID().toString().replaceAll("-", "");
TransactionSendResult result = rocketMQTemplate.sendMessageInTransaction(
topic + ":" + tags,
MessageBuilder.withPayload(account).setHeader("tx_id", uuid).build(),
uuid);
return result.getSendStatus().name();
}
</code>Producer Transaction Listener
<code>@RocketMQTransactionListener
public class ProducerMessageListener implements RocketMQLocalTransactionListener {
@Resource
private AccountService accountService;
@Override
public RocketMQLocalTransactionState executeLocalTransaction(Message msg, Object arg) {
try {
Account account = new JsonMapper().readValue((byte[]) msg.getPayload(), Account.class);
accountService.register(account);
} catch (Exception e) {
e.printStackTrace();
return RocketMQLocalTransactionState.ROLLBACK;
}
return RocketMQLocalTransactionState.COMMIT;
}
@Override
public RocketMQLocalTransactionState checkLocalTransaction(Message msg) {
try {
Account account = new JsonMapper().readValue((byte[]) msg.getPayload(), Account.class);
AccountLog accountLog = accountService.existsTxId(account.getId());
if (accountLog == null) {
return RocketMQLocalTransactionState.UNKNOWN;
}
} catch (Exception e) {
e.printStackTrace();
return RocketMQLocalTransactionState.UNKNOWN;
}
return RocketMQLocalTransactionState.COMMIT;
}
}
</code>Controller
<code>@RestController
@RequestMapping("/accounts")
public class AccountController {
@Resource
private ProducerMessageService messageService;
@PostMapping("/send")
public Object sendMessage(@RequestBody Account account) {
return messageService.sendTx("tx-topic", "mks", account);
}
}
</code>Integral Sub‑module
Entity
<code>@Entity
@Table(name = "t_integral")
public class Integral {
@Id
private Long id;
private Integer score;
private Long acccountId;
}
</code>DAO
<code>public interface IntegralRepository extends JpaRepository<Integral, Long> {}
</code>Service
<code>@Resource
private IntegralRepository integralRepository;
@Transactional
public Integral saveIntegral(Integral integral) {
return integralRepository.save(integral);
}
</code>Message Listener
<code>@RocketMQMessageListener(topic = "tx-topic", consumerGroup = "consumer05-group", selectorExpression = "mks")
@Component
public class IntegralMessageListener implements RocketMQListener<String> {
@Resource
private IntegralService integralService;
@Override
public void onMessage(String message) {
System.out.println("Integral received message: " + message);
try {
Map<String, Object> jsonMap = new JsonMapper().readValue(message, Map.class);
Integer id = (Integer) jsonMap.get("id");
integralService.saveIntegral(new Integral(1L, 1000, id + 0L));
} catch (Exception e) {
throw new RuntimeException(e);
}
}
}
</code>Testing
Start both sub‑modules, initialize the database tables (see diagram), and use Postman to invoke the /accounts/send endpoint. When the Account module's local transaction fails, the transaction is rolled back and the half message is deleted, so the Integral module never receives the message.
All components work together to guarantee eventual consistency of messages across services.
Spring Full-Stack Practical Cases
Full-stack Java development with Vue 2/3 front-end suite; hands-on examples and source code analysis for Spring, Spring Boot 2/3, and Spring Cloud.
How this landed with the community
Was this worth your time?
0 Comments
Thoughtful readers leave field notes, pushback, and hard-won operational detail here.