Implementing Distributed Transactions with RabbitMQ and Spring Boot
This article explains how to use RabbitMQ as a message middleware in a Spring Boot application to achieve distributed transactions between order and dispatch services, detailing environment setup, configuration, code implementation, testing, and verification of data consistency across systems.
The article demonstrates how to achieve distributed transactions across multiple systems using RabbitMQ as a message middleware, ensuring data consistency.
Experiment environment : CentOS 7.5, RabbitMQ, IDEA.
Purpose : Use RabbitMQ to implement distributed transaction between order and dispatch services.
Solution : Order service acts as a producer, dispatch service as a consumer; messages are sent with confirm and manual ack mechanisms.
Implementation steps include creating an exchange and queue, defining tables (order, message status, dispatch), configuring Spring Boot dependencies and application.yml, and writing Java code for order creation, message sending, local status recording, and consumer processing with manual ack and nack handling.
Key code snippets :
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<!-- rabbitmq -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<!-- mysql -->
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<scope>runtime</scope>
</dependency>
<!-- lombok -->
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
<!-- jdbc -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-jdbc</artifactId>
</dependency>
<!-- fastjson -->
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.17</version>
</dependency> server:
port: 8080
spring:
datasource:
driver-class-name: com.mysql.cj.jdbc.Driver
url: jdbc:mysql://localhost:3306/test?useUnicode=true&characterEncoding=UTF-8&useSSL=false&serverTimezone=GMT%2B8&allowMultiQueries=true
username: root
password: root123
rabbitmq:
host: localhost
port: 5672
username: admin
password: admin123
virtual-host: /
publisher-confirm-type: correlated
publisher-returns: true
listener:
simple:
acknowledge-mode: MANUAL public class OrderService {
@Autowired
private JdbcTemplate jdbcTemplate;
@Autowired
private RabbitTemplate rabbitTemplate;
@PostConstruct
public void setup() {
rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
if (!ack) {
return;
}
try {
String sql = "update tb_msgstatus set status = 1 where msgid = ?";
int count = jdbcTemplate.update(sql, correlationData.getId());
if (count != 1) {
log.warn("本地消息表状态修改失败");
}
} catch (Exception e) {
log.warn("本息消息表状态修改异常", e);
}
}
});
}
public void createOrder(JSONObject order) throws Exception {
saveOrder(order);
sendMsg(order);
}
private void sendMsg(JSONObject order) {
rabbitTemplate.convertAndSend("orderExchange", "", order.toJSONString(),
new CorrelationData((String) order.get("orderid")));
}
@Transient
private void saveOrder(JSONObject order) throws Exception {
String sql = "insert into tb_order (orderid,userid,goodsid,ordertime) values (?,?,?,now())";
int count = jdbcTemplate.update(sql, order.get("orderid"), order.get("userid"), order.get("goodsid"));
if (count != 1) {
throw new Exception("订单创建失败");
}
saveLocalMsg(order);
}
private void saveLocalMsg(JSONObject order) throws Exception {
String sql = "insert into tb_msgstatus (msgid,msg,status,sendtime) values (?,?,0,now())";
int count = jdbcTemplate.update(sql, order.get("orderid"), order.toJSONString());
if (count != 1) {
throw new Exception("记录消息发送状态失败");
}
}
} public class DispatchService {
@Autowired
private JdbcTemplate jdbcTemplate;
@RabbitListener(queues = "orderQueue")
public void messageCunsumer(String message, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long tag) throws IOException {
try {
JSONObject orderInfo = JSONObject.parseObject(message);
log.warn("收到MQ里面的消息:" + orderInfo.toJSONString());
Thread.sleep(1000L);
String orderid = orderInfo.getString("orderid");
dispatch(orderid);
channel.basicAck(tag, false);
} catch (Exception e) {
channel.basicNack(tag, false, false);
}
}
@Transient
private void dispatch(String orderid) throws Exception {
String sql = "insert into tb_dispatch (orderid,courier,status) values (?,?,?)";
int count = jdbcTemplate.update(sql, orderid, "东哥", "配送中");
if (count != 1) {
throw new Exception("调度数据插入失败,原因[数据库操作]");
}
}
}Testing shows how to generate a sample order, invoke orderService.createOrder(orderInfo), and verify that the message appears in orderQueue and that the order, message status, and dispatch records are correctly stored in the database.
Result screenshots confirm successful message consumption, order insertion, and dispatch record creation.
Signed-in readers can open the original source through BestHub's protected redirect.
This article has been distilled and summarized from source material, then republished for learning and reference. If you believe it infringes your rights, please contactand we will review it promptly.
Top Architect
Top Architect focuses on sharing practical architecture knowledge, covering enterprise, system, website, large‑scale distributed, and high‑availability architectures, plus architecture adjustments using internet technologies. We welcome idea‑driven, sharing‑oriented architects to exchange and learn together.
How this landed with the community
Was this worth your time?
0 Comments
Thoughtful readers leave field notes, pushback, and hard-won operational detail here.
