Big Data 10 min read

Using Flink CDC to Write Data into Apache Hudi and Query with Hive and Spark SQL

This guide walks through preparing the environment, creating a MySQL source table, configuring Flink CDC to ingest data into an Apache Hudi table, and then querying the Hudi data using both Hive and Spark‑SQL, including handling of partitions, realtime input formats, and required configuration settings.

Big Data Technology & Architecture
Big Data Technology & Architecture
Big Data Technology & Architecture
Using Flink CDC to Write Data into Apache Hudi and Query with Hive and Spark SQL

1. Environment preparation: install Flink 1.12.2 (Scala 2.11), Hudi 0.9.0‑SNAPSHOT, Spark 2.4.5, Hadoop 3.1.3, Hive 3.1.2.

2. Create a MySQL table users and insert sample data.

create table users (
    id bigint auto_increment primary key,
    name varchar(20) null,
    birthday timestamp default CURRENT_TIMESTAMP not null,
    ts timestamp default CURRENT_TIMESTAMP not null
);
insert into users (name) values ('hello');
insert into users (name) values ('world');
insert into users (name) values ('iceberg');
insert into users (id, name) values (4, 'spark');
insert into users (name) values ('hudi');
select * from users;
update users set name = 'hello spark' where id = 5;
delete from users where id = 5;

3. Start Flink sql‑client and define a CDC source table mysql_users that reads from MySQL.

CREATE TABLE mysql_users (
    id BIGINT PRIMARY KEY NOT ENFORCED,
    name STRING,
    birthday TIMESTAMP(3),
    ts TIMESTAMP(3)
) WITH (
    'connector'='mysql-cdc',
    'hostname'='localhost',
    'port'='3306',
    'username'='root',
    'password'='123456',
    'server-time-zone'='Asia/Shanghai',
    'database-name'='mydb',
    'table-name'='users'
);

4. Define a Hudi sink table hudi_users2 (MERGE_ON_READ) with a partition column.

CREATE TABLE hudi_users2 (
    id BIGINT PRIMARY KEY NOT ENFORCED,
    name STRING,
    birthday TIMESTAMP(3),
    ts TIMESTAMP(3),
    `partition` VARCHAR(20)
) PARTITIONED BY (`partition`) WITH (
    'connector'='hudi',
    'table.type'='MERGE_ON_READ',
    'path'='hdfs://localhost:9000/hudi/hudi_users2',
    'read.streaming.enabled'='true',
    'read.streaming.check-interval'='1'
);

5. Insert CDC data into Hudi.

INSERT INTO hudi_users2
SELECT *, DATE_FORMAT(birthday,'yyyyMMdd')
FROM mysql_users;

After the Flink job is submitted, the task UI can be checked and the Hudi data appears in HDFS after several checkpoints.

6. Query the Hudi table with Hive. Copy hudi-hadoop-mr-bundle-0.9.0-SNAPSHOT.jar to Hive lib, then create external tables using either HoodieParquetInputFormat (static view) or HoodieParquetRealtimeInputFormat (real‑time view). Add partitions and run SELECT queries.

-- static view
CREATE EXTERNAL TABLE `hudi_users_2` (
    `_hoodie_commit_time` string,
    `_hoodie_commit_seqno` string,
    `_hoodie_record_key` string,
    `_hoodie_partition_path` string,
    `_hoodie_file_name` string,
    `id` bigint,
    `name` string,
    `birthday` bigint,
    `ts` bigint
) PARTITIONED BY (`partition` string)
ROW FORMAT SERDE 'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe'
STORED AS INPUTFORMAT 'org.apache.hudi.hadoop.HoodieParquetInputFormat'
OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat'
LOCATION 'hdfs://localhost:9000/hudi/hudi_users2';

-- realtime view
CREATE EXTERNAL TABLE `hudi_users_2_mor` (
    ... same columns ...
) PARTITIONED BY (`partition` string)
ROW FORMAT SERDE 'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe'
STORED AS INPUTFORMAT 'org.apache.hudi.hadoop.realtime.HoodieParquetRealtimeInputFormat'
OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat'
LOCATION 'hdfs://localhost:9000/hudi/hudi_users2';

ALTER TABLE hudi_users_2 ADD IF NOT EXISTS PARTITION (`partition`='20210414') LOCATION 'hdfs://localhost:9000/hudi/hudi_users2/20210414';
SELECT * FROM hudi_users_2 WHERE `partition`='20210414';

Hive may require setting

set hive.input.format = org.apache.hudi.hadoop.hive.HoodieCombineHiveInputFormat;

to avoid errors with the realtime input format.

7. For Spark‑SQL, copy hudi-spark-bundle_2.11-0.9.0-SNAPSHOT.jar to $SPARK_HOME/jars and hudi-hadoop-mr-bundle-0.9.0-SNAPSHOT.jar to $HADOOP_HOME/share/hadoop/hdfs on every node, restart Hadoop, and create external tables similar to Hive.

CREATE EXTERNAL TABLE `hudi_users3_spark` (
    `_hoodie_commit_time` string,
    `_hoodie_commit_seqno` string,
    `_hoodie_record_key` string,
    `_hoodie_partition_path` string,
    `_hoodie_file_name` string,
    `id` bigint,
    `name` string,
    `birthday` bigint,
    `ts` bigint
) PARTITIONED BY (`partition` string)
ROW FORMAT SERDE 'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe'
STORED AS INPUTFORMAT 'org.apache.hudi.hadoop.HoodieParquetInputFormat'
OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat'
LOCATION 'hdfs://localhost:9000/hudi/hudi_users3';

ALTER TABLE hudi_users3_spark ADD IF NOT EXISTS PARTITION (`partition`='20210414') LOCATION 'hdfs://localhost:9000/hudi/hudi_users3/20210414';

SET spark.sql.hive.convertMetastoreParquet=false;
SELECT * FROM hudi_users3_spark WHERE `partition`='20210414';

8. Ensure column types in the external table match the Hudi schema; using STRING for BIGINT columns leads to query failures.

9. Future work: test the pipeline with production‑scale data to evaluate performance and stability of Flink‑CDC writing to Hudi.

Original Source

Signed-in readers can open the original source through BestHub's protected redirect.

Sign in to view source
Republication Notice

This article has been distilled and summarized from source material, then republished for learning and reference. If you believe it infringes your rights, please contactadmin@besthub.devand we will review it promptly.

FlinkSparkSQLHivebigdataCDCHudiDataPipeline
Big Data Technology & Architecture
Written by

Big Data Technology & Architecture

Wang Zhiwu, a big data expert, dedicated to sharing big data technology.

0 followers
Reader feedback

How this landed with the community

Sign in to like

Rate this article

Was this worth your time?

Sign in to rate
Discussion

0 Comments

Thoughtful readers leave field notes, pushback, and hard-won operational detail here.