How to Combine SpringBoot, Canal, and RabbitMQ for Real‑Time MySQL Change Capture

This guide walks through setting up a Docker‑Compose environment, configuring Canal to capture MySQL binlog changes, integrating it with a SpringBoot client, and forwarding change events to RabbitMQ, providing complete code snippets and step‑by‑step instructions for real‑time data change tracking.

Architect
Architect
Architect
How to Combine SpringBoot, Canal, and RabbitMQ for Real‑Time MySQL Change Capture

Requirement

I need a way in SpringBoot to decouple business code and record data changes. The record should contain the new data, and for updates also the old data.

After research, using Canal to listen to MySQL binlog can meet this need. To persist change records immediately, I combine Canal with RabbitMQ for asynchronous processing.

Steps

Start MySQL with binlog enabled.

Start Canal, create a MySQL account for it, and connect as a slave.

Set Canal service mode to TCP and write a Java client to listen to binlog changes.

Switch Canal service mode to RabbitMQ, start RabbitMQ, and configure Canal to send binlog events via the message queue.

Environment Setup (Docker‑Compose)

version: "3
timeout: 30s
services:
  mysql:
    network_mode: mynetwork
    container_name: mymysql
    ports:
      - "3306:3306"
    restart: always
    volumes:
      - /etc/localtime:/etc/localtime
      - /home/mycontainers/mymysql/data:/data
      - /home/mycontainers/mymysql/mysql:/var/lib/mysql
      - /home/mycontainers/mymysql/conf:/etc/mysql
    environment:
      - MYSQL_ROOT_PASSWORD=root
    command: --character-set-server=utf8mb4 \
             --collation-server=utf8mb4_unicode_ci \
             --log-bin=/var/lib/mysql/mysql-bin \
             --server-id=1 \
             --binlog-format=ROW \
             --expire_logs_days=7 \
             --max_binlog_size=500M
    image: mysql:5.7.20

  rabbitmq:
    container_name: myrabbit
    ports:
      - "15672:15672"
      - "5672:5672"
    restart: always
    volumes:
      - /etc/localtime:/etc/localtime
      - /home/mycontainers/myrabbit/rabbitmq:/var/lib/rabbitmq
    network_mode: mynetwork
    environment:
      - RABBITMQ_DEFAULT_USER=admin
      - RABBITMQ_DEFAULT_PASS=123456
    image: rabbitmq:3.8-management

  canal-server:
    container_name: canal-server
    restart: always
    ports:
      - "11110:11110"
      - "11111:11111"
      - "11112:11112"
    volumes:
      - /home/mycontainers/canal-server/conf/canal.properties:/home/admin/canal-server/conf/canal.properties
      - /home/mycontainers/canal-server/conf/instance.properties:/home/admin/canal-server/conf/example/instance.properties
      - /home/mycontainers/canal-server/logs:/home/admin/canal-server/logs
    network_mode: mynetwork
    depends_on:
      - mysql
      - rabbitmq
    image: canal/canal-server:v1.1.5

Canal Configuration Files

canal.properties

# common arguments
canal.ip = 
canal.register.ip = 
canal.port = 11111
canal.metrics.pull.port = 11112
# server mode (tcp/kafka/rabbitMQ/pulsarMQ)
canal.serverMode = tcp
canal.destinations = canal-exchange
canal.conf.dir = ../conf
canal.auto.scan = true
canal.auto.scan.interval = 5
canal.auto.reset.latest.pos.mode = false
# MQ properties (RabbitMQ example)
canal.mq.flatMessage = true
canal.mq.canalBatchSize = 50
canal.mq.canalGetTimeout = 100
canal.mq.accessChannel = local
canal.mq.database.hash = true
canal.mq.send.thread.size = 30
canal.mq.build.thread.size = 8

instance.properties

