Implementing Distributed Transactions with RabbitMQ in a Spring Boot Application

This tutorial demonstrates how to set up a CentOS 7.5 environment, install RabbitMQ, and use Spring Boot to create a distributed transaction system that ensures data consistency across order and dispatch services by leveraging RabbitMQ's confirm and ACK mechanisms, complete with database schema, configuration, and code examples.

Selected Java Interview Questions
Selected Java Interview Questions
Selected Java Interview Questions
Implementing Distributed Transactions with RabbitMQ in a Spring Boot Application

Experiment Environment

Lunix system: CentOS 7.5

Installed software: RabbitMQ

Development tool: IntelliJ IDEA

Experiment Purpose

Use RabbitMQ to achieve distributed transactions between multiple systems and guarantee data consistency.

Experiment Scheme

RabbitMQ acts as the message middleware. The order service and the dispatch service serve as the message producer and consumer respectively, exchanging messages through RabbitMQ.

The order service creates orders, persists them locally, records message status, and sends messages to RabbitMQ with the confirm mechanism to receive acknowledgments and update the local message status. A scheduled task retries failed messages to ensure at‑least‑once delivery.

The dispatch service consumes order messages, processes them, and acknowledges receipt. It uses a globally unique message ID to guarantee idempotency.

Experiment Steps

1. Message Queue Setup

1.1 RabbitMQ installation (omitted)

1.2 Create order exchange: orderExchange 1.3 Create order queue: orderQueue 1.4 Bind exchange and queue

2. Database Preparation

2.1 Order table

2.2 Message status table

2.3 Dispatch table

3. Order Service Implementation

3.2.1 Spring Boot integration with RabbitMQ and MySQL dependencies

<dependency>
  <groupId>org.springframework.boot</groupId>
  <artifactId>spring-boot-starter-web</artifactId>
</dependency>
<!--rabbitmq-->
<dependency>
  <groupId>org.springframework.boot</groupId>
  <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<!--mysql-->
<dependency>
  <groupId>mysql</groupId>
  <artifactId>mysql-connector-java</artifactId>
  <scope>runtime</scope>
</dependency>
<!--lombok-->
<dependency>
  <groupId>org.projectlombok</groupId>
  <artifactId>lombok</artifactId>
  <optional>true</optional>
</dependency>
<!--jdbc-->
<dependency>
  <groupId>org.springframework.boot</groupId>
  <artifactId>spring-boot-starter-jdbc</artifactId>
</dependency>
<!--fastjson-->
<dependency>
  <groupId>com.alibaba</groupId>
  <artifactId>fastjson</artifactId>
  <version>1.2.17</version>
</dependency>

3.2.1.2 Application configuration (application.yml)

server:
  port: 8080

spring:
  datasource:
    driver-class-name: com.mysql.cj.jdbc.Driver
    url: jdbc:mysql://localhost:3306/test?useUnicode=true&characterEncoding=UTF-8&useSSL=false&serverTimezone=GMT%2B8&allowMultiQueries=true
    username: root
    password: root123
  rabbitmq:
    host: localhost
    port: 5672
    username: admin
    password: admin123
    virtual-host: /
    publisher-confirm-type: correlated
    publisher-returns: true
    listener:
      simple:
        acknowledge-mode: MANUAL

3.2.2 Order Service Code

import com.alibaba.fastjson.JSONObject;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.stereotype.Service;
import javax.annotation.PostConstruct;
import java.beans.Transient;

@Slf4j
@Service
public class OrderService {
    @Autowired
    private JdbcTemplate jdbcTemplate;
    @Autowired
    private RabbitTemplate rabbitTemplate;

