Implementing Transactional Messages with Apache RocketMQ

This article explains how to use Apache RocketMQ's 2PC-based transactional messaging feature, covering the overall workflow, key concepts such as half messages and compensation, and providing complete Java/Spring Boot code examples for producers, transaction listeners, and consumers.

Full-Stack Internet Architecture
Full-Stack Internet Architecture
Full-Stack Internet Architecture
Implementing Transactional Messages with Apache RocketMQ

When choosing a distributed messaging solution, support for transactional messages is a critical factor, and currently only Apache RocketMQ provides the most complete implementation. This article walks through the process of implementing RocketMQ transactional messages.

Since version 4.3.0, RocketMQ supports distributed transaction messages by adopting a two‑phase commit (2PC) model and adding a compensation mechanism to handle time‑outs or failures in the second phase.

RocketMQ Transaction Flow Overview

The transaction process consists of two main stages: (1) normal transaction sending and committing, and (2) the compensation flow for transaction information.

Normal transaction send and commit:

Producer sends a half message to the MQ server (the half message cannot be consumed by consumers).

Server acknowledges the half message as successfully stored.

Producer executes the local business transaction.

Based on the local transaction result, the producer issues a Commit or Rollback operation.

Compensation flow:

If the MQ server does not receive the local transaction status for a long time, it initiates a confirmation check request to the producer.

The producer checks the local transaction status upon receiving the check request.

According to the check result, the server performs Commit or Rollback. This stage handles cases where the producer fails to send Commit/Rollback due to timeout or error.

Key Points of RocketMQ Transactional Messages

The half message is invisible to consumers because its topic is changed to RMQ_SYS_TRANS_HALF_TOPIC, which consumers do not subscribe to.

If the producer crashes after sending a Commit/Rollback, the server will retry the check up to 15 times; after that it defaults to Rollback.

Transaction status can be TransactionStatus.CommitTransaction, TransactionStatus.RollbackTransaction, or TransactionStatus.Unknown.

Code Implementation

1. Add the RocketMQ Spring Boot starter dependency:

<dependency>
  <groupId>org.apache.rocketmq</groupId>
  <artifactId>rocketmq-spring-boot-starter</artifactId>
</dependency>

2. Configure RocketMQ in application.yml (or properties):

rocketmq:
  name-server: xxx.xx.x.xx:9876;xxx.xx.x.xx:9876
  producer:
    group: cloud-group

3. Send a half message from the order service:

@Override
public void delete(String orderNo) {
    Order order = orderMapper.selectByNo(orderNo);
    if (order != null && CloudConstant.VALID_STATUS.equals(order.getStatus())) {
        String transactionId = UUID.randomUUID().toString();
        rocketMQTemplate.sendMessageInTransaction(
            "add-amount",
            MessageBuilder.withPayload(
                UserAddMoneyDTO.builder()
                    .userCode(order.getAccountCode())
                    .amount(order.getAmount())
                    .build()
            )
            .setHeader(RocketMQHeaders.TRANSACTION_ID, transactionId)
            .setHeader("order_id", order.getId())
            .build(),
            order);
    }
}

The sendMessageInTransaction method takes three parameters: destination topic, the message payload, and an argument object.

4. Implement the transaction listener:

@Slf4j
@RocketMQTransactionListener
@RequiredArgsConstructor(onConstructor = @__(@Autowired))
public class AddUserAmountListener implements RocketMQLocalTransactionListener {
    private final OrderService orderService;
    private final RocketMqTransactionLogMapper rocketMqTransactionLogMapper;

    @Override
    public RocketMQLocalTransactionState executeLocalTransaction(Message message, Object arg) {
        log.info("Executing local transaction");
        MessageHeaders headers = message.getHeaders();
        String transactionId = (String) headers.get(RocketMQHeaders.TRANSACTION_ID);
        Integer orderId = Integer.valueOf((String) headers.get("order_id"));
        try {
            orderService.changeStatuswithRocketMqLog(orderId, CloudConstant.INVALID_STATUS, transactionId);
            return RocketMQLocalTransactionState.COMMIT;
        } catch (Exception e) {
            return RocketMQLocalTransactionState.ROLLBACK;
        }
    }

    @Override
    public RocketMQLocalTransactionState checkLocalTransaction(Message message) {
        String transactionId = (String) message.getHeaders().get(RocketMQHeaders.TRANSACTION_ID);
        log.info("Checking local transaction, transactionId:{}", transactionId);
        QueryWrapper<RocketmqTransactionLog> queryWrapper = new QueryWrapper<>();
        queryWrapper.eq("transaction_id", transactionId);
        RocketmqTransactionLog logEntry = rocketMqTransactionLogMapper.selectOne(queryWrapper);
        return logEntry != null ? RocketMQLocalTransactionState.COMMIT : RocketMQLocalTransactionState.ROLLBACK;
    }
}

The executeLocalTransaction method performs the business operation and writes a transaction log; the checkLocalTransaction method queries the log to decide whether to commit or rollback.

5. Consumer side listener for the commit messages:

@Slf4j
@Service
@RocketMQMessageListener(topic = "add-amount", consumerGroup = "cloud-group")
@RequiredArgsConstructor(onConstructor = @__(@Autowired))
public class AddUserAmountListener implements RocketMQListener<UserAddMoneyDTO> {
    private final AccountMapper accountMapper;

    @Override
    public void onMessage(UserAddMoneyDTO dto) {
        log.info("received message: {}", dto);
        accountMapper.increaseAmount(dto.getUserCode(), dto.getAmount());
        log.info("add money success");
    }
}

6. Testing steps include creating an order record, invoking the delete‑order API, sending the half message, simulating a crash (e.g., taskkill /pid 19748 -t -f), restarting the order service to trigger the MQ server’s check, and finally verifying that the consumer processes the committed message and updates the user balance.

Conclusion

Implementing transactional messages with RocketMQ involves understanding the two‑phase commit workflow, handling half messages, and providing compensation logic; once the process is clear, the corresponding Java/Spring Boot code can be written to ensure message consistency across distributed services.

Original Source

Signed-in readers can open the original source through BestHub's protected redirect.

Sign in to view source
Republication Notice

This article has been distilled and summarized from source material, then republished for learning and reference. If you believe it infringes your rights, please contactadmin@besthub.devand we will review it promptly.

Distributed SystemsJavaSpring BootRocketMQ2PC
Full-Stack Internet Architecture
Written by

Full-Stack Internet Architecture

Introducing full-stack Internet architecture technologies centered on Java

0 followers
Reader feedback

How this landed with the community

Sign in to like

Rate this article

Was this worth your time?

Sign in to rate
Discussion

0 Comments

Thoughtful readers leave field notes, pushback, and hard-won operational detail here.