Data Synchronization from MySQL to Elasticsearch using DataX and Canal
The article explains how to improve query performance by flattening multi‑table MySQL data and synchronizing it to Elasticsearch—using DataX for one‑time bulk loading and Canal (with Canal‑Adapter) for real‑time binlog‑driven incremental updates—while detailing configuration steps, job examples, and common pitfalls.
Background : The business system requires multi‑table queries with fuzzy matching. Indexes cannot cover the conditions, resulting in low query performance.
Solution direction : Introduce a search engine (Elasticsearch) and sync data in real time to improve query speed.
Analysis : Syncing each table separately to ES and joining there incurs performance loss and higher complexity. Flattening the data and syncing the whole multi‑table result set to ES yields the best query performance and lower system modification cost.
Implementation plan : Use DataX for full‑offline sync and Canal for incremental sync.
DataX overview : DataX is an offline data‑sync framework built on a Framework + plugin architecture. It abstracts data sources and sinks as Reader and Writer plugins, while the Framework handles buffering, flow control, concurrency and data conversion.
Supported plugins include MySQL Reader and Elasticsearch Writer. In the example, the MySQL Reader reads data via JDBC, generates a SELECT statement, and the Elasticsearch Writer writes the records to an ES index using the REST API.
DataX job example :
{
"job": {
"setting": {"speed": {"channel":1}},
"content": [{
"reader": {
"name": "mysqlreader",
"parameter": {
"username": "xxx",
"password": "xxx",
"connection": [{
"querySql": ["select u.user_id as _id,ur.role_id as role_id,r.id as r_id,u.username as username,u.real_name as real_name,r.name as role_name from sys_user u left join sys_user_role ur on u.user_id = ur.user_id left join sys_role r on ur.role_id = r.id"],
"jdbcUrl": ["jdbc:mysql://localhost:3306/test?useUnicode=true&characterEncoding=UTF-8"]
}]
}
},
"writer": {
"name": "elasticsearchwriter",
"parameter": {
"endpoint": "http://es-v.elasticsearch.aliyuncs.com:9200",
"index": "kefu_user",
"type": "user_role",
"column": [
{"name": "_id", "type": "id"},
{"name": "r_id", "type": "long"},
{"name": "role_id", "type": "long"},
{"name": "username", "type": "keyword"},
{"name": "real_name", "type": "keyword"},
{"name": "role_name", "type": "keyword"}
]
}
}
}]
}
}Full‑offline sync is executed with python datax.py /tools/datax/job/datax-user-job.json after navigating to the {datax_home}/bin directory.
Canal working principle : Canal mimics a MySQL slave, connects to the MySQL master, receives binlog events, parses the binary log stream, and forwards the changes. This enables real‑time incremental capture of data changes.
Key MySQL binlog steps:
Master writes changes to the binary log.
Slave copies the binary log to a relay log.
Slave replays the relay log to apply changes.
Canal replaces the slave, sends a dump request to the master, receives the binlog, and parses it.
Canal configuration (example):
# position info
canal.instance.master.address=localhost:3306
canal.instance.dbUsername=xxx
canal.instance.dbPassword=xxx
canal.instance.connectionCharset=UTF-8Incremental sync steps:
Navigate to {canal_deployer_home}/bin and start the server: sh startup.sh .
Monitor logs with tail -f {canal_deployer_home}/logs/canal/canal.log .
Canal‑Adapter configuration connects Canal to ES. Example YAML snippet:
server:
port: 8081
canal.conf:
canalServerHost: 127.0.0.1:11111
batchSize: 500
srcDataSources:
defaultDS:
url: jdbc:mysql://localhost:3306/test?useUnicode=true
username: xxx
password: xxx
canalAdapters:
- instance: example
groups:
- groupId: g1
outerAdapters:
- key: exampleKey
name: es
hosts: es-cn-v.elasticsearch.aliyuncs.com:9200
properties:
mode: rest
security.auth: xxx:aaaaaa
cluster.name: elasticsearchES mapping configuration (mytest_user.yml) defines the target index, type, id field, upsert behavior, and the SQL used for data extraction.
dataSourceKey: defaultDS
outerAdapterKey: exampleKey
destination: example
esMapping:
_index: xx_user
_type: user_role
_id: _id
upsert: true
sql: "select u.user_id as _id,ur.role_id as role_id,r.id as r_id,u.username as username,u.real_name as real_name,r.name as role_name from sys_user u left join sys_user_role ur on u.user_id = ur.user_id left join sys_role r on ur.role_id = r.id"
commitBatch: 3000Common issues :
Multi‑table joins may not sync correctly for certain tables.
Full‑sync performance can degrade if the SQL is not properly flattened.
Null date fields in MySQL become the current timestamp in ES; other null fields may be omitted.
Incorrect MySQL address format (e.g., using a full JDBC URL) can cause Canal startup failures.
Overall, the article provides a complete end‑to‑end solution for migrating and continuously syncing MySQL multi‑table data to Elasticsearch using DataX for bulk loading and Canal for CDC‑based incremental updates.
DeWu Technology
A platform for sharing and discussing tech knowledge, guiding you toward the cloud of technology.
How this landed with the community
Was this worth your time?
0 Comments
Thoughtful readers leave field notes, pushback, and hard-won operational detail here.