Implementing Ordered Messages and Dead Letter Queues with Spring Cloud and RocketMQ

This article explains why message ordering is critical for order‑state flows, demonstrates how to use RocketMQ's ordered messaging and dead‑letter queue features in a Spring Cloud project, shows the full project structure, provides runnable code, testing steps, performance numbers, and common pitfalls.

Coder Trainee
Coder Trainee
Coder Trainee
Implementing Ordered Messages and Dead Letter Queues with Spring Cloud and RocketMQ

Goal

Demonstrate ordered messages and dead‑letter queue (DLQ) handling in a Spring Cloud application using RocketMQ.

Ordered Message Scenario

┌─────────────────────────────────────────────────────────────────┐
│            Ordered Message Scenario                           │
├─────────────────────────────────────────────────────────────────┤
│                                                               │
│   Order status flow:                                         │
│   ┌──────────┐   ┌──────────┐   ┌──────────┐   ┌──────────┐ │
│   │ Order   │ → │ Payment │ → │ Shipping│ → │ Complete │ │
│   │ Created │   │ Paid    │   │ Shipped │   │ Done    │ │
│   └──────────┘   └──────────┘   └──────────┘   └──────────┘ │
│                                                               │
│   ❌ If messages are out of order, business logic fails.      │
│   ✅ Solution: route all messages of the same order to the same │
│      queue so they are consumed FIFO.                         │
└───────────────────────────────────────────────────────────────┘

Ordered Message vs Normal Message

Queue selection : Normal – round‑robin or random; Ordered – route by business key.

Consume order : Normal – not guaranteed; Ordered – FIFO guaranteed.

Performance : Normal – high; Ordered – slightly lower due to locking.

Applicable scenarios : Normal – logging, monitoring; Ordered – order processing, inventory, transaction flows.

Dead‑Letter Queue Overview

┌─────────────────────────────────────────────────────────────────┐
│                Dead‑Letter Queue Flow                         │
├─────────────────────────────────────────────────────────────────┤
│   ┌─────────┐   ┌─────────┐   ┌─────────┐   ┌─────────┐   │
│   │ Normal  │ → │ Consume │ → │ Retry   │ → │ Retry   │   │
│   │ Message │   │ Failed │   │ Failed │   │ Exhaust │   │
│   └─────────┘   └─────────┘   └────┬────┘   └────┬────┘   │
│                                 │                │        │
│                                 ▼                ▼        │
│                     ┌───────────────────────────────┐   │
│                     │   Dead‑Letter Queue (DLQ)        │   │
│                     │ • Manual handling                │   │
│                     │ • Data analysis                  │   │
│                     │ • Compensation mechanisms        │   │
│                     └───────────────────────────────┘   │
└─────────────────────────────────────────────────────────────────┘

Project Structure

spring-cloud-mq-ep04/
├── order-producer/          # Ordered‑message producer
│   ├── pom.xml
│   └── src/main/java/.../producer/
│       ├── ProducerApplication.java
│       ├── config/RocketMQConfig.java
│       ├── controller/OrderController.java
│       └── service/OrderEventProducer.java
├── order-consumer/          # Ordered‑message consumer + DLQ
│   ├── pom.xml
│   └── src/main/java/.../consumer/
│       ├── ConsumerApplication.java
│       ├── listener/OrderEventListener.java
│       └── service/OrderProcessService.java
└── docker-compose.yml

Producer Module (Ordered Messages)

pom.xml

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <groupId>com.teaching</groupId>
    <artifactId>order-producer</artifactId>
    <version>1.0.0-SNAPSHOT</version>
    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>3.2.4</version>
        <relativePath/>
    </parent>
    <properties>
        <java.version>17</java.version>
        <rocketmq.version>2.2.3</rocketmq.version>
    </properties>
    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <dependency>
            <groupId>org.apache.rocketmq</groupId>
            <artifactId>rocketmq-spring-boot-starter</artifactId>
            <version>${rocketmq.version}</version>
        </dependency>
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <optional>true</optional>
        </dependency>
    </dependencies>
</project>

application.yml

server:
  port: 8081
spring:
  application:
    name: order-producer
rocketmq:
  name-server: localhost:9876
  producer:
    group: order-sequential-producer
    send-message-timeout: 3000
    retry-times-when-send-failed: 2
