Decouple Data Change Logging in SpringBoot with Canal and RabbitMQ

This guide shows how to use Canal to capture MySQL binlog changes, forward them through RabbitMQ, and process them in a SpringBoot application, providing a clean, decoupled way to record both new and old data for insert, update, and delete operations.

Java Architect Handbook
Java Architect Handbook
Java Architect Handbook
Decouple Data Change Logging in SpringBoot with Canal and RabbitMQ

Requirement

The goal is to record data changes in a SpringBoot service without tightly coupling business code to the logging logic. New data must be captured, and for updates the previous values should also be stored.

Solution Overview

Canal monitors MySQL binlog events, and RabbitMQ is used as a buffer to decouple the change capture from downstream processing. A SpringBoot client reads binlog events from Canal, publishes them to a RabbitMQ exchange, and a consumer processes the messages.

Step‑by‑Step Procedure

Start a MySQL container with binlog enabled.

Start a Canal server container and configure it to connect to MySQL as a slave.

Set Canal’s service mode to tcp for initial testing, then switch to rabbitMQ for production.

Configure RabbitMQ (exchange, queue, routing key) to receive Canal messages.

Environment Setup (Docker‑Compose)

version: "3
timeout: 30s
services:
  mysql:
    container_name: mymysql
    image: mysql:5.7.20
    ports:
      - "3306:3306"
    environment:
      - MYSQL_ROOT_PASSWORD=root
    command: --character-set-server=utf8mb4 \
             --collation-server=utf8mb4_unicode_ci \
             --log-bin=mysql-bin \
             --server-id=1 \
             --binlog-format=ROW \
             --expire_logs_days=7 \
             --max_binlog_size=500M
    volumes:
      - /home/mycontainers/mymysql/data:/data
      - /home/mycontainers/mymysql/mysql:/var/lib/mysql
      - /home/mycontainers/mymysql/conf:/etc/mysql
    network_mode: mynetwork

  rabbitmq:
    container_name: myrabbit
    image: rabbitmq:3.8-management
    ports:
      - "15672:15672"
      - "5672:5672"
    environment:
      - RABBITMQ_DEFAULT_USER=admin
      - RABBITMQ_DEFAULT_PASS=123456
    network_mode: mynetwork

  canal-server:
    container_name: canal-server
    image: canal/canal-server:v1.1.5
    ports:
      - "11110:11110"
      - "11111:11111"
      - "11112:11112"
    volumes:
      - /home/mycontainers/canal-server/conf/canal.properties:/home/admin/canal-server/conf/canal.properties
      - /home/mycontainers/canal-server/conf/instance.properties:/home/admin/canal-server/conf/example/instance.properties
      - /home/mycontainers/canal-server/logs:/home/admin/canal-server/logs
    depends_on:
      - mysql
      - rabbitmq
    network_mode: mynetwork

Canal Configuration Files

canal.properties

canal.ip =
canal.register.ip =
canal.port = 11111
canal.metrics.pull.port = 11112
canal.serverMode = tcp
canal.file.data.dir = ${canal.conf.dir}
canal.file.flush.period = 1000
canal.instance.memory.buffer.size = 16384
canal.instance.memory.buffer.memunit = 1024
canal.instance.memory.batch.mode = MEMSIZE
canal.instance.memory.rawEntry = true
canal.instance.detecting.enable = false
canal.instance.transaction.size = 1024
canal.instance.fallbackIntervalInSeconds = 60
canal.instance.network.receiveBufferSize = 16384
canal.instance.network.sendBufferSize = 16384
canal.instance.network.soTimeout = 30
canal.instance.filter.druid.ddl = true
canal.instance.filter.query.dcl = false
canal.instance.filter.query.dml = false
canal.instance.filter.query.ddl = false
canal.instance.filter.table.error = false
canal.instance.filter.rows = false
canal.instance.filter.transaction.entry = false
canal.instance.filter.dml.insert = false
canal.instance.filter.dml.update = false
canal.instance.filter.dml.delete = false
canal.instance.binlog.format = ROW,STATEMENT,MIXED
canal.instance.binlog.image = FULL,MINIMAL,NOBLOB
canal.instance.parser.parallel = true
canal.instance.parser.parallelThreadSize = 16
canal.instance.parser.parallelBufferSize = 256
canal.instance.tsdb.enable = true
canal.destinations = canal-exchange
canal.conf.dir = ../conf
canal.auto.scan = true
canal.auto.scan.interval = 5
canal.auto.reset.latest.pos.mode = false
canal.instance.global.mode = spring
canal.instance.global.lazy = false
canal.instance.global.manager.address = ${canal.admin.manager}
canal.instance.global.spring.xml = classpath:spring/file-instance.xml
canal.mq.flatMessage = true
canal.mq.canalBatchSize = 50
canal.mq.canalGetTimeout = 100
canal.mq.accessChannel = local
canal.mq.database.hash = true
canal.mq.send.thread.size = 30
canal.mq.build.thread.size = 8

