Mastering SpringBoot Transactional Messaging for Distributed Consistency

This article explains how SpringBoot handles transactional messaging to achieve distributed data consistency, covering the concept of transaction messages, CAP theory, final consistency, and three practical implementations using a local message table, Kafka transactions, and RocketMQ, plus idempotency and compensation strategies.

Java Tech Workshop
Java Tech Workshop
Java Tech Workshop
Mastering SpringBoot Transactional Messaging for Distributed Consistency

1. What Is a Transaction Message?

1.1 Concept

Transaction message ensures atomicity between message sending and the local transaction: either both succeed or both fail.

Both the local transaction and the message are sent successfully.

Both fail, avoiding a half‑successful state.

Real‑world analogy: buying groceries – you must take the item from the shelf (local transaction) and pay (message). If payment fails, the item is returned, preserving atomicity.

1.2 Why Transaction Messages?

Problem: Update database first, message sending fails → data inconsistency. Solution: Use transaction messages.

Problem: Send message first, database update fails → duplicate consumption. Solution: Use transaction messages.

Problem: Network partition causes message loss → business interruption. Solution: Use transaction messages with retry.

2. CAP Theory in Distributed Transactions

2.1 Overview

In distributed systems you cannot simultaneously guarantee consistency, availability, and partition tolerance.

Consistency : all nodes see the same data.

Availability : the system remains reachable.

Partition tolerance : the system continues operating despite network partitions.

2.2 Final Consistency

Final consistency is the most common consistency model in distributed systems. Data may be temporarily inconsistent but will eventually converge to a consistent state, typically achieved through asynchronous message passing.

Allows temporary divergence.

Eventually reaches a consistent state.

Implemented via asynchronous messaging.

3. SpringBoot Transaction Message Implementation Options

3.1 Option 1: Local Message Table Pattern

Core idea: Create a local message table to record pending messages and retry sending them with a scheduled task.

Implementation steps:

Create the message table.

