How to Sync MySQL Data to Elasticsearch in Real-Time Using Binlog and Kafka

This article explains how a growing e‑commerce platform replaced a heavyweight MySQL intermediate table with a binlog‑driven pipeline that streams changes to Elasticsearch via Kafka, detailing the architecture, modules, customizations, monitoring, and performance results.

ITPUB
ITPUB
ITPUB
How to Sync MySQL Data to Elasticsearch in Real-Time Using Binlog and Kafka

Background and Motivation

Rapid growth of e‑commerce business data made MySQL alone insufficient for multi‑dimensional product and order queries. Elasticsearch was added for fast search, but synchronizing MySQL data to Elasticsearch became a performance bottleneck.

Problems with the Original Intermediate‑Table Approach

The initial solution created a MySQL table that mirrored the fields required by Elasticsearch mappings. A cron job read rows whose update timestamp had changed and wrote them to Elasticsearch. This design caused:

Duplicated write logic between the source tables and the intermediate table.

High latency when new mapping fields were added, because the intermediate table needed schema changes.

Increased development effort as the schema evolved.

Binlog‑Driven Data Synchronization

MySQL binlog records every data change for replication. By consuming binlog events, the system can push changes to Elasticsearch in near real‑time, eliminating the need for an intermediate table.

Correctness Guarantees

Binlog events are published to a Kafka topic with access control. Consumers belong to a consumer group and read ordered partitions. The design ensures:

Order : Each partition preserves order. Binlog rows are hashed by primary key to the same partition, guaranteeing ordered updates per record.

Completeness : Offsets are committed only after the corresponding document is successfully written to Elasticsearch, preventing data loss during rebalance or crashes.

Technical Architecture

The pipeline consists of five key modules:

Configuration parsing (TOML/JSON or configuration‑center JSON).

Rule engine that maps binlog fields to Elasticsearch index, document ID, and field types.

Kafka client handling authentication, offset management, and partition consumption.

Binlog parsing (Canal already provides JSON‑formatted events).

Elasticsearch writer that batches updates via the _bulk API.

Module Details

Configuration Parsing : Reads Kafka cluster settings, Elasticsearch endpoints, and field‑to‑mapping rules.

Rule Engine : Determines target index, document‑ID field, and field mappings; supports SQL‑like WHERE clauses to filter unwanted rows.

Kafka Module : Implements a generic Golang Kafka consumer with SASL support and precise offset control.

Binlog Parsing Module : Consumes JSON binlog events from Canal, extracting database, table, and data fields.

Elasticsearch Module : Converts the key‑value map into _bulk payloads, batches them every 200 ms or when a size threshold is reached, and uses doc_as_upsert for upserts.

Customizations

Upsert : Include {"doc_as_upsert": true} in the bulk request to create the document if it does not exist.

Filter : Add a SQL‑like filter, e.g. SELECT * FROM sometable WHERE type IN (1,2), to sync only selected rows.

Full‑Load and Incremental Sync

Initial full‑load imports historical MySQL data into Elasticsearch. For incremental sync, the consumer commits the current offset for each partition before starting, allowing immediate consumption of new binlog events without waiting for the queue to drain.

Microservice Deployment and Configuration Center

The pipeline runs as a microservice, enabling rapid scaling of consumer instances when binlog traffic spikes. Configuration is managed centrally (replacing static TOML files), simplifying per‑business and per‑environment settings.

Logging and Monitoring

All binlog messages, bulk request payloads, and Elasticsearch responses are logged to the ELK stack. Two key metrics are monitored:

Sync latency (time from binlog generation to Elasticsearch write), typically around 1 second.

Heartbeat checks via a dedicated MySQL table updated every minute; missing updates trigger alerts covering Kafka, the microservice, and Elasticsearch.

Data Type Mapping (MySQL → Elasticsearch)

Integer types (tinyint, smallint, mediumint, int, bigint, year) → int64 Floating‑point types (float, double) → float32 String types (char, varchar, text) → string Date/Time types (datetime, timestamp) → string formatted as Y-m-d H:i:s Date only (date) → string formatted as

Y-m-d

Reference Implementation

The solution is built on the open‑source project go-mysql-elasticsearch. The repository can be cloned from:

https://github.com/siddontang/go-mysql-elasticsearch

Architecture diagram
Architecture diagram
Original Source

Signed-in readers can open the original source through BestHub's protected redirect.

Sign in to view source
Republication Notice

This article has been distilled and summarized from source material, then republished for learning and reference. If you believe it infringes your rights, please contactadmin@besthub.devand we will review it promptly.

BackendElasticsearchKafkamysqlBinlogdata-sync
ITPUB
Written by

ITPUB

Official ITPUB account sharing technical insights, community news, and exciting events.

0 followers
Reader feedback

How this landed with the community

Sign in to like

Rate this article

Was this worth your time?

Sign in to rate
Discussion

0 Comments

Thoughtful readers leave field notes, pushback, and hard-won operational detail here.