RocketMQ Transactional, Batch, and Delayed Message Mechanisms – Detailed Explanation and Source Code
This article explains the principles and implementation of RocketMQ's transactional messages, batch messages, and delayed messages, describing the message flow, compensation mechanisms, broker and producer handling, and providing detailed source code snippets to illustrate each process.
The article continues the discussion of message pulling by dissecting the core of the message‑pulling black box and then explores the principles and implementations of three important RocketMQ features: transactional messages, batch messages, and delayed messages.
Transactional Messages – RocketMQ implements distributed transactions by using the message middleware as a local‑transaction table. A half (prepare) message is first sent by the client, persisted by the broker, and later a Commit or Rollback message is sent based on the local transaction result. The broker makes the message visible to consumers only after receiving a Commit, otherwise it discards the half message.
RocketMQ’s transaction model differs from Kafka’s exactly‑once semantics, which are aimed at stream processing.
The transaction flow consists of four steps:
Client sends a half message and marks it with MessageConst.PROPERTY_TRANSACTION_PREPARED .
Broker persists the half message, replaces its topic with a special internal topic, and stores it like a normal message.
Producer executes the local transaction, then reports the result (Commit/Rollback) to the broker via RequestCode.END_TRANSACTION .
Broker processes the request in EndTransactionProcessor , either committing the half message to the real topic or deleting it (by writing a marker to the OP topic).
If the broker does not receive a transaction result within a configured timeout, it initiates a compensation check. The broker periodically scans half messages via TransactionalMessageServiceImpl.check() , builds a removeMap of half messages that have corresponding OP messages, and decides whether to retry, commit, rollback, or discard based on immunity time and retry limits.
while (true) {
if (System.currentTimeMillis() - startTime > MAX_PROCESS_TIME_LIMIT) break;
if (removeMap.containsKey(i)) {
// already processed, skip
} else {
// need to check transaction state
}
}The producer side receives a CHECK_TRANSACTION_STATE request, invokes the configured TransactionCheckListener or TransactionListener , and sends the local transaction state back to the broker.
TransactionCheckListener transactionCheckListener = DefaultMQProducerImpl.this.checkListener();
if (transactionCheckListener != null) {
localTransactionState = transactionCheckListener.checkLocalTransactionState(message);
} else if (transactionListener != null) {
localTransactionState = transactionListener.checkLocalTransaction(message);
}
this.processTransactionState(localTransactionState, group, exception);Batch Messages – RocketMQ allows sending multiple messages in a single request to reduce bandwidth. All messages in a batch must share the same topic, delay level, and waitStoreMsgOK flag. The client validates these constraints, builds a MessageBatch , encodes each message, concatenates the byte arrays, and sends them with RequestCode.SEND_BATCH_MESSAGE . On the broker side, the batch is split back into individual messages for storage.
List
messageList = new ArrayList<>(messages.size());
Message first = null;
for (Message message : messages) {
if (message.getDelayTimeLevel() > 0) {
throw new UnsupportedOperationException("TimeDelayLevel is not supported for batching");
}
if (first == null) {
first = message;
} else if (!first.getTopic().equals(message.getTopic())) {
throw new UnsupportedOperationException("The topic of the messages in one batch should be the same");
}
messageList.add(message);
}
MessageBatch messageBatch = new MessageBatch(messageList);After validation, the batch is encoded by concatenating each message’s byte representation, then sent to the broker where it is decomposed back into single messages for persistence.
byte[] allBytes = new byte[totalSize];
int pos = 0;
for (byte[] bytes : encodedMessages) {
System.arraycopy(bytes, 0, allBytes, pos, bytes.length);
pos += bytes.length;
}Delayed Messages – RocketMQ supports fixed delay levels. When a message’s delayTimeLevel is greater than zero, the broker rewrites its topic to the internal schedule topic and stores it in a queue corresponding to the delay level. The ScheduleMessageService maintains a timer for each level, periodically checking the queue and delivering messages whose delay has expired back to their original topic.
if (msg.getDelayTimeLevel() > 0) {
topic = TopicValidator.RMQ_SYS_SCHEDULE_TOPIC;
queueId = ScheduleMessageService.delayLevel2QueueId(msg.getDelayTimeLevel());
MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_TOPIC, msg.getTopic());
MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_QUEUE_ID, String.valueOf(msg.getQueueId()));
msg.setTopic(topic);
msg.setQueueId(queueId);
}The service schedules a DeliverDelayedMessageTimerTask for each level; when the countdown reaches zero, the delayed message is reconstructed and written to the real topic.
public void executeOnTimeup() {
// load consume queue for the delay level
// iterate messages, calculate countdown
if (countdown <= 0) {
// deliver now
PutMessageResult result = writeMessageStore.putMessage(msgInner);
} else {
// reschedule
timer.schedule(new DeliverDelayedMessageTimerTask(delayLevel, nextOffset), countdown);
}
}Overall, the article provides a comprehensive walkthrough of how RocketMQ handles transactional integrity, batch efficiency, and delayed delivery, complete with source‑level code excerpts that illustrate each critical step.
IT Architects Alliance
Discussion and exchange on system, internet, large‑scale distributed, high‑availability, and high‑performance architectures, as well as big data, machine learning, AI, and architecture adjustments with internet technologies. Includes real‑world large‑scale architecture case studies. Open to architects who have ideas and enjoy sharing.
How this landed with the community
Was this worth your time?
0 Comments
Thoughtful readers leave field notes, pushback, and hard-won operational detail here.