How to Build a Low‑Intrusive Transactional Message System with Spring Boot and RabbitMQ

This article explains a lightweight, low‑intrusive transactional message solution for microservices using Spring Boot, RabbitMQ, and MySQL, covering design principles, table schemas, transaction synchronization, compensation mechanisms, code implementation, and deployment considerations to achieve eventual consistency without sacrificing performance.

Programmer DD
Programmer DD
Programmer DD
How to Build a Low‑Intrusive Transactional Message System with Spring Boot and RabbitMQ

Premise

Distributed transactions are a tricky problem in microservice architectures. The author adopts a compromise approach based on the local message table pattern originally proposed by eBay, using RabbitMQ and MySQL to implement a lightweight, low‑intrusive transactional message module.

Environment Dependencies

JDK 1.8+

spring-boot-starter-web 2.x.x

spring-boot-starter-jdbc 2.x.x

spring-boot-starter-amqp 2.x.x

HikariCP 3.x.x (included with spring-boot-starter-jdbc)

mysql-connector-java 5.1.48

redisson 3.12.1

Design Idea

Transactional messages are suitable for weak‑consistency (eventual consistency) scenarios, such as sending a marketing SMS after user registration or pushing an order for approval after it is saved. Strong‑consistency scenarios should not use transactional messages because strict synchronization adds extra overhead.

In short, once the upstream successfully pushes a correct message to RabbitMQ, its responsibility is considered fulfilled.

To keep the code non‑intrusive, the solution relies on Spring's programmatic or declarative transactions. Programmatic transactions use TransactionTemplate, while declarative transactions depend on the @Transactional annotation.

A custom transaction synchronizer ( TransactionSynchronization) is used to send the message after the transaction commits. The relevant callbacks are afterCommit() and afterCompletion(int), which are invoked after AbstractPlatformTransactionManager#doCommit().

