How to Build a Low‑Intrusion Transactional Message System with Spring Boot and RabbitMQ

This article details a lightweight transactional message solution for microservices that stores pending messages in a local MySQL table, defers RabbitMQ publishing until after transaction commit, and includes compensation logic with exponential back‑off to ensure reliable asynchronous communication.

Java Backend Technology
Java Backend Technology
Java Backend Technology
How to Build a Low‑Intrusion Transactional Message System with Spring Boot and RabbitMQ

Prerequisite

Distributed transactions are a tricky problem in microservice practice; this solution adopts a compromise using a local message table inspired by eBay, built on RabbitMQ and MySQL, to provide a lightweight, low‑intrusion transactional message module.

Design Idea

Transaction messages are suitable for eventual consistency scenarios such as user registration notifications or order approval pushes. Strong consistency scenarios should generally avoid using transaction messages.

In short, the upstream service is considered done after successfully pushing a correct message to RabbitMQ.

To keep code non‑intrusive, the module relies on Spring's programmatic or declarative transactions (TransactionTemplate or @Transactional) and stores each pending message in a local table.

Logical Unit of Transaction Execution

The local business logic and the message record are saved within the same transaction. The actual RabbitMQ send is deferred to the afterCommit() callback so that both the transaction commit and the message send succeed together.

Compensation

If the consumer encounters an exception or the afterCommit() callback fails, the module records the failure and retries using an exponential back‑off algorithm, limiting the maximum number of retries.

Implementation

Core dependencies:

<properties>
    <spring.boot.version>2.2.4.RELEASE</spring.boot.version>
    <redisson.version>3.12.1</redisson.version>
    <mysql.connector.version>5.1.48</mysql.connector.version>
</properties>
<dependencyManagement>
    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-dependencies</artifactId>
            <version>${spring.boot.version}</version>
            <type>pom</type>
            <scope>import</scope>
        </dependency>
    </dependencies>
</dependencyManagement>
<dependencies>
    <dependency>
        <groupId>mysql</groupId>
        <artifactId>mysql-connector-java</artifactId>
        <version>${mysql.connector.version}</version>
    </dependency>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-web</artifactId>
    </dependency>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-jdbc</artifactId>
    </dependency>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-aop</artifactId>
    </dependency>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-amqp</artifactId>
    </dependency>
    <dependency>
        <groupId>org.redisson</groupId>
        <artifactId>redisson</artifactId>
        <version>${redisson.version}</version>
    </dependency>
</dependencies>

Database schema (MySQL) with two tables to separate metadata from payload:

CREATE TABLE `t_transactional_message` (
    id BIGINT UNSIGNED AUTO_INCREMENT PRIMARY KEY,
    create_time DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP,
    edit_time DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
    creator VARCHAR(20) NOT NULL DEFAULT 'admin',
    editor VARCHAR(20) NOT NULL DEFAULT 'admin',
    deleted TINYINT NOT NULL DEFAULT 0,
    current_retry_times TINYINT NOT NULL DEFAULT 0 COMMENT '当前重试次数',
    max_retry_times TINYINT NOT NULL DEFAULT 5 COMMENT '最大重试次数',
    queue_name VARCHAR(255) NOT NULL COMMENT '队列名',
    exchange_name VARCHAR(255) NOT NULL COMMENT '交换器名',
    exchange_type VARCHAR(8) NOT NULL COMMENT '交换类型',
    routing_key VARCHAR(255) COMMENT '路由键',
    business_module VARCHAR(32) NOT NULL COMMENT '业务模块',
    business_key VARCHAR(255) NOT NULL COMMENT '业务键',
    next_schedule_time DATETIME NOT NULL COMMENT '下一次调度时间',
    message_status TINYINT NOT NULL DEFAULT 0 COMMENT '消息状态',
    init_backoff BIGINT UNSIGNED NOT NULL DEFAULT 10 COMMENT '退避初始化值,单位为秒',
    backoff_factor TINYINT NOT NULL DEFAULT 2 COMMENT '退避因子(也就是指数)',
    INDEX idx_queue_name (queue_name),
    INDEX idx_create_time (create_time),
    INDEX idx_next_schedule_time (next_schedule_time),
    INDEX idx_business_key (business_key)
) COMMENT '事务消息表';

