Big Data 27 min read

Deep Dive into Flink Table & SQL Window Functions, UDFs, and Hive Integration

This article provides a comprehensive guide to Flink Table and SQL window semantics—including group, tumbling, sliding, and session windows—covers over windows, demonstrates how to define windows in SQL, explains built‑in functions, shows how to implement scalar, table, aggregate and table‑aggregate UDFs, and details Flink's integration with Hive, complete with Maven dependencies and runnable examples.

Big Data Technology & Architecture
Big Data Technology & Architecture
Big Data Technology & Architecture
Deep Dive into Flink Table & SQL Window Functions, UDFs, and Hive Integration

Before diving into this article, readers are encouraged to review related Flink series covering time, windows, stream joins, network flow control, dimension tables, memory models, and Table & SQL fundamentals.

1. Regular Windows

Flink supports two main window types in Table API and SQL: Group Windows and Over Windows . Group windows aggregate rows based on time or row count intervals.

1.1 Group Windows

Group windows are defined with the .window([w: GroupWindow] as $"w") clause and must be referenced in a groupBy clause. Example:

val table = input
  .window([w: GroupWindow] as $"w") // define window with alias w
  .groupBy($"w", $"a")               // group by fields a and window w
  .select($"a", $"b".sum)           // sum field b

Window metadata can also be selected:

val table = input
  .window([w: GroupWindow] as $"w")
  .groupBy($"w", $"a")
  .select($"a", $"w".start, $"w".end, $"w".rowtime, $"b".count)

Supported window classes are Tumbling, Sliding, and Session.

1.2 Tumbling Windows

Tumbling windows use the Tumble class with three methods: over (window length), on (time field), and as (alias). Example:

// Tumbling Event‑time Window
.window(Tumble over 10.minutes on $"rowtime" as $"w")
// Tumbling Processing‑time Window
.window(Tumble over 10.minutes on $"proctime" as $"w")
// Tumbling Row‑count Window
.window(Tumble over 10.rows on $"proctime" as $"w")

1.3 Sliding Windows

Sliding windows use the Slide class with four methods: over (window length), every (slide step), on (time field), and as (alias). Example:

// Sliding Event‑time Window
.window(Slide over 10.minutes every 5.minutes on $"rowtime" as $"w")
// Sliding Processing‑time Window
.window(Slide over 10.minutes every 5.minutes on $"proctime" as $"w")
// Sliding Row‑count Window
.window(Slide over 10.rows every 5.rows on $"proctime" as $"w")

1.4 Session Windows

Session windows use the Session class with withGap (gap duration), on (time field), and as (alias). Example:

// Session Event‑time Window
.window(Session withGap 10.minutes on $"rowtime" as $"w")
// Session Processing‑time Window
.window(Session withGap 10.minutes on $"proctime" as $"w")

2. Over Windows

Over windows perform row‑wise aggregations similar to the SQL OVER clause. They are defined with .window([w: OverWindow] as $"w") and referenced in select. Example:

val table = input
  .window([w: OverWindow] as $"w")
  .select($"a" , $"b".sum over $"w", $"c".min over $"w")

Unbounded over windows use UNBOUNDED_RANGE or UNBOUNDED_ROW; bounded windows specify a concrete range.

Unbounded example:

// Unbounded event‑time over window
.window(Over partitionBy $"a" orderBy $"rowtime" preceding UNBOUNDED_RANGE as $"w")

Bounded example (1 minute preceding):

// Bounded event‑time over window
.window(Over partitionBy $"a" orderBy $"rowtime" preceding 1.minutes as $"w")

3. Defining Windows in SQL

Group windows are expressed with TUMBLE, HOP, and SESSION functions in the GROUP BY clause. Example of a tumbling window: TUMBLE(time_attr, INTERVAL '10' SECOND) Sliding windows use HOP(time_attr, slide_interval, window_interval), and session windows use SESSION(time_attr, gap_interval). Helper functions such as TUMBLE_START, TUMBLE_END, and TUMBLE_ROWTIME retrieve window metadata.

4. Code Exercise – Counting Sensors in a Tumbling Window

A complete Scala example creates a 10‑second tumbling window to count sensor occurrences, showing both Table API and SQL approaches.

object TumblingWindowTempCount {
  def main(args: Array[String]): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.setParallelism(1)
    val settings = EnvironmentSettings.newInstance().inStreamingMode().build()
    val tableEnv = StreamTableEnvironment.create(env, settings)
    val stream = env.addSource(new SensorSource).filter(r => r.id.equals("sensor_1"))
    val table = tableEnv.fromDataStream(stream, $"id", $"timestamp" as "ts", $"temperature", $"pt".proctime())
    // Table API
    val tableResult = table
      .window(Tumble over 10.seconds() on $"pt" as $"w")
      .groupBy($"id", $"w")
      .select($"id", $"id".count())
    tableEnv.toRetractStream[Row](tableResult).print()
    // SQL
    tableEnv.createTemporaryView("sensor", stream, $"id", $"timestamp" as "ts", $"temperature", $"pt".proctime())
    val sqlResult = tableEnv.sqlQuery(
      "SELECT id, count(id), TUMBLE_START(pt, INTERVAL '10' SECOND), TUMBLE_END(pt, INTERVAL '10' SECOND) FROM sensor GROUP BY id, TUMBLE(pt, INTERVAL '10' SECOND)"
    )
    tableEnv.toRetractStream[Row](sqlResult).print()
    env.execute()
  }
}

