How to Use RocketMQ for Traffic Shaping and Peak‑Smoothing in Spring Boot
This article explains how RocketMQ’s decoupling, asynchronous and peak‑shaving features can be applied in a Spring Boot project to buffer massive like‑click traffic, reduce database pressure, and dynamically tune consumer pull parameters for optimal throughput.
RocketMQ Core Concepts
RocketMQ provides three main characteristics: decoupling, asynchronous processing, and peak shaving. The core components are:
Producer – sends messages
Broker – stores messages from producers
Consumer – pulls messages from the broker for processing
NameServer – routes producers and consumers to the appropriate broker
RocketMQ‑Spring‑Boot‑Starter Usage
When rapid integration is needed, rocketmq-spring-boot-starter can set up the environment, but it does not expose all RocketMQ configurations. For batch consumption you must define a custom DefaultMQPushConsumer bean. RocketMQListener – interface that consumer classes implement; defines
onMessage(msg) RocketMQPushConsumerLifecycleListener– allows custom configuration of the underlying consumer when the annotation parameters are insufficient. @RocketMQMessageListener – marks a bean as a consumer, specifying topic and consumerGroup. Placeholder values can be injected from configuration files.
Business Case: Like‑Button Peak Shaving
In a scenario where users can click “like” repeatedly, inserting each click directly into the database would overwhelm it. By sending each click as a RocketMQ message, the system can buffer spikes (e.g., 5 000 msgs/s while the DB can handle 2 000) and consume at a controlled rate, thus smoothing the load.
Environment Configuration
Typical test setup: 1 NameServer + 2 Brokers + 1 Consumer.
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
</dependency> rocketmq:
name-server: 127.0.0.1:9876
producer:
group: praise-group
server:
port: 10000
spring:
datasource:
driver-class-name: com.mysql.cj.jdbc.Driver
username: root
password: tiger
url: jdbc:mysql://localhost:3306/wilson
swagger:
docket:
base-package: io.rocket.consumer.controllerData Model and API
@Data
public class PraiseRecord implements Serializable {
private Long id;
private Long uid;
private Long liveId;
private LocalDateTime createTime;
} @RestController
@RequestMapping("/message")
public class MessageController {
@Resource
private RocketMQTemplate rocketMQTemplate;
@PostMapping("/praise")
public ServerResponse praise(@RequestBody PraiseRecordVO vo) {
rocketMQTemplate.sendOneWay(RocketConstant.Topic.PRAISE_TOPIC,
MessageBuilder.withPayload(vo).build());
return ServerResponse.success();
}
}The sendOneWay() method is chosen to maximize throughput at the cost of possible message loss, which is acceptable for high‑frequency likes.
RocketMQ provides three sending modes: syncSend() (synchronous), asyncSend() (asynchronous with callback), and sendOneWay() (fire‑and‑forget). Throughput ranking: sendOneWay > asyncSend > syncSend.
PraiseListener – Consumer Implementation
@Service
@RocketMQMessageListener(topic = RocketConstant.Topic.PRAISE_TOPIC,
consumerGroup = RocketConstant.ConsumerGroup.PRAISE_CONSUMER)
@Slf4j
public class PraiseListener implements RocketMQListener<PraiseRecordVO>,
RocketMQPushConsumerLifecycleListener {
@Resource
private PraiseRecordService praiseRecordService;
@Override
public void onMessage(PraiseRecordVO vo) {
praiseRecordService.insert(vo.copyProperties(PraiseRecord::new));
}
@Override
public void prepareStart(DefaultMQPushConsumer consumer) {
consumer.setPullInterval(2000); // ms
consumer.setPullBatchSize(16); // messages per queue per pull
}
}Key consumption parameters:
pullInterval – interval between successive pulls (ms).
pullBatchSize – number of messages fetched from each queue per pull. The effective total per pull equals pullBatchSize × numberOfBrokers × writeQueueNums.
consumeMessageBatchMaxSize – maximum number of messages processed in a single batch (default 1). This setting is also limited by pullBatchSize.
In the example, with 2 brokers each having 4 queues, pullBatchSize=16 yields a theoretical 128 messages per pull (16 × 2 × 4). After the initial single‑message pull, the consumer can process up to 128 messages every 2 seconds, which matches the observed test results.
Dynamic Adjustment of Consumption Efficiency
If the broker’s default maxTransferCountOnMessageInMemory (32) is reached, you can increase the number of queues on each broker (e.g., from 4 to 8) to raise the overall pull capacity without restarting the broker. After changing the queue count, verify that the new queues appear in the Dashboard’s Consumer Manager and that consumers are registered.
Enabling Batch Consumption
Since rocketmq-spring-boot-starter does not support batch consumption out of the box, you must create a custom DefaultMQPushConsumer and set consumeMessageBatchMaxSize. Example bean:
@Bean
public DefaultMQPushConsumer userMQPushConsumer() throws MQClientException {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(RocketConstant.ConsumerGroup.SPRING_BOOT_USER_CONSUMER);
consumer.setNamesrvAddr(nameServer);
consumer.subscribe(RocketConstant.Topic.SPRING_BOOT_USER_TOPIC, "*");
consumer.setPullInterval(1000);
consumer.setPullBatchSize(24);
consumer.setConsumeMessageBatchMaxSize(12);
consumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> {
List<UserInfo> userInfos = new ArrayList<>(msgs.size());
for (MessageExt msg : msgs) {
userInfos.add(JSONObject.parseObject(msg.getBody(), UserInfo.class));
}
// batch insert: userInfoMapper.insertBatch(userInfos);
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
});
consumer.start();
return consumer;
}With the above configuration, the consumer will receive up to 12 messages per batch, provided pullBatchSize is at least 12.
Running the Demo
Start the NameServer and two Brokers:
mqnamesrv -n 127.0.0.1:9876
mqbroker -c /path/to/broker-a.properties
mqbroker -c /path/to/broker-b.propertiesLaunch the RocketMQ Dashboard (refer to the official GitHub repo) and access the source code at https://github.com/Wilson-He/spring-boot-series/tree/master/spring-rocketmq .
Signed-in readers can open the original source through BestHub's protected redirect.
This article has been distilled and summarized from source material, then republished for learning and reference. If you believe it infringes your rights, please contactand we will review it promptly.
Programmer DD
A tinkering programmer and author of "Spring Cloud Microservices in Action"
How this landed with the community
Was this worth your time?
0 Comments
Thoughtful readers leave field notes, pushback, and hard-won operational detail here.
