How to Sync MySQL Binlog to Elasticsearch Using Canal and RocketMQ

Learn step‑by‑step how to configure Alibaba’s open‑source Canal to capture MySQL binlog changes, route them through RocketMQ, and index the data into Elasticsearch, covering cluster mode, MySQL and Elasticsearch setup, Canal properties, and consumer implementation details.

dbaplus Community
dbaplus Community
dbaplus Community
How to Sync MySQL Binlog to Elasticsearch Using Canal and RocketMQ

Overview

This guide demonstrates how to use Alibaba’s open‑source Canal to capture incremental changes from a MySQL binlog, publish them to RocketMQ, and index the data into Elasticsearch. It covers cluster deployment, MySQL, Elasticsearch, RocketMQ, Canal configuration, and a Java consumer implementation.

1. Cluster Mode

Canal runs as a cluster of JVM processes coordinated by Zookeeper. Each server represents a Canal process and can host multiple instance objects, each defining a synchronization task.

eventParser : connects to MySQL via the slave protocol and parses binlog events.

eventSink : filters, transforms, and routes parsed events.

eventStore : persists intermediate data.

metaManager : manages subscription and consumption metadata.

In production the high‑availability of Canal relies on Zookeeper. Two client modes exist—TCP and MQ. The MQ mode is preferred because it decouples Canal from downstream consumers by sending events to a message queue such as RocketMQ.

2. MySQL Configuration

Enable binary logging and set the binlog format to ROW. Example my.cnf:

[mysqld]
log-bin=mysql-bin          # enable binlog
binlog-format=ROW           # row‑level format
server_id=1                 # unique ID, must differ from Canal’s slaveId

Create a replication user for Canal:

CREATE USER canal IDENTIFIED BY 'canal';
GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';
FLUSH PRIVILEGES;

Create the source table that will be synchronized:

CREATE TABLE `t_product` (
  `id` BIGINT(20) NOT NULL AUTO_INCREMENT,
  `name` VARCHAR(255) COLLATE utf8mb4_bin NOT NULL,
  `price` DECIMAL(10,2) NOT NULL,
  `status` TINYINT(4) NOT NULL,
  `create_time` DATETIME NOT NULL,
  `update_time` DATETIME NOT NULL,
  PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin;

3. Elasticsearch Configuration

Create an index named t_product with appropriate mappings (via Kibana or the REST API):

PUT /t_product
{
  "settings": {
    "number_of_shards": 2,
    "number_of_replicas": 1
  },
  "mappings": {
    "properties": {
      "id": {"type": "keyword"},
      "name": {"type": "text"},
      "price": {"type": "double"},
      "status": {"type": "integer"},
      "createTime": {"type": "date", "format": "yyyy-MM-dd HH:mm:ss||yyyy-MM-dd||epoch_millis"},
      "updateTime": {"type": "date", "format": "yyyy-MM-dd HH:mm:ss||yyyy-MM-dd||epoch_millis"}
    }
  }
}

4. RocketMQ Configuration

Create a topic that Canal will use to publish change events, for example:

product-syn-topic

Ensure the topic is created in RocketMQ before starting Canal.

5. Canal Configuration

Select Canal version 1.1.6 and edit the files under the conf directory.

5.1 canal.properties

# Zookeeper address for cluster mode
canal.zkServers=localhost:2181
# Server mode (mq, tcp, kafka, rocketMQ, rabbitMQ, pulsarMQ)
canal.serverMode=rocketMQ
# Instance list (comma‑separated)
canal.destinations=product-syn
# Global Spring configuration for production clustering
canal.instance.global.spring.xml=classpath:spring/default-instance.xml
# Batch size and timeout settings
canal.mq.canalBatchSize=50
canal.mq.canalGetTimeout=100
canal.mq.flatMessage=true

5.2 instance.properties (under conf/product-syn )

# MySQL master address
canal.instance.master.address=192.168.1.20:3306
# Canal user credentials
canal.instance.dbUsername=canal
canal.instance.dbPassword=canal
# Table filter – only sync this table
canal.instance.filter.regex=mytest.t_product
# RocketMQ settings
canal.mq.topic=product-syn-topic
canal.mq.partition=0

5.3 Starting the services

Launch two Canal server instances (e.g., bin/startup.sh). Verify their registration in Zookeeper. Updating a row in t_product should generate a message visible in the RocketMQ console.

6. Consumer Implementation

The consumer consists of two components:

Product index service : provides methods to create, update, or delete documents in Elasticsearch.

Message listener : subscribes to the RocketMQ topic, ensures FIFO order per partition, extracts the JSON payload from the data node, and invokes the appropriate index‑service method based on the binlog operation type (INSERT, UPDATE, DELETE).

Key points:

Maintain strict order within a partition; RocketMQ guarantees FIFO for messages sharing the same sharding key.

Map Canal’s eventType to Elasticsearch actions: INSERTindex, UPDATEupdate, DELETEdelete.

Reference Code

All example code is available at the following repository:

https://github.com/makemyownlife/rocketmq4-learning

Original Source

Signed-in readers can open the original source through BestHub's protected redirect.

Sign in to view source
Republication Notice

This article has been distilled and summarized from source material, then republished for learning and reference. If you believe it infringes your rights, please contactadmin@besthub.devand we will review it promptly.

mysqlRocketMQCanalCDC
dbaplus Community
Written by

dbaplus Community

Enterprise-level professional community for Database, BigData, and AIOps. Daily original articles, weekly online tech talks, monthly offline salons, and quarterly XCOPS&DAMS conferences—delivered by industry experts.

0 followers
Reader feedback

How this landed with the community

Sign in to like

Rate this article

Was this worth your time?

Sign in to rate
Discussion

0 Comments

Thoughtful readers leave field notes, pushback, and hard-won operational detail here.