Spring Cloud Message‑Driven Part 5: High‑Availability RocketMQ Deployment & Message Tracing
This tutorial walks through deploying a highly available RocketMQ cluster with Docker Compose, configuring master‑slave brokers, enabling message tracing, integrating Prometheus‑Grafana monitoring, setting up Spring Boot HA properties, applying performance tweaks, validating failover, and troubleshooting common issues.
1. Objective
The article aims to deploy a RocketMQ cluster with high availability and to enable end‑to‑end message tracing.
2. High‑Availability Architecture Deployment
2.1 Docker Compose Cluster Configuration
# docker-compose-cluster.yml
version: '3.8'
services:
# NameServer cluster
namesrv1:
image: apache/rocketmq:5.1.4
container_name: rmqnamesrv1
ports:
- "9876:9876"
command: sh mqnamesrv
networks:
- rocketmq-cluster
namesrv2:
image: apache/rocketmq:5.1.4
container_name: rmqnamesrv2
ports:
- "9877:9876"
command: sh mqnamesrv
networks:
- rocketmq-cluster
# Broker master node
broker-a-master:
image: apache/rocketmq:5.1.4
container_name: rmqbroker-a-master
ports:
- "10911:10911"
- "10912:10912"
environment:
- NAMESRV_ADDR=namesrv1:9876;namesrv2:9876
volumes:
- ./broker-master.conf:/home/rocketmq/rocketmq-5.1.4/conf/broker.conf
command: sh mqbroker -c /home/rocketmq/rocketmq-5.1.4/conf/broker.conf
depends_on:
- namesrv1
- namesrv2
networks:
- rocketmq-cluster
# Broker slave node
broker-a-slave:
image: apache/rocketmq:5.1.4
container_name: rmqbroker-a-slave
ports:
- "10921:10911"
- "10922:10912"
environment:
- NAMESRV_ADDR=namesrv1:9876;namesrv2:9876
volumes:
- ./broker-slave.conf:/home/rocketmq/rocketmq-5.1.4/conf/broker.conf
command: sh mqbroker -c /home/rocketmq/rocketmq-5.1.4/conf/broker.conf
depends_on:
- namesrv1
- namesrv2
networks:
- rocketmq-cluster
# RocketMQ Dashboard
dashboard:
image: apache/rocketmq-dashboard:latest
container_name: rmqdashboard
ports:
- "8088:8080"
environment:
- JAVA_OPTS=-Drocketmq.namesrv.addr=namesrv1:9876;namesrv2:9876
depends_on:
- namesrv1
- namesrv2
networks:
- rocketmq-cluster
# Prometheus
prometheus:
image: prom/prometheus:latest
container_name: rmq-prometheus
ports:
- "9090:9090"
volumes:
- ./prometheus.yml:/etc/prometheus/prometheus.yml
networks:
- rocketmq-cluster
# Grafana
grafana:
image: grafana/grafana:latest
container_name: rmq-grafana
ports:
- "3000:3000"
environment:
- GF_SECURITY_ADMIN_PASSWORD=admin
networks:
- rocketmq-cluster
networks:
rocketmq-cluster:
driver: bridge2.2 Broker Configuration
# broker-master.conf
brokerClusterName = RocketMQCluster
brokerName = broker-a
brokerId = 0
deleteWhen = 04
fileReservedTime = 48
brokerRole = SYNC_MASTER
flushDiskType = ASYNC_FLUSH
autoCreateTopicEnable = true
autoCreateSubscriptionGroup = true
# Storage paths
storePathRootDir = /home/rocketmq/store
storePathCommitLog = /home/rocketmq/store/commitlog
# Enable message tracing
enableMsgTrace = true
traceTopicName = RMQ_SYS_TRACE_TOPIC
# Host IP (replace with your server IP)
brokerIP1 = 192.168.1.100 # broker-slave.conf
brokerClusterName = RocketMQCluster
brokerName = broker-a
brokerId = 1
deleteWhen = 04
fileReservedTime = 48
brokerRole = SLAVE
flushDiskType = ASYNC_FLUSH
autoCreateTopicEnable = true
autoCreateSubscriptionGroup = true
storePathRootDir = /home/rocketmq/store
storePathCommitLog = /home/rocketmq/store/commitlog
enableMsgTrace = true
traceTopicName = RMQ_SYS_TRACE_TOPIC
brokerIP1 = 192.168.1.1002.3 Prometheus Configuration
# prometheus.yml
global:
scrape_interval: 15s
scrape_configs:
- job_name: 'rocketmq-namesrv'
static_configs:
- targets: ['namesrv1:9876', 'namesrv2:9876']
labels:
component: namesrv
- job_name: 'rocketmq-broker'
static_configs:
- targets: ['broker-a-master:10911', 'broker-a-slave:10911']
labels:
component: broker3. Spring Boot High‑Availability Configuration
# application.yml (HA settings)
rocketmq:
name-server: 192.168.1.100:9876;192.168.1.100:9877
producer:
group: ha-producer-group
send-message-timeout: 3000
retry-times-when-send-failed: 2
enable-msg-trace: true
customized-trace-topic: RMQ_SYS_TRACE_TOPIC
consumer:
enable-msg-trace: true
customized-trace-topic: RMQ_SYS_TRACE_TOPIC4. Message Tracing Implementation
4.1 Producer Configuration
// ProducerTraceConfig.java
package com.teaching.config;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.common.message.Message;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class ProducerTraceConfig {
@Bean
public DefaultMQProducer traceProducer() throws Exception {
DefaultMQProducer producer = new DefaultMQProducer("trace-producer-group");
producer.setNamesrvAddr("192.168.1.100:9876;192.168.1.100:9877");
// Enable message tracing
producer.setEnableMsgTrace(true);
// Set custom trace topic
producer.setCustomizedTraceTopic("RMQ_SYS_TRACE_TOPIC");
producer.start();
return producer;
}
}4.2 Consumer Configuration
// ConsumerTraceConfig.java
package com.teaching.config;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class ConsumerTraceConfig {
@Bean
public DefaultMQPushConsumer traceConsumer() throws Exception {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("trace-consumer-group");
consumer.setNamesrvAddr("192.168.1.100:9876;192.168.1.100:9877");
// Enable message tracing
consumer.setEnableMsgTrace(true);
consumer.setCustomizedTraceTopic("RMQ_SYS_TRACE_TOPIC");
consumer.subscribe("trace-topic", "*");
consumer.registerMessageListener((messages, context) -> {
// Consumption logic
return null;
});
consumer.start();
return consumer;
}
}4.3 Trace Query Service
// TraceQueryService.java
package com.teaching.service;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.common.message.MessageTrace;
import org.apache.rocketmq.tools.admin.DefaultMQAdminExt;
import org.springframework.stereotype.Service;
import java.util.List;
@Service
@Slf4j
public class TraceQueryService {
/** Query message trace */
public void queryTrace(String msgId) {
try {
DefaultMQAdminExt admin = new DefaultMQAdminExt();
admin.setNamesrvAddr("192.168.1.100:9876;192.168.1.100:9877");
admin.start();
// Query message trace by ID
List<MessageTrace> traces = admin.queryMessageTraceByMsgId(msgId);
for (MessageTrace trace : traces) {
log.info("Trace: msgId={}, topic={}, tags={}, storeHost={}, storeTime={}, consumeGroup={}, consumeStatus={}, retryTimes={}",
trace.getMsgId(), trace.getTopic(), trace.getTags(),
trace.getStoreHost(), trace.getStoreTimestamp(),
trace.getConsumerGroup(), trace.getConsumeStatus(), trace.getReconsumeTimes());
}
admin.shutdown();
} catch (Exception e) {
log.error("Failed to query trace", e);
}
}
}5. Monitoring and Alert Configuration
5.1 Custom Metrics
// RocketMQMetrics.java
package com.teaching.metrics;
import io.micrometer.core.instrument.MeterRegistry;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.exception.MQClientException;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
@Component
public class RocketMQMetrics {
private final MeterRegistry meterRegistry;
private final DefaultMQPushConsumer consumer;
public RocketMQMetrics(MeterRegistry meterRegistry, DefaultMQPushConsumer consumer) {
this.meterRegistry = meterRegistry;
this.consumer = consumer;
initMetrics();
}
private void initMetrics() {
// Register backlog metric
meterRegistry.gauge("rocketmq.consumer.backlog", this, RocketMQMetrics::getBacklog);
// Register consumer thread pool size metric
meterRegistry.gauge("rocketmq.consumer.threads", this, RocketMQMetrics::getConsumerThreadCount);
}
@Scheduled(fixedDelay = 30000)
public void collectMetrics() {
try {
long backlog = getBacklog();
log.info("Backlog: {}", backlog);
if (backlog > 10000) {
sendAlert("Message backlog alert", "Current backlog: " + backlog);
}
} catch (Exception e) {
log.error("Metric collection failed", e);
}
}
private long getBacklog() {
// Retrieve backlog (requires actual MQ API call)
try {
return consumer.getDefaultMQPushConsumerImpl()
.getRebalanceImpl()
.getmQClientFactory()
.getMQClientAPIImpl()
.getConsumerStatus("consumer-group", "topic");
} catch (MQClientException e) {
return 0L;
}
}
private int getConsumerThreadCount() {
return consumer.getDefaultMQPushConsumerImpl()
.getConsumeMessageService()
.getCorePoolSize();
}
private void sendAlert(String title, String content) {
// Send DingTalk/email alert
log.warn("{}: {}", title, content);
}
}5.2 DingTalk Alert Service
// DingTalkAlertService.java
package com.teaching.alert;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;
import org.springframework.web.client.RestTemplate;
import java.util.HashMap;
import java.util.Map;
@Service
public class DingTalkAlertService {
@Value("${dingtalk.webhook:}")
private String webhook;
private final RestTemplate restTemplate = new RestTemplate();
public void sendAlert(String title, String message, String level) {
if (webhook.isEmpty()) return;
Map<String, Object> request = new HashMap<>();
request.put("msgtype", "markdown");
Map<String, String> markdown = new HashMap<>();
markdown.put("title", title);
String color = switch (level) {
case "ERROR" -> "🔴";
case "WARN" -> "🟡";
default -> "🔵";
};
markdown.put("text", String.format(
"## %s %s
**Message:** %s
**Level:** %s
**Time:** %s",
color, title, message, level, new java.util.Date()));
request.put("markdown", markdown);
restTemplate.postForObject(webhook, request, String.class);
}
}6. Performance Tuning
6.1 Broker Tuning (production recommendation)
# broker.conf (production settings)
osPageCacheBusyTimeout = 1000
sendMessageThreadPoolNums = 16
pullMessageThreadPoolNums = 16
flushDiskType = ASYNC_FLUSH
brokerRole = ASYNC_MASTER
mapedFileSizeCommitLog = 1073741824
mapedFileSizeConsumeQueue = 300000
enableCalcFilterBitMap = true6.2 JVM Tuning
# JVM options
JAVA_OPTS="-Xms4g -Xmx4g -Xmn2g \
-XX:+UseG1GC \
-XX:MaxGCPauseMillis=200 \
-XX:+PrintGCDetails \
-XX:+PrintGCDateStamps \
-Xloggc:/opt/logs/rocketmq/gc.log"6.3 Producer Optimization
// ProducerOptimizeConfig.java
package com.teaching.config;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class ProducerOptimizeConfig {
@Bean
public RocketMQTemplate optimizedRocketMQTemplate() {
RocketMQTemplate template = new RocketMQTemplate();
// Connection pool configuration
template.setProducer(new DefaultMQProducer("optimized-group"));
template.setSendMessageTimeout(2000);
template.setRetryTimesWhenSendFailed(2);
// Enable batch sending for higher throughput
template.setEnableMsgTrace(true);
return template;
}
}7. High‑Availability Verification
7.1 Start the Cluster
docker-compose -f docker-compose-cluster.yml up -d
# Verify NameServer cluster
curl http://localhost:9876/
curl http://localhost:9877/
# Verify broker master‑slave status
docker exec -it rmqbroker-a-master sh
mqadmin clusterList -n namesrv1:98767.2 Failover Test
# Stop the master broker
docker stop rmqbroker-a-master
# Verify that production and consumption continue (should switch to slave)7.3 View Message Trace
Open the RocketMQ Dashboard at http://localhost:8088, navigate to "Message Trace", input a Message ID, and view the full lifecycle including send time, store time, consume time, and retry count.
8. Common Operational Commands
# View cluster status
mqadmin clusterList -n namesrv1:9876
# List topics
mqadmin topicList -n namesrv1:9876
# Consumer group status
mqadmin consumerProgress -n namesrv1:9876 -g consumer-group
# View backlog
mqadmin consumerProgress -n namesrv1:9876 -g consumer-group -s
# Reset consumption offset
mqadmin resetOffsetByTime -n namesrv1:9876 -g consumer-group -t topic-name -t 1735660800000
# Get broker configuration
mqadmin getBrokerConfig -n namesrv1:9876 -b broker-a9. Common Issues and Solutions
Issue 1: Message trace not recorded
Ensure enableMsgTrace=true and the traceTopicName are correctly configured.
Issue 2: Master‑slave sync delay
Monitor the sync lag using mqadmin clusterList and adjust broker parameters if needed.
Issue 3: Message backlog alerts
Typical remedies:
Increase the number of consumer instances.
Raise the consumer thread pool size.
Optimize consumption logic.
10. Series Summary
The five‑part "Message‑Driven" series covered RocketMQ basics, Spring Cloud Stream integration, transaction messages, ordered messages, and finally high‑availability deployment with tracing and monitoring. The author invites readers to suggest topics for the next series.
Signed-in readers can open the original source through BestHub's protected redirect.
This article has been distilled and summarized from source material, then republished for learning and reference. If you believe it infringes your rights, please contactand we will review it promptly.
Coder Trainee
Experienced in Java and Python, we share and learn together. For submissions or collaborations, DM us.
How this landed with the community
Was this worth your time?
0 Comments
Thoughtful readers leave field notes, pushback, and hard-won operational detail here.
