How to Capture MySQL Changes in SpringBoot Using Canal and RabbitMQ

This guide shows how to decouple change‑recording logic in a SpringBoot application by using Alibaba Canal to listen to MySQL binlog events, forwarding them through RabbitMQ, and persisting both new and old row data for insert and update operations.

Java High-Performance Architecture
Java High-Performance Architecture
Java High-Performance Architecture
How to Capture MySQL Changes in SpringBoot Using Canal and RabbitMQ

Requirement

I need a way in SpringBoot to record data changes in a decoupled manner, storing the new row data and, for updates, also the old row data.

Solution Overview

Use Canal to listen to MySQL binlog changes and combine it with RabbitMQ to process and persist change records.

Steps

Start a MySQL instance with binlog enabled.

Start a Canal instance, create a MySQL account for it, and connect as a slave.

Run Canal in TCP mode with a Java client, then switch to RabbitMQ mode.

Deploy the services using docker‑compose.

Environment Setup (docker‑compose)

version: "3"
services:
  mysql:
    network_mode: mynetwork
    container_name: mymysql
    ports:
      - 3306:3306
    restart: always
    volumes:
      - /etc/localtime:/etc/localtime
      - /home/mycontainers/mymysql/data:/data
      - /home/mycontainers/mymysql/mysql:/var/lib/mysql
      - /home/mycontainers/mymysql/conf:/etc/mysql
    environment:
      - MYSQL_ROOT_PASSWORD=root
    command: |
      --character-set-server=utf8mb4
      --collation-server=utf8mb4_unicode_ci
      --log-bin=/var/lib/mysql/mysql-bin
      --server-id=1
      --binlog-format=ROW
      --expire_logs_days=7
      --max_binlog_size=500M
    image: mysql:5.7.20
  rabbitmq:
    container_name: myrabbit
    ports:
      - 15672:15672
      - 5672:5672
    restart: always
    volumes:
      - /etc/localtime:/etc/localtime
      - /home/mycontainers/myrabbit/rabbitmq:/var/lib/rabbitmq
    network_mode: mynetwork
    environment:
      - RABBITMQ_DEFAULT_USER=admin
      - RABBITMQ_DEFAULT_PASS=123456
    image: rabbitmq:3.8-management
  canal-server:
    container_name: canal-server
    restart: always
    ports:
      - 11110:11110
      - 11111:11111
      - 11112:11112
    volumes:
      - /home/mycontainers/canal-server/conf/canal.properties:/home/admin/canal-server/conf/canal.properties
      - /home/mycontainers/canal-server/conf/instance.properties:/home/admin/canal-server/conf/example/instance.properties
      - /home/mycontainers/canal-server/logs:/home/admin/canal-server/logs
    network_mode: mynetwork
    depends_on:
      - mysql
      - rabbitmq
    image: canal/canal-server:v1.1.5

Canal Configuration Files

canal.properties

#################################################
#########     common argument   #############
#################################################
# tcp bind ip
canal.ip =
# register ip to zookeeper
canal.register.ip =
canal.port = 11111
canal.metrics.pull.port = 11112
# canal admin config
# canal.admin.manager = canal-admin:8089
# canal.admin.port = 11110
# canal.admin.user = admin
# canal.admin.passwd = ...
# admin auto register
# canal.admin.register.auto = true
# cluster name (empty for single node)
# canal.admin.register.cluster =
# Canal Server name
# canal.admin.register.name = canal-admin

canal.zkServers =
canal.zookeeper.flush.period = 1000
canal.withoutNetty = false
# service mode: tcp, kafka, rocketMQ, rabbitMQ, pulsarMQ
canal.serverMode = tcp
canal.file.data.dir = ${canal.conf.dir}
canal.file.flush.period = 1000
canal.instance.memory.buffer.size = 16384
canal.instance.memory.buffer.memunit = 1024
canal.instance.memory.batch.mode = MEMSIZE
canal.instance.memory.rawEntry = true

canal.instance.detecting.enable = false
canal.instance.detecting.sql = select 1
canal.instance.detecting.interval.time = 3
canal.instance.detecting.retry.threshold = 3
canal.instance.detecting.heartbeatHaEnable = false

canal.instance.transaction.size = 1024
canal.instance.fallbackIntervalInSeconds = 60

canal.instance.network.receiveBufferSize = 16384
canal.instance.network.sendBufferSize = 16384
canal.instance.network.soTimeout = 30

canal.instance.filter.druid.ddl = true
canal.instance.filter.query.dcl = false
canal.instance.filter.query.dml = false
canal.instance.filter.query.ddl = false
canal.instance.filter.table.error = false
canal.instance.filter.rows = false
canal.instance.filter.transaction.entry = false
canal.instance.filter.dml.insert = false
canal.instance.filter.dml.update = false
canal.instance.filter.dml.delete = false

canal.instance.binlog.format = ROW,STATEMENT,MIXED
canal.instance.binlog.image = FULL,MINIMAL,NOBLOB

