Implementing Distributed Transactions with RocketMQ Transaction Messages
This article explains the concept of distributed transactions, presents typical micro‑service scenarios that generate them, and demonstrates a complete solution using RocketMQ transaction messages together with SQL table definitions and Java code for producers, transaction listeners, and consumers to achieve eventual consistency.
The article begins by defining distributed transactions as operations that span multiple services or databases in a micro‑service architecture, such as user registration, order creation, or bank transfers, which require remote coordination to maintain data integrity.
Typical scenarios that generate distributed transactions are highlighted: (1) micro‑service calls across JVM processes, and (2) a monolithic application accessing multiple database instances, both leading to cross‑service or cross‑database transaction requirements.
To solve these challenges, the author notes that strong consistency is impossible under CAP, so systems aim for eventual consistency using two‑phase or three‑phase commit, with practical implementations relying on RocketMQ transaction messages or the SEATA framework.
RocketMQ basic usage is introduced, emphasizing that transaction messages are the core mechanism for handling distributed transactions in the presented solution.
SQL scripts for creating the necessary tables are provided:
CREATE TABLE `abc_person` (
`user_id` int(11) NOT NULL COMMENT '用户编号',
`name` varchar(20) DEFAULT '' COMMENT '用户名称',
`id_card` varchar(20) DEFAULT NULL COMMENT '身份证号',
`banlance` decimal(10,2) DEFAULT NULL COMMENT '余额',
`mobile` varchar(12) DEFAULT '' COMMENT '手机号',
`create_time` datetime DEFAULT NULL COMMENT '创建时间',
`update_time` datetime DEFAULT NULL COMMENT '更新时间',
`delete_flg` char(1) DEFAULT '0' COMMENT '删除状态',
PRIMARY KEY (`user_id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8 COMMENT='用户表';
CREATE TABLE `transfer_detail` (
`id` int(11) NOT NULL AUTO_INCREMENT COMMENT '明细ID',
`user_id` int(11) NOT NULL DEFAULT '0' COMMENT '用户ID',
`money` decimal(10,2) DEFAULT '0.00' COMMENT '转账金额',
`msg_id` varchar(50) DEFAULT '' COMMENT '消息ID',
`delete_flg` char(1) DEFAULT '0' COMMENT '是否删除状态',
`create_time` timestamp NULL DEFAULT CURRENT_TIMESTAMP,
PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=7 DEFAULT CHARSET=utf8 COMMENT='消息发送表';Another set of tables for the second database is also defined with similar fields.
The producer code shows a Spring MVC endpoint that validates the source account, builds a TransferDto , and sends a half‑message to RocketMQ using rocketMQTemplate.sendMessageInTransaction . A unique distributed ID generated by Snowflake ensures idempotent consumption.
@PostMapping("abcToHx")
public String abcToHx(String mobile, BigDecimal transferMoney) {
AbcPerson abcPerson = userService.getByMobile(mobile);
if (ObjectUtil.isNotEmpty(abcPerson) && ObjectUtil.isNotEmpty(transferMoney)
&& abcPerson.getBanlance().doubleValue() > transferMoney.doubleValue()) {
TransferDto transferDto = new TransferDto();
transferDto.setMobile(mobile);
transferDto.setMoney(transferMoney);
transferDto.setUserId(abcPerson.getUserId());
transferDto.setDistributedId(snowFlakeUtil.snowflakeId());
String destination = "transfer-topic:toHx";
Message message = MessageBuilder.withPayload(JSON.toJSONString(transferDto)).build();
TransactionSendResult result = rocketMQTemplate.sendMessageInTransaction(destination, message, null);
log.warn("发送半消息:" + message + ",响应内容:" + result);
return "SUCCESS";
}
return "FAIL";
}The local transaction executor parses the message, calls userService.transferMoney , and returns COMMIT or ROLLBACK based on the outcome. The checkLocalTransaction method re‑checks the database to decide the final state.
@Override
public RocketMQLocalTransactionState executeLocalTransaction(Message msg, Object arg) {
String jsonStr = new String((byte[]) msg.getPayload());
TransferDto transferDto = JSON.parseObject(jsonStr, TransferDto.class);
boolean flag = userService.transferMoney(transferDto.getUserId(), transferDto.getMoney(), transferDto.getDistributedId().toString());
if (flag) {
log.warn("executeLocalTransaction本地事务执行完成,提交:" + JSON.toJSONString(msg));
return RocketMQLocalTransactionState.COMMIT;
} else {
log.warn("executeLocalTransaction 本地事务执行失败,ROLLBACK");
return RocketMQLocalTransactionState.ROLLBACK;
}
}
@Override
public RocketMQLocalTransactionState checkLocalTransaction(Message msg) {
String jsonStr = new String((byte[]) msg.getPayload());
TransferDto transferDto = JSON.parseObject(jsonStr, TransferDto.class);
TransferDetail transferDetail = transferDetailService.getByMsgId(transferDto.getDistributedId().toString());
if (ObjectUtil.isNotEmpty(transferDetail)) {
log.warn("本地事务执行完成,提交:" + JSON.toJSONString(msg));
return RocketMQLocalTransactionState.COMMIT;
} else {
return RocketMQLocalTransactionState.ROLLBACK;
}
}The consumer implements RocketMQListener , obtains the message, builds a Redis lock key using the distributed ID, and processes the transfer. If the lock cannot be acquired or an exception occurs, the message is not considered consumed, ensuring idempotency.
@Override
public void onMessage(MessageExt message) {
String key = null;
String value = null;
try {
String msgId = message.getMsgId();
TransferDto transferDto = JSON.parseObject(new String(message.getBody()), TransferDto.class);
key = "mq:" + transferDto.getDistributedId();
value = Thread.currentThread().getId() + ":" + System.currentTimeMillis();
boolean flag = redisTemplate.opsForValue().setIfAbsent(key, value, 1, TimeUnit.HOURS);
if (flag) {
try {
flag = userService.transferMoney(transferDto.getMobile(), transferDto.getMoney());
if (!flag) {
throw new RuntimeException("没有添加金额成功,抛出异常");
}
log.warn("成功消费");
} catch (Exception e) {
redisTemplate.watch(key);
redisTemplate.multi();
String lockValue = (String) redisTemplate.opsForValue().get(key);
if (StrUtil.isNotBlank(lockValue) && lockValue == value) {
redisTemplate.delete(key);
}
redisTemplate.exec();
throw new RuntimeException("释放分布式锁,因消费失败,故抛出异常");
}
} else {
throw new RuntimeException("未拿到锁,不进行消费");
}
} catch (Exception e) {
log.warn("消费异常");
throw new RuntimeException("消费异常");
}
}Verification steps show the data in the two databases before and after invoking the producer endpoint, confirming that the money is deducted from the source account, added to the target account, and the transaction detail records are persisted, demonstrating a successful distributed transaction flow.
Top Architect
Top Architect focuses on sharing practical architecture knowledge, covering enterprise, system, website, large‑scale distributed, and high‑availability architectures, plus architecture adjustments using internet technologies. We welcome idea‑driven, sharing‑oriented architects to exchange and learn together.
How this landed with the community
Was this worth your time?
0 Comments
Thoughtful readers leave field notes, pushback, and hard-won operational detail here.