How to Combine SpringBoot, Canal, and RabbitMQ for Real‑Time MySQL Change Capture
This guide walks through setting up a Docker‑Compose environment, configuring Canal to capture MySQL binlog changes, integrating it with a SpringBoot client, and forwarding change events to RabbitMQ, providing complete code snippets and step‑by‑step instructions for real‑time data change tracking.
Requirement
I need a way in SpringBoot to decouple business code and record data changes. The record should contain the new data, and for updates also the old data.
After research, using Canal to listen to MySQL binlog can meet this need. To persist change records immediately, I combine Canal with RabbitMQ for asynchronous processing.
Steps
Start MySQL with binlog enabled.
Start Canal, create a MySQL account for it, and connect as a slave.
Set Canal service mode to TCP and write a Java client to listen to binlog changes.
Switch Canal service mode to RabbitMQ, start RabbitMQ, and configure Canal to send binlog events via the message queue.
Environment Setup (Docker‑Compose)
version: "3
timeout: 30s
services:
mysql:
network_mode: mynetwork
container_name: mymysql
ports:
- "3306:3306"
restart: always
volumes:
- /etc/localtime:/etc/localtime
- /home/mycontainers/mymysql/data:/data
- /home/mycontainers/mymysql/mysql:/var/lib/mysql
- /home/mycontainers/mymysql/conf:/etc/mysql
environment:
- MYSQL_ROOT_PASSWORD=root
command: --character-set-server=utf8mb4 \
--collation-server=utf8mb4_unicode_ci \
--log-bin=/var/lib/mysql/mysql-bin \
--server-id=1 \
--binlog-format=ROW \
--expire_logs_days=7 \
--max_binlog_size=500M
image: mysql:5.7.20
rabbitmq:
container_name: myrabbit
ports:
- "15672:15672"
- "5672:5672"
restart: always
volumes:
- /etc/localtime:/etc/localtime
- /home/mycontainers/myrabbit/rabbitmq:/var/lib/rabbitmq
network_mode: mynetwork
environment:
- RABBITMQ_DEFAULT_USER=admin
- RABBITMQ_DEFAULT_PASS=123456
image: rabbitmq:3.8-management
canal-server:
container_name: canal-server
restart: always
ports:
- "11110:11110"
- "11111:11111"
- "11112:11112"
volumes:
- /home/mycontainers/canal-server/conf/canal.properties:/home/admin/canal-server/conf/canal.properties
- /home/mycontainers/canal-server/conf/instance.properties:/home/admin/canal-server/conf/example/instance.properties
- /home/mycontainers/canal-server/logs:/home/admin/canal-server/logs
network_mode: mynetwork
depends_on:
- mysql
- rabbitmq
image: canal/canal-server:v1.1.5Canal Configuration Files
canal.properties
# common arguments
canal.ip =
canal.register.ip =
canal.port = 11111
canal.metrics.pull.port = 11112
# server mode (tcp/kafka/rabbitMQ/pulsarMQ)
canal.serverMode = tcp
canal.destinations = canal-exchange
canal.conf.dir = ../conf
canal.auto.scan = true
canal.auto.scan.interval = 5
canal.auto.reset.latest.pos.mode = false
# MQ properties (RabbitMQ example)
canal.mq.flatMessage = true
canal.mq.canalBatchSize = 50
canal.mq.canalGetTimeout = 100
canal.mq.accessChannel = local
canal.mq.database.hash = true
canal.mq.send.thread.size = 30
canal.mq.build.thread.size = 8instance.properties
# MySQL master address
canal.instance.master.address=mymysql:3306
canal.instance.dbUsername=canal
canal.instance.dbPassword=canal
# Enable TSDB (optional)
canal.instance.tsdb.enable=true
canal.instance.tsdb.dir=${canal.file.data.dir:../conf}/${canal.instance.destination:}
canal.instance.tsdb.url=jdbc:h2:${canal.instance.tsdb.dir}/h2;CACHE_SIZE=1000;MODE=MYSQL;
canal.instance.tsdb.dbUsername=canal
canal.instance.tsdb.dbPassword=canal
# RabbitMQ settings
rabbitmq.host = myrabbit
rabbitmq.virtual.host = /
rabbitmq.exchange = canal-exchange
rabbitmq.username = admin
rabbitmq.password = 123456
rabbitmq.deliveryMode =
# Topic for Canal messages
canal.mq.topic=canal-routing-keyAfter editing the two files, ensure the instance path is /example/instance.properties and that the MySQL container and Canal container share the same Docker network so Canal can reach MySQL on its internal port 3306.
Integrate SpringBoot with Canal (Client)
Add Maven dependencies:
<dependency>
<groupId>com.alibaba.otter</groupId>
<artifactId>canal.client</artifactId>
<version>${canal.version}</version>
</dependency>
<dependency>
<groupId>com.alibaba.otter</groupId>
<artifactId>canal.protocol</artifactId>
<version>${canal.version}</version>
</dependency>Create a Spring component that connects to Canal and processes binlog entries:
import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.client.CanalConnectors;
import com.alibaba.otter.canal.protocol.CanalEntry;
import com.alibaba.otter.canal.protocol.Message;
import org.springframework.boot.CommandLineRunner;
import org.springframework.stereotype.Component;
import java.net.InetSocketAddress;
import java.util.List;
@Component
public class CanalClient implements CommandLineRunner {
private static final int BATCH_SIZE = 1000;
@Override
public void run(String... args) throws Exception {
CanalConnector connector = CanalConnectors.newSingleConnector(
new InetSocketAddress("localhost", 11111),
"canal-exchange", "canal", "canal");
try {
connector.connect();
connector.subscribe(".*.*");
connector.rollback();
while (true) {
Message message = connector.getWithoutAck(BATCH_SIZE);
long batchId = message.getId();
int size = message.getEntries().size();
if (batchId == -1 || size == 0) {
Thread.sleep(2000);
} else {
printEntry(message.getEntries());
}
connector.ack(batchId);
}
} finally {
connector.disconnect();
}
}
private static void printEntry(List<CanalEntry.Entry> entries) {
for (CanalEntry.Entry entry : entries) {
if (entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONBEGIN ||
entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONEND) {
continue;
}
CanalEntry.RowChange rowChange;
try {
rowChange = CanalEntry.RowChange.parseFrom(entry.getStoreValue());
} catch (Exception e) {
throw new RuntimeException("Parse error", e);
}
CanalEntry.EventType eventType = rowChange.getEventType();
System.out.println(String.format("binlog[%s:%s] , db[%s], table[%s] , event:%s",
entry.getHeader().getLogfileName(),
entry.getHeader().getLogfileOffset(),
entry.getHeader().getSchemaName(),
entry.getHeader().getTableName(),
eventType));
if (rowChange.getIsDdl()) {
System.out.println("DDL sql: " + rowChange.getSql());
}
for (CanalEntry.RowData rowData : rowChange.getRowDatasList()) {
if (eventType == CanalEntry.EventType.DELETE) {
printColumn(rowData.getBeforeColumnsList());
} else if (eventType == CanalEntry.EventType.INSERT) {
printColumn(rowData.getAfterColumnsList());
} else {
System.out.println("--- before ---");
printColumn(rowData.getBeforeColumnsList());
System.out.println("--- after ---");
printColumn(rowData.getAfterColumnsList());
}
}
}
}
private static void printColumn(List<CanalEntry.Column> columns) {
for (CanalEntry.Column column : columns) {
System.out.println(column.getName() + " : " + column.getValue() + " update=" + column.getUpdated());
}
}
}Start the client from the SpringBoot application entry point:
import org.springframework.boot.CommandLineRunner;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.beans.factory.annotation.Autowired;
@SpringBootApplication
public class BaseApplication implements CommandLineRunner {
@Autowired
private CanalClient canalClient;
@Override
public void run(String... args) throws Exception {
canalClient.run();
}
}When the application runs, any INSERT/UPDATE/DELETE on MySQL will be captured and printed.
Canal + RabbitMQ Integration
Change canal.serverMode to rabbitMQ in canal.properties and set the topic in instance.properties:
canal.serverMode = rabbitMQ canal.mq.topic=canal-routing-keyConfigure RabbitMQ connection in the same file:
rabbitmq.host = myrabbit
rabbitmq.virtual.host = /
rabbitmq.exchange = canal-exchange
rabbitmq.username = admin
rabbitmq.password = 123456After restarting the containers, create the exchange and queue in RabbitMQ (or let SpringBoot create them automatically).
SpringBoot RabbitMQ Configuration
Add the AMQP starter dependency:
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
<version>${amqp.version}</version>
</dependency>application.yml snippet:
spring:
rabbitmq:
host: 192.168.0.108
port: 5672
username: admin
password: 123456
publisher-confirm-type: correlated
publisher-returns: trueRabbitMQ configuration beans:
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.listener.SimpleRabbitListenerContainerFactory;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class RabbitConfig {
@Bean
public RabbitTemplate rabbitTemplate(ConnectionFactory cf) {
RabbitTemplate tmpl = new RabbitTemplate();
tmpl.setConnectionFactory(cf);
tmpl.setMessageConverter(new Jackson2JsonMessageConverter());
return tmpl;
}
@Bean
public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory(ConnectionFactory cf) {
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
factory.setConnectionFactory(cf);
factory.setMessageConverter(new Jackson2JsonMessageConverter());
return factory;
}
@Bean
public Queue canalQueue() {
return new Queue("canal-queue", true);
}
@Bean
public DirectExchange canalExchange() {
return new DirectExchange("canal-exchange", true, false);
}
@Bean
public Binding bindingCanal() {
return BindingBuilder.bind(canalQueue()).to(canalExchange()).with("canal-routing-key");
}
}Canal message producer (creates exchange, queue, binding as above) and consumer:
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import java.util.Map;
@Component
@RabbitListener(queues = "canal-queue")
public class CanalConsumer {
@RabbitHandler
public void process(Map<String, Object> msg) {
System.out.println("Received canal message: " + msg);
boolean isDdl = (boolean) msg.get("isDdl");
if (isDdl) return;
// handle INSERT/UPDATE/DELETE as needed
}
}Now, modifying data in MySQL triggers Canal to publish a message to RabbitMQ, and the SpringBoot consumer receives and processes it.
Signed-in readers can open the original source through BestHub's protected redirect.
This article has been distilled and summarized from source material, then republished for learning and reference. If you believe it infringes your rights, please contactand we will review it promptly.
Architect
Professional architect sharing high‑quality architecture insights. Topics include high‑availability, high‑performance, high‑stability architectures, big data, machine learning, Java, system and distributed architecture, AI, and practical large‑scale architecture case studies. Open to ideas‑driven architects who enjoy sharing and learning.
How this landed with the community
Was this worth your time?
0 Comments
Thoughtful readers leave field notes, pushback, and hard-won operational detail here.
