Design and Implementation of a Transactional Message Module for Distributed Systems Using Spring and RabbitMQ

This article details a lightweight, low‑intrusion transactional message solution for microservices, covering design principles, database schema, Spring‑based implementation with RabbitMQ integration, compensation mechanisms, scheduling, and testing, illustrating how to achieve reliable asynchronous messaging while maintaining eventual consistency.

Big Data Technology & Architecture
Big Data Technology & Architecture
Big Data Technology & Architecture
Design and Implementation of a Transactional Message Module for Distributed Systems Using Spring and RabbitMQ

The article introduces a transactional message approach for microservice architectures, emphasizing a compromise solution that avoids strong consistency by using a local message table pattern originally proposed by eBay, combined with RabbitMQ and MySQL.

Design Principles focus on eventual consistency scenarios such as user registration notifications and order approval messages, while discouraging use in strong consistency cases. The solution binds business logic and message record persistence within the same transaction, ensuring that message records are saved before the transaction commits.

Message sending is deferred to the afterCommit() callback of Spring's TransactionSynchronization, guaranteeing that the message is only dispatched after the transaction succeeds.

Database Schema consists of two tables: t_transactional_message for metadata and t_transactional_message_content for the actual payload, reducing I/O pressure on the database during bulk queries.

CREATE TABLE `t_transactional_message` (
    id BIGINT UNSIGNED AUTO_INCREMENT PRIMARY KEY,
    create_time DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP,
    edit_time DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
    creator VARCHAR(20) NOT NULL DEFAULT 'admin',
    editor VARCHAR(20) NOT NULL DEFAULT 'admin',
    deleted TINYINT NOT NULL DEFAULT 0,
    current_retry_times TINYINT NOT NULL DEFAULT 0 COMMENT '当前重试次数',
    max_retry_times TINYINT NOT NULL DEFAULT 5 COMMENT '最大重试次数',
    queue_name VARCHAR(255) NOT NULL COMMENT '队列名',
    exchange_name VARCHAR(255) NOT NULL COMMENT '交换器名',
    exchange_type VARCHAR(8) NOT NULL COMMENT '交换类型',
    routing_key VARCHAR(255) COMMENT '路由键',
    business_module VARCHAR(32) NOT NULL COMMENT '业务模块',
    business_key VARCHAR(255) NOT NULL COMMENT '业务键',
    next_schedule_time DATETIME NOT NULL COMMENT '下一次调度时间',
    message_status TINYINT NOT NULL DEFAULT 0 COMMENT '消息状态',
    init_backoff BIGINT UNSIGNED NOT NULL DEFAULT 10 COMMENT '退避初始化值,单位为秒',
    backoff_factor TINYINT NOT NULL DEFAULT 2 COMMENT '退避因子(也就是指数)',
    INDEX idx_queue_name (queue_name),
    INDEX idx_create_time (create_time),
    INDEX idx_next_schedule_time (next_schedule_time),
    INDEX idx_business_key (business_key)
) COMMENT '事务消息表';

CREATE TABLE `t_transactional_message_content` (
    id BIGINT UNSIGNED AUTO_INCREMENT PRIMARY KEY,
    message_id BIGINT UNSIGNED NOT NULL COMMENT '事务消息记录ID',
    content TEXT COMMENT '消息内容'
) COMMENT '事务消息内容表';

Entity Classes are simple POJOs annotated with Lombok's @Data:

@Data
public class TransactionalMessage {
    private Long id;
    private LocalDateTime createTime;
    private LocalDateTime editTime;
    private String creator;
    private String editor;
    private Integer deleted;
    private Integer currentRetryTimes;
    private Integer maxRetryTimes;
    private String queueName;
    private String exchangeName;
    private String exchangeType;
    private String routingKey;
    private String businessModule;
    private String businessKey;
    private LocalDateTime nextScheduleTime;
    private Integer messageStatus;
    private Long initBackoff;
    private Integer backoffFactor;
}

@Data
public class TransactionalMessageContent {
    private Long id;
    private Long messageId;
    private String content;
}

The TransactionalMessageService interface defines a single method to send a transactional message. Its implementation RabbitTransactionalMessageService registers a Spring transaction synchronizer to push the message after commit and ensures idempotent queue declaration using a concurrent map.

