Big Data 14 min read

Data Synchronization from MySQL to Elasticsearch using DataX and Canal

The article explains how to improve query performance by flattening multi‑table MySQL data and synchronizing it to Elasticsearch—using DataX for one‑time bulk loading and Canal (with Canal‑Adapter) for real‑time binlog‑driven incremental updates—while detailing configuration steps, job examples, and common pitfalls.

DeWu Technology
DeWu Technology
DeWu Technology
Data Synchronization from MySQL to Elasticsearch using DataX and Canal

Background : The business system requires multi‑table queries with fuzzy matching. Indexes cannot cover the conditions, resulting in low query performance.

Solution direction : Introduce a search engine (Elasticsearch) and sync data in real time to improve query speed.

Analysis : Syncing each table separately to ES and joining there incurs performance loss and higher complexity. Flattening the data and syncing the whole multi‑table result set to ES yields the best query performance and lower system modification cost.

Implementation plan : Use DataX for full‑offline sync and Canal for incremental sync.

DataX overview : DataX is an offline data‑sync framework built on a Framework + plugin architecture. It abstracts data sources and sinks as Reader and Writer plugins, while the Framework handles buffering, flow control, concurrency and data conversion.

Supported plugins include MySQL Reader and Elasticsearch Writer. In the example, the MySQL Reader reads data via JDBC, generates a SELECT statement, and the Elasticsearch Writer writes the records to an ES index using the REST API.

DataX job example :

{
  "job": {
    "setting": {"speed": {"channel":1}},
    "content": [{
      "reader": {
        "name": "mysqlreader",
        "parameter": {
          "username": "xxx",
          "password": "xxx",
          "connection": [{
            "querySql": ["select u.user_id as _id,ur.role_id as role_id,r.id as r_id,u.username as username,u.real_name as real_name,r.name as role_name from sys_user u left join sys_user_role ur on u.user_id = ur.user_id left join sys_role r on ur.role_id = r.id"],
            "jdbcUrl": ["jdbc:mysql://localhost:3306/test?useUnicode=true&characterEncoding=UTF-8"]
          }]
        }
      },
      "writer": {
        "name": "elasticsearchwriter",
        "parameter": {
          "endpoint": "http://es-v.elasticsearch.aliyuncs.com:9200",
          "index": "kefu_user",
          "type": "user_role",
          "column": [
            {"name": "_id", "type": "id"},
            {"name": "r_id", "type": "long"},
            {"name": "role_id", "type": "long"},
            {"name": "username", "type": "keyword"},
            {"name": "real_name", "type": "keyword"},
            {"name": "role_name", "type": "keyword"}
          ]
        }
      }
    }]
  }
}

Full‑offline sync is executed with python datax.py /tools/datax/job/datax-user-job.json after navigating to the {datax_home}/bin directory.

Canal working principle : Canal mimics a MySQL slave, connects to the MySQL master, receives binlog events, parses the binary log stream, and forwards the changes. This enables real‑time incremental capture of data changes.

Key MySQL binlog steps:

Master writes changes to the binary log.

Slave copies the binary log to a relay log.

Slave replays the relay log to apply changes.

Canal replaces the slave, sends a dump request to the master, receives the binlog, and parses it.

Canal configuration (example):

# position info
canal.instance.master.address=localhost:3306
canal.instance.dbUsername=xxx
canal.instance.dbPassword=xxx
canal.instance.connectionCharset=UTF-8

Incremental sync steps:

Navigate to {canal_deployer_home}/bin and start the server: sh startup.sh .

Monitor logs with tail -f {canal_deployer_home}/logs/canal/canal.log .

Canal‑Adapter configuration connects Canal to ES. Example YAML snippet:

server:
  port: 8081
canal.conf:
  canalServerHost: 127.0.0.1:11111
  batchSize: 500
  srcDataSources:
    defaultDS:
      url: jdbc:mysql://localhost:3306/test?useUnicode=true
      username: xxx
      password: xxx
  canalAdapters:
    - instance: example
      groups:
        - groupId: g1
          outerAdapters:
            - key: exampleKey
              name: es
              hosts: es-cn-v.elasticsearch.aliyuncs.com:9200
              properties:
                mode: rest
                security.auth: xxx:aaaaaa
                cluster.name: elasticsearch

ES mapping configuration (mytest_user.yml) defines the target index, type, id field, upsert behavior, and the SQL used for data extraction.

dataSourceKey: defaultDS
outerAdapterKey: exampleKey
destination: example
esMapping:
  _index: xx_user
  _type: user_role
  _id: _id
  upsert: true
  sql: "select u.user_id as _id,ur.role_id as role_id,r.id as r_id,u.username as username,u.real_name as real_name,r.name as role_name from sys_user u left join sys_user_role ur on u.user_id = ur.user_id left join sys_role r on ur.role_id = r.id"
  commitBatch: 3000

Common issues :

Multi‑table joins may not sync correctly for certain tables.

Full‑sync performance can degrade if the SQL is not properly flattened.

Null date fields in MySQL become the current timestamp in ES; other null fields may be omitted.

Incorrect MySQL address format (e.g., using a full JDBC URL) can cause Canal startup failures.

Overall, the article provides a complete end‑to‑end solution for migrating and continuously syncing MySQL multi‑table data to Elasticsearch using DataX for bulk loading and Canal for CDC‑based incremental updates.

elasticsearchmysqlCanalData SynchronizationDataXETL
DeWu Technology
Written by

DeWu Technology

A platform for sharing and discussing tech knowledge, guiding you toward the cloud of technology.

0 followers
Reader feedback

How this landed with the community

login Sign in to like

Rate this article

Was this worth your time?

Sign in to rate
Discussion

0 Comments

Thoughtful readers leave field notes, pushback, and hard-won operational detail here.