How to Achieve Reliable Message Consistency with Spring Boot & RocketMQ

This article explains the principle of reliable message eventual consistency using Spring Boot 2.3.9 and RocketMQ 4.8.0, detailing the transaction flow, broker handling of commit/rollback, project structure, code implementation for producers and consumers, and testing procedures.

Spring Full-Stack Practical Cases
Spring Full-Stack Practical Cases
Spring Full-Stack Practical Cases
How to Achieve Reliable Message Consistency with Spring Boot & RocketMQ

Reliable Message Consistency Principle

Environment: Spring Boot 2.3.9 + RocketMQ 4.8.0

Execution Flow

Producer sends a Prepare message to the broker.

After the Prepare message is successfully sent, the local transaction starts.

If the local transaction succeeds, the producer returns commit; otherwise it returns rollback (determined by the developer in the transaction callback).

The producer sends the commit or rollback result to the broker, which leads to two possible scenarios:

1. Broker receives commit/rollback: If commit is received, the broker treats the whole transaction as successful and delivers the message to the consumer. If rollback is received, the broker assumes the local transaction failed and deletes the half message, not delivering it. 2. Broker does not receive a confirmation (e.g., the local transaction crashes and returns UNKNOWN ): The broker periodically checks the local transaction result. If the check returns commit, the broker treats the transaction as successful; if it returns UNKNOWN , the broker keeps retrying. The retry interval and count are configurable.

Project Structure

Parent project with two child modules: account-manager and integral-manager.

Dependencies

<dependency>
  <groupId>org.springframework.boot</groupId>
  <artifactId>spring-boot-starter-data-jpa</artifactId>
</dependency>
<dependency>
  <groupId>org.springframework.boot</groupId>
  <artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
  <groupId>org.apache.rocketmq</groupId>
  <artifactId>rocketmq-spring-boot-starter</artifactId>
  <version>2.2.0</version>
</dependency>

Account Sub‑module

Configuration (application.yml)

server:
  port: 8081
---
rocketmq:
  nameServer: localhost:9876
  producer:
    group: pack-mq
---
spring:
  jpa:
    generateDdl: false
    hibernate:
      ddlAuto: update
    openInView: true
    show-sql: true
---
spring:
  datasource:
    driverClassName: com.mysql.cj.jdbc.Driver
    url: jdbc:mysql://localhost:3306/account?serverTimezone=GMT%2B8
    username: root
    password: ******
    type: com.zaxxer.hikari.HikariDataSource
    hikari:
      minimumIdle: 10
      maximumPoolSize: 200
      autoCommit: true
      idleTimeout: 30000
      poolName: MasterDatabookHikariCP
      maxLifetime: 1800000
      connectionTimeout: 30000
      connectionTestQuery: SELECT 1

Entity Classes

// User table
@Entity
@Table(name = "t_account")
public class Account {
    @Id
    private Long id;
    private String name;
}

// Business log table (used for deduplication)
@Entity
@Table(name = "t_account_log")
public class AccountLog {
    @Id
    private Long txid;
    private Date createTime;
}

DAO Interfaces

public interface AccountRepository extends JpaRepository<Account, Long> {}
public interface AccountLogRepository extends JpaRepository<AccountLog, Long> {}

Service Layer

@Resource
private AccountRepository accountRepository;
@Resource
private AccountLogRepository accountLogRepository;

// Save business data and a corresponding log for later verification
@Transactional
public boolean register(Account account) {
    accountRepository.save(account);
    AccountLog accountLog = new AccountLog(account.getId(), new Date());
    accountLogRepository.save(accountLog);
    return true;
}

public AccountLog existsTxId(Long txid) {
    return accountLogRepository.findById(txid).orElse(null);
}

Message Sending

@Resource
private RocketMQTemplate rocketMQTemplate;

