Big Data 13 min read

Real-time Data Ingestion from MySQL to Apache Doris Using Flink CDC and Doris Flink Connector

This article demonstrates, with step‑by‑step examples, how to capture MySQL changes via Flink CDC and stream them in real time into Apache Doris using the Doris Flink Connector, covering CDC concepts, connector features, environment setup, SQL client usage, and data verification.

Big Data Technology Architecture
Big Data Technology Architecture
Big Data Technology Architecture
Real-time Data Ingestion from MySQL to Apache Doris Using Flink CDC and Doris Flink Connector

This guide explains how to use Flink CDC together with the Doris Flink Connector to listen to changes in a MySQL database and continuously write them into the corresponding tables of an Apache Doris data warehouse.

1. What is CDC

CDC (Change Data Capture) records incremental changes in a source database and synchronizes them to one or more sinks, optionally applying transformations such as GROUP BY or JOIN.

Typical scenarios include real‑time data replication for analytics, caching, or reporting across multiple downstream systems.

1.1 CDC Application Scenarios

Data synchronization for backup and disaster recovery.

Data distribution to multiple downstream systems.

Data collection for ETL into data warehouses or lakes.

Two main technical approaches exist:

Query‑based CDC – offline batch queries that cannot guarantee consistency or real‑time delivery.

Log‑based CDC – real‑time consumption of database logs (e.g., MySQL binlog) that ensures consistency and low latency.

2. Flink CDC

Flink added CDC support in version 1.11. The typical pipeline consists of:

Enable MySQL binlog.

Use Canal to sync binlog to Kafka.

Flink reads the binlog stream from Kafka for processing.

Flink CDC can directly read binlog from the database, simplifying the chain.

2.1 Flink MySQL CDC 2.0 Features

Concurrent reading with horizontally scalable full‑load performance.

Lock‑free operation, avoiding impact on online workloads.

Checkpoint‑based resume for fault tolerance.

Benchmark: MySQL CDC 2.0 processes 65 million rows in 13 minutes versus 89 minutes for version 1.4, a 6.8× speedup.

3. Doris Flink Connector

Apache Doris is a modern MPP analytical database offering sub‑second query latency and simple distributed architecture.

The Doris Flink Connector enables Flink to read and write Doris tables. It supports Flink 1.11‑1.13 and Scala 2.12.

Key sink parameters:

sink.batch.size : number of rows per write (default 100).

sink.batch.interval : time interval in seconds for batch writes (default 1).

Enable HTTP v2 in fe.conf with enable_http_server_v2=true and ensure the user has admin rights.

4. Usage Example

4.1 Build the Doris Flink Connector

Download the pre‑built JAR doris-flink-1.0-SNAPSHOT.jar or compile it in the Docker image apache/incubator-doris:build-env-1.2 using sh build.sh . The resulting JAR is placed in Flink’s ClassPath (or jars/ for Yarn).

4.2 Configure Flink

Install Flink 1.13.3 and the MySQL CDC JAR flink-connector-mysql-cdc-2.0.2.jar . Ensure version compatibility between Flink and Flink CDC.

4.3 Install Apache Doris

Follow the linked installation guide to deploy Doris.

4.4 Set Up MySQL

Use Docker to run MySQL, enable binlog with:

log_bin=mysql_bin
binlog-format=Row
server-id=1

Create a test table test_cdc with columns id and name .

4.5 Create Doris Table

CREATE TABLE `doris_test` (
  `id` int NULL COMMENT "",
  `name` varchar(100) NULL COMMENT ""
) ENGINE=OLAP
UNIQUE KEY(`id`)
COMMENT "OLAP"
DISTRIBUTED BY HASH(`id`) BUCKETS 1
PROPERTIES (
  "replication_num" = "3",
  "in_memory" = "false",
  "storage_format" = "V2"
);

4.6 Launch Flink SQL Client

./bin/sql-client.sh embedded
set execution.result-mode=tableau;

4.7 Create Flink CDC MySQL Mapping Table

CREATE TABLE test_flink_cdc (
  id INT,
  name STRING,
  PRIMARY KEY(id) NOT ENFORCED
) WITH (
  'connector' = 'mysql-cdc',
  'hostname' = 'localhost',
  'port' = '3306',
  'username' = 'root',
  'password' = 'password',
  'database-name' = 'demo',
  'table-name' = 'test_cdc'
);

4.8 Create Flink Doris Mapping Table

CREATE TABLE doris_test_sink (
  id INT,
  name STRING
) WITH (
  'connector' = 'doris',
  'fenodes' = 'localhost:8030',
  'table.identifier' = 'db_audit.doris_test',
  'sink.batch.size' = '2',
  'sink.batch.interval' = '1',
  'username' = 'root',
  'password' = ''
);

4.9 Verify Tables

Run SELECT * FROM test_flink_cdc; and SELECT * FROM doris_test_sink; to confirm successful creation.

4.10 Insert Data into MySQL and Stream to Doris

INSERT INTO doris_test_sink SELECT id, name FROM test_flink_cdc;

Observe the Flink job in the Web UI and verify data appears in Doris.

4.11 Update MySQL Data

UPDATE test_cdc SET name='这个是验证修改的操作' WHERE id=123;

Because the Doris table uses a Unique‑key model, the update propagates to Doris; other models (Aggregate or Duplicate) do not support updates.

4.12 Delete Operation

Current Doris Flink Connector does not support delete operations; this feature is planned for future releases.

real-timeFlinkConnectorMySQLCDCApache DorisData Streaming
Big Data Technology Architecture
Written by

Big Data Technology Architecture

Exploring Open Source Big Data and AI Technologies

0 followers
Reader feedback

How this landed with the community

login Sign in to like

Rate this article

Was this worth your time?

Sign in to rate
Discussion

0 Comments

Thoughtful readers leave field notes, pushback, and hard-won operational detail here.