Mastering RocketMQ with Spring Boot: From Simple to Transactional Messaging

This guide demonstrates how to integrate RocketMQ 4.8.0 into a Spring Boot 2.3.9 application, covering dependency setup, configuration, sending and receiving plain, ordered, clustered, broadcast, and transactional messages, with complete code examples and execution results.

Spring Full-Stack Practical Cases
Spring Full-Stack Practical Cases
Spring Full-Stack Practical Cases
Mastering RocketMQ with Spring Boot: From Simple to Transactional Messaging

Dependencies

<dependency>
  <groupId>org.springframework.boot</groupId>
  <artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
  <groupId>org.apache.rocketmq</groupId>
  <artifactId>rocketmq-spring-boot-starter</artifactId>
  <version>2.2.0</version>
</dependency>

Configuration

server:
  port: 8080
---
rocketmq:
  nameServer: localhost:9876
  producer:
    group: demo-mq

Plain Message

Sending a simple message:

@Resource
private RocketMQTemplate rocketMQTemplate;

public void send(String message) {
    rocketMQTemplate.convertAndSend("test-topic:tag2", MessageBuilder.withPayload(message).build());
}

Receiving the message:

@RocketMQMessageListener(topic = "test-topic", consumerGroup = "consumer01-group", selectorExpression = "tag1 || tag2")
@Component
public class ConsumerListener implements RocketMQListener<String> {
    @Override
    public void onMessage(String message) {
        System.out.println("接收到消息:" + message);
    }
}

Ordered Message

Sending an ordered message:

public void sendOrder(String topic, String message, String tags, int id) {
    rocketMQTemplate.asyncSendOrderly(topic + ":" + tags, MessageBuilder.withPayload(message).build(),
        "order-" + id, new SendCallback() {
            @Override
            public void onSuccess(SendResult sendResult) {
                System.err.println("msg-id: " + sendResult.getMsgId() + ": " + message + "\tqueueId: " + sendResult.getMessageQueue().getQueueId());
            }
            @Override
            public void onException(Throwable e) {
                e.printStackTrace();
            }
        });
}

Receiving ordered messages (ConsumeMode.ORDERLY means one queue, one thread):

@RocketMQMessageListener(topic = "order-topic", consumerGroup = "consumer02-group", selectorExpression = "tag3 || tag4", consumeMode = ConsumeMode.ORDERLY)
@Component
public class ConsumerOrderListener implements RocketMQListener<String> {
    @Override
    public void onMessage(String message) {
        System.out.println(Thread.currentThread().getName() + " 接收到Order消息:" + message);
    }
}

Cluster and Broadcast Modes

Clustered sending:

public void send(String topic, String message, String tags) {
    rocketMQTemplate.send(topic + ":" + tags, MessageBuilder.withPayload(message).build());
}

Clustered consumer (MessageModel.CLUSTERING):

@RocketMQMessageListener(topic = "broad-topic", consumerGroup = "consumer03-group", selectorExpression = "tag6 || tag7", messageModel = MessageModel.CLUSTERING)
@Component
public class ConsumerBroadListener implements RocketMQListener<String> {
    @Override
    public void onMessage(String message) {
        System.out.println("ConsumerBroadListener1接收到消息:" + message);
    }
}

Broadcast consumer (MessageModel.BROADCASTING):

@RocketMQMessageListener(topic = "broad-topic", consumerGroup = "consumer03-group", selectorExpression = "tag6 || tag7", messageModel = MessageModel.BROADCASTING)
@Component
public class ConsumerBroadListener implements RocketMQListener<String> {
    @Override
    public void onMessage(String message) {
        System.out.println("ConsumerBroadListener1接收到消息:" + message);
    }
}
Cluster mode result
Cluster mode result
Broadcast mode result
Broadcast mode result

Transactional Messages

RocketMQ transaction states: CommitTransaction, RollbackTransaction, Unknown.

Two‑phase process: send a half message, execute local transaction, then commit or rollback; if the broker does not receive the final state, it will invoke a check callback.

Sending a transactional message:

public void sendTx(String topic, Long id, String tags) {
    rocketMQTemplate.sendMessageInTransaction(topic + ":" + tags,
        MessageBuilder.withPayload(new Users(id, UUID.randomUUID().toString().replaceAll("-", "")))
            .setHeader("BID", UUID.randomUUID().toString().replaceAll("-", ""))
            .build(),
        UUID.randomUUID().toString().replaceAll("-", ""));
}

Producer transaction listener:

@RocketMQTransactionListener
public class ProducerTxListener implements RocketMQLocalTransactionListener {
    @Resource
    private BusinessService bs;

    @Override
    public RocketMQLocalTransactionState executeLocalTransaction(Message msg, Object arg) {
        try {
            String id = (String) msg.getHeaders().get("BID");
            Users users = new JsonMapper().readValue((byte[]) msg.getPayload(), Users.class);
            System.out.println("消息内容:" + users + "\t参与数据:" + arg + "\t本次事务的唯一编号:" + id);
            bs.save(users, new UsersLog(users.getId(), id));
        } catch (Exception e) {
            e.printStackTrace();
            return RocketMQLocalTransactionState.ROLLBACK;
        }
        return RocketMQLocalTransactionState.COMMIT;
    }

    @Override
    public RocketMQLocalTransactionState checkLocalTransaction(Message msg) {
        String id = (String) msg.getHeaders().get("BID");
        System.out.println("执行查询ID为:" + id + " 的数据是否存在");
        UsersLog usersLog = bs.queryUsersLog(id);
        if (usersLog == null) {
            return RocketMQLocalTransactionState.ROLLBACK;
        }
        return RocketMQLocalTransactionState.COMMIT;
    }
}

Consumer for transactional messages:

@RocketMQMessageListener(topic = "tx-topic", consumerGroup = "consumer05-group", selectorExpression = "tag10")
@Component
public class ConsumerTxListener implements RocketMQListener<Users> {
    @Override
    public void onMessage(Users users) {
        System.out.println("TX接收到消息:" + users);
    }
}

Service with @Transactional:

@Transactional
public boolean save(Users users, UsersLog usersLog) {
    usersRepository.save(users);
    usersLogRepository.save(usersLog);
    if (users.getId() == 1) {
        throw new RuntimeException("数据错误");
    }
    return true;
}

public UsersLog queryUsersLog(String bid) {
    return usersLogRepository.findByBid(bid);
}

Controller endpoint to trigger the transaction:

@GetMapping("/tx/{id}")
public Object sendTx(@PathVariable("id") Long id) {
    ps.sendTx("tx-topic", id, "tag10");
    return "send transaction success";
}
Transaction test output
Transaction test output
Final transaction result
Final transaction result
Original Source

Signed-in readers can open the original source through BestHub's protected redirect.

Sign in to view source
Republication Notice

This article has been distilled and summarized from source material, then republished for learning and reference. If you believe it infringes your rights, please contactadmin@besthub.devand we will review it promptly.

JavatransactionBackend DevelopmentSpring BootRocketMQMessaging
Spring Full-Stack Practical Cases
Written by

Spring Full-Stack Practical Cases

Full-stack Java development with Vue 2/3 front-end suite; hands-on examples and source code analysis for Spring, Spring Boot 2/3, and Spring Cloud.

0 followers
Reader feedback

How this landed with the community

Sign in to like

Rate this article

Was this worth your time?

Sign in to rate
Discussion

0 Comments

Thoughtful readers leave field notes, pushback, and hard-won operational detail here.