# MySQL master address
canal.instance.master.address=mymysql:3306
canal.instance.dbUsername=canal
canal.instance.dbPassword=canal
# Enable TSDB (optional)
canal.instance.tsdb.enable=true
canal.instance.tsdb.dir=${canal.file.data.dir:../conf}/${canal.instance.destination:}
canal.instance.tsdb.url=jdbc:h2:${canal.instance.tsdb.dir}/h2;CACHE_SIZE=1000;MODE=MYSQL;
canal.instance.tsdb.dbUsername=canal
canal.instance.tsdb.dbPassword=canal
# RabbitMQ settings
rabbitmq.host = myrabbit
rabbitmq.virtual.host = /
rabbitmq.exchange = canal-exchange
rabbitmq.username = admin
rabbitmq.password = 123456
rabbitmq.deliveryMode = 
# Topic for Canal messages
canal.mq.topic=canal-routing-key

After editing the two files, ensure the instance path is /example/instance.properties and that the MySQL container and Canal container share the same Docker network so Canal can reach MySQL on its internal port 3306.

Integrate SpringBoot with Canal (Client)

Add Maven dependencies:

<dependency>
  <groupId>com.alibaba.otter</groupId>
  <artifactId>canal.client</artifactId>
  <version>${canal.version}</version>
</dependency>
<dependency>
  <groupId>com.alibaba.otter</groupId>
  <artifactId>canal.protocol</artifactId>
  <version>${canal.version}</version>
</dependency>

Create a Spring component that connects to Canal and processes binlog entries:

import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.client.CanalConnectors;
import com.alibaba.otter.canal.protocol.CanalEntry;
import com.alibaba.otter.canal.protocol.Message;
import org.springframework.boot.CommandLineRunner;
import org.springframework.stereotype.Component;
import java.net.InetSocketAddress;
import java.util.List;

@Component
public class CanalClient implements CommandLineRunner {
    private static final int BATCH_SIZE = 1000;

    @Override
    public void run(String... args) throws Exception {
        CanalConnector connector = CanalConnectors.newSingleConnector(
                new InetSocketAddress("localhost", 11111),
                "canal-exchange", "canal", "canal");
        try {
            connector.connect();
            connector.subscribe(".*.*");
            connector.rollback();
            while (true) {
                Message message = connector.getWithoutAck(BATCH_SIZE);
                long batchId = message.getId();
                int size = message.getEntries().size();
                if (batchId == -1 || size == 0) {
                    Thread.sleep(2000);
                } else {
                    printEntry(message.getEntries());
                }
                connector.ack(batchId);
            }
        } finally {
            connector.disconnect();
        }
    }

    private static void printEntry(List<CanalEntry.Entry> entries) {
        for (CanalEntry.Entry entry : entries) {
            if (entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONBEGIN ||
                entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONEND) {
                continue;
            }
            CanalEntry.RowChange rowChange;
            try {
                rowChange = CanalEntry.RowChange.parseFrom(entry.getStoreValue());
            } catch (Exception e) {
                throw new RuntimeException("Parse error", e);
            }
            CanalEntry.EventType eventType = rowChange.getEventType();
            System.out.println(String.format("binlog[%s:%s] , db[%s], table[%s] , event:%s",
                    entry.getHeader().getLogfileName(),
                    entry.getHeader().getLogfileOffset(),
                    entry.getHeader().getSchemaName(),
                    entry.getHeader().getTableName(),
                    eventType));
            if (rowChange.getIsDdl()) {
                System.out.println("DDL sql: " + rowChange.getSql());
            }
            for (CanalEntry.RowData rowData : rowChange.getRowDatasList()) {
                if (eventType == CanalEntry.EventType.DELETE) {
                    printColumn(rowData.getBeforeColumnsList());
                } else if (eventType == CanalEntry.EventType.INSERT) {
                    printColumn(rowData.getAfterColumnsList());
                } else {
                    System.out.println("--- before ---");
                    printColumn(rowData.getBeforeColumnsList());
                    System.out.println("--- after ---");
                    printColumn(rowData.getAfterColumnsList());
                }
            }
        }
    }

    private static void printColumn(List<CanalEntry.Column> columns) {
        for (CanalEntry.Column column : columns) {
            System.out.println(column.getName() + " : " + column.getValue() + "   update=" + column.getUpdated());
        }
    }
}