logging:
  level:
    com.teaching: DEBUG

OrderEventProducer.java

package com.teaching.producer.service;

import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Service;

import java.time.LocalDateTime;
import java.util.UUID;

@Service
@RequiredArgsConstructor
@Slf4j
public class OrderEventProducer {
    private final RocketMQTemplate rocketMQTemplate;
    private static final String SEQUENTIAL_TOPIC = "order-sequential-topic";

    /** Send ordered message (route by orderId) */
    public void sendOrderEvent(Long orderId, String eventType, String data) {
        OrderEvent event = OrderEvent.builder()
                .messageId(UUID.randomUUID().toString())
                .orderId(orderId)
                .eventType(eventType)
                .data(data)
                .timestamp(LocalDateTime.now())
                .build();
        Message<OrderEvent> message = MessageBuilder
                .withPayload(event)
                .setHeader("ORDER_ID", String.valueOf(orderId))
                .build();
        // ★ Key: use orderId as hashKey so messages of the same order go to the same queue
        SendResult sendResult = rocketMQTemplate.syncSendOrderly(
                SEQUENTIAL_TOPIC,
                message,
                String.valueOf(orderId)  // hashKey
        );
        log.info("Ordered message sent successfully: orderId={}, eventType={}, queueId={}, msgId={}",
                orderId, eventType, sendResult.getMessageQueue().getQueueId(), sendResult.getMsgId());
    }

    @lombok.Data
    @lombok.Builder
    @lombok.NoArgsConstructor
    @lombok.AllArgsConstructor
    public static class OrderEvent {
        private String messageId;
        private Long orderId;
        private String eventType;
        private String data;
        private LocalDateTime timestamp;
    }
}

OrderController.java

package com.teaching.producer.controller;

import com.teaching.producer.service.OrderEventProducer;
import lombok.RequiredArgsConstructor;
import org.springframework.web.bind.annotation.*;

import java.util.HashMap;
import java.util.Map;

@RestController
@RequestMapping("/api/order")
@RequiredArgsConstructor
public class OrderController {
    private final OrderEventProducer orderEventProducer;

    @PostMapping("/event")
    public Map<String, Object> sendOrderEvent(@RequestParam Long orderId,
                                              @RequestParam String eventType,
                                              @RequestParam String data) {
        orderEventProducer.sendOrderEvent(orderId, eventType, data);
        Map<String, Object> result = new HashMap<>();
        result.put("code", 200);
        result.put("message", "Ordered message sent successfully");
        result.put("orderId", orderId);
        result.put("eventType", eventType);
        return result;
    }

    /** Simulate order flow by sending four events sequentially */
    @PostMapping("/flow/{orderId}")
    public Map<String, Object> simulateOrderFlow(@PathVariable Long orderId) {
        orderEventProducer.sendOrderEvent(orderId, "ORDER_CREATED", "订单创建");
        orderEventProducer.sendOrderEvent(orderId, "ORDER_PAID", "订单支付");
        orderEventProducer.sendOrderEvent(orderId, "ORDER_SHIPPED", "订单发货");
        orderEventProducer.sendOrderEvent(orderId, "ORDER_COMPLETED", "订单完成");
        Map<String, Object> result = new HashMap<>();
        result.put("code", 200);
        result.put("message", "Order flow messages sent");
        result.put("orderId", orderId);
        return result;
    }
}

Consumer Module (Ordered Consumption + DLQ)

pom.xml

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <groupId>com.teaching</groupId>
    <artifactId>order-consumer</artifactId>
    <version>1.0.0-SNAPSHOT</version>
    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>3.2.4</version>
        <relativePath/>
    </parent>
    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <dependency>
            <groupId>org.apache.rocketmq</groupId>
            <artifactId>rocketmq-spring-boot-starter</artifactId>
            <version>2.2.3</version>
        </dependency>
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <optional>true</optional>
        </dependency>
    </dependencies>
</project>

application.yml

server:
  port: 8082
spring:
  application:
    name: order-consumer
rocketmq:
  name-server: localhost:9876
logging:
  level:
    com.teaching: DEBUG

OrderEventListener.java (ordered consumption + DLQ handling)

package com.teaching.consumer.listener;

