Big Data 8 min read

Using Canal to Sync MySQL Incremental Data to Elasticsearch via RocketMQ

This tutorial walks through configuring Canal in cluster mode to capture MySQL row‑based binlog events, publish them to a RocketMQ topic, and consume the ordered messages to update an Elasticsearch index, covering MySQL, RocketMQ, Elasticsearch mappings, and the required Canal properties.

Sohu Tech Products
Sohu Tech Products
Sohu Tech Products
Using Canal to Sync MySQL Incremental Data to Elasticsearch via RocketMQ

Canal is a well‑known open‑source project from Alibaba that parses MySQL binlog for incremental data subscription and consumption.

This tutorial demonstrates step‑by‑step how to use Canal to synchronize MySQL incremental data to Elasticsearch.

1. Cluster Mode

In the diagram, each server corresponds to a Canal JVM instance. A server contains one or more instances , which represent configured tasks.

An instance consists of the following modules:

eventParser : connects to the MySQL master using the slave protocol and parses the binlog.

eventSink : links the parser and store, handling data filtering, transformation, and distribution.

eventStore : stores data.

metaManager : manages incremental subscription and consumption metadata.

In production, Canal’s high availability relies on Zookeeper. Client mode can be divided into TCP mode and MQ mode . The tutorial focuses on MQ mode because it decouples Canal from downstream consumers; Canal publishes change events to Kafka or RocketMQ, and consumers process them sequentially.

Ordered Consumption : For a given topic, messages are partitioned by a sharding key. Within the same partition, messages are strictly FIFO. Order is guaranteed only inside a partition.

2. MySQL Configuration

1) Enable binlog and set binlog-format=ROW in my.cnf :

[mysqld]
log-bin=mysql-bin # enable binlog
binlog-format=ROW # use ROW format
server_id=1 # must be unique, not conflict with Canal’s slaveId

Note: Alibaba Cloud RDS for MySQL enables binlog by default and grants the necessary dump privileges.

2) Grant Canal a MySQL slave account:

CREATE USER canal IDENTIFIED BY 'canal';
GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';
FLUSH PRIVILEGES;

3) Create the product table t_product :

CREATE TABLE `t_product` (
  `id` BIGINT(20) NOT NULL AUTO_INCREMENT,
  `name` VARCHAR(255) COLLATE utf8mb4_bin NOT NULL,
  `price` DECIMAL(10,2) NOT NULL,
  `status` TINYINT(4) NOT NULL,
  `create_time` datetime NOT NULL,
  `update_time` datetime NOT NULL,
  PRIMARY KEY (`id`)
) ENGINE=INNODB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin;

3. Elasticsearch Configuration

Use Kibana to create an index t_product with the following mapping:

PUT /t_product
{
  "settings": {
    "number_of_shards": 2,
    "number_of_replicas": 1
  },
  "mappings": {
    "properties": {
      "id": {"type": "keyword"},
      "name": {"type": "text"},
      "price": {"type": "double"},
      "status": {"type": "integer"},
      "createTime": {"type": "date", "format": "yyyy-MM-dd HH:mm:ss||yyyy-MM-dd||epoch_millis"},
      "updateTime": {"type": "date", "format": "yyyy-MM-dd HH:mm:ss||yyyy-MM-dd||epoch_millis"}
    }
  }
}

4. RocketMQ Configuration

Create a topic named product-syn-topic . Canal will publish binlog change events to this topic.

5. Canal Configuration

Select Canal version 1.1.6 and edit the conf directory.

1) canal.properties :

# Cluster mode ZK address
canal.zkServers = localhost:2181
# Server mode (mq, tcp, kafka, rocketMQ, rabbitMQ, pulsarMQ)
canal.serverMode = rocketMQ
# Instance list
canal.destinations = product-syn
# Config root dir
canal.conf.dir = ../conf
# Global Spring config for production
canal.instance.global.spring.xml = classpath:spring/default-instance.xml

# Batch size (default 50K, keep below 1M for Kafka)
canal.mq.canalBatchSize = 50
# Get data timeout (ms)
canal.mq.canalGetTimeout = 100
# Flat JSON message
canal.mq.flatMessage = true

2) instance.properties (under conf/product-syn) :

# Database connection
canal.instance.master.address=192.168.1.20:3306
canal.instance.dbUsername = canal
canal.instance.dbPassword = canal
# Table filter
canal.instance.filter.regex=mytest.t_product
# MQ settings
canal.mq.topic=product-syn-topic
canal.mq.partition=0

3) Start Services

Launch two Canal servers and verify their status via Zookeeper GUI. Insert a record into t_product ; the change will be observed in the RocketMQ console.

6. Consumer

1) Product Index Service – handles CRUD operations on the Elasticsearch index.

2) Consumer Listener – processes messages in order, extracts the data node, converts it to JSON, and invokes the appropriate index operation based on the event type (INSERT, UPDATE, DELETE).

7. Conclusion

Canal is a powerful open‑source data transmission service (DTS) used by many companies. Studying its source code provides insights into network programming, multithreading, high‑performance queues (Disruptor), and abstract workflow models.

The complete code for this tutorial is available at https://github.com/makemyownlife/rocketmq4-learning .

ElasticsearchMySQLbinlogrocketmqCanalData Sync
Sohu Tech Products
Written by

Sohu Tech Products

A knowledge-sharing platform for Sohu's technology products. As a leading Chinese internet brand with media, video, search, and gaming services and over 700 million users, Sohu continuously drives tech innovation and practice. We’ll share practical insights and tech news here.

0 followers
Reader feedback

How this landed with the community

login Sign in to like

Rate this article

Was this worth your time?

Sign in to rate
Discussion

0 Comments

Thoughtful readers leave field notes, pushback, and hard-won operational detail here.