instance.properties

canal.instance.master.address=mymysql:3306
canal.instance.dbUsername=canal
canal.instance.dbPassword=canal
canal.instance.connectionCharset=UTF-8
canal.instance.filter.regex=.*\..*
canal.instance.filter.black.regex=mysql\.slave_.*
canal.instance.filter.field=test1.t_product:id/subject/keywords,test2.t_company:id/name/contact/ch
canal.instance.filter.black.field=test1.t_product:subject/product_image,test2.t_company:id/name/contact/ch
canal.instance.tsdb.enable=true
canal.instance.tsdb.url=jdbc:h2:${canal.instance.tsdb.dir}/h2;CACHE_SIZE=1000;MODE=MYSQL;
canal.instance.tsdb.dbUsername=canal
canal.instance.tsdb.dbPassword=canal
canal.mq.topic=canal-routing-key
rabbitmq.host=myrabbit
rabbitmq.virtual.host=/
rabbitmq.exchange=canal-exchange
rabbitmq.username=admin
rabbitmq.password=123456

SpringBoot Integration with Canal

Maven Dependencies

<dependency>
  <groupId>com.alibaba.otter</groupId>
  <artifactId>canal.client</artifactId>
  <version>${canal.version}</version>
</dependency>
<dependency>
  <groupId>com.alibaba.otter</groupId>
  <artifactId>canal.protocol</artifactId>
  <version>${canal.version}</version>
</dependency>

Canal Client Component

import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.client.CanalConnectors;
import com.alibaba.otter.canal.protocol.CanalEntry;
import com.alibaba.otter.canal.protocol.Message;
import org.springframework.boot.CommandLineRunner;
import org.springframework.stereotype.Component;
import java.net.InetSocketAddress;
import java.util.List;

@Component
public class CanalClient implements CommandLineRunner {
    private static final int BATCH_SIZE = 1000;

    @Override
    public void run(String... args) throws Exception {
        CanalConnector connector = CanalConnectors.newSingleConnector(
                new InetSocketAddress("localhost", 11111),
                "canal-exchange",
                "canal",
                "canal");
        try {
            connector.connect();
            connector.subscribe(".*\\..*");
            connector.rollback();
            while (true) {
                Message message = connector.getWithoutAck(BATCH_SIZE);
                long batchId = message.getId();
                int size = message.getEntries().size();
                if (batchId == -1 || size == 0) {
                    Thread.sleep(2000);
                } else {
                    printEntry(message.getEntries());
                }
                connector.ack(batchId);
            }
        } finally {
            connector.disconnect();
        }
    }

    private static void printEntry(List<CanalEntry.Entry> entries) {
        for (CanalEntry.Entry entry : entries) {
            if (entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONBEGIN ||
                entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONEND) {
                continue;
            }
            CanalEntry.RowChange rowChange;
            try {
                rowChange = CanalEntry.RowChange.parseFrom(entry.getStoreValue());
            } catch (Exception e) {
                throw new RuntimeException("Parse error", e);
            }
            CanalEntry.EventType eventType = rowChange.getEventType();
            System.out.println(String.format(
                    "=== binlog[%s:%s] , db[%s] , table[%s] , event:%s",
                    entry.getHeader().getLogfileName(),
                    entry.getHeader().getLogfileOffset(),
                    entry.getHeader().getSchemaName(),
                    entry.getHeader().getTableName(),
                    eventType));
            for (CanalEntry.RowData rowData : rowChange.getRowDatasList()) {
                if (eventType == CanalEntry.EventType.DELETE) {
                    printColumn(rowData.getBeforeColumnsList());
                } else if (eventType == CanalEntry.EventType.INSERT) {
                    printColumn(rowData.getAfterColumnsList());
                } else {
                    System.out.println("--- before ---");
                    printColumn(rowData.getBeforeColumnsList());
                    System.out.println("--- after ---");
                    printColumn(rowData.getAfterColumnsList());
                }
            }
        }
    }

