Flink 1.12 Enhancements: Full SQL Support, Hive Integration, and Streaming Write to Hive
The article reviews Flink 1.12's major enhancements, including comprehensive SQL capabilities, deep integration with Hive via catalog and streaming support, and a practical code example that demonstrates how to write streaming data into Hive tables while handling partition commits and small‑file merging.
Continuing from the previous discussion on Flink's production‑grade on‑Kubernetes high‑availability solution and DataStream API batch execution support, this article introduces several new feature enhancements in Flink 1.12.
Feature 3 – Full SQL Support
Early discussions in the Flink community highlighted the need for SQL support; after version 0.9 the Table API and code‑generation tools laid the groundwork. The Blink branch’s SQL engine was merged into the main line, and Flink 1.12 further optimizes it with features such as Upsert Kafka connector, Temporal Table Join, join optimizations, and user‑defined function (UDF) support, making Flink SQL comparable to Spark SQL.
Feature 4 – Full Hive Integration
Integration with Hive marks Flink’s ability to compete directly with Spark in the Hadoop ecosystem. Two integration layers are provided: (1) using Hive Metastore as a persistent catalog via HiveCatalog, allowing Flink metadata to be stored and reused; (2) enabling Flink to read and write Hive tables directly. HiveCatalog offers out‑of‑the‑box compatibility without modifying the existing Hive Metastore or table locations.
Key Hive‑related capabilities include streaming writes to Hive and advanced partition‑commit policies. The following code demonstrates a streaming job that reads from a Kafka source and writes into a Hive table using the FileSystem connector:
public class StreamingWriteHive {
private static final String KAFKA_SQL = "CREATE TABLE kafkaTable (
" +
" code STRING," +
" total_emp INT ," +
" ts bigint ," +
" r_t AS TO_TIMESTAMP(FROM_UNIXTIME(ts,'yyyy-MM-dd HH:mm:ss'),'yyyy-MM-dd HH:mm:ss'),
" +
" WATERMARK FOR r_t AS r_t - INTERVAL '5' SECOND " +
") WITH (
" +
" 'connector' = 'kafka',
" +
" 'topic' = 'flink_dwd_test4',
" +
" 'properties.bootstrap.servers' = 'node1:9092',
" +
" 'properties.group.id' = 'test1',
" +
" 'format' = 'json',
" +
" 'scan.startup.mode' = 'latest-offset'
" +
");
public static void main(String[] args) throws Exception{
StreamExecutionEnvironment bsEnv = StreamExecutionEnvironment.getExecutionEnvironment();
EnvironmentSettings bsSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
StreamTableEnvironment tEnv = StreamTableEnvironment.create(bsEnv, bsSettings);
bsEnv.enableCheckpointing(5000);
bsEnv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
String name = "myhive";
String defaultDatabase = "flink";
String hiveConfDir = ""; // a local path
String version = "1.1.0";
HiveCatalog hive = new HiveCatalog(name, defaultDatabase, hiveConfDir, version);
tEnv.registerCatalog("myhive", hive);
tEnv.useCatalog("myhive");
tEnv.getConfig().setSqlDialect(SqlDialect.DEFAULT);
tEnv.executeSql("drop table kafkaTable22");
tEnv.executeSql(KAFKA_SQL);
tEnv.getConfig().setSqlDialect(SqlDialect.HIVE);
String hiveSql = "CREATE TABLE fs_table (
" +
" f_random_str STRING,
" +
" f_sequence INT
" +
") partitioned by (dt string,hr string) " +
"stored as PARQUET " +
"TBLPROPERTIES (
" +
" 'partition.time-extractor.timestamp-pattern'='$dt $hr:00:00',
" +
" 'sink.partition-commit.delay'='5 s',
" +
" 'sink.partition-commit.trigger'='partition-time',
" +
" 'sink.partition-commit.policy.kind'='metastore,success-file'
" +
")";
tEnv.executeSql(hiveSql);
String insertSql = "insert into fs_table1111 SELECT code, total_emp, " +
" DATE_FORMAT(r_t, 'yyyy-MM-dd'), DATE_FORMAT(r_t, 'HH') FROM kafkaTable22";
tEnv.executeSql(insertSql).print();
}
}Hive support also introduces two notable features: flexible partition‑commit triggers and policies, and small‑file merging in the FileSystem connector (enabled via auto-compaction = true) to avoid the proliferation of tiny files during frequent checkpoints.
Beyond these core improvements, Flink continues to expand its ecosystem to address real‑world business challenges, promising further innovations.
Signed-in readers can open the original source through BestHub's protected redirect.
This article has been distilled and summarized from source material, then republished for learning and reference. If you believe it infringes your rights, please contactand we will review it promptly.
Big Data Technology & Architecture
Wang Zhiwu, a big data expert, dedicated to sharing big data technology.
How this landed with the community
Was this worth your time?
0 Comments
Thoughtful readers leave field notes, pushback, and hard-won operational detail here.