canal.instance.get.ddl.isolation = false

canal.instance.parser.parallel = true
canal.instance.parser.parallelThreadSize = 16
canal.instance.parser.parallelBufferSize = 256

canal.instance.tsdb.enable = true
canal.instance.tsdb.dir = ${canal.file.data.dir:../conf}/${canal.instance.destination:}
canal.instance.tsdb.url = jdbc:h2:${canal.instance.tsdb.dir}/h2;CACHE_SIZE=1000;MODE=MYSQL;
canal.instance.tsdb.dbUsername = canal
canal.instance.tsdb.dbPassword = canal
canal.instance.tsdb.snapshot.interval = 24
canal.instance.tsdb.snapshot.expire = 360

#################################################
#########     destinations    #############
#################################################
canal.destinations = canal-exchange
canal.conf.dir = ../conf
canal.auto.scan = true
canal.auto.scan.interval = 5
canal.auto.reset.latest.pos.mode = false

canal.instance.tsdb.spring.xml = classpath:spring/tsdb/h2-tsdb.xml
canal.instance.global.mode = spring
canal.instance.global.lazy = false
canal.instance.global.manager.address = ${canal.admin.manager}
canal.instance.global.spring.xml = classpath:spring/file-instance.xml

##################################################
#########     MQ Properties     #############
##################################################
canal.aliyun.accessKey =
canal.aliyun.secretKey =
canal.aliyun.uid=

canal.mq.flatMessage = true
canal.mq.canalBatchSize = 50
canal.mq.canalGetTimeout = 100
canal.mq.accessChannel = local

canal.mq.database.hash = true
canal.mq.send.thread.size = 30
canal.mq.build.thread.size = 8

##################################################
#########       RabbitMQ       #############
##################################################
rabbitmq.host = myrabbit
rabbitmq.virtual.host = /
rabbitmq.exchange = canal-exchange
rabbitmq.username = admin
rabbitmq.password = 123456
rabbitmq.deliveryMode =

instance.properties

#################################################
## mysql serverId , v1.0.26+ will autoGen
#canal.instance.mysql.slaveId=123

# enable gtid
canal.instance.gtidon=false

# master connection
canal.instance.master.address=mymysql:3306
canal.instance.master.journal.name=
canal.instance.master.position=
canal.instance.master.timestamp=
canal.instance.master.gtid=

# rds oss binlog (optional)
canal.instance.rds.accesskey=
canal.instance.rds.secretkey=
canal.instance.rds.instanceId=

# tsdb enable
canal.instance.tsdb.enable=true

# db credentials for Canal to read binlog
canal.instance.dbUsername=canal
canal.instance.dbPassword=canal
canal.instance.connectionCharset = UTF-8
canal.instance.enableDruid=false

# table filter regex
canal.instance.filter.regex=.*\..*
canal.instance.filter.black.regex=mysql\.slave_.*

# mq config
canal.mq.topic=canal-routing-key
canal.mq.partition=0

SpringBoot Integration with Canal (Client)

Add Maven dependencies:

<dependency>
  <groupId>com.alibaba.otter</groupId>
  <artifactId>canal.client</artifactId>
  <version>${canal.version}</version>
</dependency>
<dependency>
  <groupId>com.alibaba.otter</groupId>
  <artifactId>canal.protocol</artifactId>
  <version>${canal.version}</version>
</dependency>

Component that connects to Canal and processes entries:

import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.client.CanalConnectors;
import com.alibaba.otter.canal.protocol.CanalEntry;
import com.alibaba.otter.canal.protocol.Message;
import org.springframework.boot.CommandLineRunner;
import org.springframework.stereotype.Component;
import java.net.InetSocketAddress;
import java.util.List;

@Component
public class CanalClient implements CommandLineRunner {
    private static final int BATCH_SIZE = 1000;
    @Override
    public void run(String... args) {
        CanalConnector connector = CanalConnectors.newSingleConnector(
                new InetSocketAddress("localhost", 11111), "canal-exchange", "canal", "canal");
        try {
            connector.connect();
            connector.subscribe(".*\..*");
            connector.rollback();
            while (true) {
                Message message = connector.getWithoutAck(BATCH_SIZE);
                long batchId = message.getId();
                int size = message.getEntries().size();
                if (batchId == -1 || size == 0) {
                    Thread.sleep(2000);
                } else {
                    printEntry(message.getEntries());
                }
                connector.ack(batchId);
            }
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            connector.disconnect();
        }
    }
    private static void printEntry(List<CanalEntry.Entry> entries) {
        for (CanalEntry.Entry entry : entries) {
            if (entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONBEGIN ||
                entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONEND) {
                continue;
            }
            CanalEntry.RowChange rowChange;
            try {
                rowChange = CanalEntry.RowChange.parseFrom(entry.getStoreValue());
            } catch (Exception e) {
                throw new RuntimeException("Parser error", e);
            }
            CanalEntry.EventType eventType = rowChange.getEventType();
            System.out.println(String.format("=== binlog[%s:%s] table[%s.%s] event:%s",
                    entry.getHeader().getLogfileName(),
                    entry.getHeader().getLogfileOffset(),
                    entry.getHeader().getSchemaName(),
                    entry.getHeader().getTableName(),
                    eventType));
            if (rowChange.getIsDdl()) {
                System.out.println("DDL: " + rowChange.getSql());
            }
            for (CanalEntry.RowData rowData : rowChange.getRowDatasList()) {
                if (eventType == CanalEntry.EventType.DELETE) {
                    printColumn(rowData.getBeforeColumnsList());
                } else if (eventType == CanalEntry.EventType.INSERT) {
                    printColumn(rowData.getAfterColumnsList());
                } else {
                    System.out.println("--- before ---");
                    printColumn(rowData.getBeforeColumnsList());
                    System.out.println("--- after ---");
                    printColumn(rowData.getAfterColumnsList());
                }
            }
        }
    }
    private static void printColumn(List<CanalEntry.Column> columns) {
        for (CanalEntry.Column column : columns) {
            System.out.println(column.getName() + " : " + column.getValue() + " update=" + column.getUpdated());
        }
    }
}

