Incremental Data Synchronization with Alibaba Canal and RabbitMQ

This article explains how to use Alibaba's open‑source Canal middleware to capture MySQL binlog changes and forward them via RabbitMQ for real‑time data synchronization, covering server installation, configuration, testing, and a Java client implementation.

Code Ape Tech Column
Code Ape Tech Column
Code Ape Tech Column
Incremental Data Synchronization with Alibaba Canal and RabbitMQ

What is Canal?

Canal (meaning "channel" or "pipeline") parses MySQL incremental binlog logs to provide change data capture (CDC) without modifying existing business code.

How Canal Works

Canal consists of a server that parses MySQL binlog and pushes incremental data to clients or message middleware, and a client that consumes the data for custom business processing. Supported message middleware includes Kafka, RocketMQ, and RabbitMQ.

Other Middleware Options

Besides Canal, other open‑source CDC middleware such as Bifrost exist, but Canal is recommended for its maturity.

Canal Server Installation

Download the Canal package (latest v1.1.5) from the official GitHub releases page, unzip it, and follow the steps below using the Canal+RabbitMQ setup.

1. Enable MySQL Binlog

[mysqld]
log-bin=mysql-bin # enable binlog
binlog-format=ROW # use ROW mode
server_id=1 # unique server ID, must differ from Canal's slaveId

2. Configure MySQL Instance

Edit canal.properties to add a new instance name, then modify instance.properties (example instance) as follows:

# url
canal.instance.master.address=127.0.0.1:3306
# username/password
canal.instance.dbUsername=root
canal.instance.dbPassword=root
# default database to monitor
canal.instance.defaultDatabaseName=test

# monitor all tables (regex)
canal.instance.filter.regex=.*\..*

3. Configure RabbitMQ

Set the transport mode to rabbitMQ and configure connection details in canal.properties:

# transport mode: tcp, kafka, rocketMQ, rabbitMQ
canal.serverMode = rabbitMQ

rabbitmq.host = 127.0.0.1
rabbitmq.virtual.host = /
rabbitmq.exchange = canal.exchange
rabbitmq.username = guest
rabbitmq.password = guest
rabbitmq.deliveryMode = 2

Define the routing key in instance.properties:

canal.mq.topic=canal.routing.key

4. Create RabbitMQ Exchange and Queue

In RabbitMQ, create an exchange named canal.exchange and a queue (e.g., canal.queue) bound with routing key canal.routing.key.

5. Start the Server

Run startup.bat (Windows) in the bin directory. Successful startup is indicated by log output.

6. Test the Server

Insert a row into the test.oauth_client_details table:

INSERT INTO `oauth_client_details` VALUES ('myjszl','res1','$2a$10$F1tQdeb0SEMdtjlO8X/0wO6Gqybu6vPC/Xg8OmP9/TL1i4beXdK9W','all','password,refresh_token,authorization_code,client_credentials,implicit','http://www.baidu.com',NULL,1000,1000,NULL,'false');

The inserted data appears as a JSON message in canal.queue, confirming end‑to‑end data flow.

Canal Client Setup

The client simply consumes messages from the queue and processes them.

1. Message Entity Class

@NoArgsConstructor
@Data
public class CanalMessage<T> {
    @JsonProperty("type")
    private String type;
    @JsonProperty("table")
    private String table;
    @JsonProperty("data")
    private List<T> data;
    @JsonProperty("database")
    private String database;
    @JsonProperty("es")
    private Long es;
    @JsonProperty("id")
    private Integer id;
    @JsonProperty("isDdl")
    private Boolean isDdl;
    @JsonProperty("old")
    private List<T> old;
    @JsonProperty("pkNames")
    private List<String> pkNames;
    @JsonProperty("sql")
    private String sql;
    @JsonProperty("ts")
    private Long ts;
}

2. MQ Listener

import cn.hutool.json.JSONUtil;
import cn.myjszl.middle.ware.canal.mq.rabbit.model.CanalMessage;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

/**
 * Listener for Canal messages from RabbitMQ
 */
@Component
@Slf4j
@RequiredArgsConstructor
public class CanalRabbitMQListener {

    @RabbitListener(bindings = {
        @QueueBinding(
            value = @Queue(value = "canal.queue", durable = "true"),
            exchange = @Exchange(value = "canal.exchange"),
            key = "canal.routing.key"
        )
    })
    public void handleDataChange(String message) {
        // Convert JSON string to CanalMessage object
        CanalMessage canalMessage = JSONUtil.toBean(message, CanalMessage.class);
        String tableName = canalMessage.getTable();
        log.info("Canal listener detected change on {}: {}", tableName, message);
        // TODO: implement business logic
    }
}

3. Client Test

Insert another row into the monitored table and observe the JSON payload received by the listener, confirming successful consumption.

Conclusion

Canal provides a non‑intrusive, open‑source solution for MySQL incremental data synchronization, and can be combined with various MQ systems such as RabbitMQ to build real‑time data pipelines. Choose the middleware that best fits your business scenario.

Original Source

Signed-in readers can open the original source through BestHub's protected redirect.

Sign in to view source
Republication Notice

This article has been distilled and summarized from source material, then republished for learning and reference. If you believe it infringes your rights, please contactadmin@besthub.devand we will review it promptly.

BackendJavaBinlogRabbitMQCanaldata synchronization
Code Ape Tech Column
Written by

Code Ape Tech Column

Former Ant Group P8 engineer, pure technologist, sharing full‑stack Java, job interview and career advice through a column. Site: java-family.cn

0 followers
Reader feedback

How this landed with the community

Sign in to like

Rate this article

Was this worth your time?

Sign in to rate
Discussion

0 Comments

Thoughtful readers leave field notes, pushback, and hard-won operational detail here.