Master RocketMQ with Spring Boot: Complete Guide to Messaging APIs

This tutorial walks through integrating Alibaba's RocketMQ into a Spring Boot application, covering Maven dependencies, producer and consumer configurations, and detailed examples of synchronous, asynchronous, one‑way, delayed, ordered, transactional, request‑response, batch, and pull messaging APIs, along with best‑practice recommendations.

Lin is Dream
Lin is Dream
Lin is Dream
Master RocketMQ with Spring Boot: Complete Guide to Messaging APIs

During my years of development I have used many message queues such as Kafka, RocketMQ, Redis (list feature), and SpringEvent (the event mechanism built into the Spring framework). The most frequently used is Alibaba's open‑source RocketMQ, which is simple to use, feature‑rich, and well‑suited for large‑scale e‑commerce and financial scenarios. This article explains how to use RocketMQ's Java client, integrate the RocketMQ queue functionality, and provides a detailed summary of the APIs for sending and receiving messages, which beginners should bookmark.

Tools

SpringBoot v2.5.14, RocketMQ v4.9.7, JDK 1.8

Dependency Configuration

rocketmq-spring-boot-starter
<dependency>
  <groupId>org.apache.rocketmq</groupId>
  <artifactId>rocketmq-spring-boot-starter</artifactId>
  <version>2.2.1</version>
</dependency>

Producer Process (Java Service) Configuration

spring:
  rocketmq:
    name-server: 192.168.110.1:9876;192.168.110.2:9876;192.168.110.3:9876
    producer:
      group: my-producer-group

Consumer Process (Java Service) Configuration

spring:
  rocketmq:
    name-server: 192.168.110.1:9876;192.168.110.2:9876;192.168.110.3:9876
    consumer:
      group: my-consumer-group

If the producer and consumer run in the same process, only one configuration is needed; RocketMQ is then introduced into the SpringBoot project, which is extremely simple.

Start Using

1. Send Messages

Sending messages is done directly with the RocketMQTemplate class, which provides many sending interfaces. Below is a simple example for reference. In production you would usually wrap these interfaces for easier use; I will share a more refined wrapper in upcoming articles.

@Service
public class RocketMqProducer {
    @Autowired
    private RocketMQTemplate rocketMQTemplate;

    public void sendMessage(String topic, String tag, String message, String key) {
        Message<String> sendMessage = MessageBuilder.withPayload(message)
                .setHeader(RocketMQHeaders.KEYS, key)
                .build();
        SendResult sendResult = rocketMQTemplate.syncSend(buildDestination(topic, tag), sendMessage);
        System.out.println("Send result: " + sendResult);
    }

    private String buildDestination(String topic, String tag) {
        return topic + ":" + tag;
    }
}

The SendResult object contains fields such as sendStatus (e.g., SEND_OK), msgId , offsetMsgId , and the MessageQueue information (topic, brokerName, queueId). These fields can be used to trace the message.

2. Consume Messages

@Service
@RocketMQMessageListener(topic = "order-topic", consumerGroup = "order-consumer-group", selectorExpression = MqConstants.TAG_PAY, consumeMode = ConsumeMode.ORDERLY)
public class OrderConsumer implements RocketMQListener<String> {
    @Override
    public void onMessage(String message) {
        System.out.println("Received order message: " + message);
        // Process order message
    }
}

When the consumer process starts, it will consume messages in real time with an expected latency of about 5 ms.

Message Sending APIs

1. Synchronous Message

Synchronous sending guarantees the highest reliability; the broker returns success only after the message is persisted. It is suitable for scenarios such as confirming an order payment where strong consistency is required.

SendResult sendResult = rocketMQTemplate.syncSend(destination, message);
if (sendResult.getSendStatus() == SendStatus.SEND_OK) {
    System.out.println("Message sent successfully: " + sendResult);
} else {
    System.out.println("Message sending failed: " + sendResult);
}

2. Asynchronous Message

Asynchronous sending does not block the calling thread. The result is delivered via a callback, offering relatively high reliability. It is useful for post‑processing tasks such as creating a logistics order after payment.

rocketMQTemplate.asyncSend(destination, message, new SendCallback() {
    @Override
    public void onSuccess(SendResult sendResult) {
        System.out.println("Async message sent successfully: " + sendResult);
    }
    @Override
    public void onException(Throwable e) {
        System.out.println("Async message sending failed: " + e.getMessage());
    }
});

3. One‑Way Message

The client does not care about the send result; there is no return value or callback. Reliability is the lowest, suitable for logging or telemetry.

rocketMQTemplate.sendOneWay(destination, message);

4. Delayed Message

Both synchronous and asynchronous sending support delayed messages. After the message is sent, it is stored on the broker and delivered after the configured delay (e.g., 30 minutes). The maximum delay supported by RocketMQ is two hours.

// 16 represents a 30‑minute delay (RocketMQ fixed delay level)
rocketMQTemplate.syncSend(destination, MessageBuilder.withPayload(message).build(), 1000, 16);

5. Ordered Message

Ordered messages ensure that messages with the same hash key are routed to the same queue, preserving FIFO order. This is useful for processing a single order's lifecycle (create → pay → ship → receive).

rocketMQTemplate.syncSendOrderly(topic, MessageBuilder.withPayload(status).build(), orderId);
rocketMQTemplate.asyncSendOrderly(topic, MessageBuilder.withPayload(status).build(), orderId);
rocketMQTemplate.sendOneWayOrderly(topic, MessageBuilder.withPayload(status).build(), orderId);

