Spring Cloud Message‑Driven Part 5: High‑Availability RocketMQ Deployment & Message Tracing

This tutorial walks through deploying a highly available RocketMQ cluster with Docker Compose, configuring master‑slave brokers, enabling message tracing, integrating Prometheus‑Grafana monitoring, setting up Spring Boot HA properties, applying performance tweaks, validating failover, and troubleshooting common issues.

Coder Trainee
Coder Trainee
Coder Trainee
Spring Cloud Message‑Driven Part 5: High‑Availability RocketMQ Deployment & Message Tracing

1. Objective

The article aims to deploy a RocketMQ cluster with high availability and to enable end‑to‑end message tracing.

2. High‑Availability Architecture Deployment

2.1 Docker Compose Cluster Configuration

# docker-compose-cluster.yml
version: '3.8'
services:
  # NameServer cluster
  namesrv1:
    image: apache/rocketmq:5.1.4
    container_name: rmqnamesrv1
    ports:
      - "9876:9876"
    command: sh mqnamesrv
    networks:
      - rocketmq-cluster

  namesrv2:
    image: apache/rocketmq:5.1.4
    container_name: rmqnamesrv2
    ports:
      - "9877:9876"
    command: sh mqnamesrv
    networks:
      - rocketmq-cluster

  # Broker master node
  broker-a-master:
    image: apache/rocketmq:5.1.4
    container_name: rmqbroker-a-master
    ports:
      - "10911:10911"
      - "10912:10912"
    environment:
      - NAMESRV_ADDR=namesrv1:9876;namesrv2:9876
    volumes:
      - ./broker-master.conf:/home/rocketmq/rocketmq-5.1.4/conf/broker.conf
    command: sh mqbroker -c /home/rocketmq/rocketmq-5.1.4/conf/broker.conf
    depends_on:
      - namesrv1
      - namesrv2
    networks:
      - rocketmq-cluster

  # Broker slave node
  broker-a-slave:
    image: apache/rocketmq:5.1.4
    container_name: rmqbroker-a-slave
    ports:
      - "10921:10911"
      - "10922:10912"
    environment:
      - NAMESRV_ADDR=namesrv1:9876;namesrv2:9876
    volumes:
      - ./broker-slave.conf:/home/rocketmq/rocketmq-5.1.4/conf/broker.conf
    command: sh mqbroker -c /home/rocketmq/rocketmq-5.1.4/conf/broker.conf
    depends_on:
      - namesrv1
      - namesrv2
    networks:
      - rocketmq-cluster

  # RocketMQ Dashboard
  dashboard:
    image: apache/rocketmq-dashboard:latest
    container_name: rmqdashboard
    ports:
      - "8088:8080"
    environment:
      - JAVA_OPTS=-Drocketmq.namesrv.addr=namesrv1:9876;namesrv2:9876
    depends_on:
      - namesrv1
      - namesrv2
    networks:
      - rocketmq-cluster

  # Prometheus
  prometheus:
    image: prom/prometheus:latest
    container_name: rmq-prometheus
    ports:
      - "9090:9090"
    volumes:
      - ./prometheus.yml:/etc/prometheus/prometheus.yml
    networks:
      - rocketmq-cluster

  # Grafana
  grafana:
    image: grafana/grafana:latest
    container_name: rmq-grafana
    ports:
      - "3000:3000"
    environment:
      - GF_SECURITY_ADMIN_PASSWORD=admin
    networks:
      - rocketmq-cluster

networks:
  rocketmq-cluster:
    driver: bridge

2.2 Broker Configuration

# broker-master.conf
brokerClusterName = RocketMQCluster
brokerName = broker-a
brokerId = 0
deleteWhen = 04
fileReservedTime = 48
brokerRole = SYNC_MASTER
flushDiskType = ASYNC_FLUSH
autoCreateTopicEnable = true
autoCreateSubscriptionGroup = true

