Implementing Data Change Capture in SpringBoot Using Canal and RabbitMQ
This guide demonstrates how to decouple data change logging from business logic in a SpringBoot application by leveraging MySQL binlog monitoring with Canal, forwarding change events through RabbitMQ, and persisting both new and old record states using Docker‑compose, configuration files, and Java client code.
The requirement is to record data changes in a SpringBoot project without coupling to business code, storing the new row for inserts and both old and new rows for updates. Canal is used to listen to MySQL binlog events, and RabbitMQ queues the events for asynchronous processing.
Environment setup uses Docker‑compose to launch MySQL, RabbitMQ, and Canal containers:
version: "3"
services:
mysql:
network_mode: mynetwork
container_name: mymysql
ports:
- 3306:3306
restart: always
volumes:
- /etc/localtime:/etc/localtime
- /home/mycontainers/mymysql/data:/data
- /home/mycontainers/mymysql/mysql:/var/lib/mysql
- /home/mycontainers/mymysql/conf:/etc/mysql
environment:
- MYSQL_ROOT_PASSWORD=root
command:
--character-set-server=utf8mb4
--collation-server=utf8mb4_unicode_ci
--log-bin=/var/lib/mysql/mysql-bin
--server-id=1
--binlog-format=ROW
--expire_logs_days=7
--max_binlog_size=500M
image: mysql:5.7.20
rabbitmq:
container_name: myrabbit
ports:
- 15672:15672
- 5672:5672
restart: always
volumes:
- /etc/localtime:/etc/localtime
- /home/mycontainers/myrabbit/rabbitmq:/var/lib/rabbitmq
network_mode: mynetwork
environment:
- RABBITMQ_DEFAULT_USER=admin
- RABBITMQ_DEFAULT_PASS=123456
image: rabbitmq:3.8-management
canal-server:
container_name: canal-server
restart: always
ports:
- 11110:11110
- 11111:11111
- 11112:11112
volumes:
- /home/mycontainers/canal-server/conf/canal.properties:/home/admin/canal-server/conf/canal.properties
- /home/mycontainers/canal-server/conf/instance.properties:/home/admin/canal-server/conf/example/instance.properties
- /home/mycontainers/canal-server/logs:/home/admin/canal-server/logs
network_mode: mynetwork
depends_on:
- mysql
- rabbitmq
image: canal/canal-server:v1.1.5The canal.properties file defines server mode (initially tcp ), Zookeeper settings, and MQ parameters. The instance.properties file configures the MySQL connection, user credentials, and topic name for RabbitMQ.
SpringBoot integration adds Maven dependencies for Canal client and AMQP:
<dependency>
<groupId>com.alibaba.otter</groupId>
<artifactId>canal.client</artifactId>
<version>${canal.version}</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
<version>${amqp.version}</version>
</dependency>A CanalClient component creates a CanalConnector , subscribes to all tables, fetches batches, and prints change entries. It distinguishes INSERT, UPDATE, DELETE and logs before/after column values.
@Component
public class CanalClient {
private static final int BATCH_SIZE = 1000;
public void run() {
CanalConnector connector = CanalConnectors.newSingleConnector(
new InetSocketAddress("localhost", 11111), "canal-exchange", "canal", "canal");
try {
connector.connect();
connector.subscribe(".*\\..*");
connector.rollback();
while (true) {
Message message = connector.getWithoutAck(BATCH_SIZE);
long batchId = message.getId();
int size = message.getEntries().size();
if (batchId == -1 || size == 0) {
Thread.sleep(2000);
} else {
printEntry(message.getEntries());
}
connector.ack(batchId);
}
} catch (Exception e) {
e.printStackTrace();
} finally {
connector.disconnect();
}
}
// printEntry and printColumn methods omitted for brevity
}The main application class implements CommandLineRunner to start the client on boot.
Switching to RabbitMQ changes canal.serverMode = rabbitMQ and sets canal.mq.topic=canal-routing-key . RabbitMQ connection details are added to canal.properties (host, virtual host, exchange, username, password).
SpringBoot RabbitMQ configuration creates a durable queue canal-queue , a direct exchange canal-exchange , and binds them with the routing key.
@Configuration
public class RabbitConfig {
@Bean
public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
RabbitTemplate template = new RabbitTemplate();
template.setConnectionFactory(connectionFactory);
template.setMessageConverter(new Jackson2JsonMessageConverter());
return template;
}
@Bean
public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory(ConnectionFactory connectionFactory) {
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
factory.setConnectionFactory(connectionFactory);
factory.setMessageConverter(new Jackson2JsonMessageConverter());
return factory;
}
}A provider component declares the queue, exchange, and binding:
@Configuration
public class CanalProvider {
@Bean
public Queue canalQueue() { return new Queue(RabbitConstant.CanalQueue, true); }
@Bean
public DirectExchange canalExchange() { return new DirectExchange(RabbitConstant.CanalExchange, true, false); }
@Bean
public Binding bindingCanal() { return BindingBuilder.bind(canalQueue()).to(canalExchange()).with(RabbitConstant.CanalRouting); }
}The consumer listens on canal-queue , extracts fields such as isDdl , id , ts , database , table , type , data , and old , and ignores DDL events and the internal sys_backup table to avoid loops.
@Component
@RabbitListener(queues = RabbitConstant.CanalQueue)
public class CanalConsumer {
private final SysBackupService sysBackupService;
public CanalConsumer(SysBackupService sysBackupService) { this.sysBackupService = sysBackupService; }
@RabbitHandler
public void process(Map
msg) {
boolean isDdl = (boolean) msg.get("isDdl");
if (isDdl) return;
String table = (String) msg.get("table");
if ("sys_backup".equalsIgnoreCase(table)) return;
// further processing …
}
}After rebuilding the containers, any INSERT/UPDATE/DELETE on MySQL triggers Canal, which publishes a JSON message to RabbitMQ; the SpringBoot consumer receives it, enabling asynchronous handling or persistence of change logs.
Selected Java Interview Questions
A professional Java tech channel sharing common knowledge to help developers fill gaps. Follow us!
How this landed with the community
Was this worth your time?
0 Comments
Thoughtful readers leave field notes, pushback, and hard-won operational detail here.