Big Data 13 min read

Integrating Flink 1.11 with Hive Streaming, Kafka, and Table API

This article demonstrates how to use Flink 1.11's enhanced Hive integration to stream data from a Kafka source, write it into partitioned Hive tables with checkpoint‑driven commits, and read Hive tables as a continuous source using dynamic table options and table hints.

Big Data Technology & Architecture
Big Data Technology & Architecture
Big Data Technology & Architecture
Integrating Flink 1.11 with Hive Streaming, Kafka, and Table API

The author originally planned a deep dive into the StreamingFileSink source code but decided to showcase the new features of Flink 1.11, focusing on the significantly improved Hive integration that enables true stream‑batch unification.

Adding Dependencies

<properties>
  <scala.bin.version>2.11</scala.bin.version>
  <flink.version>1.11.0</flink.version>
  <flink-shaded-hadoop.version>2.6.5-10.0</flink-shaded-hadoop.version>
  <hive.version>1.1.0</hive.version>
</properties>

<dependencies>
  <dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-streaming-scala_${scala.bin.version}</artifactId>
    <version>${flink.version}</version>
  </dependency>
  ... (other Flink and Hive dependencies) ...
</dependencies>

Creating the Execution Environment

val streamEnv = StreamExecutionEnvironment.getExecutionEnvironment
streamEnv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
streamEnv.setParallelism(3)

val tableEnvSettings = EnvironmentSettings.newInstance()
    .useBlinkPlanner()
    .inStreamingMode()
    .build()
val tableEnv = StreamTableEnvironment.create(streamEnv, tableEnvSettings)

tableEnv.getConfig.getConfiguration.set(ExecutionCheckpointingOptions.CHECKPOINTING_MODE, CheckpointingMode.EXACTLY_ONCE)
 tableEnv.getConfig.getConfiguration.set(ExecutionCheckpointingOptions.CHECKPOINTING_INTERVAL, Duration.ofSeconds(20))

Registering HiveCatalog

val catalogName = "my_catalog"
val catalog = new HiveCatalog(
  catalogName,
  "default",
  "/Users/lmagic/develop",
  "1.1.0"
)

tableEnv.registerCatalog(catalogName, catalog)
 tableEnv.useCatalog(catalogName)

Creating the Kafka Source Table

The Kafka topic contains JSON‑encoded event logs; a computed column creates event time and a watermark. The SQL syntax is simplified compared with Flink 1.10.

tableEnv.executeSql("CREATE DATABASE IF NOT EXISTS stream_tmp")
 tableEnv.executeSql("DROP TABLE IF EXISTS stream_tmp.analytics_access_log_kafka")

 tableEnv.executeSql(
   """
     |CREATE TABLE stream_tmp.analytics_access_log_kafka (
     |  ts BIGINT,
     |  userId BIGINT,
     |  eventType STRING,
     |  fromType STRING,
     |  columnType STRING,
     |  siteId BIGINT,
     |  grouponId BIGINT,
     |  partnerId BIGINT,
     |  merchandiseId BIGINT,
     |  procTime AS PROCTIME(),
     |  eventTime AS TO_TIMESTAMP(FROM_UNIXTIME(ts / 1000,'yyyy-MM-dd HH:mm:ss')),
     |  WATERMARK FOR eventTime AS eventTime - INTERVAL '15' SECOND
     |) WITH (
     |  'connector' = 'kafka',
     |  'topic' = 'ods_analytics_access_log',
     |  'properties.bootstrap.servers' = 'kafka110:9092,kafka111:9092,kafka112:9092',
     |  'properties.group.id' = 'flink_hive_integration_exp_1',
     |  'scan.startup.mode' = 'latest-offset',
     |  'format' = 'json',
     |  'json.fail-on-missing-field' = 'false',
     |  'json.ignore-parse-errors' = 'true'
     |)
   """.stripMargin)

Creating the Hive Target Table

Using Hive‑compatible DDL (SqlDialect.HIVE) the table is partitioned by date, hour, and minute, and several sink‑commit options are set to control when partitions become visible.

tableEnv.getConfig.setSqlDialect(SqlDialect.HIVE)

