How RocketMQ Handles Transactional Messages: From Producer to Broker
This article explains the internal workflow of RocketMQ transactional messages, covering how producers send half messages, how brokers process commit or rollback requests, the storage mechanisms for transaction state, the periodic transaction check logic, and the differences between file‑system and database implementations.
Overview
This summary focuses on the implementation of transactional messages in RocketMQ 4.x. The core flow consists of a producer sending a half (prepared) message, the broker storing the half message, the producer executing a local transaction, and finally the broker committing or rolling back the message based on the transaction result.
Sending Transactional Messages
Producer side
The producer sends a half message by setting the PROPERTY_TRANSACTION_PREPARED flag, then executes the local transaction. Depending on the result, it notifies the broker to commit or roll back the message.
public TransactionSendResult sendMessageInTransaction(final Message msg,
final LocalTransactionExecuter tranExecuter,
final Object arg) throws MQClientException {
if (tranExecuter == null) {
throw new MQClientException("tranExecutor is null", null);
}
Validators.checkMessage(msg, this.defaultMQProducer);
// send half message
MessageAccessor.putProperty(msg, MessageConst.PROPERTY_TRANSACTION_PREPARED, "true");
MessageAccessor.putProperty(msg, MessageConst.PROPERTY_PRODUCER_GROUP,
this.defaultMQProducer.getProducerGroup());
SendResult sendResult;
try {
sendResult = this.send(msg);
} catch (Exception e) {
throw new MQClientException("send message Exception", e);
}
// execute local transaction
LocalTransactionState localTransactionState = LocalTransactionState.UNKNOW;
Throwable localException = null;
switch (sendResult.getSendStatus()) {
case SEND_OK:
try {
if (sendResult.getTransactionId() != null) {
msg.putUserProperty("__transactionId__", sendResult.getTransactionId());
}
localTransactionState = tranExecuter.executeLocalTransactionBranch(msg, arg);
if (localTransactionState == null) {
localTransactionState = LocalTransactionState.UNKNOW;
}
if (localTransactionState != LocalTransactionState.COMMIT_MESSAGE) {
log.info("executeLocalTransactionBranch return {}", localTransactionState);
}
} catch (Throwable e) {
log.info("executeLocalTransactionBranch exception", e);
localException = e;
}
break;
case FLUSH_DISK_TIMEOUT:
case FLUSH_SLAVE_TIMEOUT:
case SLAVE_NOT_AVAILABLE:
localTransactionState = LocalTransactionState.ROLLBACK_MESSAGE;
break;
default:
break;
}
// end transaction: COMMIT or ROLLBACK
try {
this.endTransaction(sendResult, localTransactionState, localException);
} catch (Exception e) {
log.warn("local transaction execute " + localTransactionState + ", but end broker transaction failed", e);
}
// build result
TransactionSendResult result = new TransactionSendResult();
result.setSendStatus(sendResult.getSendStatus());
result.setMessageQueue(sendResult.getMessageQueue());
result.setMsgId(sendResult.getMsgId());
result.setQueueOffset(sendResult.getQueueOffset());
result.setTransactionId(sendResult.getTransactionId());
result.setLocalTransactionState(localTransactionState);
return result;
}Broker end‑transaction handling
When the broker receives the end‑transaction request, it looks up the half message in the CommitLog, creates a new message with the appropriate commit/rollback flag, and stores it.
public RemotingCommand processRequest(ChannelHandlerContext ctx, RemotingCommand request)
throws RemotingCommandException {
RemotingCommand response = RemotingCommand.createResponseCommand(null);
EndTransactionRequestHeader requestHeader =
(EndTransactionRequestHeader) request.decodeCommandCustomHeader(EndTransactionRequestHeader.class);
MessageExt msgExt = this.brokerController.getMessageStore()
.lookMessageByOffset(requestHeader.getCommitLogOffset());
if (msgExt != null) {
MessageExtBrokerInner msgInner = this.endMessageTransaction(msgExt);
msgInner.setSysFlag(MessageSysFlag.resetTransactionValue(msgInner.getSysFlag(),
requestHeader.getCommitOrRollback()));
msgInner.setQueueOffset(requestHeader.getTranStateTableOffset());
msgInner.setPreparedTransactionOffset(requestHeader.getCommitLogOffset());
msgInner.setStoreTimestamp(msgExt.getStoreTimestamp());
if (MessageSysFlag.TRANSACTION_ROLLBACK_TYPE == requestHeader.getCommitOrRollback()) {
msgInner.setBody(null);
}
PutMessageResult putMessageResult = this.brokerController.getMessageStore()
.putMessage(msgInner);
if (putMessageResult != null) {
response.setCode(ResponseCode.SUCCESS);
} else {
response.setCode(ResponseCode.SYSTEM_ERROR);
response.setRemark("store putMessage return null");
}
} else {
response.setCode(ResponseCode.SYSTEM_ERROR);
response.setRemark("find prepared transaction message failed");
}
return response;
}ConsumeQueue generation
Only after a transaction is committed does the broker create a ConsumeQueue entry for the message, making it visible to consumers.
Transaction Check (Rollback)
Broker‑initiated check (file‑system implementation, v3.1.4)
RocketMQ uses three components to manage transaction state:
TransactionStateService – manages the lifecycle of transaction records.
TranStateTable – a MappedFileQueue that stores one fixed‑size record per half message. Each record (28 bytes) contains:
offset (Long) – physical position in the CommitLog.
size (Int) – message length.
timestamp (Int) – store time in seconds.
producerGroupHash (Int) – hash of the producer group.
state (Int) – transaction state (prepared, commit, rollback).
TranRedoLog – a redo log that records every write to TranStateTable so that the table can be rebuilt after an abnormal broker shutdown.
When a half message is stored, the broker appends a record to TranStateTable via appendPreparedTransaction and updates the redo log.
public boolean appendPreparedTransaction(final long clOffset,
final int size,
final int timestamp,
final int groupHashCode) {
MappedFile mapedFile = this.tranStateTable.getLastMapedFile();
if (mapedFile == null) {
log.error("appendPreparedTransaction: create mapedfile error.");
return false;
}
if (mapedFile.getWrotePostion() == 0) {
this.addTimerTask(mapedFile);
}
this.byteBufferAppend.position(0);
this.byteBufferAppend.limit(TSStoreUnitSize);
this.byteBufferAppend.putLong(clOffset);
this.byteBufferAppend.putInt(size);
this.byteBufferAppend.putInt(timestamp);
this.byteBufferAppend.putInt(groupHashCode);
this.byteBufferAppend.putInt(MessageSysFlag.TransactionPreparedType);
return mapedFile.appendMessage(this.byteBufferAppend.array());
}
public boolean updateTransactionState(final long tsOffset,
final long clOffset,
final int groupHashCode,
final int state) {
SelectMapedBufferResult result = this.findTransactionBuffer(tsOffset);
if (result != null) {
try {
result.getByteBuffer().putInt(TS_STATE_POS, state);
} catch (Exception e) {
log.error("updateTransactionState exception", e);
} finally {
result.release();
}
return true;
}
return false;
}Transaction‑check timer
Each MappedFile in TranStateTable is associated with a timer task (default interval 60 s). The task scans the file, finds half messages whose timestamp exceeds the configured minimum interval, and asks the corresponding producer to confirm the final state.
private void addTimerTask(final MappedFile mf) {
this.timer.scheduleAtFixedRate(new TimerTask() {
private final MappedFile mapedFile = mf;
private final TransactionCheckExecuter transactionCheckExecuter =
TransactionStateService.this.defaultMessageStore.getTransactionCheckExecuter();
private final long checkTransactionMessageAtleastInterval =
TransactionStateService.this.defaultMessageStore.getMessageStoreConfig()
.getCheckTransactionMessageAtleastInterval();
private final boolean slave = TransactionStateService.this.defaultMessageStore
.getMessageStoreConfig().getBrokerRole() == BrokerRole.SLAVE;
@Override
public void run() {
if (slave) return;
if (!TransactionStateService.this.defaultMessageStore.getMessageStoreConfig()
.isCheckTransactionMessageEnable()) return;
try {
SelectMapedBufferResult result = mapedFile.selectMapedBuffer(0);
if (result != null) {
long preparedMessageCountInThisMapedFile = 0;
for (int i = 0; i < result.getSize(); i += TSStoreUnitSize) {
result.getByteBuffer().position(i);
long clOffset = result.getByteBuffer().getLong();
int msgSize = result.getByteBuffer().getInt();
int timestamp = result.getByteBuffer().getInt();
int groupHashCode = result.getByteBuffer().getInt();
int tranType = result.getByteBuffer().getInt();
if (tranType != MessageSysFlag.TransactionPreparedType) continue;
long diff = System.currentTimeMillis() - timestamp * 1000L;
if (diff < checkTransactionMessageAtleastInterval) break;
preparedMessageCountInThisMapedFile++;
try {
transactionCheckExecuter.gotoCheck(groupHashCode,
getTranStateOffset(i), clOffset, msgSize);
} catch (Exception e) {
tranlog.warn("gotoCheck Exception", e);
}
}
result.release();
}
} catch (Exception e) {
log.error("check transaction timer task Exception", e);
}
}
}, 1000 * 60,
TransactionStateService.this.defaultMessageStore.getMessageStoreConfig()
.getCheckTransactionMessageTimerInterval());
}Broker sends check request to producer
The broker builds a CheckTransactionStateRequestHeader and selects a producer channel belonging to the same producer group (randomly). It then sends the request.
public void gotoCheck(int producerGroupHashCode,
long tranStateTableOffset,
long commitLogOffset,
int msgSize) {
ClientChannelInfo clientChannelInfo = this.brokerController.getProducerManager()
.pickProducerChannelRandomly(producerGroupHashCode);
if (clientChannelInfo == null) {
log.warn("check a producer transaction state, but not find any channel of this group[{}]", producerGroupHashCode);
return;
}
SelectMapedBufferResult msgResult = this.brokerController.getMessageStore()
.selectOneMessageByOffset(commitLogOffset, msgSize);
if (msgResult == null) {
log.warn("check a producer transaction state, but not find message by commitLogOffset: {}, msgSize: {}",
commitLogOffset, msgSize);
return;
}
CheckTransactionStateRequestHeader requestHeader = new CheckTransactionStateRequestHeader();
requestHeader.setCommitLogOffset(commitLogOffset);
requestHeader.setTranStateTableOffset(tranStateTableOffset);
this.brokerController.getBroker2Client()
.checkProducerTransactionState(clientChannelInfo.getChannel(), requestHeader, msgResult);
}Producer receives check request
Upon receiving the check request, the producer reads the half message from its local store, executes the local transaction logic again (or uses cached result), and replies with COMMIT, ROLLBACK, or UNKNOWN.
// Example handling in the producer (simplified)
public void checkProducerTransactionState(Channel channel,
CheckTransactionStateRequestHeader header,
SelectMapedBufferResult msgResult) {
// Decode the half message and re‑execute local transaction if needed
LocalTransactionState state = this.localTransactionExecuter.executeLocalTransactionBranch(
msgResult.getMessage(), null);
// Build response header
EndTransactionRequestHeader responseHeader = new EndTransactionRequestHeader();
responseHeader.setTransactionId(header.getTransactionId());
responseHeader.setCommitLogOffset(header.getCommitLogOffset());
responseHeader.setCommitOrRollback(state == LocalTransactionState.COMMIT_MESSAGE
? MessageSysFlag.TRANSACTION_COMMIT_TYPE
: MessageSysFlag.TRANSACTION_ROLLBACK_TYPE);
// Send one‑way end‑transaction request back to broker
this.mQClientFactory.getMQClientAPIImpl().endTransactionOneway(
brokerAddr, responseHeader, null, this.defaultMQProducer.getSendMsgTimeout());
}Version differences
Starting with RocketMQ 3.1.5 (and in the 4.0 series) the transaction state is persisted in a relational database instead of the file‑system. The database schema mirrors the fields of TranStateTable (offset, producerGroup, state, etc.), and the redo log is no longer required because the database guarantees durability.
In both implementations the overall semantics remain the same: the broker stores half messages, the producer executes a local transaction, the broker periodically checks unresolved half messages, and the final state is committed or rolled back to achieve exactly‑once delivery.
Programmer DD
A tinkering programmer and author of "Spring Cloud Microservices in Action"
How this landed with the community
Was this worth your time?
0 Comments
Thoughtful readers leave field notes, pushback, and hard-won operational detail here.
