Big Data 7 min read

Flink 1.12 Enhancements: Full SQL Support, Hive Integration, and Streaming Write to Hive

The article reviews Flink 1.12's major enhancements, including comprehensive SQL capabilities, deep integration with Hive via catalog and streaming support, and a practical code example that demonstrates how to write streaming data into Hive tables while handling partition commits and small‑file merging.

Big Data Technology & Architecture
Big Data Technology & Architecture
Big Data Technology & Architecture
Flink 1.12 Enhancements: Full SQL Support, Hive Integration, and Streaming Write to Hive

Continuing from the previous discussion on Flink's production‑grade on‑Kubernetes high‑availability solution and DataStream API batch execution support, this article introduces several new feature enhancements in Flink 1.12.

Feature 3 – Full SQL Support

Early discussions in the Flink community highlighted the need for SQL support; after version 0.9 the Table API and code‑generation tools laid the groundwork. The Blink branch’s SQL engine was merged into the main line, and Flink 1.12 further optimizes it with features such as Upsert Kafka connector, Temporal Table Join, join optimizations, and user‑defined function (UDF) support, making Flink SQL comparable to Spark SQL.

Feature 4 – Full Hive Integration

Integration with Hive marks Flink’s ability to compete directly with Spark in the Hadoop ecosystem. Two integration layers are provided: (1) using Hive Metastore as a persistent catalog via HiveCatalog, allowing Flink metadata to be stored and reused; (2) enabling Flink to read and write Hive tables directly. HiveCatalog offers out‑of‑the‑box compatibility without modifying the existing Hive Metastore or table locations.

Key Hive‑related capabilities include streaming writes to Hive and advanced partition‑commit policies. The following code demonstrates a streaming job that reads from a Kafka source and writes into a Hive table using the FileSystem connector:

public class StreamingWriteHive {
    private static final String KAFKA_SQL = "CREATE TABLE kafkaTable (
" +
            " code STRING," +
            " total_emp INT ," +
            " ts bigint ," +
            " r_t AS TO_TIMESTAMP(FROM_UNIXTIME(ts,'yyyy-MM-dd HH:mm:ss'),'yyyy-MM-dd HH:mm:ss'),
" +
            " WATERMARK FOR r_t AS r_t - INTERVAL '5' SECOND " +
            ") WITH (
" +
            " 'connector' = 'kafka',
" +
            " 'topic' = 'flink_dwd_test4',
" +
            " 'properties.bootstrap.servers' = 'node1:9092',
" +
            " 'properties.group.id' = 'test1',
" +
            " 'format' = 'json',
" +
            " 'scan.startup.mode' = 'latest-offset'
" +
            ");
    public static void main(String[] args) throws Exception{
        StreamExecutionEnvironment bsEnv = StreamExecutionEnvironment.getExecutionEnvironment();
        EnvironmentSettings bsSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
        StreamTableEnvironment tEnv = StreamTableEnvironment.create(bsEnv, bsSettings);
        bsEnv.enableCheckpointing(5000);
        bsEnv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
        String name = "myhive";
        String defaultDatabase = "flink";
        String hiveConfDir = ""; // a local path
        String version = "1.1.0";
        HiveCatalog hive = new HiveCatalog(name, defaultDatabase, hiveConfDir, version);
        tEnv.registerCatalog("myhive", hive);
        tEnv.useCatalog("myhive");
        tEnv.getConfig().setSqlDialect(SqlDialect.DEFAULT);
        tEnv.executeSql("drop table kafkaTable22");
        tEnv.executeSql(KAFKA_SQL);
        tEnv.getConfig().setSqlDialect(SqlDialect.HIVE);
        String hiveSql = "CREATE  TABLE  fs_table (
" +
                "  f_random_str STRING,
" +
                "  f_sequence INT
" +
                ") partitioned by (dt string,hr string) " +
                "stored as PARQUET " +
                "TBLPROPERTIES (
" +
                "  'partition.time-extractor.timestamp-pattern'='$dt $hr:00:00',
" +
                "  'sink.partition-commit.delay'='5 s',
" +
                "  'sink.partition-commit.trigger'='partition-time',
" +
                "  'sink.partition-commit.policy.kind'='metastore,success-file'
" +
                ")";
        tEnv.executeSql(hiveSql);
        String insertSql = "insert into fs_table1111 SELECT code, total_emp, " +
                " DATE_FORMAT(r_t, 'yyyy-MM-dd'), DATE_FORMAT(r_t, 'HH') FROM kafkaTable22";
        tEnv.executeSql(insertSql).print();
    }
}

Hive support also introduces two notable features: flexible partition‑commit triggers and policies, and small‑file merging in the FileSystem connector (enabled via auto-compaction = true) to avoid the proliferation of tiny files during frequent checkpoints.

Beyond these core improvements, Flink continues to expand its ecosystem to address real‑world business challenges, promising further innovations.

Original Source

Signed-in readers can open the original source through BestHub's protected redirect.

Sign in to view source
Republication Notice

This article has been distilled and summarized from source material, then republished for learning and reference. If you believe it infringes your rights, please contactadmin@besthub.devand we will review it promptly.

FlinkSQLStreamingHiveData Integration
Big Data Technology & Architecture
Written by

Big Data Technology & Architecture

Wang Zhiwu, a big data expert, dedicated to sharing big data technology.

0 followers
Reader feedback

How this landed with the community

Sign in to like

Rate this article

Was this worth your time?

Sign in to rate
Discussion

0 Comments

Thoughtful readers leave field notes, pushback, and hard-won operational detail here.