Backend Development 22 min read

Integrating SpringBoot with Canal and RabbitMQ for Database Change Capture

This guide explains how to decouple business logic in a SpringBoot application by using Canal to listen to MySQL binlog changes, forwarding those events through RabbitMQ, and processing them with a Java client to record both new and old data for insert, update, and delete operations.

Selected Java Interview Questions
Selected Java Interview Questions
Selected Java Interview Questions
Integrating SpringBoot with Canal and RabbitMQ for Database Change Capture

The requirement is to implement a change‑recording mechanism in SpringBoot that captures new data and, for updates, also records the old data, while keeping the business code loosely coupled.

After researching, Canal is chosen to listen to MySQL binlog events, and RabbitMQ is added to queue the change‑record saving logic.

Steps

Start MySQL with binlog enabled.

Start Canal, create a MySQL user for it, and connect as a slave.

Set Canal server mode to TCP and write a Java client to listen to binlog changes.

Switch Canal server mode to RabbitMQ, start RabbitMQ, and configure Canal to publish binlog events to the queue.

Environment Setup (Docker‑Compose)

version: "3"
services:
  mysql:
    network_mode: mynetwork
    container_name: mymysql
    ports:
      - 3306:3306
    restart: always
    volumes:
      - /etc/localtime:/etc/localtime
      - /home/mycontainers/mymysql/data:/data
      - /home/mycontainers/mymysql/mysql:/var/lib/mysql
      - /home/mycontainers/mymysql/conf:/etc/mysql
    environment:
      - MYSQL_ROOT_PASSWORD=root
    command:
      --character-set-server=utf8mb4
      --collation-server=utf8mb4_unicode_ci
      --log-bin=/var/lib/mysql/mysql-bin
      --server-id=1
      --binlog-format=ROW
      --expire_logs_days=7
      --max_binlog_size=500M
    image: mysql:5.7.20
  rabbitmq:
    container_name: myrabbit
    ports:
      - 15672:15672
      - 5672:5672
    restart: always
    volumes:
      - /etc/localtime:/etc/localtime
      - /home/mycontainers/myrabbit/rabbitmq:/var/lib/rabbitmq
    network_mode: mynetwork
    environment:
      - RABBITMQ_DEFAULT_USER=admin
      - RABBITMQ_DEFAULT_PASS=123456
    image: rabbitmq:3.8-management
  canal-server:
    container_name: canal-server
    restart: always
    ports:
      - 11110:11110
      - 11111:11111
      - 11112:11112
    volumes:
      - /home/mycontainers/canal-server/conf/canal.properties:/home/admin/canal-server/conf/canal.properties
      - /home/mycontainers/canal-server/conf/instance.properties:/home/admin/canal-server/conf/example/instance.properties
      - /home/mycontainers/canal-server/logs:/home/admin/canal-server/logs
    network_mode: mynetwork
    depends_on:
      - mysql
      - rabbitmq
    image: canal/canal-server:v1.1.5

Two configuration files must be prepared:

canal.properties

