How RocketMQ Implements Distributed Transactional Messaging
This article explains the design and implementation of RocketMQ transactional messages, covering a complete order‑creation example, the producer’s transaction listener, the broker’s handling of half messages, transaction checks, and the final commit or rollback process.
Interview questions often ask how RocketMQ transactions work; understanding both usage and underlying design is essential for mastering the framework.
1. Transactional Message Use Case
The example demonstrates an order‑creation service that inserts a record into a database and sends a message, requiring both operations to be atomic.
public class CreateOrderService {
@Autowired private OrderDao orderDao;
@Autowired private ExecutorService executorService;
private TransactionMQProducer producer;
@Init public void init() throws MQClientException {
TransactionListener transactionListener = createTransactionListener();
producer = new TransactionMQProducer("myGroup");
producer.setExecutorService(executorService);
producer.setTransactionListener(transactionListener);
producer.start();
}
@PUT @RequestMapping(...)
public boolean createOrder(@RequestBody CreateOrderRequest request) {
Message msg = createMessage(request);
SendResult sendResult = producer.sendMessageInTransaction(msg, request);
return sendResult.getSendStatus() == SendStatus.SEND_OK;
}
private TransactionListener createTransactionListener() {
return new TransactionListener() {
@Override
public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
CreateOrderRequest request = (CreateOrderRequest) arg;
try {
orderDao.createOrderInDB(request);
return LocalTransactionState.COMMIT_MESSAGE;
} catch (Throwable t) {
return LocalTransactionState.ROLLBACK_MESSAGE;
}
}
@Override
public LocalTransactionState checkLocalTransaction(MessageExt msg) {
String orderId = msg.getUserProperty("orderId");
return orderDao.isOrderIdExistsInDB(orderId)
? LocalTransactionState.COMMIT_MESSAGE
: LocalTransactionState.UNKNOW;
}
};
}
}The createOrder() method builds a message and sends it as a transaction; the TransactionListener defines two callbacks: executeLocalTransaction (writes the order to DB) and checkLocalTransaction (verifies the order’s existence during rollback checks).
2. How the Producer Sends Transactional Messages
The core API is DefaultMQProducerImpl#sendMessageInTransaction. It marks the message as a half‑message, sends it to the broker, then invokes either a user‑provided LocalTransactionExecuter or the registered TransactionListener to determine the local transaction outcome. The method finally calls endTransaction to inform the broker of the commit or rollback decision.
public TransactionSendResult sendMessageInTransaction(final Message msg,
final LocalTransactionExecuter localTransactionExecuter, final Object arg) throws MQClientException {
// mark as half‑message
MessageAccessor.putProperty(msg, MessageConst.PROPERTY_TRANSACTION_PREPARED, "true");
// send to broker
SendResult sendResult = this.send(msg);
// execute local transaction
LocalTransactionState localTransactionState = LocalTransactionState.UNKNOW;
if (localTransactionExecuter != null) {
localTransactionState = localTransactionExecuter.executeLocalTransactionBranch(msg, arg);
} else if (transactionListener != null) {
localTransactionState = transactionListener.executeLocalTransaction(msg, arg);
}
// notify broker
this.endTransaction(sendResult, localTransactionState, null);
// build result
TransactionSendResult result = new TransactionSendResult();
result.setSendStatus(sendResult.getSendStatus());
result.setTransactionId(sendResult.getTransactionId());
result.setLocalTransactionState(localTransactionState);
return result;
}3. Broker‑Side Processing
The broker stores half‑messages in a special internal topic RMQ_SYS_TRANS_HALF_TOPIC on queue 0, which is invisible to consumers. This guarantees that a transactional message cannot be consumed before the transaction is committed.
Key broker components:
TransactionalMessageBridge – handles half‑message storage ( putHalfMessage) and parsing ( parseHalfMessageInner).
TransactionalMessageCheckService – runs a timer that scans the half‑message queue and triggers transaction checks.
During a check, the broker sends an RPC request to the producer ( AbstractTransactionalMessageCheckListener#sendCheckMessage). The producer replies with the transaction state, and the broker decides whether to commit, roll back, or continue checking.
4. Commit or Rollback
If the broker receives a commit decision, it copies the half‑message from the half‑message queue to the target topic and queue. If the decision is rollback, the half‑message is simply discarded. The final step is performed by EndTransactionProcessor#processRequest, which marks the transaction as finished.
5. Summary
RocketMQ implements distributed transactions using a two‑phase commit model: producers send half‑messages, the broker stores them in a hidden queue, and a periodic check service ensures eventual consistency by invoking the producer’s transaction listener. This mechanism solves the consistency problem between local database operations and message publishing.
Signed-in readers can open the original source through BestHub's protected redirect.
This article has been distilled and summarized from source material, then republished for learning and reference. If you believe it infringes your rights, please contactand we will review it promptly.
JavaEdge
First‑line development experience at multiple leading tech firms; now a software architect at a Shanghai state‑owned enterprise and founder of Programming Yanxuan. Nearly 300k followers online; expertise in distributed system design, AIGC application development, and quantitative finance investing.
How this landed with the community
Was this worth your time?
0 Comments
Thoughtful readers leave field notes, pushback, and hard-won operational detail here.
