Deep Dive into Flink Table & SQL Window Functions, UDFs, and Hive Integration
This article provides a comprehensive guide to Flink Table and SQL window semantics—including group, tumbling, sliding, and session windows—covers over windows, demonstrates how to define windows in SQL, explains built‑in functions, shows how to implement scalar, table, aggregate and table‑aggregate UDFs, and details Flink's integration with Hive, complete with Maven dependencies and runnable examples.
Before diving into this article, readers are encouraged to review related Flink series covering time, windows, stream joins, network flow control, dimension tables, memory models, and Table & SQL fundamentals.
1. Regular Windows
Flink supports two main window types in Table API and SQL: Group Windows and Over Windows . Group windows aggregate rows based on time or row count intervals.
1.1 Group Windows
Group windows are defined with the .window([w: GroupWindow] as $"w") clause and must be referenced in a groupBy clause. Example:
val table = input
.window([w: GroupWindow] as $"w") // define window with alias w
.groupBy($"w", $"a") // group by fields a and window w
.select($"a", $"b".sum) // sum field bWindow metadata can also be selected:
val table = input
.window([w: GroupWindow] as $"w")
.groupBy($"w", $"a")
.select($"a", $"w".start, $"w".end, $"w".rowtime, $"b".count)Supported window classes are Tumbling, Sliding, and Session.
1.2 Tumbling Windows
Tumbling windows use the Tumble class with three methods: over (window length), on (time field), and as (alias). Example:
// Tumbling Event‑time Window
.window(Tumble over 10.minutes on $"rowtime" as $"w")
// Tumbling Processing‑time Window
.window(Tumble over 10.minutes on $"proctime" as $"w")
// Tumbling Row‑count Window
.window(Tumble over 10.rows on $"proctime" as $"w")1.3 Sliding Windows
Sliding windows use the Slide class with four methods: over (window length), every (slide step), on (time field), and as (alias). Example:
// Sliding Event‑time Window
.window(Slide over 10.minutes every 5.minutes on $"rowtime" as $"w")
// Sliding Processing‑time Window
.window(Slide over 10.minutes every 5.minutes on $"proctime" as $"w")
// Sliding Row‑count Window
.window(Slide over 10.rows every 5.rows on $"proctime" as $"w")1.4 Session Windows
Session windows use the Session class with withGap (gap duration), on (time field), and as (alias). Example:
// Session Event‑time Window
.window(Session withGap 10.minutes on $"rowtime" as $"w")
// Session Processing‑time Window
.window(Session withGap 10.minutes on $"proctime" as $"w")2. Over Windows
Over windows perform row‑wise aggregations similar to the SQL OVER clause. They are defined with .window([w: OverWindow] as $"w") and referenced in select. Example:
val table = input
.window([w: OverWindow] as $"w")
.select($"a" , $"b".sum over $"w", $"c".min over $"w")Unbounded over windows use UNBOUNDED_RANGE or UNBOUNDED_ROW; bounded windows specify a concrete range.
Unbounded example:
// Unbounded event‑time over window
.window(Over partitionBy $"a" orderBy $"rowtime" preceding UNBOUNDED_RANGE as $"w")Bounded example (1 minute preceding):
// Bounded event‑time over window
.window(Over partitionBy $"a" orderBy $"rowtime" preceding 1.minutes as $"w")3. Defining Windows in SQL
Group windows are expressed with TUMBLE, HOP, and SESSION functions in the GROUP BY clause. Example of a tumbling window: TUMBLE(time_attr, INTERVAL '10' SECOND) Sliding windows use HOP(time_attr, slide_interval, window_interval), and session windows use SESSION(time_attr, gap_interval). Helper functions such as TUMBLE_START, TUMBLE_END, and TUMBLE_ROWTIME retrieve window metadata.
4. Code Exercise – Counting Sensors in a Tumbling Window
A complete Scala example creates a 10‑second tumbling window to count sensor occurrences, showing both Table API and SQL approaches.
object TumblingWindowTempCount {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1)
val settings = EnvironmentSettings.newInstance().inStreamingMode().build()
val tableEnv = StreamTableEnvironment.create(env, settings)
val stream = env.addSource(new SensorSource).filter(r => r.id.equals("sensor_1"))
val table = tableEnv.fromDataStream(stream, $"id", $"timestamp" as "ts", $"temperature", $"pt".proctime())
// Table API
val tableResult = table
.window(Tumble over 10.seconds() on $"pt" as $"w")
.groupBy($"id", $"w")
.select($"id", $"id".count())
tableEnv.toRetractStream[Row](tableResult).print()
// SQL
tableEnv.createTemporaryView("sensor", stream, $"id", $"timestamp" as "ts", $"temperature", $"pt".proctime())
val sqlResult = tableEnv.sqlQuery(
"SELECT id, count(id), TUMBLE_START(pt, INTERVAL '10' SECOND), TUMBLE_END(pt, INTERVAL '10' SECOND) FROM sensor GROUP BY id, TUMBLE(pt, INTERVAL '10' SECOND)"
)
tableEnv.toRetractStream[Row](sqlResult).print()
env.execute()
}
}5. System Built‑in Functions
Flink provides comparison, logical, arithmetic, string, and time functions that can be used directly in Table API or SQL. Examples include === vs =, || for logical OR, POWER, UPPER, and time helpers like currentTime() or NUMERIC.minutes.
5.1 User‑Defined Functions (UDF)
UDFs extend Flink's capabilities. After registration via registerFunction() or createTemporarySystemFunction, they can be invoked in Table API or SQL.
5.2 Scalar Functions
class HashCodeFunction extends ScalarFunction {
private var factor: Int = 0
override def open(context: FunctionContext): Unit = {
factor = context.getJobParameter("hashcode_factor", "12").toInt
}
def eval(s: String): Int = s.hashCode * factor
}Usage example in Table API and SQL is shown in the source.
5.3 Table Functions
@FunctionHint(output = new DataTypeHint("ROW<word STRING, length INT>"))
class SplitFunction extends TableFunction[Row] {
def eval(str: String): Unit = {
str.split("#").foreach(s => collect(Row.of(s, Int.box(s.length))))
}
}Table functions are invoked with joinLateral or leftOuterJoinLateral in Table API, and with LATERAL TABLE(...) in SQL.
5.4 Aggregate Functions
class AvgTempAcc { var sum: Double = 0.0; var count: Int = 0 }
class AvgTemp extends AggregateFunction[Double, AvgTempAcc] {
override def getValue(acc: AvgTempAcc): Double = acc.sum / acc.count
override def createAccumulator(): AvgTempAcc = new AvgTempAcc
def accumulate(acc: AvgTempAcc, temp: Double): Unit = {
acc.sum += temp; acc.count += 1
}
}Both Table API and SQL can use the custom aggregate to compute average temperature per sensor.
5.5 Table Aggregate Functions
class Top2TempAcc { var highestTemp: Double = Int.MinValue; var secondHighestTemp: Double = Int.MinValue }
class Top2Temp extends TableAggregateFunction[(Double, Int), Top2TempAcc] {
override def createAccumulator(): Top2TempAcc = new Top2TempAcc
def accumulate(acc: Top2TempAcc, temp: Double): Unit = {
if (temp > acc.highestTemp) { acc.secondHighestTemp = acc.highestTemp; acc.highestTemp = temp }
else if (temp > acc.secondHighestTemp) { acc.secondHighestTemp = temp }
}
def emitValue(acc: Top2TempAcc, out: Collector[(Double, Int)]): Unit = {
out.collect((acc.highestTemp, 1))
out.collect((acc.secondHighestTemp, 2))
}
}Used to emit the top‑2 temperature values per sensor.
6. Flink SQL Integration with Hive
Flink can use Hive's Metastore via HiveCatalog and read/write Hive tables directly. Maven dependencies include flink-connector-hive, flink-table-api-java-bridge, and Hive/Hadoop libraries.
<!-- Flink Dependency -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-hive_2.11</artifactId>
<version>1.11.0</version>
<scope>provided</scope>
</dependency>
<!-- Hive Dependency -->
<dependency>
<groupId>org.apache.hive</groupId>
<artifactId>hive-exec</artifactId>
<version>${hive.version}</version>
<scope>provided</scope>
</dependency>Example program creates a Hive database/table, registers a HiveCatalog, writes a streaming DataStream into the Hive table, and runs a simple query.
object TestHiveStreaming {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
val tableEnv = StreamTableEnvironment.create(env)
val stream = env.fromElements(("10", "haha"), ("11", "hehe"))
val hive = new HiveCatalog("myhive", "mydb", "/path/to/hive/conf", "3.1.2")
tableEnv.registerCatalog("myhive", hive)
tableEnv.useCatalog("myhive")
tableEnv.getConfig.setSqlDialect(SqlDialect.HIVE)
tableEnv.useDatabase("mydb")
tableEnv.createTemporaryView("users", stream, 'id, 'name)
tableEnv.executeSql("insert into t_user select id, name from users")
tableEnv.executeSql("select * from t_user")
}
}A more complex example shows how to write partitioned ORC tables with timestamp‑based partitions.
Overall, the article equips readers with the knowledge to leverage Flink's windowing capabilities, extend functionality via custom functions, and integrate seamlessly with Hive for batch‑streaming workloads.
Signed-in readers can open the original source through BestHub's protected redirect.
This article has been distilled and summarized from source material, then republished for learning and reference. If you believe it infringes your rights, please contactand we will review it promptly.
Big Data Technology & Architecture
Wang Zhiwu, a big data expert, dedicated to sharing big data technology.
How this landed with the community
Was this worth your time?
0 Comments
Thoughtful readers leave field notes, pushback, and hard-won operational detail here.
