Big Data 10 min read

Flink CDC Tutorial: Sync MySQL Data to Hudi Data Lake Using SQL

This article provides a comprehensive guide on using Flink CDC with Debezium to capture MySQL changes, covering serialization, adding dependencies, configuring SQL client and Java/Scala APIs, creating source and sink tables, enabling checkpoints, and streaming data into a Hudi data lake.

Big Data Technology & Architecture
Big Data Technology & Architecture
Big Data Technology & Architecture
Flink CDC Tutorial: Sync MySQL Data to Hudi Data Lake Using SQL

1. Introduction

Flink CDC uses Debezium for change data capture. Its features include parallel snapshot reading, chunk‑based snapshot division, no global read lock, and a consistency process that marks low/high binlog positions and reads accordingly.

2. Deserialization

Example JSON change event is shown. The JsonDebeziumDeserializationSchema can include schema and custom Decimal format configuration.

Map<String, Object> customConverterConfigs = new HashMap<>();<br/>customConverterConfigs.put(JsonConverterConfig.DECIMAL_FORMAT_CONFIG, "numeric");<br/>JsonDebeziumDeserializationSchema schema = new JsonDebeziumDeserializationSchema(true, customConverterConfigs);

3. Adding Flink CDC Dependencies

3.1 sql‑client

Download flink-sql-connector-mysql-cdc-2.2.0.jar, place it in the lib directory of every Flink node, restart the cluster and start sql-client.sh.

3.2 Java/Scala API

Add the following Maven dependency to pom.xml:

<dependency><br/>    <groupId>com.ververica</groupId><br/>    <artifactId>flink-connector-mysql-cdc</artifactId><br/>    <version>2.2.0</version><br/></dependency>

4. Sync MySQL Data to Hudi via SQL

4.1 Table definition

MySQL source table info_message is created with columns id, msg_title, msg_ctx, msg_time and primary key id.

CREATE TABLE `info_message` (<br/>  `id` bigint(20) NOT NULL AUTO_INCREMENT COMMENT '主键',<br/>  `msg_title` varchar(100) DEFAULT NULL COMMENT '消息名称',<br/>  `msg_ctx` varchar(2048) DEFAULT NULL COMMENT '消息内容',<br/>  `msg_time` datetime DEFAULT NULL COMMENT '消息发送时间',<br/>  PRIMARY KEY (`id`) <br/>);

4.2 Enable checkpoint

Set execution.checkpointing.interval='10s' to allow exactly‑once semantics and ensure a full checkpoint before binlog reading.

4.3 Create MySQL source table in Flink

create table mysql_source (
    database_name string metadata from 'database_name' virtual,
    table_name string metadata from 'table_name' virtual,
    id decimal(20,0) not null,
    msg_title string,
    msg_ctx string,
    msg_time timestamp(9),
    primary key (id) not enforced
) with (
    'connector'='mysql-cdc',
    'hostname'='192.168.8.124',
    'port'='3306',
    'username'='hnmqet',
    'password'='hnmq123456',
    'server-time-zone'='Asia/Shanghai',
    'scan.startup.mode'='initial',
    'database-name'='d_general',
    'table-name'='info_message'
);

4.4 Create Hudi sink table

create table hudi_sink (
    database_name string,
    table_name string,
    id decimal(20,0) not null,
    msg_title string,
    msg_ctx string,
    msg_time timestamp(6),
    primary key (database_name, table_name, id) not enforced
) with (
    'connector'='hudi',
    'path'='hdfs://nnha/user/hudi/warehouse/hudi_db/info_message',
    'table.type'='MERGE_ON_READ',
    'hoodie.datasource.write.recordkey.field'='database_name.table_name.id',
    'write.precombine.field'='msg_time',
    'write.rate.limit'='2000',
    'write.operation'='upsert',
    'compaction.trigger.strategy'='num_commits',
    'compaction.delta_commits'='5',
    'changelog.enabled'='true'
);

4.5 Streaming insert

Insert data from the MySQL source into the Hudi sink, optionally setting a server‑id for each parallel slot.

insert into hudi_sink select database_name, table_name, id, msg_title, msg_ctx, msg_time from mysql_source /*+ OPTIONS('server-id'='5401') */ where msg_time is not null;

Note: Adding a timestamp filter may cause the job to stall at the write_stream stage, and parallelism greater than one requires distinct server‑ids for each slot (e.g., '5401-5404').

Original Source

Signed-in readers can open the original source through BestHub's protected redirect.

Sign in to view source
Republication Notice

This article has been distilled and summarized from source material, then republished for learning and reference. If you believe it infringes your rights, please contactadmin@besthub.devand we will review it promptly.

FlinkSQLmysqlbigdataCDCHudiDataLake
Big Data Technology & Architecture
Written by

Big Data Technology & Architecture

Wang Zhiwu, a big data expert, dedicated to sharing big data technology.

0 followers
Reader feedback

How this landed with the community

Sign in to like

Rate this article

Was this worth your time?

Sign in to rate
Discussion

0 Comments

Thoughtful readers leave field notes, pushback, and hard-won operational detail here.