import com.teaching.consumer.service.OrderProcessService;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.spring.annotation.ConsumeMode;
import org.apache.rocketmq.spring.annotation.MessageModel;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Component;

@Slf4j
@Component
@RequiredArgsConstructor
@RocketMQMessageListener(
        topic = "order-sequential-topic",
        consumerGroup = "order-sequential-consumer-group",
        consumeMode = ConsumeMode.ORDERLY,
        messageModel = MessageModel.CLUSTERING,
        maxReconsumeTimes = 3,
        delayLevelWhenNextConsume = 2
)
public class OrderEventListener implements RocketMQListener<OrderEvent> {
    private final OrderProcessService orderProcessService;

    @Override
    public void onMessage(OrderEvent event) {
        log.info("Received ordered message: orderId={}, eventType={}, messageId={}",
                event.getOrderId(), event.getEventType(), event.getMessageId());
        try {
            orderProcessService.processOrderEvent(event);
            log.info("Message processed successfully: orderId={}, eventType={}",
                    event.getOrderId(), event.getEventType());
        } catch (Exception e) {
            log.error("Message processing failed: orderId={}, eventType={}",
                    event.getOrderId(), event.getEventType(), e);
            // Throw to trigger RocketMQ retry → eventually DLQ
            throw new RuntimeException("Message processing failed", e);
        }
    }

    @lombok.Data
    public static class OrderEvent {
        private String messageId;
        private Long orderId;
        private String eventType;
        private String data;
        private String timestamp;
    }
}

OrderProcessService.java (business logic + failure recording)

package com.teaching.consumer.service;

import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;

import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

@Service
@Slf4j
public class OrderProcessService {
    // Simulated order status store
    private final Map<Long, String> orderStatus = new ConcurrentHashMap<>();
    // Record of failed messages for later analysis
    private final Map<String, Integer> failedMessages = new ConcurrentHashMap<>();

    public void processOrderEvent(OrderEvent event) {
        Long orderId = event.getOrderId();
        String eventType = event.getEventType();
        log.info("Processing order event: orderId={}, eventType={}, data={}",
                orderId, eventType, event.getData());
        String currentStatus = orderStatus.get(orderId);
        if ("ORDER_CREATED".equals(eventType)) {
            if (currentStatus == null) {
                orderStatus.put(orderId, "CREATED");
                log.info("Order created successfully: orderId={}", orderId);
            } else {
                log.warn("Order already exists, skip creation: orderId={}, currentStatus={}", orderId, currentStatus);
            }
        } else if ("ORDER_PAID".equals(eventType)) {
            if ("CREATED".equals(currentStatus)) {
                orderStatus.put(orderId, "PAID");
                log.info("Order paid successfully: orderId={}", orderId);
            } else {
                log.error("Invalid order state for payment: orderId={}, currentStatus={}", orderId, currentStatus);
                throw new RuntimeException("Order state invalid for payment");
            }
        } else if ("ORDER_SHIPPED".equals(eventType)) {
            if ("PAID".equals(currentStatus)) {
                orderStatus.put(orderId, "SHIPPED");
                log.info("Order shipped successfully: orderId={}", orderId);
            } else {
                throw new RuntimeException("Order state invalid for shipping");
            }
        } else if ("ORDER_COMPLETED".equals(eventType)) {
            if ("SHIPPED".equals(currentStatus)) {
                orderStatus.put(orderId, "COMPLETED");
                log.info("Order completed: orderId={}", orderId);
            } else {
                throw new RuntimeException("Order state invalid for completion");
            }
        }
    }

    @lombok.Data
    public static class OrderEvent {
        private String messageId;
        private Long orderId;
        private String eventType;
        private String data;
        private String timestamp;
    }
}

DlqEventListener.java (DLQ consumer)

package com.teaching.consumer.listener;

import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Component;

@Slf4j
@Component
@RocketMQMessageListener(
        topic = "%DLQ%order-sequential-consumer-group",
        consumerGroup = "dlq-consumer-group"
)
public class DlqEventListener implements RocketMQListener<DlqEvent> {
    @Override
    public void onMessage(DlqEvent event) {
        log.warn("Received DLQ message: orderId={}, eventType={}, originalMessage={}",
                event.getOrderId(), event.getEventType(), event.getOriginalMessage());
        // 1. Record to DB for manual handling
        // 2. Send alert (e.g., DingTalk, email)
        // 3. Write to compensation table for scheduled retry
        sendAlert(event);
    }

