Step-by-Step Guide to Setting Up Flink CDC with MySQL, Hudi, and Hive Integration on a Hadoop Cluster
This comprehensive tutorial walks through configuring a Hadoop‑based environment (Flink 1.13.1, Scala 2.11, CDH 6.2.0, Hive 2.1.1, Hudi 0.10), compiling Hudi, setting up Flink and MySQL binlog, creating CDC source and Hudi sink tables, running Flink jobs, and synchronizing the results to Hive partitions for query via Hive and Presto.
The guide starts by listing the software versions used: Flink 1.13.1, Scala 2.11, CDH 6.2.0, Hadoop 3.0.0, Hive 2.1.1, Hudi 0.10 (master), PrestoDB 0.256, and MySQL 5.7.
Cluster preparation includes installing Maven and JDK, setting Hadoop environment variables ( export HADOOP_HOME=/opt/cloudera/parcels/CDH/lib/hadoop and export HADOOP_CLASSPATH=`$HADOOP_HOME/bin/hadoop classpath`), and configuring Hudi Maven mirrors:
<mirrors>
<mirror>
<id>alimaven</id>
<mirrorOf>central,!cloudera</mirrorOf>
<name>aliyun maven</name>
<url>http://maven.aliyun.com/nexus/content/groups/public/</url>
</mirror>
</mirrors>Hudi source code is cloned ( git clone https://github.com/apache/hudi.git) and compiled with the required Hadoop and Hive versions using:
mvn clean install -DskipTests=true -Drat.skip=true -Dscala-2.11 -Dhadoop.version=3.0.0 -Pflink-bundle-shade-hive2After a first compilation (≈50 min) subsequent builds are faster (≈15 min). Compilation errors are fixed by adding the CDH repository to Hudi’s pom.xml.
Flink is configured (Flink 1.13.1, Scala 2.11) with state.backend=rocksdb, incremental checkpoints, and HDFS checkpoint directory. Required connector JARs (MySQL CDC, Oracle CDC, Kafka, Hadoop client libs, and the compiled Hudi‑flink‑bundle JAR) are placed in $FLINK_HOME/lib. The Flink session is started on YARN:
$FLINK_HOME/bin/yarn-session.sh -s 2 -jm 2048 -tm 2048 -nm ys-hudi01 -dMySQL binlog is enabled by creating a log directory, setting ownership, and editing /etc/my.cnf:
server-id=2
log-bin=/mysqldata/logs/mysql-bin
binlog_format=row
expire_logs_days=15
binlog_row_image=fullThe CDC source table is defined in Flink SQL:
CREATE TABLE mysql_users (
id BIGINT PRIMARY KEY NOT ENFORCED,
name STRING,
birthday TIMESTAMP(3),
ts TIMESTAMP(3)
) WITH (
'connector'='mysql-cdc',
'hostname'='127.0.0.1',
'port'='3306',
'username'='',
'password'='',
'server-time-zone'='Asia/Shanghai',
'debezium.snapshot.mode'='initial',
'database-name'='luo',
'table-name'='users_cdc'
);A temporary view adds a partition column:
CREATE VIEW mycdc_v AS SELECT *, DATE_FORMAT(birthday, 'yyyyMMdd') AS `partition` FROM mysql_users;The Hudi sink table (MERGE_ON_READ) with Hive synchronization is created:
CREATE TABLE mysqlcdc_sync_hive01 (
id BIGINT,
name STRING,
birthday TIMESTAMP(3),
ts TIMESTAMP(3),
`partition` VARCHAR(20),
PRIMARY KEY(id) NOT ENFORCED
) PARTITIONED BY (`partition`) WITH (
'connector'='hudi',
'path'='hdfs://nameservice/luo/hudi/mysqlcdc_sync_hive01',
'hoodie.datasource.write.recordkey.field'='id',
'write.precombine.field'='ts',
'write.tasks'='1',
'compaction.tasks'='1',
'write.rate.limit'='2000',
'table.type'='MERGE_ON_READ',
'compaction.async.enabled'='true',
'compaction.trigger.strategy'='num_commits',
'compaction.delta_commits'='1',
'changelog.enabled'='true',
'read.streaming.enabled'='true',
'read.streaming.check-interval'='3',
'hive_sync.enable'='true',
'hive_sync.mode'='hms',
'hive_sync.metastore.uris'='thrift://hadoop:9083',
'hive_sync.table'='mysqlcdc_sync_hive01',
'hive_sync.db'='luo',
'hive_sync.username'='',
'hive_sync.password'='',
'hive_sync.support_timestamp'='true'
);Data is inserted into the source MySQL table, and Flink streams the changes into Hudi:
INSERT INTO mysqlcdc_sync_hive01 SELECT id, name, birthday, ts, `partition` FROM mycdc_v;The tutorial shows the resulting Hudi file layout on HDFS, the automatic creation of Hive read‑optimized (RO) and real‑time (RT) tables, and how to query them via Hive and Presto. It also covers handling of delete operations, checkpoint configuration, and troubleshooting Hive count issues by adding hudi-hadoop-mr-bundle-0.10.0-SNAPSHOT.jar and setting
hive.input.format=org.apache.hudi.hadoop.hive.HoodieCombineHiveInputFormat.
Finally, the guide demonstrates using the Hudi CLI to inspect commits and compactions, and provides PrestoDB configuration notes for querying the synchronized Hive tables.
Big Data Technology Architecture
Exploring Open Source Big Data and AI Technologies
How this landed with the community
Was this worth your time?
0 Comments
Thoughtful readers leave field notes, pushback, and hard-won operational detail here.