@Transactional<br>public Dto businessMethod() {<br>    // business transaction code block ...<br>    // save transaction message<br>    saveTransactionMessageRecord();<br>    // register transaction synchronizer – push message to RabbitMQ in afterCommit()<br>    registerTransactionSynchronization();<br>    // more business logic ...<br>}

The save and registration steps can be placed anywhere inside the transactional method because their execution order does not affect the outcome.

Compensation

If the downstream service fails to consume a message, the upstream may need to retry pushing it. A special case occurs when the service restarts before TransactionSynchronization#afterCommit() executes. In such cases, the system marks the message as "processing" and retries using an exponential back‑off algorithm with a configurable maximum retry count.

while (retry) {<br>    try {<br>        rabbitTemplate.convertAndSend(...);<br>        markSuccess();<br>    } catch (Exception e) {<br>        markFail();<br>        // calculate next schedule time with exponential backoff<br>    }<br>}

Table Design

The module uses two MySQL tables to separate message metadata from large message content, reducing I/O during bulk queries.

CREATE TABLE `t_transactional_message` (<br>    id BIGINT UNSIGNED AUTO_INCREMENT PRIMARY KEY,<br>    create_time DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP,<br>    edit_time DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,<br>    creator VARCHAR(20) NOT NULL DEFAULT 'admin',<br>    editor VARCHAR(20) NOT NULL DEFAULT 'admin',<br>    deleted TINYINT NOT NULL DEFAULT 0,<br>    current_retry_times TINYINT NOT NULL DEFAULT 0 COMMENT '当前重试次数',<br>    max_retry_times TINYINT NOT NULL DEFAULT 5 COMMENT '最大重试次数',<br>    queue_name VARCHAR(255) NOT NULL COMMENT '队列名',<br>    exchange_name VARCHAR(255) NOT NULL COMMENT '交换器名',<br>    exchange_type VARCHAR(8) NOT NULL COMMENT '交换类型',<br>    routing_key VARCHAR(255) COMMENT '路由键',<br>    business_module VARCHAR(32) NOT NULL COMMENT '业务模块',<br>    business_key VARCHAR(255) NOT NULL COMMENT '业务键',<br>    next_schedule_time DATETIME NOT NULL COMMENT '下一次调度时间',<br>    message_status TINYINT NOT NULL DEFAULT 0 COMMENT '消息状态',<br>    init_backoff BIGINT UNSIGNED NOT NULL DEFAULT 10 COMMENT '退避初始化值,单位为秒',<br>    backoff_factor TINYINT NOT NULL DEFAULT 2 COMMENT '退避因子(也就是指数)',<br>    INDEX idx_queue_name (queue_name),<br>    INDEX idx_create_time (create_time),<br>    INDEX idx_next_schedule_time (next_schedule_time),<br>    INDEX idx_business_key (business_key)<br>) COMMENT '事务消息表';<br><br>CREATE TABLE `t_transactional_message_content` (<br>    id BIGINT UNSIGNED AUTO_INCREMENT PRIMARY KEY,<br>    message_id BIGINT UNSIGNED NOT NULL COMMENT '事务消息记录ID',<br>    content TEXT COMMENT '消息内容'<br>) COMMENT '事务消息内容表';

The fields business_module and business_key identify the business context (e.g., order number).

Code Design

Model classes:

@Data<br>public class TransactionalMessage {<br>    private Long id;<br>    private LocalDateTime createTime;<br>    private LocalDateTime editTime;<br>    private String creator;<br>    private String editor;<br>    private Integer deleted;<br>    private Integer currentRetryTimes;<br>    private Integer maxRetryTimes;<br>    private String queueName;<br>    private String exchangeName;<br>    private String exchangeType;<br>    private String routingKey;<br>    private String businessModule;<br>    private String businessKey;<br>    private LocalDateTime nextScheduleTime;<br>    private Integer messageStatus;<br>    private Long initBackoff;<br>    private Integer backoffFactor;<br>}<br><br>@Data<br>public class TransactionalMessageContent {<br>    private Long id;<br>    private Long messageId;<br>    private String content;<br>}

DAO interfaces (implementation omitted) and service interfaces are defined to abstract persistence and messaging operations.

public interface TransactionalMessageDao {<br>    void insertSelective(TransactionalMessage record);<br>    void updateStatusSelective(TransactionalMessage record);<br>    List<TransactionalMessage> queryPendingCompensationRecords(LocalDateTime min, LocalDateTime max, int limit);<br>}<br><br>public interface TransactionalMessageContentDao {<br>    void insert(TransactionalMessageContent record);<br>    List<TransactionalMessageContent> queryByMessageIds(String messageIds);<br>}<br><br>public interface TransactionalMessageService {<br>    void sendTransactionalMessage(Destination destination, TxMessage message);<br>}

Implementation of the service that registers a transaction synchronizer and persists the message:

@Slf4j<br>@Service<br>@RequiredArgsConstructor<br>public class RabbitTransactionalMessageService implements TransactionalMessageService {<br>    private final AmqpAdmin amqpAdmin;<br>    private final TransactionalMessageManagementService managementService;<br>    private static final ConcurrentMap<String, Boolean> QUEUE_ALREADY_DECLARE = new ConcurrentHashMap<>();<br><br>    @Override<br>    public void sendTransactionalMessage(Destination destination, TxMessage message) {<br>        String queueName = destination.queueName();<br>        String exchangeName = destination.exchangeName();<br>        String routingKey = destination.routingKey();<br>        ExchangeType exchangeType = destination.exchangeType();<br>        QUEUE_ALREADY_DECLARE.computeIfAbsent(queueName, k -> {<br>            Queue queue = new Queue(queueName);<br>            amqpAdmin.declareQueue(queue);<br>            Exchange exchange = new CustomExchange(exchangeName, exchangeType.getType());<br>            amqpAdmin.declareExchange(exchange);<br>            Binding binding = BindingBuilder.bind(queue).to(exchange).with(routingKey).noargs();<br>            amqpAdmin.declareBinding(binding);<br>            return true;<br>        });<br>        TransactionalMessage record = new TransactionalMessage();<br>        record.setQueueName(queueName);<br>        record.setExchangeName(exchangeName);<br>        record.setExchangeType(exchangeType.getType());<br>        record.setRoutingKey(routingKey);<br>        record.setBusinessModule(message.businessModule());<br>        record.setBusinessKey(message.businessKey());<br>        String content = message.content();<br>        managementService.saveTransactionalMessageRecord(record, content);<br>        TransactionSynchronizationManager.registerSynchronization(new TransactionSynchronizationAdapter() {<br>            @Override<br>            public void afterCommit() {<br>                managementService.sendMessageSync(record, content);<br>            }<br>        });<br>    }<br>}

Management service handling persistence, sending, success/failure marking, exponential back‑off calculation, and compensation processing:

@Slf4j<br>@RequiredArgsConstructor<br>@Service<br>public class TransactionalMessageManagementService {<br>    private final TransactionalMessageDao messageDao;<br>    private final TransactionalMessageContentDao contentDao;<br>    private final RabbitTemplate rabbitTemplate;<br>    private static final LocalDateTime END = LocalDateTime.of(2999,1,1,0,0,0);<br>    private static final long DEFAULT_INIT_BACKOFF = 10L;<br>    private static final int DEFAULT_BACKOFF_FACTOR = 2;<br>    private static final int DEFAULT_MAX_RETRY_TIMES = 5;<br>    private static final int LIMIT = 100;<br><br>    public void saveTransactionalMessageRecord(TransactionalMessage record, String content) {<br>        record.setMessageStatus(TxMessageStatus.PENDING.getStatus());<br>        record.setNextScheduleTime(calculateNextScheduleTime(LocalDateTime.now(), DEFAULT_INIT_BACKOFF, DEFAULT_BACKOFF_FACTOR, 0));<br>        record.setCurrentRetryTimes(0);<br>        record.setInitBackoff(DEFAULT_INIT_BACKOFF);<br>        record.setBackoffFactor(DEFAULT_BACKOFF_FACTOR);<br>        record.setMaxRetryTimes(DEFAULT_MAX_RETRY_TIMES);<br>        messageDao.insertSelective(record);<br>        TransactionalMessageContent mc = new TransactionalMessageContent();<br>        mc.setContent(content);<br>        mc.setMessageId(record.getId());<br>        contentDao.insert(mc);<br>    }<br><br>    public void sendMessageSync(TransactionalMessage record, String content) {<br>        try {<br>            rabbitTemplate.convertAndSend(record.getExchangeName(), record.getRoutingKey(), content);<br>            if (log.isDebugEnabled()) {<br>                log.debug("Message sent successfully, queue:{}, content:{}", record.getQueueName(), content);<br>            }<br>            markSuccess(record);<br>        } catch (Exception e) {<br>            markFail(record, e);<br>        }<br>    }<br><br>    private void markSuccess(TransactionalMessage record) {<br>        record.setNextScheduleTime(END);<br>        record.setCurrentRetryTimes(Math.min(record.getCurrentRetryTimes() + 1, record.getMaxRetryTimes()));<br>        record.setMessageStatus(TxMessageStatus.SUCCESS.getStatus());<br>        record.setEditTime(LocalDateTime.now());<br>        messageDao.updateStatusSelective(record);<br>    }<br><br>    private void markFail(TransactionalMessage record, Exception e) {<br>        log.error("Failed to send message to queue:{}", record.getQueueName(), e);<br>        record.setCurrentRetryTimes(Math.min(record.getCurrentRetryTimes() + 1, record.getMaxRetryTimes()));<br>        LocalDateTime next = calculateNextScheduleTime(record.getNextScheduleTime(), record.getInitBackoff(), record.getBackoffFactor(), record.getCurrentRetryTimes());<br>        record.setNextScheduleTime(next);<br>        record.setMessageStatus(TxMessageStatus.FAIL.getStatus());<br>        record.setEditTime(LocalDateTime.now());<br>        messageDao.updateStatusSelective(record);<br>    }<br><br>    private LocalDateTime calculateNextScheduleTime(LocalDateTime base, long initBackoff, long backoffFactor, long round) {<br>        double delta = initBackoff * Math.pow(backoffFactor, round);<br>        return base.plusSeconds((long) delta);<br>    }<br><br>    public void processPendingCompensationRecords() {<br>        LocalDateTime max = LocalDateTime.now().plusSeconds(-DEFAULT_INIT_BACKOFF);<br>        LocalDateTime min = max.plusHours(-1);<br>        Map<Long, TransactionalMessage> map = messageDao.queryPendingCompensationRecords(min, max, LIMIT)<br>                .stream().collect(Collectors.toMap(TransactionalMessage::getId, m -> m));<br>        if (!map.isEmpty()) {<br>            StringJoiner joiner = new StringJoiner(",", "(", ")");<br>            map.keySet().forEach(id -> joiner.add(id.toString()));<br>            contentDao.queryByMessageIds(joiner.toString())<br>                    .forEach(item -> sendMessageSync(map.get(item.getMessageId()), item.getContent()));<br>        }<br>    }<br>}<br>

Scheduling Configuration

A scheduled job runs every 10 seconds, acquiring a Redisson distributed lock to ensure only one instance processes compensation records.

@Slf4j<br>@RequiredArgsConstructor<br>@Configuration<br>@EnableScheduling<br>public class ScheduleJobAutoConfiguration {<br>    private final TransactionalMessageManagementService managementService;<br>    private final RedissonClient redisson = Redisson.create();<br><br>    @Scheduled(fixedDelay = 10000)<br>    public void transactionalMessageCompensationTask() throws Exception {<br>        RLock lock = redisson.getLock("transactionalMessageCompensationTask");<br>        boolean acquired = lock.tryLock(5, 300, TimeUnit.SECONDS);<br>        if (acquired) {<br>            try {<br>                long start = System.currentTimeMillis();<br>                log.info("Starting transactional message compensation task...");<br>                managementService.processPendingCompensationRecords();<br>                long elapsed = System.currentTimeMillis() - start;<br>                if (elapsed < 5000) {<br>                    Thread.sleep(5000 - elapsed);<br>                }<br>                log.info("Compensation task completed, elapsed:{} ms", elapsed);<br>            } finally {<br>                lock.unlock();<br>            }<br>        }<br>    }<br>}

Test Classes

Two test components demonstrate saving an order within a transaction and sending a transactional message.

@RequiredArgsConstructor<br>@Component<br>public class MockBusinessRunner implements CommandLineRunner {<br>    private final MockBusinessService mockBusinessService;<br>    @Override<br>    public void run(String... args) throws Exception {<br>        mockBusinessService.saveOrder();<br>    }<br>}<br><br>@Slf4j<br>@RequiredArgsConstructor<br>@Service<br>public class MockBusinessService {<br>    private final JdbcTemplate jdbcTemplate;<br>    private final TransactionalMessageService transactionalMessageService;<br>    private final ObjectMapper objectMapper;<br><br>    @Transactional(rollbackFor = Exception.class)<br>    public void saveOrder() throws Exception {<br>        String orderId = UUID.randomUUID().toString();<br>        BigDecimal amount = BigDecimal.valueOf(100L);<br>        Map<String, Object> msg = new HashMap<>();<br>        msg.put("orderId", orderId);<br>        msg.put("amount", amount);<br>        jdbcTemplate.update("INSERT INTO t_order(order_id,amount) VALUES (?,?)", ps -> {<br>            ps.setString(1, orderId);<br>            ps.setBigDecimal(2, amount);<br>        });<br>        String content = objectMapper.writeValueAsString(msg);<br>        transactionalMessageService.sendTransactionalMessage(<br>            DefaultDestination.builder()<br>                .exchangeName("tm.test.exchange")<br>                .queueName("tm.test.queue")<br>                .routingKey("tm.test.key")<br>                .exchangeType(ExchangeType.DIRECT)<br>                .build(),<br>            DefaultTxMessage.builder()<br>                .businessKey(orderId)<br>                .businessModule("SAVE_ORDER")<br>                .content(content)<br>                .build()<br>        );<br>        log.info("Order saved successfully: {}", orderId);<br>    }<br>}

Conclusion

The transactional message module provides a robust way to achieve eventual consistency in microservices by decoupling business operations from message delivery, using asynchronous processing to improve throughput, and offering compensation mechanisms for failure scenarios. It can be extended with monitoring (Micrometer, Prometheus, Grafana) and a management UI.

Original Source

Signed-in readers can open the original source through BestHub's protected redirect.

Sign in to view source
Republication Notice

This article has been distilled and summarized from source material, then republished for learning and reference. If you believe it infringes your rights, please contactadmin@besthub.devand we will review it promptly.

Distributed SystemsSpring BootmysqlRabbitMQTransactional Messaging
Programmer DD
Written by

Programmer DD

A tinkering programmer and author of "Spring Cloud Microservices in Action"

0 followers
Reader feedback

How this landed with the community

Sign in to like

Rate this article

Was this worth your time?

Sign in to rate
Discussion

0 Comments

Thoughtful readers leave field notes, pushback, and hard-won operational detail here.