public String sendTx(String topic, String tags, Account account) {
    String uuid = UUID.randomUUID().toString().replaceAll("-", "");
    TransactionSendResult result = rocketMQTemplate.sendMessageInTransaction(
        topic + ":" + tags,
        MessageBuilder.withPayload(account).setHeader("tx_id", uuid).build(),
        uuid);
    return result.getSendStatus().name();
}

Producer Transaction Listener

@RocketMQTransactionListener
public class ProducerMessageListener implements RocketMQLocalTransactionListener {
    @Resource
    private AccountService accountService;

    @Override
    public RocketMQLocalTransactionState executeLocalTransaction(Message msg, Object arg) {
        try {
            Account account = new JsonMapper().readValue((byte[]) msg.getPayload(), Account.class);
            accountService.register(account);
        } catch (Exception e) {
            e.printStackTrace();
            return RocketMQLocalTransactionState.ROLLBACK;
        }
        return RocketMQLocalTransactionState.COMMIT;
    }

    @Override
    public RocketMQLocalTransactionState checkLocalTransaction(Message msg) {
        try {
            Account account = new JsonMapper().readValue((byte[]) msg.getPayload(), Account.class);
            System.out.println("Checking if ID " + account.getId() + " exists");
            AccountLog accountLog = accountService.existsTxId(account.getId());
            if (accountLog == null) {
                return RocketMQLocalTransactionState.UNKNOWN;
            }
        } catch (Exception e) {
            e.printStackTrace();
            return RocketMQLocalTransactionState.UNKNOWN;
        }
        return RocketMQLocalTransactionState.COMMIT;
    }
}

Controller

@RestController
@RequestMapping("/accounts")
public class AccountController {
    @Resource
    private ProducerMessageService messageService;

    @PostMapping("/send")
    public Object sendMessage(@RequestBody Account account) {
        return messageService.sendTx("tx-topic", "mks", account);
    }
}

Integral Sub‑module

Entity

@Entity
@Table(name = "t_integral")
public class Integral {
    @Id
    private Long id;
    private Integer score;
    private Long acccountId;
}

DAO

public interface IntegralRepository extends JpaRepository<Integral, Long> {}

Service

@Resource
private IntegralRepository integralRepository;

@Transactional
public Integral saveIntegral(Integral integral) {
    return integralRepository.save(integral);
}

Message Listener

@RocketMQMessageListener(topic = "tx-topic", consumerGroup = "consumer05-group", selectorExpression = "mks")
@Component
public class IntegralMessageListener implements RocketMQListener<String> {
    @Resource
    private IntegralService integralService;

    @Override
    public void onMessage(String message) {
        System.out.println("Integral received message: " + message);
        try {
            Map<String, Object> jsonMap = new JsonMapper().readValue(message, Map.class);
            Integer id = (Integer) jsonMap.get("id");
            integralService.saveIntegral(new Integral(1L, 1000, id + 0L));
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }
}

Testing

Start both sub‑modules separately.

Initial Database Tables

Postman Test

Account module

Integral module

When the Account sub‑module encounters an error in its local transaction, the transaction rolls back and the half message is deleted, so the Integral sub‑module never receives the message.

End of tutorial.

Original Source

Signed-in readers can open the original source through BestHub's protected redirect.

Sign in to view source
Republication Notice

This article has been distilled and summarized from source material, then republished for learning and reference. If you believe it infringes your rights, please contactadmin@besthub.devand we will review it promptly.

JavaMicroservicesRocketMQSpringBooteventual consistency
Spring Full-Stack Practical Cases
Written by

Spring Full-Stack Practical Cases

Full-stack Java development with Vue 2/3 front-end suite; hands-on examples and source code analysis for Spring, Spring Boot 2/3, and Spring Cloud.

0 followers
Reader feedback

How this landed with the community

Sign in to like

Rate this article

Was this worth your time?

Sign in to rate
Discussion

0 Comments

Thoughtful readers leave field notes, pushback, and hard-won operational detail here.