# common arguments
canal.ip = 
canal.register.ip = 
canal.port = 11111
canal.metrics.pull.port = 11112
canal.serverMode = tcp
canal.zkServers = 
canal.zookeeper.flush.period = 1000
canal.withoutNetty = false
canal.file.data.dir = ${canal.conf.dir}
canal.file.flush.period = 1000
canal.instance.memory.buffer.size = 16384
canal.instance.memory.buffer.memunit = 1024
canal.instance.memory.batch.mode = MEMSIZE
canal.instance.memory.rawEntry = true
canal.instance.detecting.enable = false
canal.instance.detecting.sql = select 1
canal.instance.detecting.interval.time = 3
canal.instance.detecting.retry.threshold = 3
canal.instance.detecting.heartbeatHaEnable = false
canal.instance.transaction.size = 1024
canal.instance.fallbackIntervalInSeconds = 60
canal.instance.network.receiveBufferSize = 16384
canal.instance.network.sendBufferSize = 16384
canal.instance.network.soTimeout = 30
canal.instance.filter.druid.ddl = true
canal.instance.filter.query.dcl = false
canal.instance.filter.query.dml = false
canal.instance.filter.query.ddl = false
canal.instance.filter.table.error = false
canal.instance.filter.rows = false
canal.instance.filter.transaction.entry = false
canal.instance.filter.dml.insert = false
canal.instance.filter.dml.update = false
canal.instance.filter.dml.delete = false
canal.instance.binlog.format = ROW,STATEMENT,MIXED
canal.instance.binlog.image = FULL,MINIMAL,NOBLOB
canal.instance.get.ddl.isolation = false
canal.instance.parser.parallel = true
canal.instance.parser.parallelThreadSize = 16
canal.instance.parser.parallelBufferSize = 256
canal.instance.tsdb.enable = true
canal.instance.tsdb.dir = ${canal.file.data.dir:../conf}/${canal.instance.destination:}
canal.instance.tsdb.url = jdbc:h2:${canal.instance.tsdb.dir}/h2;CACHE_SIZE=1000;MODE=MYSQL;
canal.instance.tsdb.dbUsername = canal
canal.instance.tsdb.dbPassword = canal
canal.instance.tsdb.snapshot.interval = 24
canal.instance.tsdb.snapshot.expire = 360
canal.destinations = canal-exchange
canal.conf.dir = ../conf
canal.auto.scan = true
canal.auto.scan.interval = 5
canal.auto.reset.latest.pos.mode = false
canal.instance.tsdb.spring.xml = classpath:spring/tsdb/h2-tsdb.xml
canal.instance.global.mode = spring
canal.instance.global.lazy = false
canal.instance.global.manager.address = ${canal.admin.manager}
canal.instance.global.spring.xml = classpath:spring/file-instance.xml
# MQ properties
canal.mq.flatMessage = true
canal.mq.canalBatchSize = 50
canal.mq.canalGetTimeout = 100
canal.mq.accessChannel = local
canal.mq.database.hash = true
canal.mq.send.thread.size = 30
canal.mq.build.thread.size = 8
# RabbitMQ
rabbitmq.host = myrabbit
rabbitmq.virtual.host = /
rabbitmq.exchange = canal-exchange
rabbitmq.username = admin
rabbitmq.password = 123456
rabbitmq.deliveryMode =

instance.properties

# MySQL connection
canal.instance.master.address=mymysql:3306
canal.instance.dbUsername=canal
canal.instance.dbPassword=canal
# GTID
canal.instance.gtidon=false
# Instance meta
canal.instance.filter.regex=.*\..*
canal.instance.filter.black.regex=mysql\.slave_.*
# MQ topic
canal.mq.topic=canal-routing-key
canal.mq.partition=0

After mapping these files, ensure the MySQL container and Canal container share the same Docker network and that Canal connects to MySQL via the internal port 3306.

SpringBoot Integration with Canal

Add Maven dependencies:

<dependency>
  <groupId>com.alibaba.otter</groupId>
  <artifactId>canal.client</artifactId>
  <version>${canal.version}</version>
</dependency>
<dependency>
  <groupId>com.alibaba.otter</groupId>
  <artifactId>canal.protocol</artifactId>
  <version>${canal.version}</version>
</dependency>

Implement a Canal client component:

import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.client.CanalConnectors;
import com.alibaba.otter.canal.protocol.CanalEntry;
import com.alibaba.otter.canal.protocol.Message;
import org.springframework.boot.CommandLineRunner;
import org.springframework.stereotype.Component;
import java.net.InetSocketAddress;
import java.util.List;

@Component
public class CanalClient {
    private static final int BATCH_SIZE = 1000;
    public void run() {
        CanalConnector connector = CanalConnectors.newSingleConnector(
            new InetSocketAddress("localhost", 11111), "canal-exchange", "canal", "canal");
        try {
            connector.connect();
            connector.subscribe(".*.*");
            connector.rollback();
            while (true) {
                Message message = connector.getWithoutAck(BATCH_SIZE);
                long batchId = message.getId();
                int size = message.getEntries().size();
                if (batchId == -1 || size == 0) {
                    Thread.sleep(2000);
                } else {
                    printEntry(message.getEntries());
                }
                connector.ack(batchId);
            }
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            connector.disconnect();
        }
    }
    private static void printEntry(List
entrys) {
        for (CanalEntry.Entry entry : entrys) {
            if (entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONBEGIN ||
                entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONEND) {
                continue;
            }
            CanalEntry.RowChange rowChage;
            try {
                rowChage = CanalEntry.RowChange.parseFrom(entry.getStoreValue());
            } catch (Exception e) {
                throw new RuntimeException("ERROR ## parser of eromanga-event has an error, data:" + entry, e);
            }
            CanalEntry.EventType eventType = rowChage.getEventType();
            System.out.println(String.format("=== binlog[%s:%s], name[%s,%s], eventType: %s",
                entry.getHeader().getLogfileName(), entry.getHeader().getLogfileOffset(),
                entry.getHeader().getSchemaName(), entry.getHeader().getTableName(), eventType));
            if (rowChage.getIsDdl()) {
                System.out.println("=== isDdl: true, sql:" + rowChage.getSql());
            }
            for (CanalEntry.RowData rowData : rowChage.getRowDatasList()) {
                if (eventType == CanalEntry.EventType.DELETE) {
                    printColumn(rowData.getBeforeColumnsList());
                } else if (eventType == CanalEntry.EventType.INSERT) {
                    printColumn(rowData.getAfterColumnsList());
                } else {
                    System.out.println("--- before");
                    printColumn(rowData.getBeforeColumnsList());
                    System.out.println("--- after");
                    printColumn(rowData.getAfterColumnsList());
                }
            }
        }
    }
    private static void printColumn(List
columns) {
        for (CanalEntry.Column column : columns) {
            System.out.println(column.getName() + " : " + column.getValue() + "    update=" + column.getUpdated());
        }
    }
}