    private static void printColumn(List<CanalEntry.Column> columns) {
        for (CanalEntry.Column column : columns) {
            System.out.println(column.getName() + " : " + column.getValue() + "   update=" + column.getUpdated());
        }
    }
}

Application Entry Point

@SpringBootApplication
public class BaseApplication implements CommandLineRunner {
    @Autowired
    private CanalClient canalClient;

    @Override
    public void run(String... args) throws Exception {
        canalClient.run();
    }
}

Canal → RabbitMQ Integration

Change canal.serverMode to rabbitMQ in canal.properties and set the topic in instance.properties :

canal.serverMode = rabbitMQ
canal.mq.topic = canal-routing-key

Define RabbitMQ connection details (host, exchange, queue, routing key) either in application.yml or via a Spring configuration class.

Spring AMQP Dependency

<dependency>
  <groupId>org.springframework.boot</groupId>
  <artifactId>spring-boot-starter-amqp</artifactId>
  <version>${amqp.version}</version>
</dependency>

application.yml

spring:
  rabbitmq:
    host: 192.168.0.108
    port: 5672
    username: admin
    password: 123456
    publisher-confirm-type: correlated
    publisher-returns: true

RabbitMQ Configuration Beans

@Configuration
public class RabbitConfig {
    @Bean
    public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
        RabbitTemplate template = new RabbitTemplate();
        template.setConnectionFactory(connectionFactory);
        template.setMessageConverter(new Jackson2JsonMessageConverter());
        return template;
    }

    @Bean
    public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory(ConnectionFactory connectionFactory) {
        SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
        factory.setConnectionFactory(connectionFactory);
        factory.setMessageConverter(new Jackson2JsonMessageConverter());
        return factory;
    }
}

Canal Message Producer (Spring Bean)

@Configuration
public class CanalProvider {
    public static final String CanalQueue = "canal-queue";
    public static final String CanalExchange = "canal-exchange";
    public static final String CanalRouting = "canal-routing-key";

    @Bean
    public Queue canalQueue() {
        return new Queue(CanalQueue, true);
    }

    @Bean
    public DirectExchange canalExchange() {
        return new DirectExchange(CanalExchange, true, false);
    }

    @Bean
    public Binding bindingCanal() {
        return BindingBuilder.bind(canalQueue()).to(canalExchange()).with(CanalRouting);
    }
}

Canal Message Consumer

@Component
@RabbitListener(queues = CanalProvider.CanalQueue)
public class CanalConsumer {
    private final SysBackupService sysBackupService;

    public CanalConsumer(SysBackupService sysBackupService) {
        this.sysBackupService = sysBackupService;
    }

    @RabbitHandler
    public void process(Map<String, Object> msg) {
        System.out.println("Received canal message: " + msg);
        boolean isDdl = (boolean) msg.get("isDdl");
        if (isDdl) return;
        String table = (String) msg.get("table");
        if ("sys_backup".equalsIgnoreCase(table)) return;
        String type = (String) msg.get("type");
        if (!"INSERT".equalsIgnoreCase(type) &&
            !"UPDATE".equalsIgnoreCase(type) &&
            !"DELETE".equalsIgnoreCase(type)) {
            return;
        }
        // Business handling logic goes here (e.g., persisting change log)
    }
}

Testing the Pipeline

Insert, update, or delete a row in the MySQL database. Canal captures the binlog event, publishes a JSON message to the canal-exchange exchange, which routes it to canal-queue. The CanalConsumer receives the message and can persist the change record or trigger further processing.

Switching between tcp and rabbitMQ modes allows you to test the direct client first and later move to the asynchronous, decoupled architecture.

DockerMySQLRabbitMQCanalSpringBootDataChangeLogging
Java Architect Handbook
Written by

Java Architect Handbook

Focused on Java interview questions and practical article sharing, covering algorithms, databases, Spring Boot, microservices, high concurrency, JVM, Docker containers, and ELK-related knowledge. Looking forward to progressing together with you.

0 followers
Reader feedback

How this landed with the community

Sign in to like

Rate this article

Was this worth your time?

Sign in to rate
Discussion

0 Comments

Thoughtful readers leave field notes, pushback, and hard-won operational detail here.