Mastering RocketMQ Transaction Messages: Guarantees for Distributed Consistency
This article explains how TDMQ RocketMQ's transactional messaging works, covering the underlying concepts, three‑phase processing flow, key terminology, source‑code details, practical Java examples, and important considerations for building reliable distributed transactions.
Background
In distributed systems, maintaining data consistency across services is a core challenge. TDMQ RocketMQ, built on Apache RocketMQ, provides a transactional message mechanism that ensures eventual consistency between local database operations and message delivery.
Transactional Message Concept
A transactional message binds a two‑phase commit with a local transaction, guaranteeing that the message and the local operation either both succeed or both fail. A typical e‑commerce points‑redeem scenario illustrates the need for atomicity across order creation, point deduction, inventory update, and shipping notification.
Three‑Phase Process
Phase 1 – Send Transactional Message (Half Message) The producer sends a message that is stored as a Half Message in the system topic RMQ_SYS_TRANS_HALF_TOPIC. The message is invisible to downstream consumers.
Phase 2 – Execute Local Transaction After the half message is persisted, the producer executes the local transaction (e.g., create order, reserve points). If the transaction succeeds, the producer sends a Commit request; otherwise it sends a Rollback.
Phase 3 – Downstream Consumption On Commit, the broker restores the original topic and delivers the message to downstream services. On Rollback, the broker discards the message. If delivery fails, RocketMQ retries and eventually moves the message to a dead‑letter queue.
Key Terminology
Half Message : A message stored with a special flag that makes it invisible until the second‑phase decision.
OP Message : System message that records the commit/rollback status of a half message.
Real Topic / Half Topic / OP Topic : Real Topic is the business topic; Half Topic ( RMQ_SYS_TRANS_HALF_TOPIC) stores half messages; OP Topic ( MQ_SYS_TRANS_OP_HALF_TOPIC) stores OP messages.
Transactional Message Processing Flow
The producer sends a half message, the broker persists it and returns success. The producer then runs the local transaction and sends a second‑phase Commit or Rollback. The broker processes the request, either delivering the message to the real topic or discarding it. If the broker does not receive a decision within a configurable timeout, it initiates an active transaction check.
Half Message Implementation
Half messages are persisted in RMQ_SYS_TRANS_HALF_TOPIC. The original topic and queue ID are saved in message properties. When a Commit arrives, the broker restores these properties and delivers the message; on Rollback it simply deletes the half message (logically via an OP message).
Two‑Phase Commit Code (Java)
public TransactionSendResult sendMessageInTransaction(Message msg, LocalTransactionExecuter exec, Object arg) throws MQClientException {
TransactionListener listener = this.getCheckListener();
if (exec == null && listener == null) {
throw new MQClientException("tranExecutor is null", null);
}
if (msg.getDelayTimeLevel() != 0) {
MessageAccessor.clearProperty(msg, "DELAY");
}
Validators.checkMessage(msg, this.defaultMQProducer);
MessageAccessor.putProperty(msg, "TRAN_MSG", "true");
MessageAccessor.putProperty(msg, "PGROUP", this.defaultMQProducer.getProducerGroup());
SendResult sendResult = this.send(msg);
LocalTransactionState state = LocalTransactionState.UNKNOW;
try {
if (exec != null) {
state = exec.executeLocalTransactionBranch(msg, arg);
} else {
state = listener.executeLocalTransaction(msg, arg);
}
} catch (Throwable e) {
// handle exception
}
this.endTransaction(msg, sendResult, state, null);
TransactionSendResult result = new TransactionSendResult();
result.setSendStatus(sendResult.getSendStatus());
result.setMessageQueue(sendResult.getMessageQueue());
result.setMsgId(sendResult.getMsgId());
result.setTransactionId(sendResult.getTransactionId());
result.setLocalTransactionState(state);
return result;
}End Transaction (Second‑Phase)
public void endTransaction(final Message msg, final SendResult sendResult,
final LocalTransactionState state, final Throwable exception)
throws RemotingException, MQBrokerException, InterruptedException, UnknownHostException {
String transactionId = sendResult.getTransactionId();
EndTransactionRequestHeader header = new EndTransactionRequestHeader();
header.setTransactionId(transactionId);
header.setCommitLogOffset(MessageDecoder.decodeMessageId(sendResult.getOffsetMsgId()).getOffset());
switch (state) {
case COMMIT_MESSAGE:
header.setCommitOrRollback(MessageSysFlag.TRANSACTION_COMMIT_TYPE);
break;
case ROLLBACK_MESSAGE:
header.setCommitOrRollback(MessageSysFlag.TRANSACTION_ROLLBACK_TYPE);
break;
default:
header.setCommitOrRollback(MessageSysFlag.TRANSACTION_NOT_TYPE);
}
this.mQClientFactory.getMQClientAPIImpl().endTransactionOneway(brokerAddr, header, remark,
this.defaultMQProducer.getSendMsgTimeout());
}Transaction Check (Active Rollback)
If the broker does not receive a decision, it runs a periodic task (default every 60 seconds, up to 15 retries) that scans the half‑message queues. For each half message without a corresponding OP record and whose retention exceeds the transaction timeout, the broker sends a check request to the producer. The producer’s TransactionChecker returns COMMIT or ROLLBACK based on the local transaction status.
Practical Java Example (RocketMQ 5.x client)
public class ProducerTransactionMessageDemo {
private static boolean executeLocalTransaction() { return true; }
private static boolean checkTransactionStatus(String orderId) { return true; }
public static void main(String[] args) throws ClientException {
ClientServiceProvider provider = ClientServiceProvider.loadService();
String accessKey = "your-ak";
String secretKey = "your-sk";
SessionCredentialsProvider cred = new StaticSessionCredentialsProvider(accessKey, secretKey);
String endpoints = "https://your-endpoints";
ClientConfiguration cfg = ClientConfiguration.newBuilder()
.setEndpoints(endpoints).enableSsl(false).setCredentialProvider(cred).build();
String topic = "tran_topic";
TransactionChecker checker = msg -> {
String orderId = msg.getProperties().get("orderId");
return checkTransactionStatus(orderId) ? TransactionResolution.COMMIT : TransactionResolution.ROLLBACK;
};
Producer producer = provider.newProducerBuilder()
.setClientConfiguration(cfg).setTopics(topic).setTransactionChecker(checker).build();
Transaction tx = producer.beginTransaction();
Message message = provider.newMessageBuilder()
.setTopic(topic).setTag("tagA").setKeys("key-123")
.addProperty("orderId", "0001")
.setBody("This is a transaction message".getBytes(StandardCharsets.UTF_8)).build();
SendReceipt receipt = producer.send(message, tx);
if (executeLocalTransaction()) { tx.commit(); } else { tx.rollback(); }
}
}Important Considerations
The topic must be created with type TRANSACTION ; otherwise the broker rejects the message.
Transactional messages do not support delayed delivery; any delay property is cleared automatically.
If the local transaction is slow, the producer should return UNKNOWN during the check to avoid premature rollback.
Configure transactionCheckInterval and transactionCheckMax according to business latency requirements.
Conclusion
RocketMQ’s transactional messaging guarantees distributed consistency through half‑message storage, a two‑phase commit, and an active transaction‑check mechanism. Proper handling of idempotency, timeout settings, and topic type constraints enables reliable use of transactional messages in complex micro‑service architectures.
Tencent Cloud Middleware
Official account of Tencent Cloud Middleware. Focuses on microservices, messaging middleware and other cloud‑native technology trends, publishing product updates, case studies, and technical insights. Regularly hosts tech salons to share effective solutions.
How this landed with the community
Was this worth your time?
0 Comments
Thoughtful readers leave field notes, pushback, and hard-won operational detail here.