Start the client from the SpringBoot application entry point:

import org.springframework.boot.CommandLineRunner;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.beans.factory.annotation.Autowired;

@SpringBootApplication
public class BaseApplication implements CommandLineRunner {
    @Autowired
    private CanalClient canalClient;

    @Override
    public void run(String... args) throws Exception {
        canalClient.run();
    }
}

When the application runs, any INSERT/UPDATE/DELETE on MySQL will be captured and printed.

Canal + RabbitMQ Integration

Change canal.serverMode to rabbitMQ in canal.properties and set the topic in instance.properties:

canal.serverMode = rabbitMQ
canal.mq.topic=canal-routing-key

Configure RabbitMQ connection in the same file:

rabbitmq.host = myrabbit
rabbitmq.virtual.host = /
rabbitmq.exchange = canal-exchange
rabbitmq.username = admin
rabbitmq.password = 123456

After restarting the containers, create the exchange and queue in RabbitMQ (or let SpringBoot create them automatically).

SpringBoot RabbitMQ Configuration

Add the AMQP starter dependency:

<dependency>
  <groupId>org.springframework.boot</groupId>
  <artifactId>spring-boot-starter-amqp</artifactId>
  <version>${amqp.version}</version>
</dependency>

application.yml snippet:

spring:
  rabbitmq:
    host: 192.168.0.108
    port: 5672
    username: admin
    password: 123456
    publisher-confirm-type: correlated
    publisher-returns: true

RabbitMQ configuration beans:

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.listener.SimpleRabbitListenerContainerFactory;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class RabbitConfig {
    @Bean
    public RabbitTemplate rabbitTemplate(ConnectionFactory cf) {
        RabbitTemplate tmpl = new RabbitTemplate();
        tmpl.setConnectionFactory(cf);
        tmpl.setMessageConverter(new Jackson2JsonMessageConverter());
        return tmpl;
    }

    @Bean
    public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory(ConnectionFactory cf) {
        SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
        factory.setConnectionFactory(cf);
        factory.setMessageConverter(new Jackson2JsonMessageConverter());
        return factory;
    }

    @Bean
    public Queue canalQueue() {
        return new Queue("canal-queue", true);
    }

    @Bean
    public DirectExchange canalExchange() {
        return new DirectExchange("canal-exchange", true, false);
    }

    @Bean
    public Binding bindingCanal() {
        return BindingBuilder.bind(canalQueue()).to(canalExchange()).with("canal-routing-key");
    }
}

Canal message producer (creates exchange, queue, binding as above) and consumer:

import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import java.util.Map;

@Component
@RabbitListener(queues = "canal-queue")
public class CanalConsumer {
    @RabbitHandler
    public void process(Map<String, Object> msg) {
        System.out.println("Received canal message: " + msg);
        boolean isDdl = (boolean) msg.get("isDdl");
        if (isDdl) return;
        // handle INSERT/UPDATE/DELETE as needed
    }
}

Now, modifying data in MySQL triggers Canal to publish a message to RabbitMQ, and the SpringBoot consumer receives and processes it.

Original Source

Signed-in readers can open the original source through BestHub's protected redirect.

Sign in to view source
Republication Notice

This article has been distilled and summarized from source material, then republished for learning and reference. If you believe it infringes your rights, please contactadmin@besthub.devand we will review it promptly.

DockermysqlRabbitMQCanalSpringBootChange Data Capture
Architect
Written by

Architect

Professional architect sharing high‑quality architecture insights. Topics include high‑availability, high‑performance, high‑stability architectures, big data, machine learning, Java, system and distributed architecture, AI, and practical large‑scale architecture case studies. Open to ideas‑driven architects who enjoy sharing and learning.

0 followers
Reader feedback

How this landed with the community

Sign in to like

Rate this article

Was this worth your time?

Sign in to rate
Discussion

0 Comments

Thoughtful readers leave field notes, pushback, and hard-won operational detail here.