# Storage paths
storePathRootDir = /home/rocketmq/store
storePathCommitLog = /home/rocketmq/store/commitlog

# Enable message tracing
enableMsgTrace = true
traceTopicName = RMQ_SYS_TRACE_TOPIC

# Host IP (replace with your server IP)
brokerIP1 = 192.168.1.100
# broker-slave.conf
brokerClusterName = RocketMQCluster
brokerName = broker-a
brokerId = 1
deleteWhen = 04
fileReservedTime = 48
brokerRole = SLAVE
flushDiskType = ASYNC_FLUSH
autoCreateTopicEnable = true
autoCreateSubscriptionGroup = true

storePathRootDir = /home/rocketmq/store
storePathCommitLog = /home/rocketmq/store/commitlog

enableMsgTrace = true
traceTopicName = RMQ_SYS_TRACE_TOPIC

brokerIP1 = 192.168.1.100

2.3 Prometheus Configuration

# prometheus.yml
global:
  scrape_interval: 15s

scrape_configs:
  - job_name: 'rocketmq-namesrv'
    static_configs:
      - targets: ['namesrv1:9876', 'namesrv2:9876']
    labels:
      component: namesrv

  - job_name: 'rocketmq-broker'
    static_configs:
      - targets: ['broker-a-master:10911', 'broker-a-slave:10911']
    labels:
      component: broker

3. Spring Boot High‑Availability Configuration

# application.yml (HA settings)
rocketmq:
  name-server: 192.168.1.100:9876;192.168.1.100:9877
  producer:
    group: ha-producer-group
    send-message-timeout: 3000
    retry-times-when-send-failed: 2
    enable-msg-trace: true
    customized-trace-topic: RMQ_SYS_TRACE_TOPIC
  consumer:
    enable-msg-trace: true
    customized-trace-topic: RMQ_SYS_TRACE_TOPIC

4. Message Tracing Implementation

4.1 Producer Configuration

// ProducerTraceConfig.java
package com.teaching.config;

import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.common.message.Message;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class ProducerTraceConfig {

    @Bean
    public DefaultMQProducer traceProducer() throws Exception {
        DefaultMQProducer producer = new DefaultMQProducer("trace-producer-group");
        producer.setNamesrvAddr("192.168.1.100:9876;192.168.1.100:9877");
        // Enable message tracing
        producer.setEnableMsgTrace(true);
        // Set custom trace topic
        producer.setCustomizedTraceTopic("RMQ_SYS_TRACE_TOPIC");
        producer.start();
        return producer;
    }
}

4.2 Consumer Configuration

// ConsumerTraceConfig.java
package com.teaching.config;

import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class ConsumerTraceConfig {

    @Bean
    public DefaultMQPushConsumer traceConsumer() throws Exception {
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("trace-consumer-group");
        consumer.setNamesrvAddr("192.168.1.100:9876;192.168.1.100:9877");
        // Enable message tracing
        consumer.setEnableMsgTrace(true);
        consumer.setCustomizedTraceTopic("RMQ_SYS_TRACE_TOPIC");
        consumer.subscribe("trace-topic", "*");
        consumer.registerMessageListener((messages, context) -> {
            // Consumption logic
            return null;
        });
        consumer.start();
        return consumer;
    }
}

4.3 Trace Query Service

// TraceQueryService.java
package com.teaching.service;

import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.common.message.MessageTrace;
import org.apache.rocketmq.tools.admin.DefaultMQAdminExt;
import org.springframework.stereotype.Service;

import java.util.List;

@Service
@Slf4j
public class TraceQueryService {

