6 Proven Ways to Sync MySQL Data to Elasticsearch – Choose the Right Strategy
This article compares six mainstream MySQL‑to‑Elasticsearch synchronization methods—synchronous double‑write, asynchronous MQ, Logstash polling, Canal binlog listening, DataX batch sync, and Flink streaming—detailing scenarios, code samples, advantages, drawbacks, and practical selection guidance for developers.
Introduction
In distributed architectures, combining MySQL with Elasticsearch (ES) is a common solution for handling high‑concurrency queries and complex searches, but efficient data synchronization between the two remains a challenging design problem.
This article presents six mainstream solutions, complete with code examples and usage scenarios, to help developers avoid common pitfalls and make optimal technology choices.
Solution 1: Synchronous Double‑Write
Scenario: Real‑time data requirements with simple business logic, such as financial transaction record syncing.
Write to MySQL and ES simultaneously in the business code.
@Transactional
public void createOrder(Order order) {
// Write to MySQL
orderMapper.insert(order);
// Sync to ES
IndexRequest request = new IndexRequest("orders")
.id(order.getId())
.source(JSON.toJSONString(order), XContentType.JSON);
client.index(request, RequestOptions.DEFAULT);
}Pain Points:
Hard‑coded intrusion: Every write operation must add ES logic.
Performance bottleneck: Double‑write extends transaction time, reducing TPS by over 30%.
Data consistency risk: If ES write fails, a compensation mechanism (e.g., local transaction table + retry) is required.
Solution 2: Asynchronous MQ Double‑Write
Scenario: E‑commerce order status updates need to be searchable by customer‑service systems.
Decouple using a message queue.
Architecture Diagram:
Code Example:
// Producer
public void updateProduct(Product product) {
productMapper.update(product);
kafkaTemplate.send("product-update", product.getId());
}
// Consumer
@KafkaListener(topics = "product-update")
public void syncToEs(String productId) {
Product product = productMapper.selectById(productId);
esClient.index(product);
}Advantages:
Throughput boost: MQ smooths spikes, handling ten‑thousands of QPS.
Fault isolation: ES downtime does not affect the main business flow.
Drawbacks:
Message backlog: Sudden traffic may cause consumption lag; monitor Lag values.
Ordering issues: Must use partition keys to guarantee ordered consumption of the same data.
Solution 3: Logstash Periodic Pull
Scenario: User‑behavior log analysis with T+1 latency.
This approach is low‑intrusion but introduces higher latency.
Configuration Example:
input {
jdbc {
jdbc_driver => "com.mysql.jdbc.Driver"
jdbc_url => "jdbc:mysql://localhost:3306/log_db"
schedule => "*/5 * * * *" # every 5 minutes
statement => "SELECT * FROM user_log WHERE update_time > :sql_last_value"
}
}
output {
elasticsearch {
hosts => ["es-host:9200"]
index => "user_logs"
}
}Applicability Analysis:
Pros: Zero code changes, suitable for historical data migration.
Cons: Minute‑level delay (cannot meet real‑time search) and full‑table scans impose heavy load unless incremental indexes are optimized.
Solution 4: Canal Binlog Listening
Scenario: Real‑time search for social platform updates (e.g., Weibo hot topics).
Tech Stack: Canal + RocketMQ + ES
Provides millisecond‑level latency with low intrusion.
Architecture Flow:
Key Configuration:
# canal.properties
canal.instance.master.address=127.0.0.1:3306
canal.mq.topic=canal.es.syncAvoiding Pitfalls:
Data drift: Handle DDL changes via a Schema Registry.
Idempotent consumption: Use _id as a unique key to prevent duplicate writes.
Solution 5: DataX Batch Sync
Scenario: Migrating historical order data from sharded MySQL to ES.
Ideal for large‑scale data migration.
Configuration File:
{
"job": {
"content": [{
"reader": {
"name": "mysqlreader",
"parameter": {
"splitPk": "id",
"querySql": "SELECT * FROM orders"
}
},
"writer": {
"name": "elasticsearchwriter",
"parameter": {
"endpoint": "http://es-host:9200",
"index": "orders"
}
}
}]
}
}Performance Tuning:
Increase channel count to match shard numbers for higher concurrency.
Enable limit for batch queries to avoid OOM.
Solution 6: Flink Stream Processing
Scenario: Real‑time recommendation scoring when product prices change, requiring user‑profile joins.
Suitable for complex ETL scenarios.
Code Snippet:
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.addSource(new CanalSource())
.map(record -> parseToPriceEvent(record))
.keyBy(event -> event.getProductId())
.connect(userProfileBroadcastStream)
.process(new PriceRecommendationProcess())
.addSink(new ElasticsearchSink());Advantages:
State management: Handles out‑of‑order events precisely via Watermark.
Dimension table join: Real‑time profile association using Broadcast State.
Conclusion
Choosing the right synchronization method depends on project size, latency requirements, operational capabilities, and resource constraints. For teams lacking middleware expertise, Logstash or synchronous double‑write are safe bets; for sub‑second latency with moderate refactoring, asynchronous MQ with local transaction tables works well; for ultimate real‑time performance and ample resources, combining Canal with Flink offers the best solution.
Signed-in readers can open the original source through BestHub's protected redirect.
This article has been distilled and summarized from source material, then republished for learning and reference. If you believe it infringes your rights, please contactand we will review it promptly.
Su San Talks Tech
Su San, former staff at several leading tech companies, is a top creator on Juejin and a premium creator on CSDN, and runs the free coding practice site www.susan.net.cn.
How this landed with the community
Was this worth your time?
0 Comments
Thoughtful readers leave field notes, pushback, and hard-won operational detail here.
