Integrating SpringBoot with Canal and RabbitMQ for Database Change Capture
This guide explains how to decouple business logic in a SpringBoot application by using Canal to listen to MySQL binlog changes, forwarding those events through RabbitMQ, and processing them with a Java client to record both new and old data for insert, update, and delete operations.
The requirement is to implement a change‑recording mechanism in SpringBoot that captures new data and, for updates, also records the old data, while keeping the business code loosely coupled.
After researching, Canal is chosen to listen to MySQL binlog events, and RabbitMQ is added to queue the change‑record saving logic.
Steps
Start MySQL with binlog enabled.
Start Canal, create a MySQL user for it, and connect as a slave.
Set Canal server mode to TCP and write a Java client to listen to binlog changes.
Switch Canal server mode to RabbitMQ, start RabbitMQ, and configure Canal to publish binlog events to the queue.
Environment Setup (Docker‑Compose)
version: "3"
services:
mysql:
network_mode: mynetwork
container_name: mymysql
ports:
- 3306:3306
restart: always
volumes:
- /etc/localtime:/etc/localtime
- /home/mycontainers/mymysql/data:/data
- /home/mycontainers/mymysql/mysql:/var/lib/mysql
- /home/mycontainers/mymysql/conf:/etc/mysql
environment:
- MYSQL_ROOT_PASSWORD=root
command:
--character-set-server=utf8mb4
--collation-server=utf8mb4_unicode_ci
--log-bin=/var/lib/mysql/mysql-bin
--server-id=1
--binlog-format=ROW
--expire_logs_days=7
--max_binlog_size=500M
image: mysql:5.7.20
rabbitmq:
container_name: myrabbit
ports:
- 15672:15672
- 5672:5672
restart: always
volumes:
- /etc/localtime:/etc/localtime
- /home/mycontainers/myrabbit/rabbitmq:/var/lib/rabbitmq
network_mode: mynetwork
environment:
- RABBITMQ_DEFAULT_USER=admin
- RABBITMQ_DEFAULT_PASS=123456
image: rabbitmq:3.8-management
canal-server:
container_name: canal-server
restart: always
ports:
- 11110:11110
- 11111:11111
- 11112:11112
volumes:
- /home/mycontainers/canal-server/conf/canal.properties:/home/admin/canal-server/conf/canal.properties
- /home/mycontainers/canal-server/conf/instance.properties:/home/admin/canal-server/conf/example/instance.properties
- /home/mycontainers/canal-server/logs:/home/admin/canal-server/logs
network_mode: mynetwork
depends_on:
- mysql
- rabbitmq
image: canal/canal-server:v1.1.5Two configuration files must be prepared:
canal.properties
# common arguments
canal.ip =
canal.register.ip =
canal.port = 11111
canal.metrics.pull.port = 11112
canal.serverMode = tcp
canal.zkServers =
canal.zookeeper.flush.period = 1000
canal.withoutNetty = false
canal.file.data.dir = ${canal.conf.dir}
canal.file.flush.period = 1000
canal.instance.memory.buffer.size = 16384
canal.instance.memory.buffer.memunit = 1024
canal.instance.memory.batch.mode = MEMSIZE
canal.instance.memory.rawEntry = true
canal.instance.detecting.enable = false
canal.instance.detecting.sql = select 1
canal.instance.detecting.interval.time = 3
canal.instance.detecting.retry.threshold = 3
canal.instance.detecting.heartbeatHaEnable = false
canal.instance.transaction.size = 1024
canal.instance.fallbackIntervalInSeconds = 60
canal.instance.network.receiveBufferSize = 16384
canal.instance.network.sendBufferSize = 16384
canal.instance.network.soTimeout = 30
canal.instance.filter.druid.ddl = true
canal.instance.filter.query.dcl = false
canal.instance.filter.query.dml = false
canal.instance.filter.query.ddl = false
canal.instance.filter.table.error = false
canal.instance.filter.rows = false
canal.instance.filter.transaction.entry = false
canal.instance.filter.dml.insert = false
canal.instance.filter.dml.update = false
canal.instance.filter.dml.delete = false
canal.instance.binlog.format = ROW,STATEMENT,MIXED
canal.instance.binlog.image = FULL,MINIMAL,NOBLOB
canal.instance.get.ddl.isolation = false
canal.instance.parser.parallel = true
canal.instance.parser.parallelThreadSize = 16
canal.instance.parser.parallelBufferSize = 256
canal.instance.tsdb.enable = true
canal.instance.tsdb.dir = ${canal.file.data.dir:../conf}/${canal.instance.destination:}
canal.instance.tsdb.url = jdbc:h2:${canal.instance.tsdb.dir}/h2;CACHE_SIZE=1000;MODE=MYSQL;
canal.instance.tsdb.dbUsername = canal
canal.instance.tsdb.dbPassword = canal
canal.instance.tsdb.snapshot.interval = 24
canal.instance.tsdb.snapshot.expire = 360
canal.destinations = canal-exchange
canal.conf.dir = ../conf
canal.auto.scan = true
canal.auto.scan.interval = 5
canal.auto.reset.latest.pos.mode = false
canal.instance.tsdb.spring.xml = classpath:spring/tsdb/h2-tsdb.xml
canal.instance.global.mode = spring
canal.instance.global.lazy = false
canal.instance.global.manager.address = ${canal.admin.manager}
canal.instance.global.spring.xml = classpath:spring/file-instance.xml
# MQ properties
canal.mq.flatMessage = true
canal.mq.canalBatchSize = 50
canal.mq.canalGetTimeout = 100
canal.mq.accessChannel = local
canal.mq.database.hash = true
canal.mq.send.thread.size = 30
canal.mq.build.thread.size = 8
# RabbitMQ
rabbitmq.host = myrabbit
rabbitmq.virtual.host = /
rabbitmq.exchange = canal-exchange
rabbitmq.username = admin
rabbitmq.password = 123456
rabbitmq.deliveryMode =instance.properties
# MySQL connection
canal.instance.master.address=mymysql:3306
canal.instance.dbUsername=canal
canal.instance.dbPassword=canal
# GTID
canal.instance.gtidon=false
# Instance meta
canal.instance.filter.regex=.*\..*
canal.instance.filter.black.regex=mysql\.slave_.*
# MQ topic
canal.mq.topic=canal-routing-key
canal.mq.partition=0After mapping these files, ensure the MySQL container and Canal container share the same Docker network and that Canal connects to MySQL via the internal port 3306.
SpringBoot Integration with Canal
Add Maven dependencies:
<dependency>
<groupId>com.alibaba.otter</groupId>
<artifactId>canal.client</artifactId>
<version>${canal.version}</version>
</dependency>
<dependency>
<groupId>com.alibaba.otter</groupId>
<artifactId>canal.protocol</artifactId>
<version>${canal.version}</version>
</dependency>Implement a Canal client component:
import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.client.CanalConnectors;
import com.alibaba.otter.canal.protocol.CanalEntry;
import com.alibaba.otter.canal.protocol.Message;
import org.springframework.boot.CommandLineRunner;
import org.springframework.stereotype.Component;
import java.net.InetSocketAddress;
import java.util.List;
@Component
public class CanalClient {
private static final int BATCH_SIZE = 1000;
public void run() {
CanalConnector connector = CanalConnectors.newSingleConnector(
new InetSocketAddress("localhost", 11111), "canal-exchange", "canal", "canal");
try {
connector.connect();
connector.subscribe(".*.*");
connector.rollback();
while (true) {
Message message = connector.getWithoutAck(BATCH_SIZE);
long batchId = message.getId();
int size = message.getEntries().size();
if (batchId == -1 || size == 0) {
Thread.sleep(2000);
} else {
printEntry(message.getEntries());
}
connector.ack(batchId);
}
} catch (Exception e) {
e.printStackTrace();
} finally {
connector.disconnect();
}
}
private static void printEntry(List
entrys) {
for (CanalEntry.Entry entry : entrys) {
if (entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONBEGIN ||
entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONEND) {
continue;
}
CanalEntry.RowChange rowChage;
try {
rowChage = CanalEntry.RowChange.parseFrom(entry.getStoreValue());
} catch (Exception e) {
throw new RuntimeException("ERROR ## parser of eromanga-event has an error, data:" + entry, e);
}
CanalEntry.EventType eventType = rowChage.getEventType();
System.out.println(String.format("=== binlog[%s:%s], name[%s,%s], eventType: %s",
entry.getHeader().getLogfileName(), entry.getHeader().getLogfileOffset(),
entry.getHeader().getSchemaName(), entry.getHeader().getTableName(), eventType));
if (rowChage.getIsDdl()) {
System.out.println("=== isDdl: true, sql:" + rowChage.getSql());
}
for (CanalEntry.RowData rowData : rowChage.getRowDatasList()) {
if (eventType == CanalEntry.EventType.DELETE) {
printColumn(rowData.getBeforeColumnsList());
} else if (eventType == CanalEntry.EventType.INSERT) {
printColumn(rowData.getAfterColumnsList());
} else {
System.out.println("--- before");
printColumn(rowData.getBeforeColumnsList());
System.out.println("--- after");
printColumn(rowData.getAfterColumnsList());
}
}
}
}
private static void printColumn(List
columns) {
for (CanalEntry.Column column : columns) {
System.out.println(column.getName() + " : " + column.getValue() + " update=" + column.getUpdated());
}
}
}Wire the client into the SpringBoot application entry point:
@SpringBootApplication
public class BaseApplication implements CommandLineRunner {
@Autowired
private CanalClient canalClient;
@Override
public void run(String... args) throws Exception {
canalClient.run();
}
}Switching Canal to RabbitMQ Mode
Change canal.serverMode = rabbitMQ in canal.properties and set canal.mq.topic=canal-routing-key in instance.properties . Then configure RabbitMQ connection:
rabbitmq.host = myrabbit
rabbitmq.virtual.host = /
rabbitmq.exchange = canal-exchange
rabbitmq.username = admin
rabbitmq.password = 123456Create the exchange and queue (or let SpringBoot auto‑create them) and bind with routing key canal-routing-key .
SpringBoot RabbitMQ Configuration
spring:
rabbitmq:
host: 192.168.0.108
port: 5672
username: admin
password: 123456
publisher-confirm-type: correlated
publisher-returns: trueDefine a RabbitMQ configuration class to provide a RabbitTemplate and a listener container factory:
@Configuration
public class RabbitConfig {
@Bean
public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
RabbitTemplate template = new RabbitTemplate();
template.setConnectionFactory(connectionFactory);
template.setMessageConverter(new Jackson2JsonMessageConverter());
return template;
}
@Bean
public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory(ConnectionFactory connectionFactory) {
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
factory.setConnectionFactory(connectionFactory);
factory.setMessageConverter(new Jackson2JsonMessageConverter());
return factory;
}
}Declare the queue, exchange, and binding:
@Configuration
public class CanalProvider {
@Bean
public Queue canalQueue() {
return new Queue(RabbitConstant.CanalQueue, true);
}
@Bean
public DirectExchange canalExchange() {
return new DirectExchange(RabbitConstant.CanalExchange, true, false);
}
@Bean
public Binding bindingCanal() {
return BindingBuilder.bind(canalQueue()).to(canalExchange()).with(RabbitConstant.CanalRouting);
}
}Implement a consumer to process the Canal messages received from RabbitMQ:
@Component
@RabbitListener(queues = RabbitConstant.CanalQueue)
public class CanalConsumer {
private final SysBackupService sysBackupService;
public CanalConsumer(SysBackupService sysBackupService) {
this.sysBackupService = sysBackupService;
}
@RabbitHandler
public void process(Map
msg) {
System.out.println("Received canal message: " + msg);
boolean isDdl = (boolean) msg.get("isDdl");
if (isDdl) return;
String table = (String) msg.get("table");
if ("sys_backup".equalsIgnoreCase(table)) return;
String type = (String) msg.get("type");
if (!"INSERT".equalsIgnoreCase(type) && !"UPDATE".equalsIgnoreCase(type) && !"DELETE".equalsIgnoreCase(type)) {
return;
}
// further processing such as persisting change records
}
}After restarting the containers, any insert, update, or delete on the MySQL tables will be captured by Canal, sent to RabbitMQ, and processed by the SpringBoot consumer, achieving a decoupled change‑recording solution.
Selected Java Interview Questions
A professional Java tech channel sharing common knowledge to help developers fill gaps. Follow us!
How this landed with the community
Was this worth your time?
0 Comments
Thoughtful readers leave field notes, pushback, and hard-won operational detail here.