Implementation of Order Service with Distributed Transactions, Locks, and Asynchronous Processing in Java
This article explains how to build a robust order service using Seata distributed transactions, Redisson distributed locks, CompletableFuture for asynchronous operations, token-based duplicate‑submission prevention, and RabbitMQ delayed queues for automatic order cancellation, with full Java code examples.
Introduction
Order service involves many aspects such as distributed transactions and distributed locks; for example, cancelling an unpaid order after timeout, preventing duplicate submissions, and avoiding overselling.
Enabling distributed transactions ensures data consistency and integrity across multiple services.
Using distributed locks guarantees that only one operation succeeds at a time, preventing concurrency issues.
Order Flow (Key Parts Only)
Order Confirmation
public OrderConfirmVO confirmOrder(Long skuId) {
Long memberId = SecurityUtils.getMemberId();
// Resolve HttpServletRequest data loss in child threads
RequestAttributes attributes = RequestContextHolder.getRequestAttributes();
RequestContextHolder.setRequestAttributes(attributes, true);
// Fetch order items asynchronously
CompletableFuture
> getOrderItemsFuture = CompletableFuture.supplyAsync(
() -> this.getOrderItems(skuId, memberId), threadPoolExecutor)
.exceptionally(ex -> { log.error("Failed to get order items: {}", ex.toString()); return null; });
// Fetch member address asynchronously
CompletableFuture
> getMemberAddressFuture = CompletableFuture.supplyAsync(() -> {
Result
> result = memberFeignClient.listMemberAddresses(memberId);
if (Result.isSuccess(result)) { return result.getData(); }
return null;
}, threadPoolExecutor).exceptionally(ex -> { log.error("Failed to get addresses for memberId {} : {}", memberId, ex.toString()); return null; });
// Generate unique token to prevent duplicate submission
CompletableFuture
generateOrderTokenFuture = CompletableFuture.supplyAsync(() -> {
String orderToken = this.generateTradeNo(memberId);
redisTemplate.opsForValue().set(OrderConstants.ORDER_TOKEN_PREFIX + orderToken, orderToken);
return orderToken;
}, threadPoolExecutor).exceptionally(ex -> { log.error("Failed to generate order token ."); return null; });
// Wait for all async tasks
CompletableFuture.allOf(getOrderItemsFuture, getMemberAddressFuture, generateOrderTokenFuture).join();
OrderConfirmVO orderConfirmVO = new OrderConfirmVO();
orderConfirmVO.setOrderItems(getOrderItemsFuture.join());
orderConfirmVO.setAddresses(getMemberAddressFuture.join());
orderConfirmVO.setOrderToken(generateOrderTokenFuture.join());
log.info("Order confirm response for skuId {}: {}", skuId, orderConfirmVO);
return orderConfirmVO;
}Prevent Duplicate Order Submission
A unique token is generated to avoid duplicate submissions. The token is stored in Redis with a key prefix and later validated via a Lua script.
private String generateTradeNo(Long memberId) {
// Pad memberId to 5 digits, keep last 5 if longer
String userIdFilledZero = String.format("%05d", memberId);
String fiveDigitsUserId = userIdFilledZero.substring(userIdFilledZero.length() - 5);
// Prefix "wxo" helps identify order source
return System.currentTimeMillis() + RandomUtil.randomNumbers(3) + fiveDigitsUserId;
}The token key is "ORDER_TOKEN_PREFIX + token" and the value is the token itself.
During order submission, a Lua script checks the key; if it exists, the key is deleted and the submission proceeds, otherwise an assertion blocks the duplicate request.
Extension: Using CompletableFuture improves response speed, reduces blocking, and isolates exception handling for each async task.
Custom thread pool configuration is provided for these async operations.
@Configuration
@Slf4j
public class ThreadPoolConfig {
@Bean
public ThreadPoolExecutor threadPoolExecutor() {
int cpuCoreSize = Runtime.getRuntime().availableProcessors();
log.info("Current CPU cores: {}", cpuCoreSize);
int corePoolSize = cpuCoreSize + 1; // for compute‑intensive tasks
return new ThreadPoolExecutor(
corePoolSize,
2 * corePoolSize,
30,
TimeUnit.SECONDS,
new ArrayBlockingQueue<>(1000),
new NamedThreadFactory("order")
);
}
}Order Submission
@GlobalTransactional
public String submitOrder(OrderSubmitForm submitForm) {
log.info("Order submit params: {}", JSONUtil.toJsonStr(submitForm));
String orderToken = submitForm.getOrderToken();
// 1. Prevent duplicate submission using Lua script
String lockAcquireScript = "if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('del', KEYS[1]) else return 0 end";
Long lockAcquired = this.redisTemplate.execute(
new DefaultRedisScript<>(lockAcquireScript, Long.class),
Collections.singletonList(OrderConstants.ORDER_TOKEN_PREFIX + orderToken),
orderToken);
Assert.isTrue(lockAcquired != null && lockAcquired.equals(1L), "Duplicate order submission, please refresh and retry");
// 2. Validate order items (price, availability)
List
orderItems = submitForm.getOrderItems();
List
skuIds = orderItems.stream().map(OrderSubmitForm.OrderItem::getSkuId).collect(Collectors.toList());
List
skuList = skuFeignClient.getSkuInfoList(skuIds);
for (OrderSubmitForm.OrderItem item : orderItems) {
SkuInfoDTO skuInfo = skuList.stream().filter(s -> s.getId().equals(item.getSkuId())).findFirst().orElse(null);
Assert.isTrue(skuInfo != null, "Product ({}) is off‑shelf or deleted");
Assert.isTrue(item.getPrice().compareTo(skuInfo.getPrice()) == 0, "Product ({}) price changed, please refresh", item.getSkuName());
}
// 3. Lock stock
List
lockedSkuList = orderItems.stream()
.map(i -> new LockedSkuDTO(i.getSkuId(), i.getQuantity(), i.getSkuSn()))
.collect(Collectors.toList());
boolean lockStockResult = skuFeignClient.lockStock(orderToken, lockedSkuList);
Assert.isTrue(lockStockResult, "Order submission failed: unable to lock stock!");
// 4. Create order
boolean result = this.saveOrder(submitForm);
log.info("order ({}) create result:{}", orderToken, result);
return orderToken;
}Note: This method uses Seata for distributed transactions and Redisson for distributed locks.
Lua Script Details
KEYS[1] = OrderConstants.ORDER_TOKEN_PREFIX + orderToken ARGV[1] = orderToken The script returns 1L if the key exists and is deleted, otherwise 0.
Product Validation
Extract SKU IDs from the submitted form.
Fetch SKU details via Feign remote call.
Verify each SKU exists and its price matches the submitted price.
Prepare a list of LockedSkuDTO objects for stock locking.
Lock Stock Method
@Transactional
public boolean lockStock(String orderToken, List
lockedSkuList) {
Assert.isTrue(CollectionUtil.isNotEmpty(lockedSkuList), "Order ({}) contains no items", orderToken);
for (LockedSkuDTO lockedSku : lockedSkuList) {
Long skuId = lockedSku.getSkuId();
RLock lock = redissonClient.getLock(ProductConstants.SKU_LOCK_PREFIX + skuId);
try {
lock.lock();
Integer quantity = lockedSku.getQuantity();
boolean lockResult = this.update(new LambdaUpdateWrapper
()
.setSql("locked_stock = locked_stock + " + quantity)
.eq(PmsSku::getId, lockedSku.getSkuId())
.apply("stock - locked_stock >= {0}", quantity));
Assert.isTrue(lockResult, "Product ({}) insufficient stock", lockedSku.getSkuSn());
} finally {
if (lock.isLocked()) { lock.unlock(); }
}
}
redisTemplate.opsForValue().set(ProductConstants.LOCKED_SKUS_PREFIX + orderToken, lockedSkuList);
return true;
}Unlock Stock (Extension)
public boolean unlockStock(String orderSn) {
List
lockedSkus = (List
) redisTemplate.opsForValue().get(ProductConstants.LOCKED_SKUS_PREFIX + orderSn);
log.info("Releasing locked stock for order ({}): {}", orderSn, JSONUtil.toJsonStr(lockedSkus));
if (CollectionUtil.isEmpty(lockedSkus)) { return true; }
for (LockedSkuDTO lockedSku : lockedSkus) {
RLock lock = redissonClient.getLock(ProductConstants.SKU_LOCK_PREFIX + lockedSku.getSkuId());
try {
lock.lock();
this.update(new LambdaUpdateWrapper
()
.setSql("locked_stock = locked_stock - " + lockedSku.getQuantity())
.eq(PmsSku::getId, lockedSku.getSkuId()));
} finally {
if (lock.isLocked()) { lock.unlock(); }
}
}
redisTemplate.delete(ProductConstants.LOCKED_SKUS_PREFIX + orderSn);
return true;
}Deduct Stock (Extension)
public boolean deductStock(String orderSn) {
List
lockedSkus = (List
) redisTemplate.opsForValue().get(ProductConstants.LOCKED_SKUS_PREFIX + orderSn);
log.info("Order ({}) payment succeeded, deducting stock: {}", orderSn, JSONUtil.toJsonStr(lockedSkus));
Assert.isTrue(CollectionUtil.isNotEmpty(lockedSkus), "Deduct stock failed: order ({}) has no items");
for (LockedSkuDTO lockedSku : lockedSkus) {
RLock lock = redissonClient.getLock(ProductConstants.SKU_LOCK_PREFIX + lockedSku.getSkuId());
try {
lock.lock();
this.update(new LambdaUpdateWrapper
()
.setSql("stock = stock - " + lockedSku.getQuantity())
.setSql("locked_stock = locked_stock - " + lockedSku.getQuantity())
.eq(PmsSku::getId, lockedSku.getSkuId()));
} finally {
if (lock.isLocked()) { lock.unlock(); }
}
}
redisTemplate.delete(ProductConstants.LOCKED_SKUS_PREFIX + orderSn);
return true;
}Create Order
private boolean saveOrder(OrderSubmitForm submitForm) {
OmsOrder order = orderConverter.form2Entity(submitForm);
order.setStatus(OrderStatusEnum.UNPAID.getValue());
order.setMemberId(SecurityUtils.getMemberId());
order.setSource(submitForm.getOrderSource().getValue());
boolean result = this.save(order);
Long orderId = order.getId();
if (result) {
List
orderItemEntities = orderItemConverter.item2Entity(submitForm.getOrderItems());
orderItemEntities.forEach(item -> item.setOrderId(orderId));
orderItemService.saveBatch(orderItemEntities);
// Send delayed message for automatic cancellation
rabbitTemplate.convertAndSend("order.exchange", "order.close.delay", submitForm.getOrderToken());
}
return result;
}Order Timeout Cancellation (RabbitMQ Delayed Queue)
@Component
@Slf4j
public class OrderRabbitConfig {
private static final String ORDER_CLOSE_DELAY_QUEUE = "order.close.delay.queue";
private static final String ORDER_EXCHANGE = "order.exchange";
private static final String ORDER_CLOSE_DELAY_ROUTING_KEY = "order.close.delay";
private static final String ORDER_CLOSE_QUEUE = "order.close.queue";
private static final String ORDER_DLX_EXCHANGE = "order.dlx.exchange";
private static final String ORDER_CLOSE_ROUTING_KEY = "order.close";
@Bean
public Exchange orderExchange() { return new DirectExchange(ORDER_EXCHANGE, true, false); }
@Bean
public Exchange orderDlxExchange() { return new DirectExchange(ORDER_DLX_EXCHANGE, true, false); }
@Bean
public Queue orderDelayQueue() {
Map
args = new HashMap<>();
args.put("x-dead-letter-exchange", ORDER_DLX_EXCHANGE);
args.put("x-dead-letter-routing-key", ORDER_CLOSE_ROUTING_KEY);
args.put("x-message-ttl", 10 * 1000L); // 10 seconds for demo
return new Queue(ORDER_CLOSE_DELAY_QUEUE, true, false, false, args);
}
@Bean
public Binding orderDelayQueueBinding() {
return new Binding(ORDER_CLOSE_DELAY_QUEUE, Binding.DestinationType.QUEUE, ORDER_EXCHANGE, ORDER_CLOSE_DELAY_ROUTING_KEY, null);
}
@Bean
public Queue orderCloseQueue() {
log.info("Dead‑letter queue (order.close.queue) created");
return new Queue(ORDER_CLOSE_QUEUE, true, false, false);
}
@Bean
public Binding orderCloseQueueBinding() {
return new Binding(ORDER_CLOSE_QUEUE, Binding.DestinationType.QUEUE, ORDER_DLX_EXCHANGE, ORDER_CLOSE_ROUTING_KEY, null);
}
}Order Close Listener
@Component
@RequiredArgsConstructor
@Slf4j
public class OrderCloseListener {
private final OrderService orderService;
private final RabbitTemplate rabbitTemplate;
@RabbitListener(queues = "order.close.queue")
public void closeOrder(String orderSn, Message message, Channel channel) {
long deliveryTag = message.getMessageProperties().getDeliveryTag();
log.info("Order ({}) timed out, automatically closing", orderSn);
try {
boolean closeOrderResult = orderService.closeOrder(orderSn);
log.info("Close result: {}", closeOrderResult);
if (closeOrderResult) {
// Release stock after successful close
rabbitTemplate.convertAndSend("stock.exchange", "stock.unlock", orderSn);
} else {
channel.basicAck(deliveryTag, false);
}
} catch (Exception e) {
try { channel.basicReject(deliveryTag, true); }
catch (IOException ex) { log.error("Order ({}) close failed: {}", orderSn, ex.getMessage()); }
}
}
}Order Service Close Method
public boolean closeOrder(String orderSn) {
return this.update(new LambdaUpdateWrapper
()
.eq(OmsOrder::getOrderSn, orderSn)
.eq(OmsOrder::getStatus, OrderStatusEnum.UNPAID.getValue())
.set(OmsOrder::getStatus, OrderStatusEnum.CANCELED.getValue()));
}Stock Release Listener
@Component
@Slf4j
@RequiredArgsConstructor
public class StockReleaseListener {
private final SkuService skuService;
private static final String STOCK_UNLOCK_QUEUE = "stock.unlock.queue";
private static final String STOCK_EXCHANGE = "stock.exchange";
private static final String STOCK_UNLOCK_ROUTING_KEY = "stock.unlock";
@RabbitListener(bindings = @QueueBinding(
value = @Queue(value = STOCK_UNLOCK_QUEUE, durable = "true"),
exchange = @Exchange(value = STOCK_EXCHANGE),
key = {STOCK_UNLOCK_ROUTING_KEY}
), ackMode = "MANUAL")
@RabbitHandler
public void UnlockStock(String orderSn, Message message, Channel channel) {
log.info("Order {} cancelled, releasing stock", orderSn);
long deliveryTag = message.getMessageProperties().getDeliveryTag();
try {
skuService.unlockStock(orderSn);
channel.basicAck(deliveryTag, false);
} catch (Exception e) {
try { channel.basicAck(deliveryTag, true); }
catch (IOException ex) { log.error("Order {} close failed: {}", orderSn, ex.getMessage()); }
}
}
}Order Payment (Illustration)
Java Captain
Focused on Java technologies: SSM, the Spring ecosystem, microservices, MySQL, MyCat, clustering, distributed systems, middleware, Linux, networking, multithreading; occasionally covers DevOps tools like Jenkins, Nexus, Docker, ELK; shares practical tech insights and is dedicated to full‑stack Java development.
How this landed with the community
Was this worth your time?
0 Comments
Thoughtful readers leave field notes, pushback, and hard-won operational detail here.