Master RocketMQ with Spring Boot: From Simple to Transactional Messaging

This guide walks through integrating RocketMQ 4.8.0 with Spring Boot 2.3.9, covering dependencies, configuration, code examples for normal, ordered, cluster/broadcast, and transactional messages, and demonstrates testing results with detailed explanations of each messaging mode.

Spring Full-Stack Practical Cases
Spring Full-Stack Practical Cases
Spring Full-Stack Practical Cases
Master RocketMQ with Spring Boot: From Simple to Transactional Messaging

Environment

Spring Boot 2.3.9 RELEASE with RocketMQ 4.8.0.

Dependencies

<dependency>
  <groupId>org.springframework.boot</groupId>
  <artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
  <groupId>org.apache.rocketmq</groupId>
  <artifactId>rocketmq-spring-boot-starter</artifactId>
  <version>2.2.0</version>
</dependency>

Configuration

server:
  port: 8080
---
rocketmq:
  nameServer: localhost:9876
  producer:
    group: demo-mq

Normal Message

Sending

@Resource
private RocketMQTemplate rocketMQTemplate ;

public void send(String message) {
  rocketMQTemplate.convertAndSend("test-topic:tag2", MessageBuilder.withPayload(message).build());
}

Receiving

@RocketMQMessageListener(topic = "test-topic", consumerGroup = "consumer01-group", selectorExpression = "tag1 || tag2")
@Component
public class ConsumerListener implements RocketMQListener<String> {
    @Override
    public void onMessage(String message) {
        System.out.println("Received message: " + message);
    }
}

Ordered Message

Sending

@Resource
private RocketMQTemplate rocketMQTemplate ;

public void sendOrder(String topic, String message, String tags, int id) {
    rocketMQTemplate.asyncSendOrderly(topic + ":" + tags, MessageBuilder.withPayload(message).build(),
            "order-" + id, new SendCallback() {
                @Override
                public void onSuccess(SendResult sendResult) {
                    System.err.println("msg-id: " + sendResult.getMsgId() + ": " + message + "\tqueueId: " + sendResult.getMessageQueue().getQueueId());
                }
                @Override
                public void onException(Throwable e) {
                    e.printStackTrace();
                }
            });
}

Consumer

@RocketMQMessageListener(topic = "order-topic", consumerGroup = "consumer02-group", selectorExpression = "tag3 || tag4", consumeMode = ConsumeMode.ORDERLY)
@Component
public class ConsumerOrderListener implements RocketMQListener<String> {
    @Override
    public void onMessage(String message) {
        System.out.println(Thread.currentThread().getName() + " Received Order message: " + message);
    }
}

ConsumeMode.ORDERLY indicates a single queue and a single thread for ordered processing.

Cluster and Broadcast Modes

Cluster mode sending

@Resource
private RocketMQTemplate rocketMQTemplate ;

public void send(String topic, String message, String tags) {
    rocketMQTemplate.send(topic + ":" + tags, MessageBuilder.withPayload(message).build());
}

Cluster consumer

@RocketMQMessageListener(topic = "broad-topic", consumerGroup = "consumer03-group", selectorExpression = "tag6 || tag7", messageModel = MessageModel.CLUSTERING)
@Component
public class ConsumerBroadListener implements RocketMQListener<String> {
    @Override
    public void onMessage(String message) {
        System.out.println("ConsumerBroadListener1 received: " + message);
    }
}

Broadcast consumer

@RocketMQMessageListener(topic = "broad-topic", consumerGroup = "consumer03-group", selectorExpression = "tag6 || tag7", messageModel = MessageModel.BROADCASTING)
@Component
public class ConsumerBroadListener implements RocketMQListener<String> {
    @Override
    public void onMessage(String message) {
        System.out.println("ConsumerBroadListener1 received: " + message);
    }
}

Test results (cluster mode shows load‑balanced consumption, broadcast mode shows each instance receiving all messages):

Transactional Message

RocketMQ defines three transaction statuses:

CommitTransaction : commit the transaction, consumer can consume the message.