Wire the client into the SpringBoot application entry point:

@SpringBootApplication
public class BaseApplication implements CommandLineRunner {
    @Autowired
    private CanalClient canalClient;
    @Override
    public void run(String... args) throws Exception {
        canalClient.run();
    }
}

Switching Canal to RabbitMQ Mode

Change canal.serverMode = rabbitMQ in canal.properties and set canal.mq.topic=canal-routing-key in instance.properties . Then configure RabbitMQ connection:

rabbitmq.host = myrabbit
rabbitmq.virtual.host = /
rabbitmq.exchange = canal-exchange
rabbitmq.username = admin
rabbitmq.password = 123456

Create the exchange and queue (or let SpringBoot auto‑create them) and bind with routing key canal-routing-key .

SpringBoot RabbitMQ Configuration

spring:
  rabbitmq:
    host: 192.168.0.108
    port: 5672
    username: admin
    password: 123456
    publisher-confirm-type: correlated
    publisher-returns: true

Define a RabbitMQ configuration class to provide a RabbitTemplate and a listener container factory:

@Configuration
public class RabbitConfig {
    @Bean
    public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
        RabbitTemplate template = new RabbitTemplate();
        template.setConnectionFactory(connectionFactory);
        template.setMessageConverter(new Jackson2JsonMessageConverter());
        return template;
    }
    @Bean
    public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory(ConnectionFactory connectionFactory) {
        SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
        factory.setConnectionFactory(connectionFactory);
        factory.setMessageConverter(new Jackson2JsonMessageConverter());
        return factory;
    }
}

Declare the queue, exchange, and binding:

@Configuration
public class CanalProvider {
    @Bean
    public Queue canalQueue() {
        return new Queue(RabbitConstant.CanalQueue, true);
    }
    @Bean
    public DirectExchange canalExchange() {
        return new DirectExchange(RabbitConstant.CanalExchange, true, false);
    }
    @Bean
    public Binding bindingCanal() {
        return BindingBuilder.bind(canalQueue()).to(canalExchange()).with(RabbitConstant.CanalRouting);
    }
}

Implement a consumer to process the Canal messages received from RabbitMQ:

@Component
@RabbitListener(queues = RabbitConstant.CanalQueue)
public class CanalConsumer {
    private final SysBackupService sysBackupService;
    public CanalConsumer(SysBackupService sysBackupService) {
        this.sysBackupService = sysBackupService;
    }
    @RabbitHandler
    public void process(Map
msg) {
        System.out.println("Received canal message: " + msg);
        boolean isDdl = (boolean) msg.get("isDdl");
        if (isDdl) return;
        String table = (String) msg.get("table");
        if ("sys_backup".equalsIgnoreCase(table)) return;
        String type = (String) msg.get("type");
        if (!"INSERT".equalsIgnoreCase(type) && !"UPDATE".equalsIgnoreCase(type) && !"DELETE".equalsIgnoreCase(type)) {
            return;
        }
        // further processing such as persisting change records
    }
}

After restarting the containers, any insert, update, or delete on the MySQL tables will be captured by Canal, sent to RabbitMQ, and processed by the SpringBoot consumer, achieving a decoupled change‑recording solution.

JavadockerMySQLRabbitMQCanalSpringBootCDC
Selected Java Interview Questions
Written by

Selected Java Interview Questions

A professional Java tech channel sharing common knowledge to help developers fill gaps. Follow us!

0 followers
Reader feedback

How this landed with the community

login Sign in to like

Rate this article

Was this worth your time?

Sign in to rate
Discussion

0 Comments

Thoughtful readers leave field notes, pushback, and hard-won operational detail here.