CREATE TABLE `t_transactional_message_content` (
    id BIGINT UNSIGNED AUTO_INCREMENT PRIMARY KEY,
    message_id BIGINT UNSIGNED NOT NULL COMMENT '事务消息记录ID',
    content TEXT COMMENT '消息内容'
) COMMENT '事务消息内容表';

Model classes using Lombok:

@Data
public class TransactionalMessage {
    private Long id;
    private LocalDateTime createTime;
    private LocalDateTime editTime;
    private String creator;
    private String editor;
    private Integer deleted;
    private Integer currentRetryTimes;
    private Integer maxRetryTimes;
    private String queueName;
    private String exchangeName;
    private String exchangeType;
    private String routingKey;
    private String businessModule;
    private String businessKey;
    private LocalDateTime nextScheduleTime;
    private Integer messageStatus;
    private Long initBackoff;
    private Integer backoffFactor;
}

@Data
public class TransactionalMessageContent {
    private Long id;
    private Long messageId;
    private String content;
}

DAO interfaces provide insert, update, and query operations:

public interface TransactionalMessageDao {
    void insertSelective(TransactionalMessage record);
    void updateStatusSelective(TransactionalMessage record);
    List<TransactionalMessage> queryPendingCompensationRecords(LocalDateTime minScheduleTime, LocalDateTime maxScheduleTime, int limit);
}

public interface TransactionalMessageContentDao {
    void insert(TransactionalMessageContent record);
    List<TransactionalMessageContent> queryByMessageIds(String messageIds);
}

Service interface for sending messages:

public interface TransactionalMessageService {
    void sendTransactionalMessage(Destination destination, TxMessage message);
}

Implementation registers a Spring TransactionSynchronization that sends the RabbitMQ message after the transaction commits:

@Service
public class RabbitTransactionalMessageService implements TransactionalMessageService {
    private final AmqpAdmin amqpAdmin;
    private final TransactionalMessageManagementService managementService;
    private static final ConcurrentMap<String, Boolean> QUEUE_ALREADY_DECLARE = new ConcurrentHashMap<>();

    @Override
    public void sendTransactionalMessage(Destination destination, TxMessage message) {
        String queueName = destination.queueName();
        String exchangeName = destination.exchangeName();
        String routingKey = destination.routingKey();
        ExchangeType exchangeType = destination.exchangeType();
        QUEUE_ALREADY_DECLARE.computeIfAbsent(queueName, k -> {
            Queue queue = new Queue(queueName);
            amqpAdmin.declareQueue(queue);
            Exchange exchange = new CustomExchange(exchangeName, exchangeType.getType());
            amqpAdmin.declareExchange(exchange);
            Binding binding = BindingBuilder.bind(queue).to(exchange).with(routingKey).noargs();
            amqpAdmin.declareBinding(binding);
            return true;
        });
        TransactionalMessage record = new TransactionalMessage();
        record.setQueueName(queueName);
        record.setExchangeName(exchangeName);
        record.setExchangeType(exchangeType.getType());
        record.setRoutingKey(routingKey);
        record.setBusinessModule(message.businessModule());
        record.setBusinessKey(message.businessKey());
        String content = message.content();
        managementService.saveTransactionalMessageRecord(record, content);
        TransactionSynchronizationManager.registerSynchronization(new TransactionSynchronizationAdapter() {
            @Override
            public void afterCommit() {
                managementService.sendMessageSync(record, content);
            }
        });
    }
}