    /** Query message trace */
    public void queryTrace(String msgId) {
        try {
            DefaultMQAdminExt admin = new DefaultMQAdminExt();
            admin.setNamesrvAddr("192.168.1.100:9876;192.168.1.100:9877");
            admin.start();

            // Query message trace by ID
            List<MessageTrace> traces = admin.queryMessageTraceByMsgId(msgId);
            for (MessageTrace trace : traces) {
                log.info("Trace: msgId={}, topic={}, tags={}, storeHost={}, storeTime={}, consumeGroup={}, consumeStatus={}, retryTimes={}",
                        trace.getMsgId(), trace.getTopic(), trace.getTags(),
                        trace.getStoreHost(), trace.getStoreTimestamp(),
                        trace.getConsumerGroup(), trace.getConsumeStatus(), trace.getReconsumeTimes());
            }
            admin.shutdown();
        } catch (Exception e) {
            log.error("Failed to query trace", e);
        }
    }
}

5. Monitoring and Alert Configuration

5.1 Custom Metrics

// RocketMQMetrics.java
package com.teaching.metrics;

import io.micrometer.core.instrument.MeterRegistry;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.exception.MQClientException;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;

@Component
public class RocketMQMetrics {

    private final MeterRegistry meterRegistry;
    private final DefaultMQPushConsumer consumer;

    public RocketMQMetrics(MeterRegistry meterRegistry, DefaultMQPushConsumer consumer) {
        this.meterRegistry = meterRegistry;
        this.consumer = consumer;
        initMetrics();
    }

    private void initMetrics() {
        // Register backlog metric
        meterRegistry.gauge("rocketmq.consumer.backlog", this, RocketMQMetrics::getBacklog);
        // Register consumer thread pool size metric
        meterRegistry.gauge("rocketmq.consumer.threads", this, RocketMQMetrics::getConsumerThreadCount);
    }

    @Scheduled(fixedDelay = 30000)
    public void collectMetrics() {
        try {
            long backlog = getBacklog();
            log.info("Backlog: {}", backlog);
            if (backlog > 10000) {
                sendAlert("Message backlog alert", "Current backlog: " + backlog);
            }
        } catch (Exception e) {
            log.error("Metric collection failed", e);
        }
    }

    private long getBacklog() {
        // Retrieve backlog (requires actual MQ API call)
        try {
            return consumer.getDefaultMQPushConsumerImpl()
                    .getRebalanceImpl()
                    .getmQClientFactory()
                    .getMQClientAPIImpl()
                    .getConsumerStatus("consumer-group", "topic");
        } catch (MQClientException e) {
            return 0L;
        }
    }

    private int getConsumerThreadCount() {
        return consumer.getDefaultMQPushConsumerImpl()
                .getConsumeMessageService()
                .getCorePoolSize();
    }

    private void sendAlert(String title, String content) {
        // Send DingTalk/email alert
        log.warn("{}: {}", title, content);
    }
}

5.2 DingTalk Alert Service

// DingTalkAlertService.java
package com.teaching.alert;

import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;
import org.springframework.web.client.RestTemplate;

import java.util.HashMap;
import java.util.Map;

@Service
public class DingTalkAlertService {

    @Value("${dingtalk.webhook:}")
    private String webhook;

    private final RestTemplate restTemplate = new RestTemplate();

    public void sendAlert(String title, String message, String level) {
        if (webhook.isEmpty()) return;
        Map<String, Object> request = new HashMap<>();
        request.put("msgtype", "markdown");
        Map<String, String> markdown = new HashMap<>();
        markdown.put("title", title);
        String color = switch (level) {
            case "ERROR" -> "🔴";
            case "WARN" -> "🟡";
            default -> "🔵";
        };
        markdown.put("text", String.format(
                "## %s %s

**Message:** %s

**Level:** %s

**Time:** %s",
                color, title, message, level, new java.util.Date()));
        request.put("markdown", markdown);
        restTemplate.postForObject(webhook, request, String.class);
    }
}

6. Performance Tuning

6.1 Broker Tuning (production recommendation)