tableEnv.executeSql("CREATE DATABASE IF NOT EXISTS hive_tmp")
 tableEnv.executeSql("DROP TABLE IF EXISTS hive_tmp.analytics_access_log_hive")

 tableEnv.executeSql(
   """
     |CREATE TABLE hive_tmp.analytics_access_log_hive (
     |  ts BIGINT,
     |  user_id BIGINT,
     |  event_type STRING,
     |  from_type STRING,
     |  column_type STRING,
     |  site_id BIGINT,
     |  groupon_id BIGINT,
     |  partner_id BIGINT,
     |  merchandise_id BIGINT
     |) PARTITIONED BY (
     |  ts_date STRING,
     |  ts_hour STRING,
     |  ts_minute STRING
     |) STORED AS PARQUET
     |TBLPROPERTIES (
     |  'sink.partition-commit.trigger' = 'partition-time',
     |  'sink.partition-commit.delay' = '1 min',
     |  'sink.partition-commit.policy.kind' = 'metastore,success-file',
     |  'partition.time-extractor.timestamp-pattern' = '$ts_date $ts_hour:$ts_minute:00'
     |)
   """.stripMargin)

Streaming Write to Hive

tableEnv.getConfig.setSqlDialect(SqlDialect.DEFAULT)
 tableEnv.executeSql(
   """
     |INSERT INTO hive_tmp.analytics_access_log_hive
     |SELECT
     |  ts,userId,eventType,fromType,columnType,siteId,grouponId,partnerId,merchandiseId,
     |  DATE_FORMAT(eventTime,'yyyy-MM-dd'),
     |  DATE_FORMAT(eventTime,'HH'),
     |  DATE_FORMAT(eventTime,'mm')
     |FROM stream_tmp.analytics_access_log_kafka
     |WHERE merchandiseId > 0
   """.stripMargin)

Because checkpointing is set to a 20‑second interval and parallelism to 3, each checkpoint produces three part files; completed partitions receive a _SUCCESS marker, while in‑progress partitions show an .inprogress suffix.

Verifying Data in Hive

hive> SELECT from_unixtime(min(cast(ts/1000 as BIGINT))), from_unixtime(max(cast(ts/1000 as BIGINT)))
      FROM hive_tmp.analytics_access_log_hive
      WHERE ts_date='2020-07-15' AND ts_hour='23' AND ts_minute='23';

The query confirms that the timestamps stored in Hive match the original event times.

Streaming Read from Hive

tableEnv.getConfig.getConfiguration.setBoolean(TableConfigOptions.TABLE_DYNAMIC_TABLE_OPTIONS_ENABLED, true)

val result = tableEnv.sqlQuery(
  """
     |SELECT merchandise_id, count(1) AS pv
     |FROM hive_tmp.analytics_access_log_hive
     |/*+ OPTIONS(
     |  'streaming-source.enable' = 'true',
     |  'streaming-source.monitor-interval' = '1 min',
     |  'streaming-source.consume-start-offset' = '2020-07-15 23:30:00'
     |) */
     |WHERE event_type = 'shtOpenGoodsDetail'
     |  AND ts_date >= '2020-07-15'
     |GROUP BY merchandise_id
     |ORDER BY pv DESC LIMIT 10
   """.stripMargin)

result.toRetractStream[Row].print().setParallelism(1)
streamEnv.execute()

The three table‑hint options enable the Hive table as a streaming source, define how often new partitions are detected, and set the start offset for consumption.

Overall, Flink 1.11's Hive streaming capabilities greatly improve the real‑time characteristics of a Hive data warehouse, making it suitable for ETL pipelines and continuous queries.

Original Source

Signed-in readers can open the original source through BestHub's protected redirect.

Sign in to view source
Republication Notice

This article has been distilled and summarized from source material, then republished for learning and reference. If you believe it infringes your rights, please contactadmin@besthub.devand we will review it promptly.

Big DataFlinkStreamingKafkaHiveScalaTable API
Big Data Technology & Architecture
Written by

Big Data Technology & Architecture

Wang Zhiwu, a big data expert, dedicated to sharing big data technology.

0 followers
Reader feedback

How this landed with the community

Sign in to like

Rate this article

Was this worth your time?

Sign in to rate
Discussion

0 Comments

Thoughtful readers leave field notes, pushback, and hard-won operational detail here.