Backend Development 9 min read

Ensuring Reliable Message Consistency with Spring Boot & RocketMQ

This guide explains the principle and step‑by‑step implementation of reliable transactional messaging using Spring Boot 2.3.9 and RocketMQ 4.8.0, covering execution flow, broker handling, project structure, code examples for entities, repositories, services, listeners, and testing.

Spring Full-Stack Practical Cases
Spring Full-Stack Practical Cases
Spring Full-Stack Practical Cases
Ensuring Reliable Message Consistency with Spring Boot & RocketMQ

Reliable Message Consistency Principle

Environment: Spring Boot 2.3.9 + RocketMQ 4.8.0

Execution Process

Producer sends a Prepare message to the broker.

After the Prepare message is acknowledged, the local transaction starts.

If the local transaction succeeds, the producer returns

commit

; otherwise it returns

rollback

(decided by the developer in the transaction callback).

Producer sends the resulting

commit

or

rollback

to the broker, which leads to two possible broker behaviours:

1. Broker receives commit / rollback : If commit , the broker treats the whole transaction as successful and delivers the message to the consumer. If rollback , the broker deletes the half message and does not deliver it. 2. Broker does not receive a confirmation (e.g., the local transaction crashes and returns UNKNOWN ), so the broker periodically checks the transaction status. If the check returns commit , the message is delivered; if it returns rollback or remains UNKNOWN , the half message is removed. The check interval and retry count are configurable.

Project Structure

The project uses a parent‑child Maven layout with two sub‑modules:

account‑manager

and

integral‑manager

.

Dependencies

<code>&lt;dependency&gt;
  &lt;groupId&gt;org.springframework.boot&lt;/groupId&gt;
  &lt;artifactId&gt;spring-boot-starter-data-jpa&lt;/artifactId&gt;
&lt;/dependency&gt;
&lt;dependency&gt;
  &lt;groupId&gt;org.springframework.boot&lt;/groupId&gt;
  &lt;artifactId&gt;spring-boot-starter-web&lt;/artifactId&gt;
&lt;/dependency&gt;
&lt;dependency&gt;
  &lt;groupId&gt;org.apache.rocketmq&lt;/groupId&gt;
  &lt;artifactId&gt;rocketmq-spring-boot-starter&lt;/artifactId&gt;
  &lt;version&gt;2.2.0&lt;/version&gt;
&lt;/dependency&gt;</code>

Account Module

Configuration file

<code>server:
  port: 8081
---
rocketmq:
  nameServer: localhost:9876
  producer:
    group: pack-mq
---
spring:
  jpa:
    generateDdl: false
    hibernate:
      ddlAuto: update
    openInView: true
    show-sql: true
---
spring:
  datasource:
    driverClassName: com.mysql.cj.jdbc.Driver
    url: jdbc:mysql://localhost:3306/account?serverTimezone=GMT%2B8
    username: root
    password: ******
    type: com.zaxxer.hikari.HikariDataSource
    hikari:
      minimumIdle: 10
      maximumPoolSize: 200
      autoCommit: true
      idleTimeout: 30000
      poolName: MasterDatabookHikariCP
      maxLifetime: 1800000
      connectionTimeout: 30000
      connectionTestQuery: SELECT 1</code>

Entity classes

<code>// User table
@Entity
@Table(name = "t_account")
public class Account {
    @Id
    private Long id;
    private String name;
}

// Business log table (used for deduplication)
@Entity
@Table(name = "t_account_log")
public class AccountLog {
    @Id
    private Long txid;
    private Date createTime;
}</code>

DAO interfaces

<code>public interface AccountRepository extends JpaRepository<Account, Long> {}
public interface AccountLogRepository extends JpaRepository<AccountLog, Long> {}</code>

Service methods

<code>@Resource
private AccountRepository accountRepository;
@Resource
private AccountLogRepository accountLogRepository;

@Transactional
public boolean register(Account account) {
    accountRepository.save(account);
    AccountLog accountLog = new AccountLog(account.getId(), new Date());
    accountLogRepository.save(accountLog);
    return true;
}

