Implementing Distributed Transactions with RabbitMQ in a Spring Boot Application
This tutorial demonstrates how to set up a CentOS 7.5 environment, install RabbitMQ, and use Spring Boot to create a distributed transaction system that ensures data consistency across order and dispatch services by leveraging RabbitMQ's confirm and ACK mechanisms, complete with database schema, configuration, and code examples.
Experiment Environment
Lunix system: CentOS 7.5
Installed software: RabbitMQ
Development tool: IntelliJ IDEA
Experiment Purpose
Use RabbitMQ to achieve distributed transactions between multiple systems and guarantee data consistency.
Experiment Scheme
RabbitMQ acts as the message middleware. The order service and the dispatch service serve as the message producer and consumer respectively, exchanging messages through RabbitMQ.
The order service creates orders, persists them locally, records message status, and sends messages to RabbitMQ with the confirm mechanism to receive acknowledgments and update the local message status. A scheduled task retries failed messages to ensure at‑least‑once delivery.
The dispatch service consumes order messages, processes them, and acknowledges receipt. It uses a globally unique message ID to guarantee idempotency.
Experiment Steps
1. Message Queue Setup
1.1 RabbitMQ installation (omitted)
1.2 Create order exchange: orderExchange 1.3 Create order queue: orderQueue 1.4 Bind exchange and queue
2. Database Preparation
2.1 Order table
2.2 Message status table
2.3 Dispatch table
3. Order Service Implementation
3.2.1 Spring Boot integration with RabbitMQ and MySQL dependencies
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<!--rabbitmq-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<!--mysql-->
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<scope>runtime</scope>
</dependency>
<!--lombok-->
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
<!--jdbc-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-jdbc</artifactId>
</dependency>
<!--fastjson-->
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.17</version>
</dependency>3.2.1.2 Application configuration (application.yml)
server:
port: 8080
spring:
datasource:
driver-class-name: com.mysql.cj.jdbc.Driver
url: jdbc:mysql://localhost:3306/test?useUnicode=true&characterEncoding=UTF-8&useSSL=false&serverTimezone=GMT%2B8&allowMultiQueries=true
username: root
password: root123
rabbitmq:
host: localhost
port: 5672
username: admin
password: admin123
virtual-host: /
publisher-confirm-type: correlated
publisher-returns: true
listener:
simple:
acknowledge-mode: MANUAL3.2.2 Order Service Code
import com.alibaba.fastjson.JSONObject;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.stereotype.Service;
import javax.annotation.PostConstruct;
import java.beans.Transient;
@Slf4j
@Service
public class OrderService {
@Autowired
private JdbcTemplate jdbcTemplate;
@Autowired
private RabbitTemplate rabbitTemplate;
@PostConstruct
public void setup() {
rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
if (!ack) {
return;
}
try {
String sql = "update tb_msgstatus set status = 1 where msgid = ?";
int count = jdbcTemplate.update(sql, correlationData.getId());
if (count != 1) {
log.warn("本地消息表状态修改失败");
}
} catch (Exception e) {
log.warn("本息消息表状态修改异常", e);
}
}
});
}
public void createOrder(JSONObject order) throws Exception {
saveOrder(order);
sendMsg(order);
}
private void sendMsg(JSONObject order) {
rabbitTemplate.convertAndSend("orderExchange", "", order.toJSONString(), new CorrelationData((String) order.get("orderid")));
}
@Transient
private void saveOrder(JSONObject order) throws Exception {
String sql = "insert into tb_order (orderid,userid,goodsid,ordertime) values (?,?,?,now())";
int count = jdbcTemplate.update(sql, order.get("orderid"), order.get("userid"), order.get("goodsid"));
if (count != 1) {
throw new Exception("订单创建失败");
}
saveLocalMsg(order);
}
private void saveLocalMsg(JSONObject order) throws Exception {
String sql = "insert into tb_msgstatus (msgid,msg,status,sendtime) values (?,?,0,now())";
int count = jdbcTemplate.update(sql, order.get("orderid"), order.toJSONString());
if (count != 1) {
throw new Exception("记录消息发送状态失败");
}
}
}3.3 Order Service Test
@Autowired
private OrderService orderService;
@Test
public void orderServiceTest() throws Exception {
JSONObject orderinfo = new JSONObject();
orderinfo.put("orderid", UUID.randomUUID().toString());
orderinfo.put("userid", UUID.randomUUID().toString());
orderinfo.put("goodsid", UUID.randomUUID().toString());
orderService.createOrder(orderinfo);
}4. Dispatch Service Implementation
import com.alibaba.fastjson.JSONObject;
import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.support.AmqpHeaders;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.stereotype.Service;
import java.beans.Transient;
import java.io.IOException;
@Slf4j
@Service
public class DispatchService {
@Autowired
private JdbcTemplate jdbcTemplate;
@RabbitListener(queues = "orderQueue")
public void messageConsumer(String message, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long tag) throws IOException {
try {
JSONObject orderInfo = JSONObject.parseObject(message);
log.warn("收到MQ里面的消息:" + orderInfo.toJSONString());
Thread.sleep(1000L);
String orderid = orderInfo.getString("orderid");
dispatch(orderid);
channel.basicAck(tag, false);
} catch (Exception e) {
channel.basicNack(tag, false, false);
}
}
@Transient
private void dispatch(String orderid) throws Exception {
String sql = "insert into tb_dispatch (orderid,courier,status) values (?,?,?)";
int count = jdbcTemplate.update(sql, orderid, "东哥", "配送中");
if (count != 1) {
throw new Exception("调度数据插入失败,原因[数据库操作]");
}
}
}The application starts Spring Boot, automatically listens to the orderQueue, processes incoming order messages, acknowledges successful handling, and records dispatch information in the database. Test results show messages received in the queue, order data stored in tb_order, and status updates in tb_msgstatus.
Signed-in readers can open the original source through BestHub's protected redirect.
This article has been distilled and summarized from source material, then republished for learning and reference. If you believe it infringes your rights, please contactand we will review it promptly.
Selected Java Interview Questions
A professional Java tech channel sharing common knowledge to help developers fill gaps. Follow us!
How this landed with the community
Was this worth your time?
0 Comments
Thoughtful readers leave field notes, pushback, and hard-won operational detail here.
