Integrating Apache Hudi with Flink CDC for Real‑Time Data Lake Solutions
This article explains how to integrate Apache Hudi with Flink CDC to build a near‑real‑time data lake, covering Hudi’s storage model, streaming primitives, version compatibility, Maven setup, SQL table definitions, data flow from MySQL through Kafka, and practical troubleshooting tips.
Introduction
Apache Hudi (pronounced "Hoodie") provides the following streaming primitives on DFS datasets:
Insert/Update – how to change the dataset
Incremental Pull – how to retrieve changed data
Hudi maintains a timeline of all operations performed on a dataset, offering an instant view of the data. It organizes the dataset in a directory structure similar to Hive tables, with partitions identified by unique paths. Each partition contains multiple files, each having a unique file ID and the commit that generated it. When updates occur, multiple files may share the same file ID but have different commit timestamps.
Storage Types – How Data Is Stored
Copy‑on‑Write
Columnar (pure columnar)
New file version creation
Read‑time merge
Near‑real‑time
Views – How Data Is Read
Read‑Optimized View – Input format selects only compressed columnar files.
Parquet file query performance
~30 minutes latency for 500 GB
Import existing Hive tables
Near‑Real‑Time View
Mixed, formatted data
~1‑5 minutes latency
Provides near‑real‑time tables
Incremental View
Dataset changes
Enable incremental pull
Hudi Storage Layer Composition
The storage layer consists of three parts:
Metadata – stored as a timeline that records all operations (commit, clean, compaction, index, data).
Commit – an atomic batch write with a monotonically increasing timestamp.
Clean – removes old file versions no longer needed for queries.
Compaction – converts row‑based files to columnar files.
Index – quickly maps record keys to files; default is a Bloom filter, alternative is Apache HBase.
Data – stored in two formats: read‑optimized columnar (default Parquet) and write‑optimized row‑based (default Avro).
Why Hudi Is Important for Large‑Scale Near‑Real‑Time Applications
Hudi addresses several limitations:
Scalability limits of HDFS
Need for faster data presentation in Hadoop
Lack of direct support for updates and deletes on existing data
Fast ETL and modeling
Ability to retrieve all updated records using the latest checkpoint timestamp without scanning the entire source table
Hudi Advantages
Overcomes HDFS scalability constraints
Provides rapid data presentation in Hadoop
Supports updates and deletes on existing data
Enables fast ETL and modeling
New Architecture with Lakehouse Integration
By combining lakehouse and stream‑batch integration, the architecture achieves data source, compute engine, storage, and computation semantics unified, delivering minute‑level data freshness suitable for near‑real‑time data warehousing.
Data flow:
MySQL data is captured by Flink CDC and sent to Kafka.
Kafka data is written to Hudi for both offline ODS storage and real‑time DWD‑DWS‑OLAP pipelines.
Historical data correction is performed when schema changes or previous job errors require reprocessing.
Best Practices – Version Compatibility
Flink
Hudi
1.12.2
0.9.0
1.13.1
0.10.0
Recommendation: Use Hudi master with Flink 1.13 for better compatibility with the CDC connector.
Downloading Hudi
https://mvnrepository.com/artifact/org.apache.hudi/hudi-flink-bundle
The latest version in Maven Central is 0.9.0; version 0.10.0 can be obtained from the community group or built from source.
Running Flink on Hudi
Place the hudi-flink-bundle_2.11-0.10.0.jar into flink/lib and execute:
bin/sql-client.sh embedded
Example Maven Project
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>org.example</groupId>
<artifactId>flink_hudi_test</artifactId>
<version>1.0-SNAPSHOT</version>
<properties>
<maven.compiler.source>8</maven.compiler.source>
<maven.compiler.target>8</maven.compiler.target>
<flink.version>1.13.1</flink.version>
<hudi.version>0.10.0</hudi.version>
<hadoop.version>2.10.1</hadoop.version>
</properties>
<dependencies>
... (dependencies omitted for brevity) ...
</dependencies>
</project>Insert Data into Hudi
After creating the MySQL source table, the following SQL inserts MySQL records into Hudi:
insert into t2 select replace(uuid(),'-',''),id,name,description,now() from mysql_binlogQuery Hudi Table
package name.lijiaqi;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.SqlDialect;
import org.apache.flink.table.api.TableResult;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
public class ReadHudi {
public static void main(String[] args) throws Exception {
EnvironmentSettings fsSettings = EnvironmentSettings.newInstance()
.useBlinkPlanner()
.inStreamingMode()
.build();
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, fsSettings);
tableEnv.getConfig().setSqlDialect(SqlDialect.DEFAULT);
String sourceDDL = "CREATE TABLE t2(
\tuuid VARCHAR(20),
\tid INT NOT NULL,
\tname VARCHAR(40),
\tdescription VARCHAR(40),
\tts TIMESTAMP(3)
) WITH (
'connector' = 'hudi',
'path' = 'hdfs://172.19.28.4:9000/hudi_t4/',
'table.type' = 'MERGE_ON_READ'
)";
tableEnv.executeSql(sourceDDL);
TableResult result2 = tableEnv.executeSql("select * from t2");
result2.print();
env.execute("read_hudi");
}
}Result screenshot:
Flink CDC 2.0 on Hudi
Using the official Flink package, add the following dependencies to $FLINK_HOME/lib:
hudi-flink-bundle_2.11-0.10.0‑SNAPSHOT.jar (built from master with Flink 1.13.2)
hadoop-mapreduce-client-core-2.7.3.jar (resolves Hudi ClassNotFoundException)
flink-sql-connector-mysql-cdc-2.0.0.jar
flink-format-changelog-json-2.0.0.jar
flink-sql-connector-kafka_2.11-1.13.2.jar
Note: CDC 2.0 changed its groupId from com.alibaba.ververica to com.ververica.
Create MySQL CDC Table
CREATE TABLE mysql_users (
id BIGINT PRIMARY KEY NOT ENFORCED,
name STRING,
birthday TIMESTAMP(3),
ts TIMESTAMP(3)
) WITH (
'connector' = 'mysql-cdc',
'hostname' = 'localhost',
'port' = '3306',
'username' = 'root',
'password' = 'dafei1288',
'server-time-zone' = 'Asia/Shanghai',
'database-name' = 'test',
'table-name' = 'users'
);Create Hudi Table
CREATE TABLE hudi_users5 (
id BIGINT PRIMARY KEY NOT ENFORCED,
name STRING,
birthday TIMESTAMP(3),
ts TIMESTAMP(3),
`partition` VARCHAR(20)
) PARTITIONED BY (`partition`) WITH (
'connector' = 'hudi',
'table.type' = 'MERGE_ON_READ',
'path' = 'hdfs://localhost:9009/hudi/hudi_users5'
);Configure execution mode and checkpoint interval:
set execution.result-mode=tableau; set execution.checkpointing.interval=10sec;
Insert data from MySQL CDC into Hudi:
INSERT INTO hudi_users5(id,name,birthday,ts,partition) SELECT id,name,birthday,ts,DATE_FORMAT(birthday,'yyyyMMdd') FROM mysql_users;
Query the Hudi table:
select * from hudi_users5;
Result screenshot:
Execution Plan Stuck Issue
The job was stuck at hoodie_stream_write. Setting a checkpoint interval resolved the problem:
set execution.checkpointing.interval=10sec;
After the change, the pipeline ran normally.
Conclusion
The prototype of a Flink + Hudi unified lake‑warehouse solution is now complete. Thanks to the community for the guidance.
--- Hi, I am Wang Zhiwu, a creator focusing on big data. Follow me for more industry insights.
Signed-in readers can open the original source through BestHub's protected redirect.
This article has been distilled and summarized from source material, then republished for learning and reference. If you believe it infringes your rights, please contactand we will review it promptly.
Big Data Technology & Architecture
Wang Zhiwu, a big data expert, dedicated to sharing big data technology.
How this landed with the community
Was this worth your time?
0 Comments
Thoughtful readers leave field notes, pushback, and hard-won operational detail here.