public AccountLog existsTxId(Long txid) {
    return accountLogRepository.findById(txid).orElse(null);
}</code>

Message sending

<code>@Resource
private RocketMQTemplate rocketMQTemplate;

public String sendTx(String topic, String tags, Account account) {
    String uuid = UUID.randomUUID().toString().replaceAll("-", "");
    TransactionSendResult result = rocketMQTemplate.sendMessageInTransaction(
        topic + ":" + tags,
        MessageBuilder.withPayload(account).setHeader("tx_id", uuid).build(),
        uuid);
    return result.getSendStatus().name();
}</code>

Producer transaction listener

<code>@RocketMQTransactionListener
public class ProducerMessageListener implements RocketMQLocalTransactionListener {

    @Resource
    private AccountService accountService;

    @Override
    public RocketMQLocalTransactionState executeLocalTransaction(Message msg, Object arg) {
        try {
            Account account = new JsonMapper().readValue((byte[]) msg.getPayload(), Account.class);
            accountService.register(account);
        } catch (Exception e) {
            e.printStackTrace();
            return RocketMQLocalTransactionState.ROLLBACK;
        }
        return RocketMQLocalTransactionState.COMMIT;
    }

    @Override
    public RocketMQLocalTransactionState checkLocalTransaction(Message msg) {
        try {
            Account account = new JsonMapper().readValue((byte[]) msg.getPayload(), Account.class);
            AccountLog accountLog = accountService.existsTxId(account.getId());
            if (accountLog == null) {
                return RocketMQLocalTransactionState.UNKNOWN;
            }
        } catch (Exception e) {
            e.printStackTrace();
            return RocketMQLocalTransactionState.UNKNOWN;
        }
        return RocketMQLocalTransactionState.COMMIT;
    }
}</code>

Controller

<code>@RestController
@RequestMapping("/accounts")
public class AccountController {

    @Resource
    private ProducerMessageService messageService;

    @PostMapping("/send")
    public Object sendMessage(@RequestBody Account account) {
        return messageService.sendTx("tx-topic", "mks", account);
    }
}</code>

Integral Module

Entity

<code>@Entity
@Table(name = "t_integral")
public class Integral {
    @Id
    private Long id;
    private Integer score;
    private Long acccountId;
}</code>

DAO

<code>public interface IntegralRepository extends JpaRepository<Integral, Long> {}</code>

Service

<code>@Resource
private IntegralRepository integralRepository;

@Transactional
public Integral saveIntegral(Integral integral) {
    return integralRepository.save(integral);
}</code>

Message listener

<code>@RocketMQMessageListener(topic = "tx-topic", consumerGroup = "consumer05-group", selectorExpression = "mks")
@Component
public class IntegralMessageListener implements RocketMQListener<String> {

    @Resource
    private IntegralService integralService;

    @Override
    public void onMessage(String message) {
        System.out.println("Integral received message: " + message);
        try {
            Map<String, Object> jsonMap = new JsonMapper().readValue(message, Map.class);
            Integer id = (Integer) jsonMap.get("id");
            integralService.saveIntegral(new Integral(1L, 1000, id + 0L));
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }
}</code>

Testing

Start both sub‑modules, initialize the database tables, and use Postman to call

/accounts/send

. When the Account module’s local transaction fails, the transaction rolls back, the half message is deleted, and the Integral module never receives the message.

JavamicroservicesSpring BootrocketmqTransactional Messaging
Spring Full-Stack Practical Cases
Written by

Spring Full-Stack Practical Cases

Full-stack Java development with Vue 2/3 front-end suite; hands-on examples and source code analysis for Spring, Spring Boot 2/3, and Spring Cloud.

0 followers
Reader feedback

How this landed with the community

login Sign in to like

Rate this article

Was this worth your time?

Sign in to rate
Discussion

0 Comments

Thoughtful readers leave field notes, pushback, and hard-won operational detail here.