Understanding RocketMQ Transaction Messages with Code Examples

This article explains RocketMQ's transaction message feature, covering core concepts, the execution flow, and provides complete Java code examples for an order service and a product service that demonstrate how distributed transactions are implemented and verified.

Architecture Digest
Architecture Digest
Architecture Digest
Understanding RocketMQ Transaction Messages with Code Examples

RocketMQ provides a powerful transaction message feature that enables easy implementation of distributed transactions. The article introduces the key concepts, execution process, and supplies full Java code examples.

1. Related Concepts

RocketMQ extends its basic messaging model with two concepts for transaction messages:

Half (Prepare) Message – a special message that cannot be consumed until the broker receives a second‑phase confirmation from the producer.

Message Status Check – used to resolve timeout issues by allowing the broker to query the producer for the transaction state (Commit or Rollback).

2. Execution Process

The official flow diagram is shown above; the detailed steps are:

Producer sends a Half Message to the Broker.

Broker acknowledges the Half Message.

Producer executes the local transaction.

Producer sends a second‑phase confirmation (Commit or Rollback). The broker forwards committed messages to consumers or discards rolled‑back ones.

If the broker does not receive the confirmation in time, it initiates a status‑check.

Producer processes the check and returns the local transaction result.

Broker performs Commit or Rollback based on the check result.

3. Code Example

The following example simulates two micro‑services: an Order Service that sends a transaction message and a Product Service that consumes it to reduce inventory.

/**
 * @Auther: ZhangShenao
 * @Date: 2019/3/27 16:44
 * @Description: 使用RocketMQ事务消息——订单服务发送事务消息,然后进行本地下单,并通知商品服务减库存
 */
public class OrderService {
    public static void main(String[] args) throws Exception {
        TransactionMQProducer producer = new TransactionMQProducer();
        producer.setNamesrvAddr(RocketMQConstants.NAMESRV_ADDR);
        producer.setProducerGroup(RocketMQConstants.TRANSACTION_PRODUCER_GROUP);
        //自定义线程池,执行事务操作
        ThreadPoolExecutor executor = new ThreadPoolExecutor(10, 50, 10L, TimeUnit.SECONDS,
                new ArrayBlockingQueue<>(20), (Runnable r) -> new Thread("Order Transaction Massage Thread"));
        producer.setExecutorService(executor);
        //设置事务消息监听器
        producer.setTransactionListener(new OrderTransactionListener());
        producer.start();
        System.err.println("OrderService Start");
        for (int i = 0; i < 10; i++) {
            String orderId = UUID.randomUUID().toString();
            String payload = "下单,orderId: " + orderId;
            String tags = "Tag";
            Message message = new Message(RocketMQConstants.TRANSACTION_TOPIC_NAME, tags, orderId, payload.getBytes(RemotingHelper.DEFAULT_CHARSET));
            //发送事务消息
            TransactionSendResult result = producer.sendMessageInTransaction(message, orderId);
            System.err.println("发送事务消息,发送结果: " + result);
        }
    }
}

The transaction listener implements the local transaction execution and status‑check logic.

/**
 * @Auther: ZhangShenao
 * @Date: 2019/3/27 16:50
 * @Description: 订单事务消息监听器
 */
public class OrderTransactionListener implements TransactionListener {
    private static final Map<String, Boolean> results = new ConcurrentHashMap<>();

    @Override
    public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
        String orderId = (String) arg;
        //记录本地事务执行结果
        boolean success = persistTransactionResult(orderId);
        System.err.println("订单服务执行本地事务下单,orderId: " + orderId + ", result: " + success);
        return success ? LocalTransactionState.COMMIT_MESSAGE : LocalTransactionState.ROLLBACK_MESSAGE;
    }

    @Override
    public LocalTransactionState checkLocalTransaction(MessageExt msg) {
        String orderId = msg.getKeys();
        System.err.println("执行事务消息回查,orderId: " + orderId);
        return Boolean.TRUE.equals(results.get(orderId)) ? LocalTransactionState.COMMIT_MESSAGE : LocalTransactionState.ROLLBACK_MESSAGE;
    }

    private boolean persistTransactionResult(String orderId) {
        boolean success = Math.abs(Objects.hash(orderId)) % 2 == 0;
        results.put(orderId, success);
        return success;
    }
}

The Product Service sets up a consumer and a listener to handle committed messages.

/**
 * @Auther: ZhangShenao
 * @Date: 2019/3/27 17:09
 * @Description: 使用RocketMQ事务消息——商品服务接收下单的事务消息,如果消息成功commit则本地减库存
 */
public class ProductService {
    public static void main(String[] args) throws Exception {
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer();
        consumer.setNamesrvAddr(RocketMQConstants.NAMESRV_ADDR);
        consumer.setConsumerGroup(RocketMQConstants.TRANSACTION_CONSUMER_GROUP);
        consumer.subscribe(RocketMQConstants.TRANSACTION_TOPIC_NAME, "*");
        consumer.registerMessageListener(new ProductListener());
        consumer.start();
        System.err.println("ProductService Start");
    }
}
public class ProductListener implements MessageListenerConcurrently {
    @Override
    public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
        Optional.ofNullable(msgs).orElse(Collections.emptyList()).forEach(m -> {
            String orderId = m.getKeys();
            System.err.println("监听到下单消息,orderId: " + orderId + ", 商品服务减库存");
        });
        return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
    }
}

Running both services shows that only orders whose local transaction succeeds trigger the product service to reduce inventory.

Original Source

Signed-in readers can open the original source through BestHub's protected redirect.

Sign in to view source
Republication Notice

This article has been distilled and summarized from source material, then republished for learning and reference. If you believe it infringes your rights, please contactadmin@besthub.devand we will review it promptly.

JavaMessage QueueRocketMQdistributed-transactionTransaction Message
Architecture Digest
Written by

Architecture Digest

Focusing on Java backend development, covering application architecture from top-tier internet companies (high availability, high performance, high stability), big data, machine learning, Java architecture, and other popular fields.

0 followers
Reader feedback

How this landed with the community

Sign in to like

Rate this article

Was this worth your time?

Sign in to rate
Discussion

0 Comments

Thoughtful readers leave field notes, pushback, and hard-won operational detail here.