How to Use RocketMQ with Spring Cloud Stream: A Beginner’s Guide
This tutorial explains why microservices need a message queue, outlines RocketMQ’s core concepts, shows how to set up RocketMQ with Docker Compose, integrates it into Spring Boot projects with detailed producer and consumer code, provides testing steps, and lists common pitfalls and solutions.
1. Why a Message Queue?
Microservice calls that rely on synchronous RPC (e.g., Feign) suffer from tight coupling, poor performance, and inability to handle traffic spikes because each service must wait for downstream responses.
Problems
Severe coupling – a downstream failure blocks the upstream service.
Performance – total latency equals the sum of all service latencies.
Traffic spikes – sudden peaks can overwhelm downstream services.
No buffering – the system cannot smooth burst traffic.
Benefits After Introducing a Message Queue
Decoupling – the producer does not need to know who consumes the message.
Peak‑shaving – the queue buffers traffic and smooths spikes.
Asynchrony – the producer can return immediately after sending.
Reliability – messages are persisted and not lost.
2. Core Concepts of RocketMQ
Producer : the message sender.
Consumer : the message receiver.
Topic : logical channel for categorizing messages.
Message : the payload.
Broker : the server that stores and forwards messages.
NameServer : the routing registry.
3. Environment Setup
Use Docker Compose to launch a NameServer, a Broker, and the RocketMQ Dashboard.
version: '3.8'
services:
namesrv:
image: apache/rocketmq:5.1.4
container_name: rmqnamesrv
ports:
- "9876:9876"
command: sh mqnamesrv
networks:
- rocketmq-network
broker:
image: apache/rocketmq:5.1.4
container_name: rmqbroker
ports:
- "10911:10911"
- "10912:10912"
environment:
- NAMESRV_ADDR=namesrv:9876
volumes:
- ./broker.conf:/home/rocketmq/rocketmq-5.1.4/conf/broker.conf
command: sh mqbroker -c /home/rocketmq/rocketmq-5.1.4/conf/broker.conf
depends_on:
- namesrv
networks:
- rocketmq-network
dashboard:
image: apache/rocketmq-dashboard:latest
container_name: rmqdashboard
ports:
- "8088:8080"
environment:
- JAVA_OPTS=-Drocketmq.namesrv.addr=namesrv:9876
depends_on:
- namesrv
networks:
- rocketmq-network
networks:
rocketmq-network:
driver: bridgeBroker configuration (broker.conf) must contain the correct host IP, e.g.:
brokerClusterName = DefaultCluster
brokerName = broker-a
brokerId = 0
deleteWhen = 04
fileReservedTime = 48
brokerRole = ASYNC_MASTER
flushDiskType = ASYNC_FLUSH
autoCreateTopicEnable = true
autoCreateSubscriptionGroup = true
brokerIP1 = 192.168.1.1004. Spring Boot Integration
Project Structure
spring-cloud-mq-ep01/
├── mq-producer/ # message producer
│ ├── pom.xml
│ └── src/main/java/.../producer/
│ ├── ProducerApplication.java
│ ├── config/RocketMQConfig.java
│ ├── controller/MessageController.java
│ └── service/OrderEventProducer.java
├── mq-consumer/ # message consumer
│ ├── pom.xml
│ └── src/main/java/.../consumer/
│ ├── ConsumerApplication.java
│ ├── config/RocketMQConfig.java
│ └── listener/OrderEventListener.java
└── docker-compose.ymlParent POM Dependencies
<project xmlns="http://maven.apache.org/POM/4.0.0" ...>
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>3.2.4</version>
</parent>
<properties>
<java.version>17</java.version>
<rocketmq.version>2.2.3</rocketmq.version>
</properties>
<modules>
<module>mq-producer</module>
<module>mq-consumer</module>
</modules>
<dependencyManagement>
<dependencies>
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
<version>${rocketmq.version}</version>
</dependency>
</dependencies>
</dependencyManagement>
</project>Producer Module
Key configuration (application.yml):
server:
port: 8081
spring:
application:
name: mq-producer
rocketmq:
name-server: localhost:9876
producer:
group: order-producer-group
send-message-timeout: 3000
retry-times-when-send-failed: 2Producer entry point:
package com.teaching.producer;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
@SpringBootApplication
public class ProducerApplication {
public static void main(String[] args) {
SpringApplication.run(ProducerApplication.class, args);
System.out.println("✅ 消息生产者启动成功!");
}
}OrderEventProducer demonstrates synchronous, asynchronous, and delayed sending using RocketMQTemplate and MessageBuilder:
public void sendOrderCreatedMessage(Long orderId, Long userId, Long productId, Integer quantity) {
OrderMessage messageBody = new OrderMessage(orderId, userId, productId, quantity, "CREATED");
Message<OrderMessage> message = MessageBuilder.withPayload(messageBody)
.setHeader("messageId", UUID.randomUUID().toString())
.build();
rocketMQTemplate.syncSend(ORDER_TOPIC, message);
log.info("消息发送成功: orderId={}", orderId);
}
public void sendOrderCreatedMessageAsync(Long orderId, Long userId, Long productId, Integer quantity) {
OrderMessage messageBody = new OrderMessage(orderId, userId, productId, quantity, "CREATED");
rocketMQTemplate.asyncSend(ORDER_TOPIC, messageBody, sendResult -> {
if (sendResult.getSendStatus().name().equals("SEND_OK")) {
log.info("异步消息发送成功: orderId={}", orderId);
} else {
log.error("异步消息发送失败: orderId={}", orderId);
}
});
}
public void sendDelayMessage(Long orderId, int delayLevel) {
OrderMessage messageBody = new OrderMessage(orderId, 1L, 1L, 1, "DELAY");
rocketMQTemplate.syncSend(ORDER_TOPIC, MessageBuilder.withPayload(messageBody).build(), 3000, delayLevel);
log.info("延迟消息已发送: orderId={}, delayLevel={}", orderId, delayLevel);
}REST controller exposes three endpoints for normal, async, and delayed messages.
Consumer Module
Consumer configuration (application.yml) mirrors the producer’s name‑server address.
server:
port: 8082
spring:
application:
name: mq-consumer
rocketmq:
name-server: localhost:9876Listener implementation processes the incoming map, logs the payload, and simulates business handling with a short sleep.
@Component
@RocketMQMessageListener(topic = "order-topic", consumerGroup = "order-consumer-group", selectorExpression = "*")
public class OrderEventListener implements RocketMQListener<Map<String, Object>> {
@Override
public void onMessage(Map<String, Object> message) {
log.info("收到订单消息: {}", message);
Long orderId = (Long) message.get("orderId");
// ... extract other fields ...
log.info("处理订单事件: orderId={}, userId={}, productId={}, quantity={}, status={}",
orderId, userId, productId, quantity, status);
try {
Thread.sleep(100);
log.info("订单事件处理完成: orderId={}", orderId);
} catch (InterruptedException e) {
log.error("处理失败", e);
}
}
}5. Testing and Verification
Start the infrastructure and applications:
# Start RocketMQ stack
cd spring-cloud-mq-ep01
docker-compose up -d
# Start producer
cd mq-producer
mvn spring-boot:run
# Start consumer
cd ../mq-consumer
mvn spring-boot:runSend test messages with curl:
# Normal order message
curl -X POST "http://localhost:8081/api/message/order?orderId=1001&userId=1&productId=100&quantity=2"
# Asynchronous message
curl -X POST "http://localhost:8081/api/message/order/async?orderId=1002"
# Delayed message (delayLevel=3 ≈ 10 s)
curl -X POST "http://localhost:8081/api/message/order/delay?orderId=1003&delayLevel=3"Consumer logs show receipt and processing, e.g.:
2025-06-01 10:30:15.123 INFO 收到订单消息: {orderId=1001, userId=1, productId=100, quantity=2, status=CREATED}
2025-06-01 10:30:15.124 INFO 处理订单事件: orderId=1001, userId=1, productId=100, quantity=2, status=CREATED
2025-06-01 10:30:15.225 INFO 订单事件处理完成: orderId=10016. Common Issues and Pitfalls
Pitfall 1 – Broker IP Misconfiguration
Symptom: producer cannot connect to the broker.
Solution: set the correct brokerIP1 in broker.conf.
Pitfall 2 – Message Send Timeout
Symptom: messages fail to send.
Solution: increase rocketmq.producer.send-message-timeout, e.g. to 5000 ms.
rocketmq:
producer:
send-message-timeout: 5000Pitfall 3 – Consumer Receives No Messages
Checklist:
Verify the topic exists.
Ensure the consumer group matches the producer’s configuration.
Confirm the NameServer address is correct.
7. Next Episode Preview
The upcoming article, "Spring Cloud Messaging (Part 2): Spring Cloud Stream Unified Programming Model", will cover the abstraction layer provided by Spring Cloud Stream, binder configuration, unified programming concepts, and multi‑broker switching.
Signed-in readers can open the original source through BestHub's protected redirect.
This article has been distilled and summarized from source material, then republished for learning and reference. If you believe it infringes your rights, please contactand we will review it promptly.
Coder Trainee
Experienced in Java and Python, we share and learn together. For submissions or collaborations, DM us.
How this landed with the community
Was this worth your time?
0 Comments
Thoughtful readers leave field notes, pushback, and hard-won operational detail here.
