How to Sync MySQL Binlog to Elasticsearch Using Canal and RocketMQ

This step‑by‑step tutorial shows how to configure Alibaba's Canal to capture MySQL binlog changes, route them through RocketMQ, and index the data into Elasticsearch, covering cluster mode, MySQL and Elasticsearch setup, RocketMQ topic creation, Canal properties, and consumer implementation.

Su San Talks Tech
Su San Talks Tech
Su San Talks Tech
How to Sync MySQL Binlog to Elasticsearch Using Canal and RocketMQ

1 Cluster Mode

In cluster mode each Canal server runs as a JVM instance; a server may host multiple instances, each representing a configured task. An instance consists of four modules: eventParser (simulates a MySQL slave and parses the binlog), eventSink (filters, processes, and distributes data), eventStore (stores data), and metaManager (manages subscription and consumption metadata).

Canal achieves high availability via ZooKeeper and supports two client modes: TCP and MQ. The MQ mode is preferred because it decouples Canal from downstream consumers by sending change events to Kafka or RocketMQ.

Ordered Consumption: Messages within the same partition are consumed strictly FIFO, guaranteeing order per partition while allowing parallelism across partitions.

2 MySQL Configuration

Enable binlog in MySQL and set binlog-format=ROW. Example my.cnf snippet:

[mysqld]
log-bin=mysql-bin # enable binlog
binlog-format=ROW # use ROW mode
server_id=1 # unique server ID, must not clash with Canal slaveId

Grant the Canal user replication privileges:

CREATE USER canal IDENTIFIED BY 'canal';
GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';
FLUSH PRIVILEGES;

Create the source table t_product:

CREATE TABLE `t_product` (
  `id` BIGINT(20) NOT NULL AUTO_INCREMENT,
  `name` VARCHAR(255) COLLATE utf8mb4_bin NOT NULL,
  `price` DECIMAL(10,2) NOT NULL,
  `status` TINYINT(4) NOT NULL,
  `create_time` datetime NOT NULL,
  `update_time` datetime NOT NULL,
  PRIMARY KEY (`id`)
) ENGINE=INNODB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin;

3 Elasticsearch Configuration

Create an index for the product data via Kibana:

PUT /t_product
{
  "settings": {
    "number_of_shards": 2,
    "number_of_replicas": 1
  },
  "mappings": {
    "properties": {
      "id": {"type": "keyword"},
      "name": {"type": "text"},
      "price": {"type": "double"},
      "status": {"type": "integer"},
      "createTime": {"type": "date", "format": "yyyy-MM-dd HH:mm:ss||yyyy-MM-dd||epoch_millis"},
      "updateTime": {"type": "date", "format": "yyyy-MM-dd HH:mm:ss||yyyy-MM-dd||epoch_millis"}
    }
  }
}

4 RocketMQ Configuration

Create a topic named product-syn-topic. Canal will publish binlog change events to this topic.

5 Canal Configuration

Select Canal version 1.1.6 and edit the conf directory.

1. canal.properties

# Cluster mode ZooKeeper address
canal.zkServers = localhost:2181
# Server mode (mq, tcp, kafka, rocketMQ, rabbitMQ, pulsarMQ)
canal.serverMode = rocketMQ
# Instance list
canal.destinations = product-syn
# Global Spring config for production
canal.instance.global.spring.xml = classpath:spring/default-instance.xml

# Default values (shown for reference)
canal.mq.canalBatchSize = 50
canal.mq.canalGetTimeout = 100
canal.mq.flatMessage = true

2. Instance configuration (instance.properties)

# Database connection (modify as needed)
canal.instance.master.address=192.168.1.20:3306
canal.instance.dbUsername = canal
canal.instance.dbPassword = canal

# Table filter
canal.instance.filter.regex=mytest.t_product

# MQ settings
canal.mq.topic=product-syn-topic
canal.mq.partition=0

3. Start Services

Launch two Canal server instances and verify their status via ZooKeeper UI.

6 Consumer Implementation

The consumer consists of two parts:

Product index operation service.

Consumption listener that processes ordered messages, extracts the data node, converts it to JSON, and invokes the appropriate index operation (INSERT, UPDATE, DELETE) based on the event type.

7 Conclusion

Canal is a powerful open‑source data transmission service widely used to build real‑time pipelines. By following this guide you can learn network programming, multithreading, high‑performance queues (Disruptor), and abstract workflow modeling.

Original Source

Signed-in readers can open the original source through BestHub's protected redirect.

Sign in to view source
Republication Notice

This article has been distilled and summarized from source material, then republished for learning and reference. If you believe it infringes your rights, please contactadmin@besthub.devand we will review it promptly.

ElasticsearchmysqlRocketMQCanaldata synchronization
Su San Talks Tech
Written by

Su San Talks Tech

Su San, former staff at several leading tech companies, is a top creator on Juejin and a premium creator on CSDN, and runs the free coding practice site www.susan.net.cn.

0 followers
Reader feedback

How this landed with the community

Sign in to like

Rate this article

Was this worth your time?

Sign in to rate
Discussion

0 Comments

Thoughtful readers leave field notes, pushback, and hard-won operational detail here.