Sync MySQL Binlog to Elasticsearch with Canal and RocketMQ – Step‑by‑Step Guide

This tutorial walks through configuring Canal in cluster mode, enabling MySQL binlog, creating an Elasticsearch index, setting up RocketMQ topics, adjusting Canal properties and instance files, launching services, and implementing a consumer to reliably synchronize MySQL changes into Elasticsearch.

Su San Talks Tech
Su San Talks Tech
Su San Talks Tech
Sync MySQL Binlog to Elasticsearch with Canal and RocketMQ – Step‑by‑Step Guide

1. Cluster Mode Overview

Canal runs as a server instance per JVM; a server can host multiple instances, each representing a synchronization task. The server consists of four modules: eventParser (parses MySQL binlog), eventSink (filters, processes, and distributes data), eventStore (stores data), and metaManager (manages subscription and consumption metadata).

Canal cluster architecture
Canal cluster architecture
Canal instance diagram
Canal instance diagram

In production, Canal achieves high availability via ZooKeeper. Client modes are divided into TCP mode and MQ mode . The tutorial focuses on MQ mode because it decouples Canal from downstream consumers by publishing binlog changes to Kafka or RocketMQ.

Sequential Consumption: For a given topic, messages are sharded by a key; within each partition, they are consumed in strict FIFO order, guaranteeing order per partition while allowing parallelism across partitions.

2. MySQL Configuration

Enable binlog and set binlog-format=ROW in my.cnf:

[mysqld]
log-bin=mysql-bin # enable binlog
binlog-format=ROW # row‑level logging
server_id=1 # unique server ID, must differ from Canal slaveId

Grant the Canal user replication privileges (skip for Alibaba Cloud RDS which enables binlog by default):

CREATE USER canal IDENTIFIED BY 'canal';
GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';
FLUSH PRIVILEGES;

Create the source table t_product:

CREATE TABLE `t_product` (
  `id` BIGINT(20) NOT NULL AUTO_INCREMENT,
  `name` VARCHAR(255) COLLATE utf8mb4_bin NOT NULL,
  `price` DECIMAL(10,2) NOT NULL,
  `status` TINYINT(4) NOT NULL,
  `create_time` datetime NOT NULL,
  `update_time` datetime NOT NULL,
  PRIMARY KEY (`id`)
) ENGINE=INNODB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin;

3. Elasticsearch Index Setup

Using Kibana (or the REST API), create an index t_product with appropriate mappings:

PUT /t_product
{
  "settings": {
    "number_of_shards": 2,
    "number_of_replicas": 1
  },
  "mappings": {
    "properties": {
      "id": {"type": "keyword"},
      "name": {"type": "text"},
      "price": {"type": "double"},
      "status": {"type": "integer"},
      "createTime": {"type": "date", "format": "yyyy-MM-dd HH:mm:ss||yyyy-MM-dd||epoch_millis"},
      "updateTime": {"type": "date", "format": "yyyy-MM-dd HH:mm:ss||yyyy-MM-dd||epoch_millis"}
    }
  }
}
Kibana index creation
Kibana index creation

4. RocketMQ Topic Configuration

Create a topic named product-syn-topic. Canal will publish binlog change events to this topic.

RocketMQ topic creation
RocketMQ topic creation
RocketMQ topic details
RocketMQ topic details

5. Canal Configuration

Select Canal version 1.1.6 and edit conf/canal.properties:

# Cluster mode ZK address
canal.zkServers = localhost:2181
# Server mode (rocketMQ, kafka, tcp, etc.)
canal.serverMode = rocketMQ
# Instance list
canal.destinations = product-syn
# Global Spring config for instances
canal.instance.global.spring.xml = classpath:spring/default-instance.xml
# Batch size and timeout (default values shown)
canal.mq.canalBatchSize = 50
canal.mq.canalGetTimeout = 100
canal.mq.flatMessage = true

Create an instance directory conf/product-syn with instance.properties and set database connection, filter, and MQ details:

# Database connection (replace with your own)
canal.instance.master.address=192.168.1.20:3306
canal.instance.dbUsername = canal
canal.instance.dbPassword = canal
# Table filter
canal.instance.filter.regex=mytest.t_product
# RocketMQ settings
canal.mq.topic=product-syn-topic
canal.mq.partition=0

6. Starting Services

Launch two Canal server instances (one per instance) and verify their status via ZooKeeper UI.

Canal ZK UI
Canal ZK UI

Insert or update a row in t_product. The change appears as a message in RocketMQ.

RocketMQ message view
RocketMQ message view

7. Consumer Implementation

The consumer service performs two main tasks:

Consume messages in order.

Parse the data node, extract the latest row data, and based on the operation type ( INSERT, UPDATE, DELETE) invoke the product index service to modify the Elasticsearch document.

Product index service diagram
Product index service diagram
Consumer listener flow
Consumer listener flow

8. Conclusion

Canal provides a robust way to capture MySQL incremental changes and, when combined with RocketMQ and Elasticsearch, enables real‑time data synchronization suitable for search, analytics, or downstream services. The tutorial demonstrates a complete end‑to‑end setup, from MySQL binlog activation to consumer logic that keeps the Elasticsearch index in sync.

Original Source

Signed-in readers can open the original source through BestHub's protected redirect.

Sign in to view source
Republication Notice

This article has been distilled and summarized from source material, then republished for learning and reference. If you believe it infringes your rights, please contactadmin@besthub.devand we will review it promptly.

BackendElasticsearchMySQLRocketMQCanalData Synchronization
Su San Talks Tech
Written by

Su San Talks Tech

Su San, former staff at several leading tech companies, is a top creator on Juejin and a premium creator on CSDN, and runs the free coding practice site www.susan.net.cn.

0 followers
Reader feedback

How this landed with the community

Sign in to like

Rate this article

Was this worth your time?

Sign in to rate
Discussion

0 Comments

Thoughtful readers leave field notes, pushback, and hard-won operational detail here.