    private void sendAlert(DlqEvent event) {
        log.info("Sending DLQ alert: orderId={}, please handle manually", event.getOrderId());
        // Integration with alert service would go here
    }

    @lombok.Data
    public static class DlqEvent {
        private Long orderId;
        private String eventType;
        private String originalMessage;
        private String timestamp;
    }
}

Testing & Verification

Start RocketMQ: docker-compose up -d Run producer: cd order-producer && mvn spring-boot:run Run consumer: cd order-consumer && mvn spring-boot:run Test ordered flow: curl -X POST "http://localhost:8081/api/order/flow/1001" – observe FIFO processing in consumer logs.

Test disorder (trigger DLQ):

curl -X POST "http://localhost:8081/api/order/event?orderId=2001&eventType=ORDER_PAID&data=支付"

– consumer fails, retries, then lands in DLQ.

Message Retry Configuration (RocketMQ)

Parameter                Description                Default
maxReconsumeTimes        Maximum retry attempts    16
delayLevelWhenNextConsume Retry delay level         3
Level   Interval
1       1s
2       5s
3       10s
4       30s
5       1m
6       2m
7       3m
8       4m
9       7m
10      8m
11      9m
12      10m
13      20m
14      30m
15      1h
16      2h

DLQ Handling Strategies

-- Dead‑letter record table
CREATE TABLE `dead_letter_record` (
    `id` BIGINT NOT NULL AUTO_INCREMENT,
    `order_id` BIGINT NOT NULL,
    `message_id` VARCHAR(100) NOT NULL,
    `event_type` VARCHAR(50) NOT NULL,
    `original_message` TEXT,
    `fail_reason` VARCHAR(500),
    `retry_count` INT DEFAULT 0,
    `status` TINYINT DEFAULT 0 COMMENT '0‑pending 1‑processed 2‑ignored',
    `create_time` DATETIME DEFAULT CURRENT_TIMESTAMP,
    `process_time` DATETIME,
    PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;
// Compensation task (periodic retry of DLQ records)
@Scheduled(cron = "0 0/5 * * * ?")
public void retryDeadLetters() {
    List<DeadLetter> letters = deadLetterRepository.findByStatus(0);
    for (DeadLetter letter : letters) {
        try {
            // Re‑send or compensate
            retryService.retry(letter);
            letter.setStatus(1);
            letter.setProcessTime(LocalDateTime.now());
            deadLetterRepository.save(letter);
        } catch (Exception e) {
            log.error("Compensation failed: {}", letter.getId(), e);
        }
    }
}

Performance Comparison (Ordered vs Normal Messages)

Scenario                     Normal QPS   Ordered QPS   Difference
1 producer, 1 consumer       100,000      60,000        ~40%
Multiple producers/consumers 300,000      150,000       ~50%

Explanation: Ordered messages require a lock to guarantee FIFO order, which introduces overhead and results in lower throughput compared with normal messages.

Common Pitfalls & Solutions

Pitfall 1: Ordered messages are not ordered

Cause: Not using syncSendOrderly or using a non‑unique hashKey.

Solution: Ensure that all messages belonging to the same business key share the same hashKey.

Pitfall 2: DLQ not created

Cause: The consumer group must first encounter a consumption failure; the DLQ is created automatically only after that.

Pitfall 3: Retry count configuration ineffective

Solution: Set maxReconsumeTimes correctly inside the @RocketMQMessageListener annotation.

Original Source

Signed-in readers can open the original source through BestHub's protected redirect.

Sign in to view source
Republication Notice

This article has been distilled and summarized from source material, then republished for learning and reference. If you believe it infringes your rights, please contactadmin@besthub.devand we will review it promptly.

microservicesSpring BootrocketmqSpring Cloudordered-messagesdead-letter-queuemessage-retry
Coder Trainee
Written by

Coder Trainee

Experienced in Java and Python, we share and learn together. For submissions or collaborations, DM us.

0 followers
Reader feedback

How this landed with the community

Sign in to like

Rate this article

Was this worth your time?

Sign in to rate
Discussion

0 Comments

Thoughtful readers leave field notes, pushback, and hard-won operational detail here.