RollbackTransaction : rollback, the message is deleted and cannot be consumed.

Unknown : intermediate state, requires a status check.

The two‑phase process consists of a normal send/commit phase and a compensation flow.

Normal transaction send and commit

Producer sends a half‑message (cannot be consumed).

Server acknowledges the half‑message.

Local transaction executes.

Based on the local result, the producer commits or rolls back.

Compensation flow

If the server does not receive a commit/rollback status for a long time, it sends a check request to the producer.

The producer checks the local transaction status.

According to the check result, it commits or rolls back.

Sending code

@Resource
private RocketMQTemplate rocketMQTemplate ;

public void sendTx(String topic, Long id, String tags) {
    rocketMQTemplate.sendMessageInTransaction(topic + ":" + tags, MessageBuilder.withPayload(
            new Users(id, UUID.randomUUID().toString().replaceAll("-", "")))
            .setHeader("BID", UUID.randomUUID().toString().replaceAll("-", ""))
            .build(),
            UUID.randomUUID().toString().replaceAll("-", ""));
}

Producer transaction listener

@RocketMQTransactionListener
public class ProducerTxListener implements RocketMQLocalTransactionListener {
    @Resource
    private BusinessService bs;

    @Override
    public RocketMQLocalTransactionState executeLocalTransaction(Message msg, Object arg) {
        try {
            String id = (String) msg.getHeaders().get("BID");
            Users users = new JsonMapper().readValue((byte[])msg.getPayload(), Users.class);
            System.out.println("Message: " + users + "\tData: " + arg + "\tTx ID: " + id);
            bs.save(users, new UsersLog(users.getId(), id));
        } catch (Exception e) {
            e.printStackTrace();
            return RocketMQLocalTransactionState.ROLLBACK;
        }
        return RocketMQLocalTransactionState.COMMIT;
    }

    @Override
    public RocketMQLocalTransactionState checkLocalTransaction(Message msg) {
        String id = (String) msg.getHeaders().get("BID");
        System.out.println("Checking if ID " + id + " exists");
        UsersLog usersLog = bs.queryUsersLog(id);
        if (usersLog == null) {
            return RocketMQLocalTransactionState.ROLLBACK;
        }
        return RocketMQLocalTransactionState.COMMIT;
    }
}

Consumer

@RocketMQMessageListener(topic = "tx-topic", consumerGroup = "consumer05-group", selectorExpression = "tag10")
@Component
public class ConsumerTxListener implements RocketMQListener<Users> {
    @Override
    public void onMessage(Users users) {
        System.out.println("TX received: " + users);
    }
}

Service layer

@Transactional
public boolean save(Users users, UsersLog usersLog) {
    usersRepository.save(users);
    usersLogRepository.save(usersLog);
    if (users.getId() == 1) {
        throw new RuntimeException("Data error");
    }
    return true;
}

public UsersLog queryUsersLog(String bid) {
    return usersLogRepository.findByBid(bid);
}

Controller

@GetMapping("/tx/{id}")
public Object sendTx(@PathVariable("id") Long id) {
    ps.sendTx("tx-topic", id, "tag10");
    return "send transaction success";
}

Testing shows that after the local transaction commits, the consumer receives the message, and rollback scenarios prevent consumption.

Original Source

Signed-in readers can open the original source through BestHub's protected redirect.

Sign in to view source
Republication Notice

This article has been distilled and summarized from source material, then republished for learning and reference. If you believe it infringes your rights, please contactadmin@besthub.devand we will review it promptly.

backend-developmentSpring BootMessage QueueTransactional Messaging
Spring Full-Stack Practical Cases
Written by

Spring Full-Stack Practical Cases

Full-stack Java development with Vue 2/3 front-end suite; hands-on examples and source code analysis for Spring, Spring Boot 2/3, and Spring Cloud.

0 followers
Reader feedback

How this landed with the community

Sign in to like

Rate this article

Was this worth your time?

Sign in to rate
Discussion

0 Comments

Thoughtful readers leave field notes, pushback, and hard-won operational detail here.