Mastering RocketMQ Transactional Messages for Distributed Consistency

This article explains how RocketMQ’s transactional message feature enables distributed transaction final consistency by using half messages, message check‑back, and defined transaction states, and provides complete Java code samples for producers, listeners, and consumer idempotency handling.

Programmer DD
Programmer DD
Programmer DD
Mastering RocketMQ Transactional Messages for Distributed Consistency

RocketMQ, an Alibaba‑open‑source high‑performance message middleware, supports distributed transaction final consistency through its transactional message capability.

Principle Overview

RocketMQ offers XA‑like distributed transaction support via transactional messages.

The producer first sends a half message that is not immediately deliverable; it remains in a temporary state until the broker receives a second‑phase confirmation.

If the second‑phase confirmation is lost (e.g., network glitch or producer restart), the broker periodically scans for long‑standing half messages and initiates a message check‑back to query the producer for the final status (Commit or Rollback).

These steps constitute the sending process (steps 1‑4) and the check‑back process (steps 5‑6).

How to Use

The transaction status in RocketMQ includes:

TransactionStatus.CommitTransaction : the message is committed and can be consumed.

TransactionStatus.RollbackTransaction : the broker deletes the half message; it cannot be consumed.

TransactionStatus.Unknown : the broker will perform a check‑back to determine the final state.

To send a transactional message, create a TransactionMQProducer, set a unique producer group, configure an executor service, and register a TransactionListener that implements both executeLocalTransaction and checkLocalTransaction. Then start the producer and send messages with producer.sendMessageInTransaction(msg, null).

public class TransactionProducer {
    public static void main(String[] args) throws MQClientException, InterruptedException {
        TransactionListener transactionListener = new TransactionListenerImpl();
        TransactionMQProducer producer = new TransactionMQProducer("please_rename_unique_group_name");
        ExecutorService executorService = new ThreadPoolExecutor(2, 5, 100, TimeUnit.SECONDS,
                new ArrayBlockingQueue<Runnable>(2000), new ThreadFactory() {
            @Override
            public Thread newThread(Runnable r) {
                Thread thread = new Thread(r);
                thread.setName("client-transaction-msg-check-thread");
                return thread;
            }
        });
        producer.setExecutorService(executorService);
        producer.setTransactionListener(transactionListener);
        producer.start();
        String[] tags = {"TagA","TagB","TagC","TagD","TagE"};
        for (int i = 0; i < 10; i++) {
            try {
                Message msg = new Message("TopicTest1234", tags[i % tags.length], "KEY" + i,
                        ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET));
                SendResult sendResult = producer.sendMessageInTransaction(msg, null);
                System.out.printf("%s%n", sendResult);
                Thread.sleep(10);
            } catch (MQClientException | UnsupportedEncodingException e) {
                e.printStackTrace();
            }
        }
        for (int i = 0; i < 100000; i++) {
            Thread.sleep(1000);
        }
        producer.shutdown();
    }
}

Implement the TransactionListener as follows:

public class TransactionListenerImpl implements TransactionListener {
    private AtomicInteger transactionIndex = new AtomicInteger(0);
    private ConcurrentHashMap<String, Integer> localTrans = new ConcurrentHashMap<>();

    @Override
    public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
        int value = transactionIndex.getAndIncrement();
        int status = value % 3;
        localTrans.put(msg.getTransactionId(), status);
        return LocalTransactionState.UNKNOW;
    }

    @Override
    public LocalTransactionState checkLocalTransaction(MessageExt msg) {
        Integer status = localTrans.get(msg.getTransactionId());
        if (status != null) {
            switch (status) {
                case 0:
                    return LocalTransactionState.UNKNOW;
                case 1:
                    return LocalTransactionState.COMMIT_MESSAGE;
                case 2:
                    return LocalTransactionState.ROLLBACK_MESSAGE;
            }
        }
        return LocalTransactionState.COMMIT_MESSAGE;
    }
}

Consumers must ensure idempotent processing based on business parameters.

Appendix: RocketMQ Transaction Message Implementation Details

RocketMQ uses two internal topics—Half Topic for storing prepare messages and Operation Topic for storing the corresponding commit/rollback messages. The broker compares offsets between these topics to identify uncommitted or timed‑out transactions and triggers check‑backs.

The design abstracts transaction handling into service layers, allowing users to focus solely on local transaction execution and status reporting while the broker manages two‑phase commit, timeout detection, and high availability.

Original Source

Signed-in readers can open the original source through BestHub's protected redirect.

Sign in to view source
Republication Notice

This article has been distilled and summarized from source material, then republished for learning and reference. If you believe it infringes your rights, please contactadmin@besthub.devand we will review it promptly.

JavaMessage QueueRocketMQDistributed TransactionsTransactional Messaging
Programmer DD
Written by

Programmer DD

A tinkering programmer and author of "Spring Cloud Microservices in Action"

0 followers
Reader feedback

How this landed with the community

Sign in to like

Rate this article

Was this worth your time?

Sign in to rate
Discussion

0 Comments

Thoughtful readers leave field notes, pushback, and hard-won operational detail here.