CREATE TABLE `message_record` (
    `id` BIGINT AUTO_INCREMENT PRIMARY KEY,
    `business_key` VARCHAR(64) NOT NULL COMMENT '业务ID',
    `topic` VARCHAR(128) NOT NULL COMMENT '消息主题',
    `message` TEXT NOT NULL COMMENT '消息内容',
    `status` TINYINT NOT NULL DEFAULT 0 COMMENT '状态:0-待发送 1-已发送 2-失败',
    `retry_count` INT DEFAULT 0 COMMENT '重试次数',
    `create_time` DATETIME DEFAULT CURRENT_TIMESTAMP,
    `update_time` DATETIME DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
    UNIQUE KEY `uk_business_key` (`business_key`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;

Create the message entity.

@Data
@Entity
@Table(name = "message_record")
public class MessageRecord {
    @Id
    @GeneratedValue(strategy = GenerationType.IDENTITY)
    private Long id;
    @Column(name = "business_key", nullable = false, unique = true)
    private String businessKey;
    @Column(nullable = false)
    private String topic;
    @Column(columnDefinition = "TEXT", nullable = false)
    private String message;
    @Column(nullable = false)
    private Integer status = 0; // 0: pending
    @Column(name = "retry_count")
    private Integer retryCount = 0;
    @Column(name = "create_time")
    private LocalDateTime createTime;
    @Column(name = "update_time")
    private LocalDateTime updateTime;
}

Send the message within the local transaction.

@Service
public class OrderService {
    private final OrderRepository orderRepository;
    private final MessageRecordRepository messageRecordRepository;
    private final KafkaTemplate<String, String> kafkaTemplate;

    @Transactional
    public void createOrder(Order order) {
        orderRepository.save(order);
        MessageRecord record = new MessageRecord();
        record.setBusinessKey(order.getId());
        record.setTopic("order-topic");
        record.setMessage(JSON.toJSONString(order));
        record.setStatus(0);
        messageRecordRepository.save(record);
        try {
            kafkaTemplate.send(record.getTopic(), record.getMessage()).get();
            record.setStatus(1);
            messageRecordRepository.save(record);
        } catch (Exception e) {
            // keep status 0 for retry
        }
    }
}

Scheduled retry task.

@Component
public class MessageRetryScheduler {
    private final MessageRecordRepository messageRecordRepository;
    private final KafkaTemplate<String, String> kafkaTemplate;
    private static final int MAX_RETRY_COUNT = 3;

    @Scheduled(fixedRate = 60000)
    public void retryFailedMessages() {
        List<MessageRecord> records = messageRecordRepository.findByStatusAndRetryCountLessThan(0, MAX_RETRY_COUNT);
        for (MessageRecord record : records) {
            try {
                kafkaTemplate.send(record.getTopic(), record.getMessage()).get();
                record.setStatus(1);
                log.info("Message retry succeeded: {}", record.getBusinessKey());
            } catch (Exception e) {
                record.setRetryCount(record.getRetryCount() + 1);
                if (record.getRetryCount() >= MAX_RETRY_COUNT) {
                    record.setStatus(2);
                    log.error("Message retry {} times failed: {}", MAX_RETRY_COUNT, record.getBusinessKey());
                }
            }
            messageRecordRepository.save(record);
        }
    }
}

3.2 Option 2: Kafka Transactional Messages

Core idea: Leverage Kafka's native transaction support so that message sending and the local DB operation share the same transaction.

Steps:

Configure Kafka transaction in application.yml.

spring:
  kafka:
    bootstrap-servers: localhost:9092
    producer:
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: org.springframework.kafka.support.serializer.JsonSerializer
      acks: all
      retries: 3
      transaction-id-prefix: tx-
    consumer:
      group-id: order-consumer-group
      enable-auto-commit: false

Define a Kafka transaction manager.

@Configuration
public class KafkaTransactionConfig {
    @Bean
    public KafkaTransactionManager<String, OrderMessage> kafkaTransactionManager(
            ProducerFactory<String, OrderMessage> producerFactory) {
        return new KafkaTransactionManager<>(producerFactory);
    }
}

Use @Transactional with the Kafka transaction manager.

@Service
public class OrderTransactionalService {
    private final OrderRepository orderRepository;
    private final KafkaTemplate<String, OrderMessage> kafkaTemplate;
    private static final String ORDER_TOPIC = "order-topic";

    @Transactional(transactionManager = "kafkaTransactionManager")
    public void createOrderTransactional(Order order) {
        orderRepository.save(order);
        OrderMessage message = OrderMessage.builder()
                .orderId(order.getId())
                .userId(order.getUserId())
                .amount(order.getAmount())
                .createTime(LocalDateTime.now())
                .build();
        kafkaTemplate.send(ORDER_TOPIC, message);
        log.info("Order created and message sent: {}", order.getId());
    }
}

3.3 Option 3: RocketMQ Transactional Messages

Core idea: RocketMQ provides a “half‑message” mechanism to achieve distributed transaction consistency.

Execution flow:

Send a half message.

Execute the local transaction.

Commit or roll back the message based on the transaction result.

If the broker does not receive a commit/rollback within a timeout, it actively checks the transaction status.

Key code snippets:

<dependency>
    <groupId>org.apache.rocketmq</groupId>
    <artifactId>rocketmq-spring-boot-starter</artifactId>
    <version>2.2.3</version>
</dependency>
rocketmq:
  name-server: localhost:9876
  producer:
    group: order-producer-group
@Component
public class OrderTransactionListener implements RocketMQLocalTransactionListener {
    private final OrderRepository orderRepository;

    @Override
    public RocketMQLocalTransactionState executeLocalTransaction(Message message, Object arg) {
        try {
            Order order = JSON.parseObject(new String(message.getBody()), Order.class);
            orderRepository.save(order);
            return RocketMQLocalTransactionState.COMMIT;
        } catch (Exception e) {
            return RocketMQLocalTransactionState.ROLLBACK;
        }
    }

    @Override
    public RocketMQLocalTransactionState checkLocalTransaction(Message message) {
        try {
            Order order = JSON.parseObject(new String(message.getBody()), Order.class);
            return orderRepository.existsById(order.getId())
                    ? RocketMQLocalTransactionState.COMMIT
                    : RocketMQLocalTransactionState.ROLLBACK;
        } catch (Exception e) {
            return RocketMQLocalTransactionState.UNKNOWN;
        }
    }
}
@Service
public class RocketMQTransactionService {
    @Autowired
    private RocketMQTemplate rocketMQTemplate;
    private static final String ORDER_TOPIC = "order-topic";

    public void sendTransactionMessage(Order order) {
        String body = JSON.toJSONString(order);
        rocketMQTemplate.sendMessageInTransaction(
                ORDER_TOPIC,
                MessageBuilder.withPayload(body).build(),
                order);
        log.info("Transactional message sent: {}", order.getId());
    }
}

4. Strategies for Final Consistency

4.1 Message Idempotency

Idempotency means that executing the same operation multiple times yields the same result. An example implementation for stock deduction checks whether the order has already been processed before updating inventory and records a processing entry.

@Service
public class InventoryService {
    private final InventoryRepository inventoryRepository;

    @Transactional
    public void deductStock(String orderId, String productId, int quantity) {
        if (inventoryRepository.existsByOrderId(orderId)) {
            log.info("Order {} already processed, skipping", orderId);
            return;
        }
        Inventory inventory = inventoryRepository.findByProductId(productId)
                .orElseThrow(() -> new RuntimeException("Inventory not found"));
        if (inventory.getQuantity() < quantity) {
            throw new RuntimeException("Insufficient stock");
        }
        inventory.setQuantity(inventory.getQuantity() - quantity);
        inventoryRepository.save(inventory);
        InventoryRecord record = new InventoryRecord();
        record.setOrderId(orderId);
        record.setProductId(productId);
        record.setQuantity(quantity);
        inventoryRepository.save(record);
        log.info("Stock deducted: {} - {}", productId, quantity);
    }
}

4.2 Compensation Mechanism

Compensation periodically scans business tables for unfinished operations and either retries the message or executes corrective logic.

@Component
public class CompensationScheduler {
    private final OrderRepository orderRepository;
    private final KafkaTemplate<String, OrderMessage> kafkaTemplate;

    @Scheduled(fixedRate = 300000)
    public void compensateUnfinishedOrders() {
        LocalDateTime threshold = LocalDateTime.now().minusMinutes(10);
        List<Order> orders = orderRepository.findByStatusAndCreateTimeBefore(OrderStatus.CREATED, threshold);
        for (Order order : orders) {
            try {
                OrderMessage message = OrderMessage.builder()
                        .orderId(order.getId())
                        .userId(order.getUserId())
                        .amount(order.getAmount())
                        .createTime(order.getCreateTime())
                        .build();
                kafkaTemplate.send("order-topic", message).get();
                log.info("Compensation message sent: {}", order.getId());
            } catch (Exception e) {
                log.error("Compensation message failed: {}", order.getId());
            }
        }
    }
}

5. End‑to‑End Order Creation Case Study

Flow diagram (textual representation):

用户下单 → 创建订单事务 → 发送消息 → 库存服务消费 → 更新订单状态
    ↓            ↓            ↓            ↓            ↓
  成功        成功/失败    成功/失败    成功/失败    成功/失败
    ↓            ↓            ↓            ↓            ↓
  返回成功   回滚/重试   重试/死信   重试/死信   完成

Key code snippets include the order controller, transactional order service, inventory consumer, and the complete order creation flow.

@RestController
@RequestMapping("/api/orders")
public class OrderController {
    private final OrderTransactionalService transactionalService;

    @PostMapping
    public ResponseEntity<String> createOrder(@RequestBody OrderDTO dto) {
        Order order = Order.builder()
                .id(UUID.randomUUID().toString())
                .userId(dto.getUserId())
                .productId(dto.getProductId())
                .amount(dto.getAmount())
                .quantity(dto.getQuantity())
                .status(OrderStatus.CREATED)
                .createTime(LocalDateTime.now())
                .build();
        transactionalService.createOrderTransactional(order);
        return ResponseEntity.ok("订单创建成功");
    }
}
@Component
public class OrderConsumer {
    private final InventoryService inventoryService;
    private final OrderService orderService;

    @KafkaListener(topics = "order-topic", groupId = "inventory-group")
    public void consumeOrderMessage(@Payload OrderMessage message, Acknowledgment ack) {
        try {
            inventoryService.deductStock(message.getOrderId(), "product-id", 1);
            orderService.updateStatus(message.getOrderId(), OrderStatus.PAID);
            ack.acknowledge();
            log.info("订单处理完成: {}", message.getOrderId());
        } catch (Exception e) {
            log.error("订单处理失败: {}", message.getOrderId());
            // message will be redelivered
        }
    }
}

The article concludes with a checklist of what was covered: transaction message concepts, CAP theory and final consistency, three implementation schemes, idempotency, compensation, and a complete end‑to‑end order creation example.

Original Source

Signed-in readers can open the original source through BestHub's protected redirect.

Sign in to view source
Republication Notice

This article has been distilled and summarized from source material, then republished for learning and reference. If you believe it infringes your rights, please contactadmin@besthub.devand we will review it promptly.

CAP theoremdistributed-consistencyKafkarocketmqidempotencySpringBootTransactional Messagingfinal-consistency
Java Tech Workshop
Written by

Java Tech Workshop

Focused on Java backend technologies, sharing fundamentals, multithreading, JVM, the Spring ecosystem, microservices, distributed systems, high concurrency, source‑code analysis, and practical experience. Continuously delivers high‑quality original content, interview guides, and learning roadmaps to help Java developers progress from beginner to advanced, enhancing technical skills and core competitiveness.

0 followers
Reader feedback

How this landed with the community

Sign in to like

Rate this article

Was this worth your time?

Sign in to rate
Discussion

0 Comments

Thoughtful readers leave field notes, pushback, and hard-won operational detail here.