Backend Development 18 min read

Implementing Data Change Capture in SpringBoot Using Canal and RabbitMQ

This guide demonstrates how to decouple data change logging from business logic in a SpringBoot application by leveraging MySQL binlog monitoring with Canal, forwarding change events through RabbitMQ, and persisting both new and old record states using Docker‑compose, configuration files, and Java client code.

Selected Java Interview Questions
Selected Java Interview Questions
Selected Java Interview Questions
Implementing Data Change Capture in SpringBoot Using Canal and RabbitMQ

The requirement is to record data changes in a SpringBoot project without coupling to business code, storing the new row for inserts and both old and new rows for updates. Canal is used to listen to MySQL binlog events, and RabbitMQ queues the events for asynchronous processing.

Environment setup uses Docker‑compose to launch MySQL, RabbitMQ, and Canal containers:

version: "3"
services:
  mysql:
    network_mode: mynetwork
    container_name: mymysql
    ports:
      - 3306:3306
    restart: always
    volumes:
      - /etc/localtime:/etc/localtime
      - /home/mycontainers/mymysql/data:/data
      - /home/mycontainers/mymysql/mysql:/var/lib/mysql
      - /home/mycontainers/mymysql/conf:/etc/mysql
    environment:
      - MYSQL_ROOT_PASSWORD=root
    command:
      --character-set-server=utf8mb4
      --collation-server=utf8mb4_unicode_ci
      --log-bin=/var/lib/mysql/mysql-bin
      --server-id=1
      --binlog-format=ROW
      --expire_logs_days=7
      --max_binlog_size=500M
    image: mysql:5.7.20
  rabbitmq:
    container_name: myrabbit
    ports:
      - 15672:15672
      - 5672:5672
    restart: always
    volumes:
      - /etc/localtime:/etc/localtime
      - /home/mycontainers/myrabbit/rabbitmq:/var/lib/rabbitmq
    network_mode: mynetwork
    environment:
      - RABBITMQ_DEFAULT_USER=admin
      - RABBITMQ_DEFAULT_PASS=123456
    image: rabbitmq:3.8-management
  canal-server:
    container_name: canal-server
    restart: always
    ports:
      - 11110:11110
      - 11111:11111
      - 11112:11112
    volumes:
      - /home/mycontainers/canal-server/conf/canal.properties:/home/admin/canal-server/conf/canal.properties
      - /home/mycontainers/canal-server/conf/instance.properties:/home/admin/canal-server/conf/example/instance.properties
      - /home/mycontainers/canal-server/logs:/home/admin/canal-server/logs
    network_mode: mynetwork
    depends_on:
      - mysql
      - rabbitmq
    image: canal/canal-server:v1.1.5

The canal.properties file defines server mode (initially tcp ), Zookeeper settings, and MQ parameters. The instance.properties file configures the MySQL connection, user credentials, and topic name for RabbitMQ.

SpringBoot integration adds Maven dependencies for Canal client and AMQP:

<dependency>
  <groupId>com.alibaba.otter</groupId>
  <artifactId>canal.client</artifactId>
  <version>${canal.version}</version>
</dependency>
<dependency>
  <groupId>org.springframework.boot</groupId>
  <artifactId>spring-boot-starter-amqp</artifactId>
  <version>${amqp.version}</version>
</dependency>

A CanalClient component creates a CanalConnector , subscribes to all tables, fetches batches, and prints change entries. It distinguishes INSERT, UPDATE, DELETE and logs before/after column values.

@Component
public class CanalClient {
    private static final int BATCH_SIZE = 1000;
    public void run() {
        CanalConnector connector = CanalConnectors.newSingleConnector(
            new InetSocketAddress("localhost", 11111), "canal-exchange", "canal", "canal");
        try {
            connector.connect();
            connector.subscribe(".*\\..*");
            connector.rollback();
            while (true) {
                Message message = connector.getWithoutAck(BATCH_SIZE);
                long batchId = message.getId();
                int size = message.getEntries().size();
                if (batchId == -1 || size == 0) {
                    Thread.sleep(2000);
                } else {
                    printEntry(message.getEntries());
                }
                connector.ack(batchId);
            }
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            connector.disconnect();
        }
    }
    // printEntry and printColumn methods omitted for brevity
}

The main application class implements CommandLineRunner to start the client on boot.

Switching to RabbitMQ changes canal.serverMode = rabbitMQ and sets canal.mq.topic=canal-routing-key . RabbitMQ connection details are added to canal.properties (host, virtual host, exchange, username, password).

SpringBoot RabbitMQ configuration creates a durable queue canal-queue , a direct exchange canal-exchange , and binds them with the routing key.

@Configuration
public class RabbitConfig {
    @Bean
    public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
        RabbitTemplate template = new RabbitTemplate();
        template.setConnectionFactory(connectionFactory);
        template.setMessageConverter(new Jackson2JsonMessageConverter());
        return template;
    }
    @Bean
    public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory(ConnectionFactory connectionFactory) {
        SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
        factory.setConnectionFactory(connectionFactory);
        factory.setMessageConverter(new Jackson2JsonMessageConverter());
        return factory;
    }
}

A provider component declares the queue, exchange, and binding:

@Configuration
public class CanalProvider {
    @Bean
    public Queue canalQueue() { return new Queue(RabbitConstant.CanalQueue, true); }
    @Bean
    public DirectExchange canalExchange() { return new DirectExchange(RabbitConstant.CanalExchange, true, false); }
    @Bean
    public Binding bindingCanal() { return BindingBuilder.bind(canalQueue()).to(canalExchange()).with(RabbitConstant.CanalRouting); }
}

The consumer listens on canal-queue , extracts fields such as isDdl , id , ts , database , table , type , data , and old , and ignores DDL events and the internal sys_backup table to avoid loops.

@Component
@RabbitListener(queues = RabbitConstant.CanalQueue)
public class CanalConsumer {
    private final SysBackupService sysBackupService;
    public CanalConsumer(SysBackupService sysBackupService) { this.sysBackupService = sysBackupService; }
    @RabbitHandler
    public void process(Map
msg) {
        boolean isDdl = (boolean) msg.get("isDdl");
        if (isDdl) return;
        String table = (String) msg.get("table");
        if ("sys_backup".equalsIgnoreCase(table)) return;
        // further processing …
    }
}

After rebuilding the containers, any INSERT/UPDATE/DELETE on MySQL triggers Canal, which publishes a JSON message to RabbitMQ; the SpringBoot consumer receives it, enabling asynchronous handling or persistence of change logs.

DockerMySQLRabbitMQCanalSpringBootCDCDataSync
Selected Java Interview Questions
Written by

Selected Java Interview Questions

A professional Java tech channel sharing common knowledge to help developers fill gaps. Follow us!

0 followers
Reader feedback

How this landed with the community

login Sign in to like

Rate this article

Was this worth your time?

Sign in to rate
Discussion

0 Comments

Thoughtful readers leave field notes, pushback, and hard-won operational detail here.