How to Sync MySQL Binlog to Elasticsearch Using Canal and RocketMQ
Learn step‑by‑step how to configure Alibaba’s open‑source Canal to capture MySQL binlog changes, route them through RocketMQ, and index the data into Elasticsearch, covering cluster mode, MySQL and Elasticsearch setup, Canal properties, and consumer implementation details.
Overview
This guide demonstrates how to use Alibaba’s open‑source Canal to capture incremental changes from a MySQL binlog, publish them to RocketMQ, and index the data into Elasticsearch. It covers cluster deployment, MySQL, Elasticsearch, RocketMQ, Canal configuration, and a Java consumer implementation.
1. Cluster Mode
Canal runs as a cluster of JVM processes coordinated by Zookeeper. Each server represents a Canal process and can host multiple instance objects, each defining a synchronization task.
eventParser : connects to MySQL via the slave protocol and parses binlog events.
eventSink : filters, transforms, and routes parsed events.
eventStore : persists intermediate data.
metaManager : manages subscription and consumption metadata.
In production the high‑availability of Canal relies on Zookeeper. Two client modes exist—TCP and MQ. The MQ mode is preferred because it decouples Canal from downstream consumers by sending events to a message queue such as RocketMQ.
2. MySQL Configuration
Enable binary logging and set the binlog format to ROW. Example my.cnf:
[mysqld]
log-bin=mysql-bin # enable binlog
binlog-format=ROW # row‑level format
server_id=1 # unique ID, must differ from Canal’s slaveIdCreate a replication user for Canal:
CREATE USER canal IDENTIFIED BY 'canal';
GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';
FLUSH PRIVILEGES;Create the source table that will be synchronized:
CREATE TABLE `t_product` (
`id` BIGINT(20) NOT NULL AUTO_INCREMENT,
`name` VARCHAR(255) COLLATE utf8mb4_bin NOT NULL,
`price` DECIMAL(10,2) NOT NULL,
`status` TINYINT(4) NOT NULL,
`create_time` DATETIME NOT NULL,
`update_time` DATETIME NOT NULL,
PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin;3. Elasticsearch Configuration
Create an index named t_product with appropriate mappings (via Kibana or the REST API):
PUT /t_product
{
"settings": {
"number_of_shards": 2,
"number_of_replicas": 1
},
"mappings": {
"properties": {
"id": {"type": "keyword"},
"name": {"type": "text"},
"price": {"type": "double"},
"status": {"type": "integer"},
"createTime": {"type": "date", "format": "yyyy-MM-dd HH:mm:ss||yyyy-MM-dd||epoch_millis"},
"updateTime": {"type": "date", "format": "yyyy-MM-dd HH:mm:ss||yyyy-MM-dd||epoch_millis"}
}
}
}4. RocketMQ Configuration
Create a topic that Canal will use to publish change events, for example:
product-syn-topicEnsure the topic is created in RocketMQ before starting Canal.
5. Canal Configuration
Select Canal version 1.1.6 and edit the files under the conf directory.
5.1 canal.properties
# Zookeeper address for cluster mode
canal.zkServers=localhost:2181
# Server mode (mq, tcp, kafka, rocketMQ, rabbitMQ, pulsarMQ)
canal.serverMode=rocketMQ
# Instance list (comma‑separated)
canal.destinations=product-syn
# Global Spring configuration for production clustering
canal.instance.global.spring.xml=classpath:spring/default-instance.xml
# Batch size and timeout settings
canal.mq.canalBatchSize=50
canal.mq.canalGetTimeout=100
canal.mq.flatMessage=true5.2 instance.properties (under conf/product-syn )
# MySQL master address
canal.instance.master.address=192.168.1.20:3306
# Canal user credentials
canal.instance.dbUsername=canal
canal.instance.dbPassword=canal
# Table filter – only sync this table
canal.instance.filter.regex=mytest.t_product
# RocketMQ settings
canal.mq.topic=product-syn-topic
canal.mq.partition=05.3 Starting the services
Launch two Canal server instances (e.g., bin/startup.sh). Verify their registration in Zookeeper. Updating a row in t_product should generate a message visible in the RocketMQ console.
6. Consumer Implementation
The consumer consists of two components:
Product index service : provides methods to create, update, or delete documents in Elasticsearch.
Message listener : subscribes to the RocketMQ topic, ensures FIFO order per partition, extracts the JSON payload from the data node, and invokes the appropriate index‑service method based on the binlog operation type (INSERT, UPDATE, DELETE).
Key points:
Maintain strict order within a partition; RocketMQ guarantees FIFO for messages sharing the same sharding key.
Map Canal’s eventType to Elasticsearch actions: INSERT → index, UPDATE → update, DELETE → delete.
Reference Code
All example code is available at the following repository:
https://github.com/makemyownlife/rocketmq4-learning
Signed-in readers can open the original source through BestHub's protected redirect.
This article has been distilled and summarized from source material, then republished for learning and reference. If you believe it infringes your rights, please contactand we will review it promptly.
dbaplus Community
Enterprise-level professional community for Database, BigData, and AIOps. Daily original articles, weekly online tech talks, monthly offline salons, and quarterly XCOPS&DAMS conferences—delivered by industry experts.
How this landed with the community
Was this worth your time?
0 Comments
Thoughtful readers leave field notes, pushback, and hard-won operational detail here.
