Synchronizing MySQL Incremental Data to Elasticsearch Using Canal and RocketMQ
This tutorial demonstrates how to use Alibaba's Canal in cluster mode with Zookeeper to capture MySQL row‑based binlog events, publish them via RocketMQ, and consume them with a custom listener that transforms inserts, updates, and deletes into synchronized documents in an Elasticsearch index.
Canal is a well‑known open‑source project from Alibaba that parses MySQL binlog incrementally and provides data subscription and consumption capabilities.
This tutorial walks readers through using Canal to synchronize MySQL incremental data to Elasticsearch.
1. Cluster Mode
In the diagram, each server corresponds to a Canal instance running in its own JVM. A server can contain one or more instances , which represent configured tasks.
An instance consists of the following modules:
eventParser : connects to the MySQL master using the slave protocol and parses the binlog.
eventSink : links the parser and store, performing data filtering, transformation, and distribution.
eventStore : stores data.
metaManager : manages incremental subscription and consumption metadata.
In production, Canal’s high availability relies on Zookeeper. Client modes can be divided into TCP mode and MQ mode . The MQ mode is commonly used because it decouples Canal from downstream consumers by sending change events to a message queue such as Kafka or RocketMQ.
Ordered Consumption: For a given topic, messages are partitioned by a sharding key. Within the same partition, messages are strictly FIFO. Order is guaranteed only inside a partition.
2. MySQL Configuration
1) Enable binlog and set binlog-format=ROW in my.cnf:
[mysqld]
log-bin=mysql-bin # enable binlog
binlog-format=ROW # use ROW mode
server_id=1 # must be unique and not conflict with Canal’s slaveIdNote: For Alibaba Cloud RDS for MySQL, binlog is already enabled and the default account has binlog dump privileges, so this step can be skipped.
2) Grant Canal the necessary replication privileges:
CREATE USER canal IDENTIFIED BY 'canal';
GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';
FLUSH PRIVILEGES;3) Create the source table t_product:
CREATE TABLE `t_product` (
`id` BIGINT(20) NOT NULL AUTO_INCREMENT,
`name` VARCHAR(255) COLLATE utf8mb4_bin NOT NULL,
`price` DECIMAL(10,2) NOT NULL,
`status` TINYINT(4) NOT NULL,
`create_time` datetime NOT NULL,
`update_time` datetime NOT NULL,
PRIMARY KEY (`id`)
) ENGINE=INNODB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin;3. Elasticsearch Configuration
Use Kibana to create an index named t_product with the following mapping:
PUT /t_product
{
"settings": {
"number_of_shards": 2,
"number_of_replicas": 1
},
"mappings": {
"properties": {
"id": {"type": "keyword"},
"name": {"type": "text"},
"price": {"type": "double"},
"status": {"type": "integer"},
"createTime": {"type": "date", "format": "yyyy-MM-dd HH:mm:ss||yyyy-MM-dd||epoch_millis"},
"updateTime": {"type": "date", "format": "yyyy-MM-dd HH:mm:ss||yyyy-MM-dd||epoch_millis"}
}
}
}4. RocketMQ Configuration
Create a topic named product-syn-topic. Canal will publish binlog change events to this topic.
5. Canal Configuration
Select Canal version 1.1.6 and edit the configuration files under the conf directory.
1) canal.properties :
# Cluster mode ZK address
canal.zkServers = localhost:2181
# Server mode (mq, tcp, kafka, rocketMQ, rabbitMQ, pulsarMQ)
canal.serverMode = rocketMQ
# Instance list
canal.destinations = product-syn
# Config root directory
canal.conf.dir = ../conf
# Global Spring XML for production/cluster deployment
canal.instance.global.spring.xml = classpath:spring/default-instance.xml
# Default values (shown for reference)
canal.mq.canalBatchSize = 50
canal.mq.canalGetTimeout = 100
canal.mq.flatMessage = true2) instance.properties (under conf/product-syn) :
# Database connection (modify as needed)
canal.instance.master.address=192.168.1.20:3306
canal.instance.dbUsername = canal
canal.instance.dbPassword = canal
# Table filter (regex)
canal.instance.filter.regex=mytest.t_product
# MQ settings
canal.mq.topic=product-syn-topic
canal.mq.partition=03) Service Startup
Start two Canal services and verify their status via the Zookeeper GUI. After inserting a row into t_product, a new message appears in the RocketMQ console.
6. Consumer Implementation
The consumer consists of two parts:
Product index operation service (writes to Elasticsearch).
Consumption listener that processes ordered messages, extracts the data node, converts it to JSON, and invokes the appropriate operation (INSERT, UPDATE, DELETE) on the Elasticsearch index.
7. Conclusion
Canal is a versatile open‑source project widely used to build Data Transmission Services (DTS). Reading its source code can help you learn about network programming, multithreading, high‑performance queues (Disruptor), and abstract workflow models.
The complete code referenced in this article is available at https://github.com/makemyownlife/rocketmq4-learning.
Signed-in readers can open the original source through BestHub's protected redirect.
This article has been distilled and summarized from source material, then republished for learning and reference. If you believe it infringes your rights, please contactand we will review it promptly.
Sohu Tech Products
A knowledge-sharing platform for Sohu's technology products. As a leading Chinese internet brand with media, video, search, and gaming services and over 700 million users, Sohu continuously drives tech innovation and practice. We’ll share practical insights and tech news here.
How this landed with the community
Was this worth your time?
0 Comments
Thoughtful readers leave field notes, pushback, and hard-won operational detail here.