Management service handles persistence, success/failure marking, exponential back‑off calculation, and a compensation task that retries pending messages:

@Service
public class TransactionalMessageManagementService {
    private final TransactionalMessageDao messageDao;
    private final TransactionalMessageContentDao contentDao;
    private final RabbitTemplate rabbitTemplate;
    private static final LocalDateTime END = LocalDateTime.of(2999, 1, 1, 0, 0, 0);
    private static final long DEFAULT_INIT_BACKOFF = 10L;
    private static final int DEFAULT_BACKOFF_FACTOR = 2;
    private static final int DEFAULT_MAX_RETRY_TIMES = 5;
    private static final int LIMIT = 100;

    public void saveTransactionalMessageRecord(TransactionalMessage record, String content) {
        record.setMessageStatus(TxMessageStatus.PENDING.getStatus());
        record.setNextScheduleTime(calculateNextScheduleTime(LocalDateTime.now(), DEFAULT_INIT_BACKOFF, DEFAULT_BACKOFF_FACTOR, 0));
        record.setCurrentRetryTimes(0);
        record.setInitBackoff(DEFAULT_INIT_BACKOFF);
        record.setBackoffFactor(DEFAULT_BACKOFF_FACTOR);
        record.setMaxRetryTimes(DEFAULT_MAX_RETRY_TIMES);
        messageDao.insertSelective(record);
        TransactionalMessageContent messageContent = new TransactionalMessageContent();
        messageContent.setContent(content);
        messageContent.setMessageId(record.getId());
        contentDao.insert(messageContent);
    }

    public void sendMessageSync(TransactionalMessage record, String content) {
        try {
            rabbitTemplate.convertAndSend(record.getExchangeName(), record.getRoutingKey(), content);
            if (log.isDebugEnabled()) {
                log.debug("发送消息成功,目标队列:{},消息内容:{}", record.getQueueName(), content);
            }
            markSuccess(record);
        } catch (Exception e) {
            markFail(record, e);
        }
    }

    private void markSuccess(TransactionalMessage record) {
        record.setNextScheduleTime(END);
        record.setCurrentRetryTimes(Math.min(record.getMaxRetryTimes(), record.getCurrentRetryTimes() + 1));
        record.setMessageStatus(TxMessageStatus.SUCCESS.getStatus());
        record.setEditTime(LocalDateTime.now());
        messageDao.updateStatusSelective(record);
    }

    private void markFail(TransactionalMessage record, Exception e) {
        log.error("发送消息失败,目标队列:{}", record.getQueueName(), e);
        record.setCurrentRetryTimes(Math.min(record.getMaxRetryTimes(), record.getCurrentRetryTimes() + 1));
        LocalDateTime nextScheduleTime = calculateNextScheduleTime(
                record.getNextScheduleTime(),
                record.getInitBackoff(),
                record.getBackoffFactor(),
                record.getCurrentRetryTimes()
        );
        record.setNextScheduleTime(nextScheduleTime);
        record.setMessageStatus(TxMessageStatus.FAIL.getStatus());
        record.setEditTime(LocalDateTime.now());
        messageDao.updateStatusSelective(record);
    }

    private LocalDateTime calculateNextScheduleTime(LocalDateTime base, long initBackoff, long backoffFactor, long round) {
        double delta = initBackoff * Math.pow(backoffFactor, round);
        return base.plusSeconds((long) delta);
    }

    public void processPendingCompensationRecords() {
        LocalDateTime max = LocalDateTime.now().plusSeconds(-DEFAULT_INIT_BACKOFF);
        LocalDateTime min = max.plusHours(-1);
        Map<Long, TransactionalMessage> collect = messageDao.queryPendingCompensationRecords(min, max, LIMIT)
                .stream()
                .collect(Collectors.toMap(TransactionalMessage::getId, x -> x));
        if (!collect.isEmpty()) {
            StringJoiner joiner = new StringJoiner(",", "(", ")");
            collect.keySet().forEach(id -> joiner.add(id.toString()));
            contentDao.queryByMessageIds(joiner.toString())
                    .forEach(item -> {
                        TransactionalMessage msg = collect.get(item.getMessageId());
                        sendMessageSync(msg, item.getContent());
                    });
        }
    }
}

