How to Capture MySQL Changes in SpringBoot Using Canal and RabbitMQ
This guide shows how to decouple change‑recording logic in a SpringBoot application by using Alibaba Canal to listen to MySQL binlog events, forwarding them through RabbitMQ, and persisting both new and old row data for insert and update operations.
Requirement
I need a way in SpringBoot to record data changes in a decoupled manner, storing the new row data and, for updates, also the old row data.
Solution Overview
Use Canal to listen to MySQL binlog changes and combine it with RabbitMQ to process and persist change records.
Steps
Start a MySQL instance with binlog enabled.
Start a Canal instance, create a MySQL account for it, and connect as a slave.
Run Canal in TCP mode with a Java client, then switch to RabbitMQ mode.
Deploy the services using docker‑compose.
Environment Setup (docker‑compose)
version: "3"
services:
mysql:
network_mode: mynetwork
container_name: mymysql
ports:
- 3306:3306
restart: always
volumes:
- /etc/localtime:/etc/localtime
- /home/mycontainers/mymysql/data:/data
- /home/mycontainers/mymysql/mysql:/var/lib/mysql
- /home/mycontainers/mymysql/conf:/etc/mysql
environment:
- MYSQL_ROOT_PASSWORD=root
command: |
--character-set-server=utf8mb4
--collation-server=utf8mb4_unicode_ci
--log-bin=/var/lib/mysql/mysql-bin
--server-id=1
--binlog-format=ROW
--expire_logs_days=7
--max_binlog_size=500M
image: mysql:5.7.20
rabbitmq:
container_name: myrabbit
ports:
- 15672:15672
- 5672:5672
restart: always
volumes:
- /etc/localtime:/etc/localtime
- /home/mycontainers/myrabbit/rabbitmq:/var/lib/rabbitmq
network_mode: mynetwork
environment:
- RABBITMQ_DEFAULT_USER=admin
- RABBITMQ_DEFAULT_PASS=123456
image: rabbitmq:3.8-management
canal-server:
container_name: canal-server
restart: always
ports:
- 11110:11110
- 11111:11111
- 11112:11112
volumes:
- /home/mycontainers/canal-server/conf/canal.properties:/home/admin/canal-server/conf/canal.properties
- /home/mycontainers/canal-server/conf/instance.properties:/home/admin/canal-server/conf/example/instance.properties
- /home/mycontainers/canal-server/logs:/home/admin/canal-server/logs
network_mode: mynetwork
depends_on:
- mysql
- rabbitmq
image: canal/canal-server:v1.1.5Canal Configuration Files
canal.properties
#################################################
######### common argument #############
#################################################
# tcp bind ip
canal.ip =
# register ip to zookeeper
canal.register.ip =
canal.port = 11111
canal.metrics.pull.port = 11112
# canal admin config
# canal.admin.manager = canal-admin:8089
# canal.admin.port = 11110
# canal.admin.user = admin
# canal.admin.passwd = ...
# admin auto register
# canal.admin.register.auto = true
# cluster name (empty for single node)
# canal.admin.register.cluster =
# Canal Server name
# canal.admin.register.name = canal-admin
canal.zkServers =
canal.zookeeper.flush.period = 1000
canal.withoutNetty = false
# service mode: tcp, kafka, rocketMQ, rabbitMQ, pulsarMQ
canal.serverMode = tcp
canal.file.data.dir = ${canal.conf.dir}
canal.file.flush.period = 1000
canal.instance.memory.buffer.size = 16384
canal.instance.memory.buffer.memunit = 1024
canal.instance.memory.batch.mode = MEMSIZE
canal.instance.memory.rawEntry = true
canal.instance.detecting.enable = false
canal.instance.detecting.sql = select 1
canal.instance.detecting.interval.time = 3
canal.instance.detecting.retry.threshold = 3
canal.instance.detecting.heartbeatHaEnable = false
canal.instance.transaction.size = 1024
canal.instance.fallbackIntervalInSeconds = 60
canal.instance.network.receiveBufferSize = 16384
canal.instance.network.sendBufferSize = 16384
canal.instance.network.soTimeout = 30
canal.instance.filter.druid.ddl = true
canal.instance.filter.query.dcl = false
canal.instance.filter.query.dml = false
canal.instance.filter.query.ddl = false
canal.instance.filter.table.error = false
canal.instance.filter.rows = false
canal.instance.filter.transaction.entry = false
canal.instance.filter.dml.insert = false
canal.instance.filter.dml.update = false
canal.instance.filter.dml.delete = false
canal.instance.binlog.format = ROW,STATEMENT,MIXED
canal.instance.binlog.image = FULL,MINIMAL,NOBLOB
canal.instance.get.ddl.isolation = false
canal.instance.parser.parallel = true
canal.instance.parser.parallelThreadSize = 16
canal.instance.parser.parallelBufferSize = 256
canal.instance.tsdb.enable = true
canal.instance.tsdb.dir = ${canal.file.data.dir:../conf}/${canal.instance.destination:}
canal.instance.tsdb.url = jdbc:h2:${canal.instance.tsdb.dir}/h2;CACHE_SIZE=1000;MODE=MYSQL;
canal.instance.tsdb.dbUsername = canal
canal.instance.tsdb.dbPassword = canal
canal.instance.tsdb.snapshot.interval = 24
canal.instance.tsdb.snapshot.expire = 360
#################################################
######### destinations #############
#################################################
canal.destinations = canal-exchange
canal.conf.dir = ../conf
canal.auto.scan = true
canal.auto.scan.interval = 5
canal.auto.reset.latest.pos.mode = false
canal.instance.tsdb.spring.xml = classpath:spring/tsdb/h2-tsdb.xml
canal.instance.global.mode = spring
canal.instance.global.lazy = false
canal.instance.global.manager.address = ${canal.admin.manager}
canal.instance.global.spring.xml = classpath:spring/file-instance.xml
##################################################
######### MQ Properties #############
##################################################
canal.aliyun.accessKey =
canal.aliyun.secretKey =
canal.aliyun.uid=
canal.mq.flatMessage = true
canal.mq.canalBatchSize = 50
canal.mq.canalGetTimeout = 100
canal.mq.accessChannel = local
canal.mq.database.hash = true
canal.mq.send.thread.size = 30
canal.mq.build.thread.size = 8
##################################################
######### RabbitMQ #############
##################################################
rabbitmq.host = myrabbit
rabbitmq.virtual.host = /
rabbitmq.exchange = canal-exchange
rabbitmq.username = admin
rabbitmq.password = 123456
rabbitmq.deliveryMode =instance.properties
#################################################
## mysql serverId , v1.0.26+ will autoGen
#canal.instance.mysql.slaveId=123
# enable gtid
canal.instance.gtidon=false
# master connection
canal.instance.master.address=mymysql:3306
canal.instance.master.journal.name=
canal.instance.master.position=
canal.instance.master.timestamp=
canal.instance.master.gtid=
# rds oss binlog (optional)
canal.instance.rds.accesskey=
canal.instance.rds.secretkey=
canal.instance.rds.instanceId=
# tsdb enable
canal.instance.tsdb.enable=true
# db credentials for Canal to read binlog
canal.instance.dbUsername=canal
canal.instance.dbPassword=canal
canal.instance.connectionCharset = UTF-8
canal.instance.enableDruid=false
# table filter regex
canal.instance.filter.regex=.*\..*
canal.instance.filter.black.regex=mysql\.slave_.*
# mq config
canal.mq.topic=canal-routing-key
canal.mq.partition=0SpringBoot Integration with Canal (Client)
Add Maven dependencies:
<dependency>
<groupId>com.alibaba.otter</groupId>
<artifactId>canal.client</artifactId>
<version>${canal.version}</version>
</dependency>
<dependency>
<groupId>com.alibaba.otter</groupId>
<artifactId>canal.protocol</artifactId>
<version>${canal.version}</version>
</dependency>Component that connects to Canal and processes entries:
import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.client.CanalConnectors;
import com.alibaba.otter.canal.protocol.CanalEntry;
import com.alibaba.otter.canal.protocol.Message;
import org.springframework.boot.CommandLineRunner;
import org.springframework.stereotype.Component;
import java.net.InetSocketAddress;
import java.util.List;
@Component
public class CanalClient implements CommandLineRunner {
private static final int BATCH_SIZE = 1000;
@Override
public void run(String... args) {
CanalConnector connector = CanalConnectors.newSingleConnector(
new InetSocketAddress("localhost", 11111), "canal-exchange", "canal", "canal");
try {
connector.connect();
connector.subscribe(".*\..*");
connector.rollback();
while (true) {
Message message = connector.getWithoutAck(BATCH_SIZE);
long batchId = message.getId();
int size = message.getEntries().size();
if (batchId == -1 || size == 0) {
Thread.sleep(2000);
} else {
printEntry(message.getEntries());
}
connector.ack(batchId);
}
} catch (Exception e) {
e.printStackTrace();
} finally {
connector.disconnect();
}
}
private static void printEntry(List<CanalEntry.Entry> entries) {
for (CanalEntry.Entry entry : entries) {
if (entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONBEGIN ||
entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONEND) {
continue;
}
CanalEntry.RowChange rowChange;
try {
rowChange = CanalEntry.RowChange.parseFrom(entry.getStoreValue());
} catch (Exception e) {
throw new RuntimeException("Parser error", e);
}
CanalEntry.EventType eventType = rowChange.getEventType();
System.out.println(String.format("=== binlog[%s:%s] table[%s.%s] event:%s",
entry.getHeader().getLogfileName(),
entry.getHeader().getLogfileOffset(),
entry.getHeader().getSchemaName(),
entry.getHeader().getTableName(),
eventType));
if (rowChange.getIsDdl()) {
System.out.println("DDL: " + rowChange.getSql());
}
for (CanalEntry.RowData rowData : rowChange.getRowDatasList()) {
if (eventType == CanalEntry.EventType.DELETE) {
printColumn(rowData.getBeforeColumnsList());
} else if (eventType == CanalEntry.EventType.INSERT) {
printColumn(rowData.getAfterColumnsList());
} else {
System.out.println("--- before ---");
printColumn(rowData.getBeforeColumnsList());
System.out.println("--- after ---");
printColumn(rowData.getAfterColumnsList());
}
}
}
}
private static void printColumn(List<CanalEntry.Column> columns) {
for (CanalEntry.Column column : columns) {
System.out.println(column.getName() + " : " + column.getValue() + " update=" + column.getUpdated());
}
}
}Application entry point:
@SpringBootApplication
public class BaseApplication implements CommandLineRunner {
@Autowired
private CanalClient canalClient;
@Override
public void run(String... args) throws Exception {
canalClient.run();
}
}Integrating Canal with RabbitMQ
Change canal.serverMode to rabbitMQ and set the topic in instance.properties:
canal.serverMode = rabbitMQ
canal.mq.topic=canal-routing-keyRabbitMQ settings in canal.properties already define host, exchange, username and password.
SpringBoot RabbitMQ Configuration
Add dependency:
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
<version>${amqp.version}</version>
</dependency>application.yml:
spring:
rabbitmq:
host: 192.168.0.108
port: 5672
username: admin
password: 123456
publisher-confirm-type: correlated
publisher-returns: trueRabbit configuration class:
@Configuration
public class RabbitConfig {
@Bean
public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
RabbitTemplate template = new RabbitTemplate();
template.setConnectionFactory(connectionFactory);
template.setMessageConverter(new Jackson2JsonMessageConverter());
return template;
}
@Bean
public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory(ConnectionFactory connectionFactory) {
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
factory.setConnectionFactory(connectionFactory);
factory.setMessageConverter(new Jackson2JsonMessageConverter());
return factory;
}
}Declare queue, exchange and binding:
@Configuration
public class CanalProvider {
@Bean
public Queue canalQueue() {
return new Queue("canal-queue", true);
}
@Bean
public DirectExchange canalExchange() {
return new DirectExchange("canal-exchange", true, false);
}
@Bean
public Binding bindingCanal() {
return BindingBuilder.bind(canalQueue()).to(canalExchange()).with("canal-routing-key");
}
}Consumer that processes messages from the queue:
@Component
@RabbitListener(queues = "canal-queue")
public class CanalConsumer {
private final SysBackupService sysBackupService;
public CanalConsumer(SysBackupService sysBackupService) {
this.sysBackupService = sysBackupService;
}
@RabbitHandler
public void process(Map<String, Object> msg) {
System.out.println("Received canal message: " + msg);
boolean isDdl = (boolean) msg.get("isDdl");
if (isDdl) return;
String table = (String) msg.get("table");
if ("sys_backup".equalsIgnoreCase(table)) return;
String type = (String) msg.get("type");
if (!"INSERT".equalsIgnoreCase(type) && !"UPDATE".equalsIgnoreCase(type) && !"DELETE".equalsIgnoreCase(type)) return;
// further processing, e.g., persisting change record
}
}After starting the containers and the SpringBoot application, any INSERT, UPDATE or DELETE on the MySQL tables will be captured by Canal, sent to RabbitMQ, and finally processed by the CanalConsumer component.
Signed-in readers can open the original source through BestHub's protected redirect.
This article has been distilled and summarized from source material, then republished for learning and reference. If you believe it infringes your rights, please contactand we will review it promptly.
Java High-Performance Architecture
Sharing Java development articles and resources, including SSM architecture and the Spring ecosystem (Spring Boot, Spring Cloud, MyBatis, Dubbo, Docker), Zookeeper, Redis, architecture design, microservices, message queues, Git, etc.
How this landed with the community
Was this worth your time?
0 Comments
Thoughtful readers leave field notes, pushback, and hard-won operational detail here.
