Implementing Distributed Transactions with RabbitMQ and Spring Boot

This article explains how to use RabbitMQ as a message middleware in a Spring Boot application to achieve distributed transactions between order and dispatch services, detailing environment setup, configuration, code implementation, testing, and verification of data consistency across systems.

Top Architect
Top Architect
Top Architect
Implementing Distributed Transactions with RabbitMQ and Spring Boot

The article demonstrates how to achieve distributed transactions across multiple systems using RabbitMQ as a message middleware, ensuring data consistency.

Experiment environment : CentOS 7.5, RabbitMQ, IDEA.

Purpose : Use RabbitMQ to implement distributed transaction between order and dispatch services.

Solution : Order service acts as a producer, dispatch service as a consumer; messages are sent with confirm and manual ack mechanisms.

Implementation steps include creating an exchange and queue, defining tables (order, message status, dispatch), configuring Spring Boot dependencies and application.yml, and writing Java code for order creation, message sending, local status recording, and consumer processing with manual ack and nack handling.

Key code snippets :

<dependency>
  <groupId>org.springframework.boot</groupId>
  <artifactId>spring-boot-starter-web</artifactId>
</dependency>
<!-- rabbitmq -->
<dependency>
  <groupId>org.springframework.boot</groupId>
  <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<!-- mysql -->
<dependency>
  <groupId>mysql</groupId>
  <artifactId>mysql-connector-java</artifactId>
  <scope>runtime</scope>
</dependency>
<!-- lombok -->
<dependency>
  <groupId>org.projectlombok</groupId>
  <artifactId>lombok</artifactId>
  <optional>true</optional>
</dependency>
<!-- jdbc -->
<dependency>
  <groupId>org.springframework.boot</groupId>
  <artifactId>spring-boot-starter-jdbc</artifactId>
</dependency>
<!-- fastjson -->
<dependency>
  <groupId>com.alibaba</groupId>
  <artifactId>fastjson</artifactId>
  <version>1.2.17</version>
</dependency>
server:
  port: 8080

spring:
  datasource:
    driver-class-name: com.mysql.cj.jdbc.Driver
    url: jdbc:mysql://localhost:3306/test?useUnicode=true&characterEncoding=UTF-8&useSSL=false&serverTimezone=GMT%2B8&allowMultiQueries=true
    username: root
    password: root123
  rabbitmq:
    host: localhost
    port: 5672
    username: admin
    password: admin123
    virtual-host: /
    publisher-confirm-type: correlated
    publisher-returns: true
    listener:
      simple:
        acknowledge-mode: MANUAL
public class OrderService {
    @Autowired
    private JdbcTemplate jdbcTemplate;
    @Autowired
    private RabbitTemplate rabbitTemplate;

    @PostConstruct
    public void setup() {
        rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
            @Override
            public void confirm(CorrelationData correlationData, boolean ack, String cause) {
                if (!ack) {
                    return;
                }
                try {
                    String sql = "update tb_msgstatus set status = 1 where msgid = ?";
                    int count = jdbcTemplate.update(sql, correlationData.getId());
                    if (count != 1) {
                        log.warn("本地消息表状态修改失败");
                    }
                } catch (Exception e) {
                    log.warn("本息消息表状态修改异常", e);
                }
            }
        });
    }

    public void createOrder(JSONObject order) throws Exception {
        saveOrder(order);
        sendMsg(order);
    }

    private void sendMsg(JSONObject order) {
        rabbitTemplate.convertAndSend("orderExchange", "", order.toJSONString(),
                new CorrelationData((String) order.get("orderid")));
    }

    @Transient
    private void saveOrder(JSONObject order) throws Exception {
        String sql = "insert into tb_order (orderid,userid,goodsid,ordertime) values (?,?,?,now())";
        int count = jdbcTemplate.update(sql, order.get("orderid"), order.get("userid"), order.get("goodsid"));
        if (count != 1) {
            throw new Exception("订单创建失败");
        }
        saveLocalMsg(order);
    }

    private void saveLocalMsg(JSONObject order) throws Exception {
        String sql = "insert into tb_msgstatus (msgid,msg,status,sendtime) values (?,?,0,now())";
        int count = jdbcTemplate.update(sql, order.get("orderid"), order.toJSONString());
        if (count != 1) {
            throw new Exception("记录消息发送状态失败");
        }
    }
}
public class DispatchService {
    @Autowired
    private JdbcTemplate jdbcTemplate;

    @RabbitListener(queues = "orderQueue")
    public void messageCunsumer(String message, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long tag) throws IOException {
        try {
            JSONObject orderInfo = JSONObject.parseObject(message);
            log.warn("收到MQ里面的消息:" + orderInfo.toJSONString());
            Thread.sleep(1000L);
            String orderid = orderInfo.getString("orderid");
            dispatch(orderid);
            channel.basicAck(tag, false);
        } catch (Exception e) {
            channel.basicNack(tag, false, false);
        }
    }

    @Transient
    private void dispatch(String orderid) throws Exception {
        String sql = "insert into tb_dispatch (orderid,courier,status) values (?,?,?)";
        int count = jdbcTemplate.update(sql, orderid, "东哥", "配送中");
        if (count != 1) {
            throw new Exception("调度数据插入失败,原因[数据库操作]");
        }
    }
}

Testing shows how to generate a sample order, invoke orderService.createOrder(orderInfo), and verify that the message appears in orderQueue and that the order, message status, and dispatch records are correctly stored in the database.

Result screenshots confirm successful message consumption, order insertion, and dispatch record creation.

Original Source

Signed-in readers can open the original source through BestHub's protected redirect.

Sign in to view source
Republication Notice

This article has been distilled and summarized from source material, then republished for learning and reference. If you believe it infringes your rights, please contactadmin@besthub.devand we will review it promptly.

javaSpring BootmysqlMessage QueueRabbitMQdistributed-transaction
Top Architect
Written by

Top Architect

Top Architect focuses on sharing practical architecture knowledge, covering enterprise, system, website, large‑scale distributed, and high‑availability architectures, plus architecture adjustments using internet technologies. We welcome idea‑driven, sharing‑oriented architects to exchange and learn together.

0 followers
Reader feedback

How this landed with the community

Sign in to like

Rate this article

Was this worth your time?

Sign in to rate
Discussion

0 Comments

Thoughtful readers leave field notes, pushback, and hard-won operational detail here.