# broker.conf (production settings)
osPageCacheBusyTimeout = 1000
sendMessageThreadPoolNums = 16
pullMessageThreadPoolNums = 16
flushDiskType = ASYNC_FLUSH
brokerRole = ASYNC_MASTER
mapedFileSizeCommitLog = 1073741824
mapedFileSizeConsumeQueue = 300000
enableCalcFilterBitMap = true

6.2 JVM Tuning

# JVM options
JAVA_OPTS="-Xms4g -Xmx4g -Xmn2g \
  -XX:+UseG1GC \
  -XX:MaxGCPauseMillis=200 \
  -XX:+PrintGCDetails \
  -XX:+PrintGCDateStamps \
  -Xloggc:/opt/logs/rocketmq/gc.log"

6.3 Producer Optimization

// ProducerOptimizeConfig.java
package com.teaching.config;

import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class ProducerOptimizeConfig {

    @Bean
    public RocketMQTemplate optimizedRocketMQTemplate() {
        RocketMQTemplate template = new RocketMQTemplate();
        // Connection pool configuration
        template.setProducer(new DefaultMQProducer("optimized-group"));
        template.setSendMessageTimeout(2000);
        template.setRetryTimesWhenSendFailed(2);
        // Enable batch sending for higher throughput
        template.setEnableMsgTrace(true);
        return template;
    }
}

7. High‑Availability Verification

7.1 Start the Cluster

docker-compose -f docker-compose-cluster.yml up -d
# Verify NameServer cluster
curl http://localhost:9876/
curl http://localhost:9877/
# Verify broker master‑slave status
docker exec -it rmqbroker-a-master sh
mqadmin clusterList -n namesrv1:9876

7.2 Failover Test

# Stop the master broker
docker stop rmqbroker-a-master
# Verify that production and consumption continue (should switch to slave)

7.3 View Message Trace

Open the RocketMQ Dashboard at http://localhost:8088, navigate to "Message Trace", input a Message ID, and view the full lifecycle including send time, store time, consume time, and retry count.

8. Common Operational Commands

# View cluster status
mqadmin clusterList -n namesrv1:9876
# List topics
mqadmin topicList -n namesrv1:9876
# Consumer group status
mqadmin consumerProgress -n namesrv1:9876 -g consumer-group
# View backlog
mqadmin consumerProgress -n namesrv1:9876 -g consumer-group -s
# Reset consumption offset
mqadmin resetOffsetByTime -n namesrv1:9876 -g consumer-group -t topic-name -t 1735660800000
# Get broker configuration
mqadmin getBrokerConfig -n namesrv1:9876 -b broker-a

9. Common Issues and Solutions

Issue 1: Message trace not recorded

Ensure enableMsgTrace=true and the traceTopicName are correctly configured.

Issue 2: Master‑slave sync delay

Monitor the sync lag using mqadmin clusterList and adjust broker parameters if needed.

Issue 3: Message backlog alerts

Typical remedies:

Increase the number of consumer instances.

Raise the consumer thread pool size.

Optimize consumption logic.

10. Series Summary

The five‑part "Message‑Driven" series covered RocketMQ basics, Spring Cloud Stream integration, transaction messages, ordered messages, and finally high‑availability deployment with tracing and monitoring. The author invites readers to suggest topics for the next series.

Original Source

Signed-in readers can open the original source through BestHub's protected redirect.

Sign in to view source
Republication Notice

This article has been distilled and summarized from source material, then republished for learning and reference. If you believe it infringes your rights, please contactadmin@besthub.devand we will review it promptly.

High AvailabilityPrometheusSpring BootrocketmqMessage TracingGrafanaDocker Compose
Coder Trainee
Written by

Coder Trainee

Experienced in Java and Python, we share and learn together. For submissions or collaborations, DM us.

0 followers
Reader feedback

How this landed with the community

Sign in to like

Rate this article

Was this worth your time?

Sign in to rate
Discussion

0 Comments

Thoughtful readers leave field notes, pushback, and hard-won operational detail here.