6. Transactional Message

Transactional messages are not covered in this article and will be discussed later.

7. Request‑Response Message

This pattern allows a producer to send a message and wait for a consumer to process it and return a response, similar to RPC but still broker‑mediated.

String status = rocketMQTemplate.sendAndReceive(
        "order-status-topic",
        orderId,
        String.class,
        3000);
System.out.println("Order status: " + status);

The first generic type ( Long) is the request parameter type, and the second ( String) is the response type.

8. Batch Message

Batch sending reduces the number of network calls by sending a list of messages at once. It is suitable for scenarios where multiple systems need to receive the same batch of events, such as order creation triggering inventory, payment, and shipping services.

List<Message<String>> messageList = new ArrayList<>();
for (int i = 0; i < 10; i++) {
    String orderId = "ORDER" + i;
    Message<String> message = MessageBuilder.withPayload("Order ID: " + orderId)
            .setHeader(MessageConst.PROPERTY_KEYS, orderId)
            .build();
    messageList.add(message);
}
SendResult result = rocketMQTemplate.syncSend("order-topic", messageList);
System.out.println("SendResult: " + result);

RocketMQ does not directly support batch consumption, but a pull consumer can be used to simulate batch consumption.

DefaultMQPullConsumer consumer = new DefaultMQPullConsumer("batch-consumer-group");
consumer.setNamesrvAddr("localhost:9876");
consumer.start();
Set<MessageQueue> mqs = consumer.fetchSubscribeMessageQueues("order-topic");
for (MessageQueue mq : mqs) {
    PullResult pullResult = consumer.pull(mq, "*", 0, 10);
    switch (pullResult.getPullStatus()) {
        case FOUND:
            for (MessageExt msg : pullResult.getMsgFoundList()) {
                System.out.printf("Received: %s %n", new String(msg.getBody()));
            }
            break;
        case NO_NEW_MSG:
            System.out.println("No new message.");
            break;
        // other cases omitted for brevity
    }
}
consumer.shutdown();

Best Practice

Produce batch messages + consume one‑by‑one + manually pull for batch consumption.

Receiving API Details

Message consumption is implemented with the @RocketMQMessageListener annotation. Below is a complete example and an explanation of each parameter.

@RocketMQMessageListener(
    topic = "order-topic",
    consumerGroup = "order-consumer-group",
    selectorExpression = "order_create",
    consumeMode = ConsumeMode.CONCURRENTLY,
    messageModel = MessageModel.CLUSTERING,
    consumeThreadMax = 20,
    maxReconsumeTimes = 3,
    consumeTimeout = 15,
    replyTimeout = 3000)
public class OrderConsumer implements RocketMQListener<String> {
    @Override
    public void onMessage(String message) {
        System.out.println("Received message: " + message);
    }
}

topic

Required. The topic that the consumer listens to. Usually one business scenario defines one topic, e.g., order-topic, notify-topic.

consumerGroup

Required. The group to which the consumer belongs. Instances in the same group share the load; in clustering mode only one instance consumes a given message. All consumers in a group must subscribe to the same topic.

selectorExpression

Required. Tag used for message filtering. A topic can have multiple tags; consumers subscribe to a specific tag. Each tag must have a unique consumer group to avoid message loss.

consumeThreadMax

Recommended. Defines the maximum number of concurrent consumer threads (default 64). A fixed thread pool is created; setting this to around 20 is typical.

consumeMode

Optional. Defines the consumption mode. ConsumeMode.CONCURRENTLY (default) allows messages from the same queue to be processed by different threads. ConsumeMode.ORDERLY guarantees that messages from the same queue are processed in order; it must be used together with ordered sending APIs.

messageModel

Optional. Defines the message model. MessageModel.CLUSTERING (default) provides load‑balanced consumption. MessageModel.BROADCASTING delivers the message to all instances, useful for configuration updates or promotions.

consumeTimeout

Optional. Consumer timeout in minutes (default 15). Increase for long‑running tasks.

maxReconsumeTimes

Recommended. Maximum number of retry attempts after a consumption failure (default -1 for unlimited). After exceeding this limit, the message is moved to the dead‑letter queue.

replyTimeout

Optional. Timeout in milliseconds for request‑response mode (default 3000 ms). Set to 1–3 seconds for low‑latency scenarios.

ACK Mechanism

By default, the consumer automatically sends an ACK to RocketMQ after successful processing. If an exception is thrown or the processing exceeds consumeTimeout, the message will be retried according to the retry policy.

That concludes the entire content of this article.

Original Source

Signed-in readers can open the original source through BestHub's protected redirect.

Sign in to view source
Republication Notice

This article has been distilled and summarized from source material, then republished for learning and reference. If you believe it infringes your rights, please contactadmin@besthub.devand we will review it promptly.

JavaSpring BootMessage QueueRocketMQMessaging API
Lin is Dream
Written by

Lin is Dream

Sharing Java developer knowledge, practical articles, and continuous insights into computer engineering.

0 followers
Reader feedback

How this landed with the community

Sign in to like

Rate this article

Was this worth your time?

Sign in to rate
Discussion

0 Comments

Thoughtful readers leave field notes, pushback, and hard-won operational detail here.