Implementing Ordered Messages and Dead Letter Queues with Spring Cloud and RocketMQ
This article explains why message ordering is critical for order‑state flows, demonstrates how to use RocketMQ's ordered messaging and dead‑letter queue features in a Spring Cloud project, shows the full project structure, provides runnable code, testing steps, performance numbers, and common pitfalls.
Goal
Demonstrate ordered messages and dead‑letter queue (DLQ) handling in a Spring Cloud application using RocketMQ.
Ordered Message Scenario
┌─────────────────────────────────────────────────────────────────┐
│ Ordered Message Scenario │
├─────────────────────────────────────────────────────────────────┤
│ │
│ Order status flow: │
│ ┌──────────┐ ┌──────────┐ ┌──────────┐ ┌──────────┐ │
│ │ Order │ → │ Payment │ → │ Shipping│ → │ Complete │ │
│ │ Created │ │ Paid │ │ Shipped │ │ Done │ │
│ └──────────┘ └──────────┘ └──────────┘ └──────────┘ │
│ │
│ ❌ If messages are out of order, business logic fails. │
│ ✅ Solution: route all messages of the same order to the same │
│ queue so they are consumed FIFO. │
└───────────────────────────────────────────────────────────────┘Ordered Message vs Normal Message
Queue selection : Normal – round‑robin or random; Ordered – route by business key.
Consume order : Normal – not guaranteed; Ordered – FIFO guaranteed.
Performance : Normal – high; Ordered – slightly lower due to locking.
Applicable scenarios : Normal – logging, monitoring; Ordered – order processing, inventory, transaction flows.
Dead‑Letter Queue Overview
┌─────────────────────────────────────────────────────────────────┐
│ Dead‑Letter Queue Flow │
├─────────────────────────────────────────────────────────────────┤
│ ┌─────────┐ ┌─────────┐ ┌─────────┐ ┌─────────┐ │
│ │ Normal │ → │ Consume │ → │ Retry │ → │ Retry │ │
│ │ Message │ │ Failed │ │ Failed │ │ Exhaust │ │
│ └─────────┘ └─────────┘ └────┬────┘ └────┬────┘ │
│ │ │ │
│ ▼ ▼ │
│ ┌───────────────────────────────┐ │
│ │ Dead‑Letter Queue (DLQ) │ │
│ │ • Manual handling │ │
│ │ • Data analysis │ │
│ │ • Compensation mechanisms │ │
│ └───────────────────────────────┘ │
└─────────────────────────────────────────────────────────────────┘Project Structure
spring-cloud-mq-ep04/
├── order-producer/ # Ordered‑message producer
│ ├── pom.xml
│ └── src/main/java/.../producer/
│ ├── ProducerApplication.java
│ ├── config/RocketMQConfig.java
│ ├── controller/OrderController.java
│ └── service/OrderEventProducer.java
├── order-consumer/ # Ordered‑message consumer + DLQ
│ ├── pom.xml
│ └── src/main/java/.../consumer/
│ ├── ConsumerApplication.java
│ ├── listener/OrderEventListener.java
│ └── service/OrderProcessService.java
└── docker-compose.ymlProducer Module (Ordered Messages)
pom.xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.teaching</groupId>
<artifactId>order-producer</artifactId>
<version>1.0.0-SNAPSHOT</version>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>3.2.4</version>
<relativePath/>
</parent>
<properties>
<java.version>17</java.version>
<rocketmq.version>2.2.3</rocketmq.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
<version>${rocketmq.version}</version>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
</dependencies>
</project>application.yml
server:
port: 8081
spring:
application:
name: order-producer
rocketmq:
name-server: localhost:9876
producer:
group: order-sequential-producer
send-message-timeout: 3000
retry-times-when-send-failed: 2
logging:
level:
com.teaching: DEBUGOrderEventProducer.java
package com.teaching.producer.service;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Service;
import java.time.LocalDateTime;
import java.util.UUID;
@Service
@RequiredArgsConstructor
@Slf4j
public class OrderEventProducer {
private final RocketMQTemplate rocketMQTemplate;
private static final String SEQUENTIAL_TOPIC = "order-sequential-topic";
/** Send ordered message (route by orderId) */
public void sendOrderEvent(Long orderId, String eventType, String data) {
OrderEvent event = OrderEvent.builder()
.messageId(UUID.randomUUID().toString())
.orderId(orderId)
.eventType(eventType)
.data(data)
.timestamp(LocalDateTime.now())
.build();
Message<OrderEvent> message = MessageBuilder
.withPayload(event)
.setHeader("ORDER_ID", String.valueOf(orderId))
.build();
// ★ Key: use orderId as hashKey so messages of the same order go to the same queue
SendResult sendResult = rocketMQTemplate.syncSendOrderly(
SEQUENTIAL_TOPIC,
message,
String.valueOf(orderId) // hashKey
);
log.info("Ordered message sent successfully: orderId={}, eventType={}, queueId={}, msgId={}",
orderId, eventType, sendResult.getMessageQueue().getQueueId(), sendResult.getMsgId());
}
@lombok.Data
@lombok.Builder
@lombok.NoArgsConstructor
@lombok.AllArgsConstructor
public static class OrderEvent {
private String messageId;
private Long orderId;
private String eventType;
private String data;
private LocalDateTime timestamp;
}
}OrderController.java
package com.teaching.producer.controller;
import com.teaching.producer.service.OrderEventProducer;
import lombok.RequiredArgsConstructor;
import org.springframework.web.bind.annotation.*;
import java.util.HashMap;
import java.util.Map;
@RestController
@RequestMapping("/api/order")
@RequiredArgsConstructor
public class OrderController {
private final OrderEventProducer orderEventProducer;
@PostMapping("/event")
public Map<String, Object> sendOrderEvent(@RequestParam Long orderId,
@RequestParam String eventType,
@RequestParam String data) {
orderEventProducer.sendOrderEvent(orderId, eventType, data);
Map<String, Object> result = new HashMap<>();
result.put("code", 200);
result.put("message", "Ordered message sent successfully");
result.put("orderId", orderId);
result.put("eventType", eventType);
return result;
}
/** Simulate order flow by sending four events sequentially */
@PostMapping("/flow/{orderId}")
public Map<String, Object> simulateOrderFlow(@PathVariable Long orderId) {
orderEventProducer.sendOrderEvent(orderId, "ORDER_CREATED", "订单创建");
orderEventProducer.sendOrderEvent(orderId, "ORDER_PAID", "订单支付");
orderEventProducer.sendOrderEvent(orderId, "ORDER_SHIPPED", "订单发货");
orderEventProducer.sendOrderEvent(orderId, "ORDER_COMPLETED", "订单完成");
Map<String, Object> result = new HashMap<>();
result.put("code", 200);
result.put("message", "Order flow messages sent");
result.put("orderId", orderId);
return result;
}
}Consumer Module (Ordered Consumption + DLQ)
pom.xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.teaching</groupId>
<artifactId>order-consumer</artifactId>
<version>1.0.0-SNAPSHOT</version>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>3.2.4</version>
<relativePath/>
</parent>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
<version>2.2.3</version>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
</dependencies>
</project>application.yml
server:
port: 8082
spring:
application:
name: order-consumer
rocketmq:
name-server: localhost:9876
logging:
level:
com.teaching: DEBUGOrderEventListener.java (ordered consumption + DLQ handling)
package com.teaching.consumer.listener;
import com.teaching.consumer.service.OrderProcessService;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.spring.annotation.ConsumeMode;
import org.apache.rocketmq.spring.annotation.MessageModel;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Component;
@Slf4j
@Component
@RequiredArgsConstructor
@RocketMQMessageListener(
topic = "order-sequential-topic",
consumerGroup = "order-sequential-consumer-group",
consumeMode = ConsumeMode.ORDERLY,
messageModel = MessageModel.CLUSTERING,
maxReconsumeTimes = 3,
delayLevelWhenNextConsume = 2
)
public class OrderEventListener implements RocketMQListener<OrderEvent> {
private final OrderProcessService orderProcessService;
@Override
public void onMessage(OrderEvent event) {
log.info("Received ordered message: orderId={}, eventType={}, messageId={}",
event.getOrderId(), event.getEventType(), event.getMessageId());
try {
orderProcessService.processOrderEvent(event);
log.info("Message processed successfully: orderId={}, eventType={}",
event.getOrderId(), event.getEventType());
} catch (Exception e) {
log.error("Message processing failed: orderId={}, eventType={}",
event.getOrderId(), event.getEventType(), e);
// Throw to trigger RocketMQ retry → eventually DLQ
throw new RuntimeException("Message processing failed", e);
}
}
@lombok.Data
public static class OrderEvent {
private String messageId;
private Long orderId;
private String eventType;
private String data;
private String timestamp;
}
}OrderProcessService.java (business logic + failure recording)
package com.teaching.consumer.service;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
@Service
@Slf4j
public class OrderProcessService {
// Simulated order status store
private final Map<Long, String> orderStatus = new ConcurrentHashMap<>();
// Record of failed messages for later analysis
private final Map<String, Integer> failedMessages = new ConcurrentHashMap<>();
public void processOrderEvent(OrderEvent event) {
Long orderId = event.getOrderId();
String eventType = event.getEventType();
log.info("Processing order event: orderId={}, eventType={}, data={}",
orderId, eventType, event.getData());
String currentStatus = orderStatus.get(orderId);
if ("ORDER_CREATED".equals(eventType)) {
if (currentStatus == null) {
orderStatus.put(orderId, "CREATED");
log.info("Order created successfully: orderId={}", orderId);
} else {
log.warn("Order already exists, skip creation: orderId={}, currentStatus={}", orderId, currentStatus);
}
} else if ("ORDER_PAID".equals(eventType)) {
if ("CREATED".equals(currentStatus)) {
orderStatus.put(orderId, "PAID");
log.info("Order paid successfully: orderId={}", orderId);
} else {
log.error("Invalid order state for payment: orderId={}, currentStatus={}", orderId, currentStatus);
throw new RuntimeException("Order state invalid for payment");
}
} else if ("ORDER_SHIPPED".equals(eventType)) {
if ("PAID".equals(currentStatus)) {
orderStatus.put(orderId, "SHIPPED");
log.info("Order shipped successfully: orderId={}", orderId);
} else {
throw new RuntimeException("Order state invalid for shipping");
}
} else if ("ORDER_COMPLETED".equals(eventType)) {
if ("SHIPPED".equals(currentStatus)) {
orderStatus.put(orderId, "COMPLETED");
log.info("Order completed: orderId={}", orderId);
} else {
throw new RuntimeException("Order state invalid for completion");
}
}
}
@lombok.Data
public static class OrderEvent {
private String messageId;
private Long orderId;
private String eventType;
private String data;
private String timestamp;
}
}DlqEventListener.java (DLQ consumer)
package com.teaching.consumer.listener;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Component;
@Slf4j
@Component
@RocketMQMessageListener(
topic = "%DLQ%order-sequential-consumer-group",
consumerGroup = "dlq-consumer-group"
)
public class DlqEventListener implements RocketMQListener<DlqEvent> {
@Override
public void onMessage(DlqEvent event) {
log.warn("Received DLQ message: orderId={}, eventType={}, originalMessage={}",
event.getOrderId(), event.getEventType(), event.getOriginalMessage());
// 1. Record to DB for manual handling
// 2. Send alert (e.g., DingTalk, email)
// 3. Write to compensation table for scheduled retry
sendAlert(event);
}
private void sendAlert(DlqEvent event) {
log.info("Sending DLQ alert: orderId={}, please handle manually", event.getOrderId());
// Integration with alert service would go here
}
@lombok.Data
public static class DlqEvent {
private Long orderId;
private String eventType;
private String originalMessage;
private String timestamp;
}
}Testing & Verification
Start RocketMQ: docker-compose up -d Run producer: cd order-producer && mvn spring-boot:run Run consumer: cd order-consumer && mvn spring-boot:run Test ordered flow: curl -X POST "http://localhost:8081/api/order/flow/1001" – observe FIFO processing in consumer logs.
Test disorder (trigger DLQ):
curl -X POST "http://localhost:8081/api/order/event?orderId=2001&eventType=ORDER_PAID&data=支付"– consumer fails, retries, then lands in DLQ.
Message Retry Configuration (RocketMQ)
Parameter Description Default
maxReconsumeTimes Maximum retry attempts 16
delayLevelWhenNextConsume Retry delay level 3 Level Interval
1 1s
2 5s
3 10s
4 30s
5 1m
6 2m
7 3m
8 4m
9 7m
10 8m
11 9m
12 10m
13 20m
14 30m
15 1h
16 2hDLQ Handling Strategies
-- Dead‑letter record table
CREATE TABLE `dead_letter_record` (
`id` BIGINT NOT NULL AUTO_INCREMENT,
`order_id` BIGINT NOT NULL,
`message_id` VARCHAR(100) NOT NULL,
`event_type` VARCHAR(50) NOT NULL,
`original_message` TEXT,
`fail_reason` VARCHAR(500),
`retry_count` INT DEFAULT 0,
`status` TINYINT DEFAULT 0 COMMENT '0‑pending 1‑processed 2‑ignored',
`create_time` DATETIME DEFAULT CURRENT_TIMESTAMP,
`process_time` DATETIME,
PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4; // Compensation task (periodic retry of DLQ records)
@Scheduled(cron = "0 0/5 * * * ?")
public void retryDeadLetters() {
List<DeadLetter> letters = deadLetterRepository.findByStatus(0);
for (DeadLetter letter : letters) {
try {
// Re‑send or compensate
retryService.retry(letter);
letter.setStatus(1);
letter.setProcessTime(LocalDateTime.now());
deadLetterRepository.save(letter);
} catch (Exception e) {
log.error("Compensation failed: {}", letter.getId(), e);
}
}
}Performance Comparison (Ordered vs Normal Messages)
Scenario Normal QPS Ordered QPS Difference
1 producer, 1 consumer 100,000 60,000 ~40%
Multiple producers/consumers 300,000 150,000 ~50%
Explanation: Ordered messages require a lock to guarantee FIFO order, which introduces overhead and results in lower throughput compared with normal messages.Common Pitfalls & Solutions
Pitfall 1: Ordered messages are not ordered
Cause: Not using syncSendOrderly or using a non‑unique hashKey.
Solution: Ensure that all messages belonging to the same business key share the same hashKey.
Pitfall 2: DLQ not created
Cause: The consumer group must first encounter a consumption failure; the DLQ is created automatically only after that.
Pitfall 3: Retry count configuration ineffective
Solution: Set maxReconsumeTimes correctly inside the @RocketMQMessageListener annotation.
Signed-in readers can open the original source through BestHub's protected redirect.
This article has been distilled and summarized from source material, then republished for learning and reference. If you believe it infringes your rights, please contactand we will review it promptly.
Coder Trainee
Experienced in Java and Python, we share and learn together. For submissions or collaborations, DM us.
How this landed with the community
Was this worth your time?
0 Comments
Thoughtful readers leave field notes, pushback, and hard-won operational detail here.
