How to Use RocketMQ for Traffic Shaping and Peak‑Smoothing in Spring Boot

This article explains how RocketMQ’s decoupling, asynchronous and peak‑shaving features can be applied in a Spring Boot project to buffer massive like‑click traffic, reduce database pressure, and dynamically tune consumer pull parameters for optimal throughput.

Programmer DD
Programmer DD
Programmer DD
How to Use RocketMQ for Traffic Shaping and Peak‑Smoothing in Spring Boot

RocketMQ Core Concepts

RocketMQ provides three main characteristics: decoupling, asynchronous processing, and peak shaving. The core components are:

Producer – sends messages

Broker – stores messages from producers

Consumer – pulls messages from the broker for processing

NameServer – routes producers and consumers to the appropriate broker

RocketMQ‑Spring‑Boot‑Starter Usage

When rapid integration is needed, rocketmq-spring-boot-starter can set up the environment, but it does not expose all RocketMQ configurations. For batch consumption you must define a custom DefaultMQPushConsumer bean. RocketMQListener – interface that consumer classes implement; defines

onMessage(msg)
RocketMQPushConsumerLifecycleListener

– allows custom configuration of the underlying consumer when the annotation parameters are insufficient. @RocketMQMessageListener – marks a bean as a consumer, specifying topic and consumerGroup. Placeholder values can be injected from configuration files.

Business Case: Like‑Button Peak Shaving

In a scenario where users can click “like” repeatedly, inserting each click directly into the database would overwhelm it. By sending each click as a RocketMQ message, the system can buffer spikes (e.g., 5 000 msgs/s while the DB can handle 2 000) and consume at a controlled rate, thus smoothing the load.

Environment Configuration

Typical test setup: 1 NameServer + 2 Brokers + 1 Consumer.

<dependency>
    <groupId>org.apache.rocketmq</groupId>
    <artifactId>rocketmq-spring-boot-starter</artifactId>
</dependency>
rocketmq:
  name-server: 127.0.0.1:9876
  producer:
    group: praise-group
server:
  port: 10000
spring:
  datasource:
    driver-class-name: com.mysql.cj.jdbc.Driver
    username: root
    password: tiger
    url: jdbc:mysql://localhost:3306/wilson
  swagger:
    docket:
      base-package: io.rocket.consumer.controller

Data Model and API

@Data
public class PraiseRecord implements Serializable {
    private Long id;
    private Long uid;
    private Long liveId;
    private LocalDateTime createTime;
}
@RestController
@RequestMapping("/message")
public class MessageController {
    @Resource
    private RocketMQTemplate rocketMQTemplate;

    @PostMapping("/praise")
    public ServerResponse praise(@RequestBody PraiseRecordVO vo) {
        rocketMQTemplate.sendOneWay(RocketConstant.Topic.PRAISE_TOPIC,
                MessageBuilder.withPayload(vo).build());
        return ServerResponse.success();
    }
}

The sendOneWay() method is chosen to maximize throughput at the cost of possible message loss, which is acceptable for high‑frequency likes.

RocketMQ provides three sending modes: syncSend() (synchronous), asyncSend() (asynchronous with callback), and sendOneWay() (fire‑and‑forget). Throughput ranking: sendOneWay > asyncSend > syncSend.

PraiseListener – Consumer Implementation

@Service
@RocketMQMessageListener(topic = RocketConstant.Topic.PRAISE_TOPIC,
        consumerGroup = RocketConstant.ConsumerGroup.PRAISE_CONSUMER)
@Slf4j
public class PraiseListener implements RocketMQListener<PraiseRecordVO>,
        RocketMQPushConsumerLifecycleListener {

    @Resource
    private PraiseRecordService praiseRecordService;

    @Override
    public void onMessage(PraiseRecordVO vo) {
        praiseRecordService.insert(vo.copyProperties(PraiseRecord::new));
    }

    @Override
    public void prepareStart(DefaultMQPushConsumer consumer) {
        consumer.setPullInterval(2000); // ms
        consumer.setPullBatchSize(16);   // messages per queue per pull
    }
}

Key consumption parameters:

pullInterval – interval between successive pulls (ms).

pullBatchSize – number of messages fetched from each queue per pull. The effective total per pull equals pullBatchSize × numberOfBrokers × writeQueueNums.

consumeMessageBatchMaxSize – maximum number of messages processed in a single batch (default 1). This setting is also limited by pullBatchSize.

In the example, with 2 brokers each having 4 queues, pullBatchSize=16 yields a theoretical 128 messages per pull (16 × 2 × 4). After the initial single‑message pull, the consumer can process up to 128 messages every 2 seconds, which matches the observed test results.

Dynamic Adjustment of Consumption Efficiency

If the broker’s default maxTransferCountOnMessageInMemory (32) is reached, you can increase the number of queues on each broker (e.g., from 4 to 8) to raise the overall pull capacity without restarting the broker. After changing the queue count, verify that the new queues appear in the Dashboard’s Consumer Manager and that consumers are registered.

Enabling Batch Consumption

Since rocketmq-spring-boot-starter does not support batch consumption out of the box, you must create a custom DefaultMQPushConsumer and set consumeMessageBatchMaxSize. Example bean:

@Bean
public DefaultMQPushConsumer userMQPushConsumer() throws MQClientException {
    DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(RocketConstant.ConsumerGroup.SPRING_BOOT_USER_CONSUMER);
    consumer.setNamesrvAddr(nameServer);
    consumer.subscribe(RocketConstant.Topic.SPRING_BOOT_USER_TOPIC, "*");
    consumer.setPullInterval(1000);
    consumer.setPullBatchSize(24);
    consumer.setConsumeMessageBatchMaxSize(12);
    consumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> {
        List<UserInfo> userInfos = new ArrayList<>(msgs.size());
        for (MessageExt msg : msgs) {
            userInfos.add(JSONObject.parseObject(msg.getBody(), UserInfo.class));
        }
        // batch insert: userInfoMapper.insertBatch(userInfos);
        return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
    });
    consumer.start();
    return consumer;
}

With the above configuration, the consumer will receive up to 12 messages per batch, provided pullBatchSize is at least 12.

Running the Demo

Start the NameServer and two Brokers:

mqnamesrv -n 127.0.0.1:9876
mqbroker -c /path/to/broker-a.properties
mqbroker -c /path/to/broker-b.properties

Launch the RocketMQ Dashboard (refer to the official GitHub repo) and access the source code at https://github.com/Wilson-He/spring-boot-series/tree/master/spring-rocketmq .

Original Source

Signed-in readers can open the original source through BestHub's protected redirect.

Sign in to view source
Republication Notice

This article has been distilled and summarized from source material, then republished for learning and reference. If you believe it infringes your rights, please contactadmin@besthub.devand we will review it promptly.

Spring BootMessage QueueRocketMQPeak Shavingbatch consumptionConsumer Configuration
Programmer DD
Written by

Programmer DD

A tinkering programmer and author of "Spring Cloud Microservices in Action"

0 followers
Reader feedback

How this landed with the community

Sign in to like

Rate this article

Was this worth your time?

Sign in to rate
Discussion

0 Comments

Thoughtful readers leave field notes, pushback, and hard-won operational detail here.