    @PostConstruct
    public void setup() {
        rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
            @Override
            public void confirm(CorrelationData correlationData, boolean ack, String cause) {
                if (!ack) {
                    return;
                }
                try {
                    String sql = "update tb_msgstatus set status = 1 where msgid = ?";
                    int count = jdbcTemplate.update(sql, correlationData.getId());
                    if (count != 1) {
                        log.warn("本地消息表状态修改失败");
                    }
                } catch (Exception e) {
                    log.warn("本息消息表状态修改异常", e);
                }
            }
        });
    }

    public void createOrder(JSONObject order) throws Exception {
        saveOrder(order);
        sendMsg(order);
    }

    private void sendMsg(JSONObject order) {
        rabbitTemplate.convertAndSend("orderExchange", "", order.toJSONString(), new CorrelationData((String) order.get("orderid")));
    }

    @Transient
    private void saveOrder(JSONObject order) throws Exception {
        String sql = "insert into tb_order (orderid,userid,goodsid,ordertime) values (?,?,?,now())";
        int count = jdbcTemplate.update(sql, order.get("orderid"), order.get("userid"), order.get("goodsid"));
        if (count != 1) {
            throw new Exception("订单创建失败");
        }
        saveLocalMsg(order);
    }

    private void saveLocalMsg(JSONObject order) throws Exception {
        String sql = "insert into tb_msgstatus (msgid,msg,status,sendtime) values (?,?,0,now())";
        int count = jdbcTemplate.update(sql, order.get("orderid"), order.toJSONString());
        if (count != 1) {
            throw new Exception("记录消息发送状态失败");
        }
    }
}

3.3 Order Service Test

@Autowired
private OrderService orderService;

@Test
public void orderServiceTest() throws Exception {
    JSONObject orderinfo = new JSONObject();
    orderinfo.put("orderid", UUID.randomUUID().toString());
    orderinfo.put("userid", UUID.randomUUID().toString());
    orderinfo.put("goodsid", UUID.randomUUID().toString());
    orderService.createOrder(orderinfo);
}

4. Dispatch Service Implementation

import com.alibaba.fastjson.JSONObject;
import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.support.AmqpHeaders;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.stereotype.Service;
import java.beans.Transient;
import java.io.IOException;

@Slf4j
@Service
public class DispatchService {
    @Autowired
    private JdbcTemplate jdbcTemplate;

    @RabbitListener(queues = "orderQueue")
    public void messageConsumer(String message, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long tag) throws IOException {
        try {
            JSONObject orderInfo = JSONObject.parseObject(message);
            log.warn("收到MQ里面的消息:" + orderInfo.toJSONString());
            Thread.sleep(1000L);
            String orderid = orderInfo.getString("orderid");
            dispatch(orderid);
            channel.basicAck(tag, false);
        } catch (Exception e) {
            channel.basicNack(tag, false, false);
        }
    }

    @Transient
    private void dispatch(String orderid) throws Exception {
        String sql = "insert into tb_dispatch (orderid,courier,status) values (?,?,?)";
        int count = jdbcTemplate.update(sql, orderid, "东哥", "配送中");
        if (count != 1) {
            throw new Exception("调度数据插入失败,原因[数据库操作]");
        }
    }
}

The application starts Spring Boot, automatically listens to the orderQueue, processes incoming order messages, acknowledges successful handling, and records dispatch information in the database. Test results show messages received in the queue, order data stored in tb_order, and status updates in tb_msgstatus.

Original Source

Signed-in readers can open the original source through BestHub's protected redirect.

Sign in to view source
Republication Notice

This article has been distilled and summarized from source material, then republished for learning and reference. If you believe it infringes your rights, please contactadmin@besthub.devand we will review it promptly.

javaSpring BootmysqlMessage QueueRabbitMQdistributed-transaction
Selected Java Interview Questions
Written by

Selected Java Interview Questions

A professional Java tech channel sharing common knowledge to help developers fill gaps. Follow us!

0 followers
Reader feedback

How this landed with the community

Sign in to like

Rate this article

Was this worth your time?

Sign in to rate
Discussion

0 Comments

Thoughtful readers leave field notes, pushback, and hard-won operational detail here.