Real-time Data Ingestion from MySQL to Apache Doris Using Flink CDC and Doris Flink Connector
This article demonstrates, with step‑by‑step examples, how to capture MySQL changes via Flink CDC and stream them in real time into Apache Doris using the Doris Flink Connector, covering CDC concepts, connector features, environment setup, SQL client usage, and data verification.
This guide explains how to use Flink CDC together with the Doris Flink Connector to listen to changes in a MySQL database and continuously write them into the corresponding tables of an Apache Doris data warehouse.
1. What is CDC
CDC (Change Data Capture) records incremental changes in a source database and synchronizes them to one or more sinks, optionally applying transformations such as GROUP BY or JOIN.
Typical scenarios include real‑time data replication for analytics, caching, or reporting across multiple downstream systems.
1.1 CDC Application Scenarios
Data synchronization for backup and disaster recovery.
Data distribution to multiple downstream systems.
Data collection for ETL into data warehouses or lakes.
Two main technical approaches exist:
Query‑based CDC – offline batch queries that cannot guarantee consistency or real‑time delivery.
Log‑based CDC – real‑time consumption of database logs (e.g., MySQL binlog) that ensures consistency and low latency.
2. Flink CDC
Flink added CDC support in version 1.11. The typical pipeline consists of:
Enable MySQL binlog.
Use Canal to sync binlog to Kafka.
Flink reads the binlog stream from Kafka for processing.
Flink CDC can directly read binlog from the database, simplifying the chain.
2.1 Flink MySQL CDC 2.0 Features
Concurrent reading with horizontally scalable full‑load performance.
Lock‑free operation, avoiding impact on online workloads.
Checkpoint‑based resume for fault tolerance.
Benchmark: MySQL CDC 2.0 processes 65 million rows in 13 minutes versus 89 minutes for version 1.4, a 6.8× speedup.
3. Doris Flink Connector
Apache Doris is a modern MPP analytical database offering sub‑second query latency and simple distributed architecture.
The Doris Flink Connector enables Flink to read and write Doris tables. It supports Flink 1.11‑1.13 and Scala 2.12.
Key sink parameters:
sink.batch.size : number of rows per write (default 100).
sink.batch.interval : time interval in seconds for batch writes (default 1).
Enable HTTP v2 in fe.conf with enable_http_server_v2=true and ensure the user has admin rights.
4. Usage Example
4.1 Build the Doris Flink Connector
Download the pre‑built JAR doris-flink-1.0-SNAPSHOT.jar or compile it in the Docker image apache/incubator-doris:build-env-1.2 using sh build.sh . The resulting JAR is placed in Flink’s ClassPath (or jars/ for Yarn).
4.2 Configure Flink
Install Flink 1.13.3 and the MySQL CDC JAR flink-connector-mysql-cdc-2.0.2.jar . Ensure version compatibility between Flink and Flink CDC.
4.3 Install Apache Doris
Follow the linked installation guide to deploy Doris.
4.4 Set Up MySQL
Use Docker to run MySQL, enable binlog with:
log_bin=mysql_bin
binlog-format=Row
server-id=1Create a test table test_cdc with columns id and name .
4.5 Create Doris Table
CREATE TABLE `doris_test` (
`id` int NULL COMMENT "",
`name` varchar(100) NULL COMMENT ""
) ENGINE=OLAP
UNIQUE KEY(`id`)
COMMENT "OLAP"
DISTRIBUTED BY HASH(`id`) BUCKETS 1
PROPERTIES (
"replication_num" = "3",
"in_memory" = "false",
"storage_format" = "V2"
);4.6 Launch Flink SQL Client
./bin/sql-client.sh embedded
set execution.result-mode=tableau;4.7 Create Flink CDC MySQL Mapping Table
CREATE TABLE test_flink_cdc (
id INT,
name STRING,
PRIMARY KEY(id) NOT ENFORCED
) WITH (
'connector' = 'mysql-cdc',
'hostname' = 'localhost',
'port' = '3306',
'username' = 'root',
'password' = 'password',
'database-name' = 'demo',
'table-name' = 'test_cdc'
);4.8 Create Flink Doris Mapping Table
CREATE TABLE doris_test_sink (
id INT,
name STRING
) WITH (
'connector' = 'doris',
'fenodes' = 'localhost:8030',
'table.identifier' = 'db_audit.doris_test',
'sink.batch.size' = '2',
'sink.batch.interval' = '1',
'username' = 'root',
'password' = ''
);4.9 Verify Tables
Run SELECT * FROM test_flink_cdc; and SELECT * FROM doris_test_sink; to confirm successful creation.
4.10 Insert Data into MySQL and Stream to Doris
INSERT INTO doris_test_sink SELECT id, name FROM test_flink_cdc;Observe the Flink job in the Web UI and verify data appears in Doris.
4.11 Update MySQL Data
UPDATE test_cdc SET name='这个是验证修改的操作' WHERE id=123;Because the Doris table uses a Unique‑key model, the update propagates to Doris; other models (Aggregate or Duplicate) do not support updates.
4.12 Delete Operation
Current Doris Flink Connector does not support delete operations; this feature is planned for future releases.
Big Data Technology Architecture
Exploring Open Source Big Data and AI Technologies
How this landed with the community
Was this worth your time?
0 Comments
Thoughtful readers leave field notes, pushback, and hard-won operational detail here.