Incremental Data Synchronization with Alibaba Canal and RabbitMQ
This article explains how to use Alibaba's open‑source Canal middleware to capture MySQL binlog changes and forward them via RabbitMQ for real‑time data synchronization, covering server installation, configuration, testing, and a Java client implementation.
What is Canal?
Canal (meaning "channel" or "pipeline") parses MySQL incremental binlog logs to provide change data capture (CDC) without modifying existing business code.
How Canal Works
Canal consists of a server that parses MySQL binlog and pushes incremental data to clients or message middleware, and a client that consumes the data for custom business processing. Supported message middleware includes Kafka, RocketMQ, and RabbitMQ.
Other Middleware Options
Besides Canal, other open‑source CDC middleware such as Bifrost exist, but Canal is recommended for its maturity.
Canal Server Installation
Download the Canal package (latest v1.1.5) from the official GitHub releases page, unzip it, and follow the steps below using the Canal+RabbitMQ setup.
1. Enable MySQL Binlog
[mysqld]
log-bin=mysql-bin # enable binlog
binlog-format=ROW # use ROW mode
server_id=1 # unique server ID, must differ from Canal's slaveId2. Configure MySQL Instance
Edit canal.properties to add a new instance name, then modify instance.properties (example instance) as follows:
# url
canal.instance.master.address=127.0.0.1:3306
# username/password
canal.instance.dbUsername=root
canal.instance.dbPassword=root
# default database to monitor
canal.instance.defaultDatabaseName=test
# monitor all tables (regex)
canal.instance.filter.regex=.*\..*3. Configure RabbitMQ
Set the transport mode to rabbitMQ and configure connection details in canal.properties:
# transport mode: tcp, kafka, rocketMQ, rabbitMQ
canal.serverMode = rabbitMQ
rabbitmq.host = 127.0.0.1
rabbitmq.virtual.host = /
rabbitmq.exchange = canal.exchange
rabbitmq.username = guest
rabbitmq.password = guest
rabbitmq.deliveryMode = 2Define the routing key in instance.properties:
canal.mq.topic=canal.routing.key4. Create RabbitMQ Exchange and Queue
In RabbitMQ, create an exchange named canal.exchange and a queue (e.g., canal.queue) bound with routing key canal.routing.key.
5. Start the Server
Run startup.bat (Windows) in the bin directory. Successful startup is indicated by log output.
6. Test the Server
Insert a row into the test.oauth_client_details table:
INSERT INTO `oauth_client_details` VALUES ('myjszl','res1','$2a$10$F1tQdeb0SEMdtjlO8X/0wO6Gqybu6vPC/Xg8OmP9/TL1i4beXdK9W','all','password,refresh_token,authorization_code,client_credentials,implicit','http://www.baidu.com',NULL,1000,1000,NULL,'false');The inserted data appears as a JSON message in canal.queue, confirming end‑to‑end data flow.
Canal Client Setup
The client simply consumes messages from the queue and processes them.
1. Message Entity Class
@NoArgsConstructor
@Data
public class CanalMessage<T> {
@JsonProperty("type")
private String type;
@JsonProperty("table")
private String table;
@JsonProperty("data")
private List<T> data;
@JsonProperty("database")
private String database;
@JsonProperty("es")
private Long es;
@JsonProperty("id")
private Integer id;
@JsonProperty("isDdl")
private Boolean isDdl;
@JsonProperty("old")
private List<T> old;
@JsonProperty("pkNames")
private List<String> pkNames;
@JsonProperty("sql")
private String sql;
@JsonProperty("ts")
private Long ts;
}2. MQ Listener
import cn.hutool.json.JSONUtil;
import cn.myjszl.middle.ware.canal.mq.rabbit.model.CanalMessage;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
/**
* Listener for Canal messages from RabbitMQ
*/
@Component
@Slf4j
@RequiredArgsConstructor
public class CanalRabbitMQListener {
@RabbitListener(bindings = {
@QueueBinding(
value = @Queue(value = "canal.queue", durable = "true"),
exchange = @Exchange(value = "canal.exchange"),
key = "canal.routing.key"
)
})
public void handleDataChange(String message) {
// Convert JSON string to CanalMessage object
CanalMessage canalMessage = JSONUtil.toBean(message, CanalMessage.class);
String tableName = canalMessage.getTable();
log.info("Canal listener detected change on {}: {}", tableName, message);
// TODO: implement business logic
}
}3. Client Test
Insert another row into the monitored table and observe the JSON payload received by the listener, confirming successful consumption.
Conclusion
Canal provides a non‑intrusive, open‑source solution for MySQL incremental data synchronization, and can be combined with various MQ systems such as RabbitMQ to build real‑time data pipelines. Choose the middleware that best fits your business scenario.
Signed-in readers can open the original source through BestHub's protected redirect.
This article has been distilled and summarized from source material, then republished for learning and reference. If you believe it infringes your rights, please contactand we will review it promptly.
Code Ape Tech Column
Former Ant Group P8 engineer, pure technologist, sharing full‑stack Java, job interview and career advice through a column. Site: java-family.cn
How this landed with the community
Was this worth your time?
0 Comments
Thoughtful readers leave field notes, pushback, and hard-won operational detail here.