5. System Built‑in Functions

Flink provides comparison, logical, arithmetic, string, and time functions that can be used directly in Table API or SQL. Examples include === vs =, || for logical OR, POWER, UPPER, and time helpers like currentTime() or NUMERIC.minutes.

5.1 User‑Defined Functions (UDF)

UDFs extend Flink's capabilities. After registration via registerFunction() or createTemporarySystemFunction, they can be invoked in Table API or SQL.

5.2 Scalar Functions

class HashCodeFunction extends ScalarFunction {
  private var factor: Int = 0
  override def open(context: FunctionContext): Unit = {
    factor = context.getJobParameter("hashcode_factor", "12").toInt
  }
  def eval(s: String): Int = s.hashCode * factor
}

Usage example in Table API and SQL is shown in the source.

5.3 Table Functions

@FunctionHint(output = new DataTypeHint("ROW<word STRING, length INT>"))
class SplitFunction extends TableFunction[Row] {
  def eval(str: String): Unit = {
    str.split("#").foreach(s => collect(Row.of(s, Int.box(s.length))))
  }
}

Table functions are invoked with joinLateral or leftOuterJoinLateral in Table API, and with LATERAL TABLE(...) in SQL.

5.4 Aggregate Functions

class AvgTempAcc { var sum: Double = 0.0; var count: Int = 0 }
class AvgTemp extends AggregateFunction[Double, AvgTempAcc] {
  override def getValue(acc: AvgTempAcc): Double = acc.sum / acc.count
  override def createAccumulator(): AvgTempAcc = new AvgTempAcc
  def accumulate(acc: AvgTempAcc, temp: Double): Unit = {
    acc.sum += temp; acc.count += 1
  }
}

Both Table API and SQL can use the custom aggregate to compute average temperature per sensor.

5.5 Table Aggregate Functions

class Top2TempAcc { var highestTemp: Double = Int.MinValue; var secondHighestTemp: Double = Int.MinValue }
class Top2Temp extends TableAggregateFunction[(Double, Int), Top2TempAcc] {
  override def createAccumulator(): Top2TempAcc = new Top2TempAcc
  def accumulate(acc: Top2TempAcc, temp: Double): Unit = {
    if (temp > acc.highestTemp) { acc.secondHighestTemp = acc.highestTemp; acc.highestTemp = temp }
    else if (temp > acc.secondHighestTemp) { acc.secondHighestTemp = temp }
  }
  def emitValue(acc: Top2TempAcc, out: Collector[(Double, Int)]): Unit = {
    out.collect((acc.highestTemp, 1))
    out.collect((acc.secondHighestTemp, 2))
  }
}

Used to emit the top‑2 temperature values per sensor.

6. Flink SQL Integration with Hive

Flink can use Hive's Metastore via HiveCatalog and read/write Hive tables directly. Maven dependencies include flink-connector-hive, flink-table-api-java-bridge, and Hive/Hadoop libraries.

<!-- Flink Dependency -->
<dependency>
  <groupId>org.apache.flink</groupId>
  <artifactId>flink-connector-hive_2.11</artifactId>
  <version>1.11.0</version>
  <scope>provided</scope>
</dependency>
<!-- Hive Dependency -->
<dependency>
  <groupId>org.apache.hive</groupId>
  <artifactId>hive-exec</artifactId>
  <version>${hive.version}</version>
  <scope>provided</scope>
</dependency>

Example program creates a Hive database/table, registers a HiveCatalog, writes a streaming DataStream into the Hive table, and runs a simple query.

object TestHiveStreaming {
  def main(args: Array[String]): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    val tableEnv = StreamTableEnvironment.create(env)
    val stream = env.fromElements(("10", "haha"), ("11", "hehe"))
    val hive = new HiveCatalog("myhive", "mydb", "/path/to/hive/conf", "3.1.2")
    tableEnv.registerCatalog("myhive", hive)
    tableEnv.useCatalog("myhive")
    tableEnv.getConfig.setSqlDialect(SqlDialect.HIVE)
    tableEnv.useDatabase("mydb")
    tableEnv.createTemporaryView("users", stream, 'id, 'name)
    tableEnv.executeSql("insert into t_user select id, name from users")
    tableEnv.executeSql("select * from t_user")
  }
}

A more complex example shows how to write partitioned ORC tables with timestamp‑based partitions.

Overall, the article equips readers with the knowledge to leverage Flink's windowing capabilities, extend functionality via custom functions, and integrate seamlessly with Hive for batch‑streaming workloads.

Original Source

Signed-in readers can open the original source through BestHub's protected redirect.

Sign in to view source
Republication Notice

This article has been distilled and summarized from source material, then republished for learning and reference. If you believe it infringes your rights, please contactadmin@besthub.devand we will review it promptly.

FlinkSQLUDFWindow FunctionsTable APIHive Integration
Big Data Technology & Architecture
Written by

Big Data Technology & Architecture

Wang Zhiwu, a big data expert, dedicated to sharing big data technology.

0 followers
Reader feedback

How this landed with the community

Sign in to like

Rate this article

Was this worth your time?

Sign in to rate
Discussion

0 Comments

Thoughtful readers leave field notes, pushback, and hard-won operational detail here.