Flink CDC Tutorial: Sync MySQL Data to Hudi Data Lake Using SQL
This article provides a comprehensive guide on using Flink CDC with Debezium to capture MySQL changes, covering serialization, adding dependencies, configuring SQL client and Java/Scala APIs, creating source and sink tables, enabling checkpoints, and streaming data into a Hudi data lake.
1. Introduction
Flink CDC uses Debezium for change data capture. Its features include parallel snapshot reading, chunk‑based snapshot division, no global read lock, and a consistency process that marks low/high binlog positions and reads accordingly.
2. Deserialization
Example JSON change event is shown. The JsonDebeziumDeserializationSchema can include schema and custom Decimal format configuration.
Map<String, Object> customConverterConfigs = new HashMap<>();<br/>customConverterConfigs.put(JsonConverterConfig.DECIMAL_FORMAT_CONFIG, "numeric");<br/>JsonDebeziumDeserializationSchema schema = new JsonDebeziumDeserializationSchema(true, customConverterConfigs);3. Adding Flink CDC Dependencies
3.1 sql‑client
Download flink-sql-connector-mysql-cdc-2.2.0.jar, place it in the lib directory of every Flink node, restart the cluster and start sql-client.sh.
3.2 Java/Scala API
Add the following Maven dependency to pom.xml:
<dependency><br/> <groupId>com.ververica</groupId><br/> <artifactId>flink-connector-mysql-cdc</artifactId><br/> <version>2.2.0</version><br/></dependency>4. Sync MySQL Data to Hudi via SQL
4.1 Table definition
MySQL source table info_message is created with columns id, msg_title, msg_ctx, msg_time and primary key id.
CREATE TABLE `info_message` (<br/> `id` bigint(20) NOT NULL AUTO_INCREMENT COMMENT '主键',<br/> `msg_title` varchar(100) DEFAULT NULL COMMENT '消息名称',<br/> `msg_ctx` varchar(2048) DEFAULT NULL COMMENT '消息内容',<br/> `msg_time` datetime DEFAULT NULL COMMENT '消息发送时间',<br/> PRIMARY KEY (`id`) <br/>);4.2 Enable checkpoint
Set execution.checkpointing.interval='10s' to allow exactly‑once semantics and ensure a full checkpoint before binlog reading.
4.3 Create MySQL source table in Flink
create table mysql_source (
database_name string metadata from 'database_name' virtual,
table_name string metadata from 'table_name' virtual,
id decimal(20,0) not null,
msg_title string,
msg_ctx string,
msg_time timestamp(9),
primary key (id) not enforced
) with (
'connector'='mysql-cdc',
'hostname'='192.168.8.124',
'port'='3306',
'username'='hnmqet',
'password'='hnmq123456',
'server-time-zone'='Asia/Shanghai',
'scan.startup.mode'='initial',
'database-name'='d_general',
'table-name'='info_message'
);4.4 Create Hudi sink table
create table hudi_sink (
database_name string,
table_name string,
id decimal(20,0) not null,
msg_title string,
msg_ctx string,
msg_time timestamp(6),
primary key (database_name, table_name, id) not enforced
) with (
'connector'='hudi',
'path'='hdfs://nnha/user/hudi/warehouse/hudi_db/info_message',
'table.type'='MERGE_ON_READ',
'hoodie.datasource.write.recordkey.field'='database_name.table_name.id',
'write.precombine.field'='msg_time',
'write.rate.limit'='2000',
'write.operation'='upsert',
'compaction.trigger.strategy'='num_commits',
'compaction.delta_commits'='5',
'changelog.enabled'='true'
);4.5 Streaming insert
Insert data from the MySQL source into the Hudi sink, optionally setting a server‑id for each parallel slot.
insert into hudi_sink select database_name, table_name, id, msg_title, msg_ctx, msg_time from mysql_source /*+ OPTIONS('server-id'='5401') */ where msg_time is not null;Note: Adding a timestamp filter may cause the job to stall at the write_stream stage, and parallelism greater than one requires distinct server‑ids for each slot (e.g., '5401-5404').
Signed-in readers can open the original source through BestHub's protected redirect.
This article has been distilled and summarized from source material, then republished for learning and reference. If you believe it infringes your rights, please contactand we will review it promptly.
Big Data Technology & Architecture
Wang Zhiwu, a big data expert, dedicated to sharing big data technology.
How this landed with the community
Was this worth your time?
0 Comments
Thoughtful readers leave field notes, pushback, and hard-won operational detail here.