A scheduled job with a Redisson distributed lock runs the compensation task every 10 seconds:

@Configuration
@EnableScheduling
public class ScheduleJobAutoConfiguration {
    private final TransactionalMessageManagementService managementService;
    private final RedissonClient redisson = Redisson.create();

    @Scheduled(fixedDelay = 10000)
    public void transactionalMessageCompensationTask() throws Exception {
        RLock lock = redisson.getLock("transactionalMessageCompensationTask");
        boolean tryLock = lock.tryLock(5, 300, TimeUnit.SECONDS);
        if (tryLock) {
            try {
                long start = System.currentTimeMillis();
                log.info("开始执行事务消息推送补偿定时任务...");
                managementService.processPendingCompensationRecords();
                long end = System.currentTimeMillis();
                long delta = end - start;
                if (delta < 5000) {
                    Thread.sleep(5000 - delta);
                }
                log.info("执行事务消息推送补偿定时任务完毕,耗时:{} ms...", end - start);
            } finally {
                lock.unlock();
            }
        }
    }
}

Demo includes a mock business service that saves an order in MySQL and sends a transactional message:

@Service
public class MockBusinessService {
    private final JdbcTemplate jdbcTemplate;
    private final TransactionalMessageService transactionalMessageService;
    private final ObjectMapper objectMapper;

    @Transactional(rollbackFor = Exception.class)
    public void saveOrder() throws Exception {
        String orderId = UUID.randomUUID().toString();
        BigDecimal amount = BigDecimal.valueOf(100L);
        Map<String, Object> message = new HashMap<>();
        message.put("orderId", orderId);
        message.put("amount", amount);
        jdbcTemplate.update("INSERT INTO t_order(order_id,amount) VALUES (?,?)", ps -> {
            ps.setString(1, orderId);
            ps.setBigDecimal(2, amount);
        });
        String content = objectMapper.writeValueAsString(message);
        transactionalMessageService.sendTransactionalMessage(
                DefaultDestination.builder()
                        .exchangeName("tm.test.exchange")
                        .queueName("tm.test.queue")
                        .routingKey("tm.test.key")
                        .exchangeType(ExchangeType.DIRECT)
                        .build(),
                DefaultTxMessage.builder()
                        .businessKey(orderId)
                        .businessModule("SAVE_ORDER")
                        .content(content)
                        .build()
        );
        log.info("保存订单:{}成功...", orderId);
    }
}

Conclusion

The transactional message module makes asynchronous message publishing reliable, improves system throughput, and can be extended with monitoring tools such as Micrometer, Prometheus, and Grafana for real‑time metrics.

Original Source

Signed-in readers can open the original source through BestHub's protected redirect.

Sign in to view source
Republication Notice

This article has been distilled and summarized from source material, then republished for learning and reference. If you believe it infringes your rights, please contactadmin@besthub.devand we will review it promptly.

Distributed SystemsSpring BootmysqlRabbitMQCompensationTransactional Messaging
Java Backend Technology
Written by

Java Backend Technology

Focus on Java-related technologies: SSM, Spring ecosystem, microservices, MySQL, MyCat, clustering, distributed systems, middleware, Linux, networking, multithreading. Occasionally cover DevOps tools like Jenkins, Nexus, Docker, and ELK. Also share technical insights from time to time, committed to Java full-stack development!

0 followers
Reader feedback

How this landed with the community

Sign in to like

Rate this article

Was this worth your time?

Sign in to rate
Discussion

0 Comments

Thoughtful readers leave field notes, pushback, and hard-won operational detail here.