Sync MySQL Binlog to Elasticsearch with Canal and RocketMQ – Step‑by‑Step Guide
This tutorial walks through configuring Canal in cluster mode, enabling MySQL binlog, creating an Elasticsearch index, setting up RocketMQ topics, adjusting Canal properties and instance files, launching services, and implementing a consumer to reliably synchronize MySQL changes into Elasticsearch.
1. Cluster Mode Overview
Canal runs as a server instance per JVM; a server can host multiple instances, each representing a synchronization task. The server consists of four modules: eventParser (parses MySQL binlog), eventSink (filters, processes, and distributes data), eventStore (stores data), and metaManager (manages subscription and consumption metadata).
In production, Canal achieves high availability via ZooKeeper. Client modes are divided into TCP mode and MQ mode . The tutorial focuses on MQ mode because it decouples Canal from downstream consumers by publishing binlog changes to Kafka or RocketMQ.
Sequential Consumption: For a given topic, messages are sharded by a key; within each partition, they are consumed in strict FIFO order, guaranteeing order per partition while allowing parallelism across partitions.
2. MySQL Configuration
Enable binlog and set binlog-format=ROW in my.cnf:
[mysqld]
log-bin=mysql-bin # enable binlog
binlog-format=ROW # row‑level logging
server_id=1 # unique server ID, must differ from Canal slaveIdGrant the Canal user replication privileges (skip for Alibaba Cloud RDS which enables binlog by default):
CREATE USER canal IDENTIFIED BY 'canal';
GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';
FLUSH PRIVILEGES;Create the source table t_product:
CREATE TABLE `t_product` (
`id` BIGINT(20) NOT NULL AUTO_INCREMENT,
`name` VARCHAR(255) COLLATE utf8mb4_bin NOT NULL,
`price` DECIMAL(10,2) NOT NULL,
`status` TINYINT(4) NOT NULL,
`create_time` datetime NOT NULL,
`update_time` datetime NOT NULL,
PRIMARY KEY (`id`)
) ENGINE=INNODB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin;3. Elasticsearch Index Setup
Using Kibana (or the REST API), create an index t_product with appropriate mappings:
PUT /t_product
{
"settings": {
"number_of_shards": 2,
"number_of_replicas": 1
},
"mappings": {
"properties": {
"id": {"type": "keyword"},
"name": {"type": "text"},
"price": {"type": "double"},
"status": {"type": "integer"},
"createTime": {"type": "date", "format": "yyyy-MM-dd HH:mm:ss||yyyy-MM-dd||epoch_millis"},
"updateTime": {"type": "date", "format": "yyyy-MM-dd HH:mm:ss||yyyy-MM-dd||epoch_millis"}
}
}
}4. RocketMQ Topic Configuration
Create a topic named product-syn-topic. Canal will publish binlog change events to this topic.
5. Canal Configuration
Select Canal version 1.1.6 and edit conf/canal.properties:
# Cluster mode ZK address
canal.zkServers = localhost:2181
# Server mode (rocketMQ, kafka, tcp, etc.)
canal.serverMode = rocketMQ
# Instance list
canal.destinations = product-syn
# Global Spring config for instances
canal.instance.global.spring.xml = classpath:spring/default-instance.xml
# Batch size and timeout (default values shown)
canal.mq.canalBatchSize = 50
canal.mq.canalGetTimeout = 100
canal.mq.flatMessage = trueCreate an instance directory conf/product-syn with instance.properties and set database connection, filter, and MQ details:
# Database connection (replace with your own)
canal.instance.master.address=192.168.1.20:3306
canal.instance.dbUsername = canal
canal.instance.dbPassword = canal
# Table filter
canal.instance.filter.regex=mytest.t_product
# RocketMQ settings
canal.mq.topic=product-syn-topic
canal.mq.partition=06. Starting Services
Launch two Canal server instances (one per instance) and verify their status via ZooKeeper UI.
Insert or update a row in t_product. The change appears as a message in RocketMQ.
7. Consumer Implementation
The consumer service performs two main tasks:
Consume messages in order.
Parse the data node, extract the latest row data, and based on the operation type ( INSERT, UPDATE, DELETE) invoke the product index service to modify the Elasticsearch document.
8. Conclusion
Canal provides a robust way to capture MySQL incremental changes and, when combined with RocketMQ and Elasticsearch, enables real‑time data synchronization suitable for search, analytics, or downstream services. The tutorial demonstrates a complete end‑to‑end setup, from MySQL binlog activation to consumer logic that keeps the Elasticsearch index in sync.
Signed-in readers can open the original source through BestHub's protected redirect.
This article has been distilled and summarized from source material, then republished for learning and reference. If you believe it infringes your rights, please contactand we will review it promptly.
Su San Talks Tech
Su San, former staff at several leading tech companies, is a top creator on Juejin and a premium creator on CSDN, and runs the free coding practice site www.susan.net.cn.
How this landed with the community
Was this worth your time?
0 Comments
Thoughtful readers leave field notes, pushback, and hard-won operational detail here.
