Big Data 15 min read

Step-by-Step Guide to Setting Up Flink CDC with MySQL, Hudi, and Hive Integration on a Hadoop Cluster

This comprehensive tutorial walks through configuring a Hadoop‑based environment (Flink 1.13.1, Scala 2.11, CDH 6.2.0, Hive 2.1.1, Hudi 0.10), compiling Hudi, setting up Flink and MySQL binlog, creating CDC source and Hudi sink tables, running Flink jobs, and synchronizing the results to Hive partitions for query via Hive and Presto.

Big Data Technology Architecture
Big Data Technology Architecture
Big Data Technology Architecture
Step-by-Step Guide to Setting Up Flink CDC with MySQL, Hudi, and Hive Integration on a Hadoop Cluster

The guide starts by listing the software versions used: Flink 1.13.1, Scala 2.11, CDH 6.2.0, Hadoop 3.0.0, Hive 2.1.1, Hudi 0.10 (master), PrestoDB 0.256, and MySQL 5.7.

Cluster preparation includes installing Maven and JDK, setting Hadoop environment variables ( export HADOOP_HOME=/opt/cloudera/parcels/CDH/lib/hadoop and export HADOOP_CLASSPATH=`$HADOOP_HOME/bin/hadoop classpath`), and configuring Hudi Maven mirrors:

<mirrors>
  <mirror>
    <id>alimaven</id>
    <mirrorOf>central,!cloudera</mirrorOf>
    <name>aliyun maven</name>
    <url>http://maven.aliyun.com/nexus/content/groups/public/</url>
  </mirror>
</mirrors>

Hudi source code is cloned ( git clone https://github.com/apache/hudi.git) and compiled with the required Hadoop and Hive versions using:

mvn clean install -DskipTests=true -Drat.skip=true -Dscala-2.11 -Dhadoop.version=3.0.0 -Pflink-bundle-shade-hive2

After a first compilation (≈50 min) subsequent builds are faster (≈15 min). Compilation errors are fixed by adding the CDH repository to Hudi’s pom.xml.

Flink is configured (Flink 1.13.1, Scala 2.11) with state.backend=rocksdb, incremental checkpoints, and HDFS checkpoint directory. Required connector JARs (MySQL CDC, Oracle CDC, Kafka, Hadoop client libs, and the compiled Hudi‑flink‑bundle JAR) are placed in $FLINK_HOME/lib. The Flink session is started on YARN:

$FLINK_HOME/bin/yarn-session.sh -s 2 -jm 2048 -tm 2048 -nm ys-hudi01 -d

MySQL binlog is enabled by creating a log directory, setting ownership, and editing /etc/my.cnf:

server-id=2
log-bin=/mysqldata/logs/mysql-bin
binlog_format=row
expire_logs_days=15
binlog_row_image=full

The CDC source table is defined in Flink SQL:

CREATE TABLE mysql_users (
  id BIGINT PRIMARY KEY NOT ENFORCED,
  name STRING,
  birthday TIMESTAMP(3),
  ts TIMESTAMP(3)
) WITH (
  'connector'='mysql-cdc',
  'hostname'='127.0.0.1',
  'port'='3306',
  'username'='',
  'password'='',
  'server-time-zone'='Asia/Shanghai',
  'debezium.snapshot.mode'='initial',
  'database-name'='luo',
  'table-name'='users_cdc'
);

A temporary view adds a partition column:

CREATE VIEW mycdc_v AS SELECT *, DATE_FORMAT(birthday, 'yyyyMMdd') AS `partition` FROM mysql_users;

The Hudi sink table (MERGE_ON_READ) with Hive synchronization is created:

CREATE TABLE mysqlcdc_sync_hive01 (
  id BIGINT,
  name STRING,
  birthday TIMESTAMP(3),
  ts TIMESTAMP(3),
  `partition` VARCHAR(20),
  PRIMARY KEY(id) NOT ENFORCED
) PARTITIONED BY (`partition`) WITH (
  'connector'='hudi',
  'path'='hdfs://nameservice/luo/hudi/mysqlcdc_sync_hive01',
  'hoodie.datasource.write.recordkey.field'='id',
  'write.precombine.field'='ts',
  'write.tasks'='1',
  'compaction.tasks'='1',
  'write.rate.limit'='2000',
  'table.type'='MERGE_ON_READ',
  'compaction.async.enabled'='true',
  'compaction.trigger.strategy'='num_commits',
  'compaction.delta_commits'='1',
  'changelog.enabled'='true',
  'read.streaming.enabled'='true',
  'read.streaming.check-interval'='3',
  'hive_sync.enable'='true',
  'hive_sync.mode'='hms',
  'hive_sync.metastore.uris'='thrift://hadoop:9083',
  'hive_sync.table'='mysqlcdc_sync_hive01',
  'hive_sync.db'='luo',
  'hive_sync.username'='',
  'hive_sync.password'='',
  'hive_sync.support_timestamp'='true'
);

Data is inserted into the source MySQL table, and Flink streams the changes into Hudi:

INSERT INTO mysqlcdc_sync_hive01 SELECT id, name, birthday, ts, `partition` FROM mycdc_v;

The tutorial shows the resulting Hudi file layout on HDFS, the automatic creation of Hive read‑optimized (RO) and real‑time (RT) tables, and how to query them via Hive and Presto. It also covers handling of delete operations, checkpoint configuration, and troubleshooting Hive count issues by adding hudi-hadoop-mr-bundle-0.10.0-SNAPSHOT.jar and setting

hive.input.format=org.apache.hudi.hadoop.hive.HoodieCombineHiveInputFormat

.

Finally, the guide demonstrates using the Hudi CLI to inspect commits and compactions, and provides PrestoDB configuration notes for querying the synchronized Hive tables.

FlinkHiveMySQLCDCHudi
Big Data Technology Architecture
Written by

Big Data Technology Architecture

Exploring Open Source Big Data and AI Technologies

0 followers
Reader feedback

How this landed with the community

Sign in to like

Rate this article

Was this worth your time?

Sign in to rate
Discussion

0 Comments

Thoughtful readers leave field notes, pushback, and hard-won operational detail here.