Combining RocketMQ Transaction Messages, Local Message Table, and XXL‑Job for 10‑100k QPS Distributed Transactions
This article presents a detailed design for a high‑concurrency (10‑100k QPS) distributed transaction solution that integrates RocketMQ transactional messages, an eBay‑style local message table, and XXL‑Job reconciliation to achieve eventual consistency while handling failures, retries, and data‑explosion challenges.
CAP Conflict and Need for AP
In a 10 W QPS scenario the interview question highlights the fundamental conflict between strong consistency (CP) and high availability (AP). According to the CAP theorem a distributed system can satisfy at most two of consistency, availability, and partition tolerance, so high‑concurrency transactions must sacrifice strong consistency and rely on eventual consistency.
Classic eBay Local Message Table
The eBay‑style local‑message‑table implements BASE theory to provide reliable eventual consistency. A dedicated table records each business message and its status, and the table and business data are committed in the same database transaction to guarantee atomicity.
id – bigint primary key
msg_type – varchar, message type
biz_id – varchar, business identifier
content – text, message body
state – varchar, status (e.g., PENDING, CONSUMED)
create_time – datetime
update_time – datetime
Producer Side (Order Service)
import org.apache.rocketmq.client.producer.TransactionMQProducer;
import org.apache.rocketmq.common.message.Message;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import javax.annotation.Resource;
import java.nio.charset.StandardCharsets;
import java.util.Date;
import java.util.concurrent.TimeUnit;
@Service
public class OrderServiceImpl implements OrderService {
@Resource
private TransactionMQProducer transactionMQProducer;
@Resource
private OrderMapper orderMapper;
@Resource
private MessageLogMapper messageLogMapper;
@Override
public void createOrder(OrderCreateDTO dto) throws Exception {
String orderId = generateOrderId();
TOrder order = TOrder.builder()
.orderId(orderId)
.userId(dto.getUserId())
.skuId(dto.getSkuId())
.quantity(dto.getQuantity())
.orderStatus("CREATED")
.build();
String msgContent = JSON.toJSONString(dto);
Message message = new Message(
"OrderTopic",
"INVENTORY_DEDUCT||NOTICE_SEND",
orderId.getBytes(StandardCharsets.UTF_8),
msgContent.getBytes(StandardCharsets.UTF_8)
);
transactionMQProducer.sendMessageInTransaction(message, order);
}
@Transactional(rollbackFor = Exception.class)
public LocalTransactionState executeLocalTransaction(TOrder order, Message message) {
try {
orderMapper.insert(order);
Date nextReconcileTime = new Date(System.currentTimeMillis() + TimeUnit.MINUTES.toMillis(5));
MessageLog log = MessageLog.builder()
.orderId(order.getOrderId())
.rocketmqMsgId(message.getMsgId())
.messageContent(new String(message.getBody()))
.businessType("ORDER_CREATE")
.msgStatus("INIT")
.reconcileStatus("PENDING")
.retryCount(0)
.nextReconcileTime(nextReconcileTime)
.build();
messageLogMapper.insert(log);
return LocalTransactionState.COMMIT_MESSAGE;
} catch (Exception e) {
return LocalTransactionState.ROLLBACK_MESSAGE;
}
}
public LocalTransactionState checkLocalTransaction(String orderId) {
MessageLog log = messageLogMapper.selectByOrderId(orderId);
if (log == null) return LocalTransactionState.ROLLBACK_MESSAGE;
if ("INIT".equals(log.getMsgStatus()) || "PENDING".equals(log.getReconcileStatus())) {
return LocalTransactionState.COMMIT_MESSAGE;
}
return LocalTransactionState.ROLLBACK_MESSAGE;
}
private String generateOrderId() {
return "ORDER_" + System.currentTimeMillis() + RandomUtils.nextInt(1000, 9999);
}
}Consumer Side (Inventory Service)
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Component;
import org.springframework.transaction.annotation.Transactional;
import javax.annotation.Resource;
@Component
@RocketMQMessageListener(topic = "OrderTopic", selectorExpression = "INVENTORY_DEDUCT", consumerGroup = "inventory_consumer_group")
public class InventoryConsumer implements RocketMQListener<MessageExt> {
@Resource
private InventoryMapper inventoryMapper;
@Resource
private InventoryDeductLogMapper deductLogMapper;
@Resource
private MessageLogFeignClient messageLogFeignClient;
@Override
@Transactional(rollbackFor = Exception.class)
public void onMessage(MessageExt messageExt) {
String msgContent = new String(messageExt.getBody());
OrderCreateDTO dto = JSON.parseObject(msgContent, OrderCreateDTO.class);
String orderId = dto.getOrderId();
Long skuId = dto.getSkuId();
Integer quantity = dto.getQuantity();
InventoryDeductLog existLog = deductLogMapper.selectByOrderId(orderId);
if (existLog != null && "SUCCESS".equals(existLog.getDeductStatus())) {
messageLogFeignClient.updateMsgStatus(orderId, "CONSUMED");
return;
}
try {
Inventory inventory = inventoryMapper.selectBySkuId(skuId);
if (inventory == null || inventory.getStock() < quantity) {
deductLogMapper.insert(InventoryDeductLog.builder()
.orderId(orderId)
.skuId(skuId)
.deductQuantity(quantity)
.deductStatus("FAIL")
.build());
messageLogFeignClient.updateMsgStatus(orderId, "FAIL");
throw new RuntimeException("库存不足,扣减失败");
}
inventory.setStock(inventory.getStock() - quantity);
inventoryMapper.updateById(inventory);
deductLogMapper.insert(InventoryDeductLog.builder()
.orderId(orderId)
.skuId(skuId)
.deductQuantity(quantity)
.deductStatus("SUCCESS")
.build());
messageLogFeignClient.updateMsgStatus(orderId, "CONSUMED");
} catch (Exception e) {
deductLogMapper.insertOrUpdate(InventoryDeductLog.builder()
.orderId(orderId)
.skuId(skuId)
.deductQuantity(quantity)
.deductStatus("PROCESSING")
.build());
return ConsumeConcurrentlyStatus.RECONSUME_LATER;
}
}
public String queryDeductStatus(String orderId) {
InventoryDeductLog log = deductLogMapper.selectByOrderId(orderId);
return log == null ? "NOT_PROCESSED" : log.getDeductStatus();
}
public boolean retryDeduct(String orderId) {
// Simplified retry logic for PROCESSING/FAIL states
return true;
}
}Normal Process Flow
User places an order → createOrder sends a RocketMQ transaction message.
RocketMQ receives a half‑message and invokes executeLocalTransaction.
On success the order and local‑message‑log are persisted (msg_status=INIT, reconcile_status=PENDING) and COMMIT_MESSAGE is returned.
RocketMQ then delivers the message to downstream services.
Inventory and notice services consume the message, perform their business logic, and update the message status to CONSUMED.
After 5 minutes an XXL‑Job task reconciles the statuses of Service A (order), Service B (inventory) and Service C (notice). If all are successful the reconcile status is set to SUCCESS, completing the loop.
Exception Scenarios (Without Reconciliation)
Local transaction failure : order or message‑log insertion fails → ROLLBACK_MESSAGE, RocketMQ discards the message.
Broker timeout : no commit/rollback received → broker triggers checkLocalTransaction, which reads the message‑log and returns COMMIT_MESSAGE if the local transaction succeeded.
Consumer failure : consumer returns RECONSUME_LATER, RocketMQ retries (default 16 times) then moves the message to a dead‑letter queue.
XXL‑Job Reconciliation Mechanism
Core Goal
Detect and fix consistency gaps that the basic flow cannot cover, such as successful consumption without status update, network‑induced delivery loss, or partial business execution.
Task Configuration
Frequency: every 5 minutes, matching next_reconcile_time.
Sharding: split by the tail of order_id (e.g., 10 shards) to support million‑order reconciliation.
Timeout per shard: 30 seconds; timeout marks the record for retry.
Dependencies: status‑query and retry APIs of order, inventory, and notice services.
Reconciliation Logic (Code)
import com.xxl.job.core.context.XxlJobHelper;
import com.xxl.job.core.handler.annotation.XxlJob;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
import java.util.Date;
import java.util.List;
import java.util.concurrent.TimeUnit;
@Component
public class OrderReconcileJob {
@Resource
private MessageLogMapper messageLogMapper;
@Resource
private OrderService orderService;
@Resource
private InventoryFeignClient inventoryFeignClient;
@Resource
private NoticeFeignClient noticeFeignClient;
@Resource
private ReconcileWorkOrderMapper workOrderMapper;
@XxlJob("orderReconcileJob")
public void execute() throws Exception {
int shardIndex = XxlJobHelper.getShardIndex();
int shardTotal = XxlJobHelper.getShardTotal();
List<MessageLog> pendingLogs = messageLogMapper.selectPendingReconcile(
shardIndex, shardTotal, 5, new Date()
);
if (pendingLogs.isEmpty()) {
XxlJobHelper.log("No pending data for shard {}", shardIndex);
return;
}
for (MessageLog log : pendingLogs) {
String orderId = log.getOrderId();
try {
XxlJobHelper.log("Reconciling order {}", orderId);
String orderStatus = orderService.queryOrderStatus(orderId);
String inventoryStatus = inventoryFeignClient.queryDeductStatus(orderId);
String noticeStatus = noticeFeignClient.queryNoticeStatus(orderId);
// Scenario 1: all success
if ("CREATED".equals(orderStatus) && "SUCCESS".equals(inventoryStatus) && "SUCCESS".equals(noticeStatus)) {
messageLogMapper.updateReconcileStatus(orderId, "SUCCESS");
continue;
}
// Scenario 2: processing – retry later
if ("CREATED".equals(orderStatus) && ("PROCESSING".equals(inventoryStatus) || "PROCESSING".equals(noticeStatus))) {
Date next = new Date(System.currentTimeMillis() + TimeUnit.MINUTES.toMillis(10));
messageLogMapper.updateRetryInfo(orderId, log.getRetryCount() + 1, next, "RETRY");
continue;
}
// Scenario 3: downstream failure – auto‑retry up to 5 times
if ("CREATED".equals(orderStatus) && ("FAIL".equals(inventoryStatus) || "FAIL".equals(noticeStatus))) {
if (log.getRetryCount() < 5) {
boolean invRetry = "FAIL".equals(inventoryStatus) ? inventoryFeignClient.retryDeduct(orderId) : true;
boolean noticeRetry = "FAIL".equals(noticeStatus) ? noticeFeignClient.retrySend(orderId) : true;
if (invRetry && noticeRetry) {
Date next = new Date(System.currentTimeMillis() + TimeUnit.MINUTES.toMillis(5));
messageLogMapper.updateRetryInfo(orderId, log.getRetryCount() + 1, next, "RETRY");
} else {
Date next = new Date(System.currentTimeMillis() + TimeUnit.MINUTES.toMillis(30));
messageLogMapper.updateRetryInfo(orderId, log.getRetryCount() + 1, next, "RETRY");
}
continue;
}
// Exceeded retries – generate manual work order
workOrderMapper.insert(ReconcileWorkOrder.builder()
.orderId(orderId)
.workOrderNo("RECONCILE_" + System.currentTimeMillis())
.faultDesc("Order " + orderId + ": inventory=" + inventoryStatus + ", notice=" + noticeStatus + ", retries exhausted")
.workOrderStatus("PENDING")
.createTime(new Date())
.build());
messageLogMapper.updateReconcileStatus(orderId, "FAIL");
continue;
}
// Scenario 4: order cancelled – treat as success
if ("CANCELED".equals(orderStatus)) {
messageLogMapper.updateReconcileStatus(orderId, "SUCCESS");
continue;
}
} catch (Exception e) {
Date next = new Date(System.currentTimeMillis() + TimeUnit.MINUTES.toMillis(10));
messageLogMapper.updateRetryInfo(orderId, log.getRetryCount() + 1, next, "RETRY");
XxlJobHelper.log("Reconcile error for order {}: {}, next try at {}", orderId, e.getMessage(), next);
}
}
XxlJobHelper.handleSuccess("Shard {} completed, processed {} records", shardIndex, pendingLogs.size());
}
}Key Guarantees
Idempotency : Reconciliation scans only records with status PENDING or RETRY, avoiding duplicate processing.
Retry Strategy : Exponential back‑off (5 min → 10 min → 30 min) reduces pressure on downstream services.
Manual Fallback : After five automatic retries a work order is created for human intervention (e.g., manual inventory compensation).
Data Cleanup : Records with SUCCESS older than 30 days are archived to message_log_hist to keep the primary table lightweight.
Message‑Table Scaling Strategies
Index Optimization : Add indexes on status fields to accelerate filtering.
Pagination : Perform batch pagination instead of a single massive scan.
Multithreaded Segment Queries : Split the table by business identifier or numeric range (e.g., 0‑1000, 1001‑2000) and process each segment in parallel.
Sharding : For very large tables adopt database sharding to distribute load.
Advantages and Disadvantages of the eBay Local Message Table
Advantages
Low implementation cost; provides reliable message delivery and eventual consistency.
No need for explicit query‑back mechanisms, reducing business intrusion.
Can be further decoupled with annotations for near‑zero code intrusion in some scenarios.
Disadvantages
Couples the message table with business data, making it hard to scale independently.
Relies on database I/O; under extreme concurrency the disk‑bound nature becomes a performance bottleneck.
When the table grows large, scanning becomes slow and can cause reconciliation latency.
System Architecture Overview
The solution consists of four services and one middleware:
Order Service (producer) : Generates orders, writes the local message record, and sends a RocketMQ transaction message.
Inventory Service (consumer) : Subscribes to the order topic, deducts stock, records its own log, and updates the message status.
Notice Service (consumer) : Sends notifications and updates the message status.
RocketMQ : Acts as the transaction coordinator; after receiving a commit from the producer it delivers the message to downstream consumers.
XXL‑Job : Periodic reconciliation job that compares order, inventory and notice statuses, applies retry logic, and creates manual work orders when automatic retries are exhausted.
Database Design
-- Order service local message table (message_log)
CREATE TABLE message_log (
id BIGINT PRIMARY KEY AUTO_INCREMENT,
order_id BIGINT NOT NULL COMMENT 'Order ID',
rocketmq_msg_id VARCHAR(64) DEFAULT NULL COMMENT 'RocketMQ message ID',
message_content TEXT NOT NULL COMMENT 'JSON payload',
business_type VARCHAR(32) NOT NULL COMMENT 'e.g., ORDER_CREATE',
msg_status ENUM('INIT','SENT','CONSUMED','FAIL') DEFAULT 'INIT' COMMENT 'Message lifecycle',
reconcile_status ENUM('PENDING','SUCCESS','FAIL','RETRY') DEFAULT 'PENDING' COMMENT 'Reconciliation state',
retry_count TINYINT DEFAULT 0 COMMENT 'Retry count (max 5)',
next_reconcile_time DATETIME NOT NULL COMMENT 'Next reconciliation trigger',
create_time DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP,
update_time DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
UNIQUE KEY uk_order_id_business_type (order_id, business_type)
);
-- Simplified order table
CREATE TABLE t_order (
id BIGINT PRIMARY KEY AUTO_INCREMENT,
order_id VARCHAR(64) NOT NULL COMMENT 'Unique order number',
user_id BIGINT NOT NULL,
sku_id BIGINT NOT NULL,
quantity INT NOT NULL,
order_status ENUM('CREATED','PAID','SHIPPED','FINISHED','CANCELED') DEFAULT 'CREATED',
create_time DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP,
UNIQUE KEY uk_order_id (order_id)
);
-- Inventory deduction log (supports idempotency)
CREATE TABLE inventory_deduct_log (
id BIGINT PRIMARY KEY AUTO_INCREMENT,
order_id VARCHAR(64) NOT NULL COMMENT 'Order ID',
sku_id BIGINT NOT NULL,
deduct_quantity INT NOT NULL,
deduct_status ENUM('SUCCESS','FAIL','PROCESSING') DEFAULT 'PROCESSING',
create_time DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP,
UNIQUE KEY uk_order_id (order_id)
);Full Exception Handling Overview
Local transaction failure : Order or message‑log insertion fails → ROLLBACK_MESSAGE, RocketMQ discards the message.
Broker timeout / transaction check : If RocketMQ does not receive commit/rollback it invokes checkLocalTransaction; a present log leads to COMMIT_MESSAGE and message re‑delivery.
Consumer failure : Consumer returns RECONSUME_LATER; RocketMQ retries up to 16 times then moves the message to a dead‑letter queue.
Successful consumption without status update : Reconciliation job queries downstream status, updates the message to CONSUMED.
Downstream business failure (e.g., insufficient stock) : Consumer records FAIL; reconciliation retries up to 5 times then generates a manual work order.
Tech Freedom Circle
Crazy Maker Circle (Tech Freedom Architecture Circle): a community of tech enthusiasts, experts, and high‑performance fans. Many top‑level masters, architects, and hobbyists have achieved tech freedom; another wave of go‑getters are hustling hard toward tech freedom.
How this landed with the community
Was this worth your time?
0 Comments
Thoughtful readers leave field notes, pushback, and hard-won operational detail here.