@Service
@RequiredArgsConstructor
public class RabbitTransactionalMessageService implements TransactionalMessageService {
    private final AmqpAdmin amqpAdmin;
    private final TransactionalMessageManagementService managementService;
    private static final ConcurrentMap<String, Boolean> QUEUE_ALREADY_DECLARE = new ConcurrentHashMap<>();

    @Override
    public void sendTransactionalMessage(Destination destination, TxMessage message) {
        // pre‑declare queue/exchange atomically
        QUEUE_ALREADY_DECLARE.computeIfAbsent(destination.queueName(), k -> {
            Queue queue = new Queue(k);
            amqpAdmin.declareQueue(queue);
            Exchange exchange = new CustomExchange(destination.exchangeName(), destination.exchangeType().getType());
            amqpAdmin.declareExchange(exchange);
            Binding binding = BindingBuilder.bind(queue).to(exchange).with(destination.routingKey()).noargs();
            amqpAdmin.declareBinding(binding);
            return true;
        });
        // persist message record and content
        TransactionalMessage record = new TransactionalMessage();
        record.setQueueName(destination.queueName());
        record.setExchangeName(destination.exchangeName());
        record.setExchangeType(destination.exchangeType().getType());
        record.setRoutingKey(destination.routingKey());
        record.setBusinessModule(message.businessModule());
        record.setBusinessKey(message.businessKey());
        String content = message.content();
        managementService.saveTransactionalMessageRecord(record, content);
        // register synchronizer to send after commit
        TransactionSynchronizationManager.registerSynchronization(new TransactionSynchronizationAdapter() {
            @Override
            public void afterCommit() {
                managementService.sendMessageSync(record, content);
            }
        });
    }
}

The management service handles persistence, status updates, retry logic with exponential back‑off, and a compensation task that periodically re‑sends failed messages. The compensation task is scheduled with Spring @Scheduled and guarded by a Redisson distributed lock to avoid concurrent executions.

@Configuration
@EnableScheduling
@RequiredArgsConstructor
public class ScheduleJobAutoConfiguration {
    private final TransactionalMessageManagementService managementService;
    private final RedissonClient redisson = Redisson.create();

    @Scheduled(fixedDelay = 10000)
    public void transactionalMessageCompensationTask() throws Exception {
        RLock lock = redisson.getLock("transactionalMessageCompensationTask");
        if (lock.tryLock(5, 300, TimeUnit.SECONDS)) {
            try {
                long start = System.currentTimeMillis();
                log.info("开始执行事务消息推送补偿定时任务...");
                managementService.processPendingCompensationRecords();
                long delta = System.currentTimeMillis() - start;
                if (delta < 5000) {
                    Thread.sleep(5000 - delta);
                }
                log.info("执行事务消息推送补偿定时任务完毕,耗时:{} ms...", delta);
            } finally {
                lock.unlock();
            }
        }
    }
}

A mock business service demonstrates usage: within a @Transactional method an order is inserted via JdbcTemplate, a JSON payload is built, and TransactionalMessageService.sendTransactionalMessage is called with a DefaultDestination and DefaultTxMessage. The log shows successful order persistence and asynchronous message delivery.

In summary, the article presents a complete, low‑impact transactional messaging framework for microservices, covering design rationale, database layout, Spring‑based code, compensation handling, and scheduling, enabling reliable asynchronous communication while preserving eventual consistency.

Original Source

Signed-in readers can open the original source through BestHub's protected redirect.

Sign in to view source
Republication Notice

This article has been distilled and summarized from source material, then republished for learning and reference. If you believe it infringes your rights, please contactadmin@besthub.devand we will review it promptly.

MicroservicesspringAsynchronousRabbitMQCompensation
Big Data Technology & Architecture
Written by

Big Data Technology & Architecture

Wang Zhiwu, a big data expert, dedicated to sharing big data technology.

0 followers
Reader feedback

How this landed with the community

Sign in to like

Rate this article

Was this worth your time?

Sign in to rate
Discussion

0 Comments

Thoughtful readers leave field notes, pushback, and hard-won operational detail here.