Backend Development 15 min read

Using RocketMQ for Traffic Shaping in Spring Boot: Configuration, Code Samples, and Performance Tuning

This article explains how to use RocketMQ for traffic‑shaping in a Spring Boot application, covering its core components, consumer pull configuration, integration via rocketmq‑spring‑boot‑starter, a practical like‑praise use case, a Maven setup, YAML configuration, and advanced topics such as dynamic scaling and batch consumption.

Top Architect
Top Architect
Top Architect
Using RocketMQ for Traffic Shaping in Spring Boot: Configuration, Code Samples, and Performance Tuning

RocketMQ’s main characteristics are decoupling, asynchronous processing, and traffic‑shaping (peak‑cutting). The article records personal experience integrating RocketMQ in a Spring Boot project to reduce database pressure by buffering high‑frequency "praise" (like) events.

The core components of RocketMQ are:

Producer – sends messages.

Broker – stores messages from producers.

Consumer – pulls messages from brokers for processing.

NameServer – routes producers and consumers to the appropriate broker.

Key consumption points:

Consumers pull messages from brokers rather than being pushed.

Pulling is performed per queue; parameters such as pullBatchSize affect how many messages are fetched from each queue.

rocketmq‑spring‑boot‑starter Usage Overview

To quickly integrate RocketMQ, the rocketmq‑spring‑boot‑starter can be used, though it does not expose all configuration options (e.g., batch consumption requires a custom DefaultMQPushConsumer bean).

Important classes provided by the starter:

RocketMQListener – implement onMessage(msg) to consume messages.

RocketMQPushConsumerLifecycleListener – allows customizing the underlying DefaultMQPushConsumer when annotation configuration is insufficient.

@RocketMQMessageListener – annotates a bean that implements RocketMQListener , specifying consumerGroup and topic . Placeholder values can be injected from configuration files.

Business Case: Praise (Like) Throttling

A simple like‑praise service records each user’s praise without limits. Directly inserting each request into the database would overwhelm it under high concurrency. By sending praise events to RocketMQ, the system can limit the consumption rate (e.g., pull 1600 messages per second when the database can only handle 2000), allowing the broker to buffer excess traffic.

Environment Configuration

Article example environment: 1 NameServer + 2 Brokers + 1 Consumer

Adding Maven Dependency

<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
</dependency>

application.yml Configuration

rocketmq:
name-server: 127.0.0.1:9876
producer:
group: praise-group
server:
port: 10000
spring:
datasource:
driver-class-name: com.mysql.cj.jdbc.Driver
username: root
password: tiger
url: jdbc:mysql://localhost:3306/wilson
swagger:
docket:
base-package: io.rocket.consumer.controller

Praise Record Entity

@Data
public class PraiseRecord implements Serializable {
private Long id;
private Long uid;
private Long liveId;
private LocalDateTime createTime;
}

MessageController (Test API)

RestController
@RequestMapping("/message")
public class MessageController {
@Resource
private RocketMQTemplate rocketMQTemplate;
@PostMapping("/praise")
public ServerResponse praise(@RequestBody PraiseRecordVO vo) {
rocketMQTemplate.sendOneWay(RocketConstant.Topic.PRAISE_TOPIC, MessageBuilder.withPayload(vo).build());
return ServerResponse.success();
}
}

The service uses sendOneWay() for higher throughput at the cost of possible message loss, which is acceptable for the praise scenario.

RocketMQ provides three sending modes: syncSend() , asyncSend() , and sendOneWay() . sendOneWay() is the fastest but may lose messages.

PraiseListener: Consumer Implementation

@Service
@RocketMQMessageListener(topic = RocketConstant.Topic.PRAISE_TOPIC, consumerGroup = RocketConstant.ConsumerGroup.PRAISE_CONSUMER)
@Slf4j
public class PraiseListener implements RocketMQListener
, RocketMQPushConsumerLifecycleListener {
@Resource
private PraiseRecordService praiseRecordService;
@Override
public void onMessage(PraiseRecordVO vo) {
praiseRecordService.insert(vo.copyProperties(PraiseRecord::new));
}
@Override
public void prepareStart(DefaultMQPushConsumer consumer) {
// Pull interval in ms
consumer.setPullInterval(2000);
// Number of messages per pull per queue
consumer.setPullBatchSize(16);
}
}

The consumer’s pull interval is set to 2 seconds and each pull fetches 16 messages per queue. With two brokers each having four queues, the theoretical maximum per pull is 16 × 2 × 4 = 128 messages.

Performance tests showed the database receiving roughly 128 new praise records every 2 seconds, confirming the throttling effect.

Dynamic Adjustment of Consumption Rate

If the broker’s maxTransferCountOnMessageInMemory is set to 32, the total consumption capacity equals 32 × brokerCount × queuesPerBroker. To increase capacity without restarting brokers, one can increase the number of queues per broker (e.g., from 4 to 8), which raises the theoretical throughput.

Batch Consumption

The starter does not support batch consumption out‑of‑the‑box. To enable it, a custom DefaultMQPushConsumer bean must be defined and its consumeMessageBatchMaxSize configured. Example bean:

@Bean
public DefaultMQPushConsumer userMQPushConsumer() throws MQClientException {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(RocketConstant.ConsumerGroup.SPRING_BOOT_USER_CONSUMER);
consumer.setNamesrvAddr(nameServer);
consumer.subscribe(RocketConstant.Topic.SPRING_BOOT_USER_TOPIC, "*");
consumer.setPullInterval(1000);
consumer.setPullBatchSize(24);
consumer.setConsumeMessageBatchMaxSize(12);
consumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> {
List
userInfos = new ArrayList<>(msgs.size());
// convert msgs to UserInfo objects and process batch insert
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
});
consumer.start();
return consumer;
}

When pullBatchSize is smaller than consumeMessageBatchMaxSize , the actual batch size is limited by the pull size.

Additional Information

The article provides commands to start NameServer and Brokers, a link to the source repository, and notes that the RocketMQ Dashboard can be used to monitor consumer registration and queue distribution.

JavaPerformancemicroservicesSpring BootMessage Queuerocketmqtraffic shaping
Top Architect
Written by

Top Architect

Top Architect focuses on sharing practical architecture knowledge, covering enterprise, system, website, large‑scale distributed, and high‑availability architectures, plus architecture adjustments using internet technologies. We welcome idea‑driven, sharing‑oriented architects to exchange and learn together.

0 followers
Reader feedback

How this landed with the community

login Sign in to like

Rate this article

Was this worth your time?

Sign in to rate
Discussion

0 Comments

Thoughtful readers leave field notes, pushback, and hard-won operational detail here.