How to Sync MySQL Data to Elasticsearch in Real-Time Using Binlog and Kafka
This article explains how a growing e‑commerce platform replaced a heavyweight MySQL intermediate table with a binlog‑driven pipeline that streams changes to Elasticsearch via Kafka, detailing the architecture, modules, customizations, monitoring, and performance results.
Background and Motivation
Rapid growth of e‑commerce business data made MySQL alone insufficient for multi‑dimensional product and order queries. Elasticsearch was added for fast search, but synchronizing MySQL data to Elasticsearch became a performance bottleneck.
Problems with the Original Intermediate‑Table Approach
The initial solution created a MySQL table that mirrored the fields required by Elasticsearch mappings. A cron job read rows whose update timestamp had changed and wrote them to Elasticsearch. This design caused:
Duplicated write logic between the source tables and the intermediate table.
High latency when new mapping fields were added, because the intermediate table needed schema changes.
Increased development effort as the schema evolved.
Binlog‑Driven Data Synchronization
MySQL binlog records every data change for replication. By consuming binlog events, the system can push changes to Elasticsearch in near real‑time, eliminating the need for an intermediate table.
Correctness Guarantees
Binlog events are published to a Kafka topic with access control. Consumers belong to a consumer group and read ordered partitions. The design ensures:
Order : Each partition preserves order. Binlog rows are hashed by primary key to the same partition, guaranteeing ordered updates per record.
Completeness : Offsets are committed only after the corresponding document is successfully written to Elasticsearch, preventing data loss during rebalance or crashes.
Technical Architecture
The pipeline consists of five key modules:
Configuration parsing (TOML/JSON or configuration‑center JSON).
Rule engine that maps binlog fields to Elasticsearch index, document ID, and field types.
Kafka client handling authentication, offset management, and partition consumption.
Binlog parsing (Canal already provides JSON‑formatted events).
Elasticsearch writer that batches updates via the _bulk API.
Module Details
Configuration Parsing : Reads Kafka cluster settings, Elasticsearch endpoints, and field‑to‑mapping rules.
Rule Engine : Determines target index, document‑ID field, and field mappings; supports SQL‑like WHERE clauses to filter unwanted rows.
Kafka Module : Implements a generic Golang Kafka consumer with SASL support and precise offset control.
Binlog Parsing Module : Consumes JSON binlog events from Canal, extracting database, table, and data fields.
Elasticsearch Module : Converts the key‑value map into _bulk payloads, batches them every 200 ms or when a size threshold is reached, and uses doc_as_upsert for upserts.
Customizations
Upsert : Include {"doc_as_upsert": true} in the bulk request to create the document if it does not exist.
Filter : Add a SQL‑like filter, e.g. SELECT * FROM sometable WHERE type IN (1,2), to sync only selected rows.
Full‑Load and Incremental Sync
Initial full‑load imports historical MySQL data into Elasticsearch. For incremental sync, the consumer commits the current offset for each partition before starting, allowing immediate consumption of new binlog events without waiting for the queue to drain.
Microservice Deployment and Configuration Center
The pipeline runs as a microservice, enabling rapid scaling of consumer instances when binlog traffic spikes. Configuration is managed centrally (replacing static TOML files), simplifying per‑business and per‑environment settings.
Logging and Monitoring
All binlog messages, bulk request payloads, and Elasticsearch responses are logged to the ELK stack. Two key metrics are monitored:
Sync latency (time from binlog generation to Elasticsearch write), typically around 1 second.
Heartbeat checks via a dedicated MySQL table updated every minute; missing updates trigger alerts covering Kafka, the microservice, and Elasticsearch.
Data Type Mapping (MySQL → Elasticsearch)
Integer types (tinyint, smallint, mediumint, int, bigint, year) → int64 Floating‑point types (float, double) → float32 String types (char, varchar, text) → string Date/Time types (datetime, timestamp) → string formatted as Y-m-d H:i:s Date only (date) → string formatted as
Y-m-dReference Implementation
The solution is built on the open‑source project go-mysql-elasticsearch. The repository can be cloned from:
https://github.com/siddontang/go-mysql-elasticsearch
Signed-in readers can open the original source through BestHub's protected redirect.
This article has been distilled and summarized from source material, then republished for learning and reference. If you believe it infringes your rights, please contactand we will review it promptly.
ITPUB
Official ITPUB account sharing technical insights, community news, and exciting events.
How this landed with the community
Was this worth your time?
0 Comments
Thoughtful readers leave field notes, pushback, and hard-won operational detail here.