Application entry point:

@SpringBootApplication
public class BaseApplication implements CommandLineRunner {
    @Autowired
    private CanalClient canalClient;
    @Override
    public void run(String... args) throws Exception {
        canalClient.run();
    }
}

Integrating Canal with RabbitMQ

Change canal.serverMode to rabbitMQ and set the topic in instance.properties:

canal.serverMode = rabbitMQ
canal.mq.topic=canal-routing-key

RabbitMQ settings in canal.properties already define host, exchange, username and password.

SpringBoot RabbitMQ Configuration

Add dependency:

<dependency>
  <groupId>org.springframework.boot</groupId>
  <artifactId>spring-boot-starter-amqp</artifactId>
  <version>${amqp.version}</version>
</dependency>

application.yml:

spring:
  rabbitmq:
    host: 192.168.0.108
    port: 5672
    username: admin
    password: 123456
    publisher-confirm-type: correlated
    publisher-returns: true

Rabbit configuration class:

@Configuration
public class RabbitConfig {
    @Bean
    public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
        RabbitTemplate template = new RabbitTemplate();
        template.setConnectionFactory(connectionFactory);
        template.setMessageConverter(new Jackson2JsonMessageConverter());
        return template;
    }
    @Bean
    public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory(ConnectionFactory connectionFactory) {
        SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
        factory.setConnectionFactory(connectionFactory);
        factory.setMessageConverter(new Jackson2JsonMessageConverter());
        return factory;
    }
}

Declare queue, exchange and binding:

@Configuration
public class CanalProvider {
    @Bean
    public Queue canalQueue() {
        return new Queue("canal-queue", true);
    }
    @Bean
    public DirectExchange canalExchange() {
        return new DirectExchange("canal-exchange", true, false);
    }
    @Bean
    public Binding bindingCanal() {
        return BindingBuilder.bind(canalQueue()).to(canalExchange()).with("canal-routing-key");
    }
}

Consumer that processes messages from the queue:

@Component
@RabbitListener(queues = "canal-queue")
public class CanalConsumer {
    private final SysBackupService sysBackupService;
    public CanalConsumer(SysBackupService sysBackupService) {
        this.sysBackupService = sysBackupService;
    }
    @RabbitHandler
    public void process(Map<String, Object> msg) {
        System.out.println("Received canal message: " + msg);
        boolean isDdl = (boolean) msg.get("isDdl");
        if (isDdl) return;
        String table = (String) msg.get("table");
        if ("sys_backup".equalsIgnoreCase(table)) return;
        String type = (String) msg.get("type");
        if (!"INSERT".equalsIgnoreCase(type) && !"UPDATE".equalsIgnoreCase(type) && !"DELETE".equalsIgnoreCase(type)) return;
        // further processing, e.g., persisting change record
    }
}

After starting the containers and the SpringBoot application, any INSERT, UPDATE or DELETE on the MySQL tables will be captured by Canal, sent to RabbitMQ, and finally processed by the CanalConsumer component.

Original Source

Signed-in readers can open the original source through BestHub's protected redirect.

Sign in to view source
Republication Notice

This article has been distilled and summarized from source material, then republished for learning and reference. If you believe it infringes your rights, please contactadmin@besthub.devand we will review it promptly.

DockermysqlRabbitMQCanalSpringBootChange Data Capture
Java High-Performance Architecture
Written by

Java High-Performance Architecture

Sharing Java development articles and resources, including SSM architecture and the Spring ecosystem (Spring Boot, Spring Cloud, MyBatis, Dubbo, Docker), Zookeeper, Redis, architecture design, microservices, message queues, Git, etc.

0 followers
Reader feedback

How this landed with the community

Sign in to like

Rate this article

Was this worth your time?

Sign in to rate
Discussion

0 Comments

Thoughtful readers leave field notes, pushback, and hard-won operational detail here.