Apache Flink Table API Tutorial and End‑to‑End Examples
This article provides a comprehensive tutorial on Apache Flink's Table API, explaining its concepts, core features, and a wide range of operators such as SELECT, WHERE, GROUP BY, UNION, JOIN, and various window functions, while offering complete Scala code examples, custom sources, sinks, and an end‑to‑end job that computes page‑view counts per region using event‑time tumbling windows.
What is Table API
Table API and SQL are the top‑level analytical APIs in Apache Flink; they share the same semantics, but Table API is expressed in Java/Scala code while SQL is textual.
Features of Table API
Declarative – users specify what to compute, not how.
High performance – query optimizer produces the best execution plan.
Unified batch and stream – the same logic runs on both.
Standard‑compliant – follows SQL standards.
Extended expression capabilities – custom functions, map/flatMap, etc.
HelloWorld Example
First, add Maven dependencies for Flink 1.7.0:
<properties>
<table.version>1.7.0</table.version>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table_2.11</artifactId>
<version>${table.version}</version>
</dependency>
...
</dependencies>Program structure:
External source (Kafka, CSV, etc.)
Query logic (e.g., simple SELECT, join, window aggregation)
External sink (Kafka, Cassandra, CSV, …)
Running mode selection (streaming vs. batch) is done by creating the appropriate execution environment:
val env = StreamExecutionEnvironment.getExecutionEnvironment
val tEnv = TableEnvironment.getTableEnvironment(env)Build a test source:
val data = Seq("Flink", "Bob", "Bob", "something", "Hello", "Flink", "Bob")
val source = env.fromCollection(data).toTable(tEnv, 'word)WordCount logic:
val result = source.groupBy('word).select('word, 'word.count)Custom retract sink to collect results:
class RetractSink extends RichSinkFunction[(Boolean, (String, Long))] { ... }Table API Operators
SELECT
Projects columns or expressions. Example:
val result = customer.select('c_name, concat_ws('c_name, " come ", 'c_desc))WHERE
Filters rows based on predicates. Example:
val result = customer.where("c_id = 'c_001' || c_id = 'c_003'").select('c_id, 'c_name, 'c_desc)GROUP BY
Aggregates data per key. Example (order count per customer):
val result = order.groupBy('c_id).select('c_id, 'o_id.count)Time‑based grouping (e.g., per minute):
val result = order.select('o_id, 'c_id, 'o_time.substring(1,16) as 'o_time_min)
.groupBy('o_time_min)
.select('o_time_min, 'o_id.count)UNION ALL and UNION
Combine two tables. UNION ALL keeps duplicates, UNION removes them.
val resultAll = customer.unionAll(customer)
val resultDistinct = customer.union(customer)JOIN
Various join types are supported (INNER, LEFT, RIGHT, FULL). Example (inner join customers with orders):
val result = customer.join(order.select('o_id, 'c_id as 'o_c_id, 'o_time, 'o_desc), 'c_id === 'o_c_id)LEFT JOIN keeps all rows from the left side and fills missing right side columns with NULL.
val result = customer.leftOuterJoin(order.select('o_id, 'c_id as 'o_c_id, 'o_time, 'o_desc), 'c_id === 'o_c_id)Time‑Interval JOIN
Bounded join based on event‑time intervals:
val result = left.join(right)
.where('a === 'd && 'c >= 'f - 5.seconds && 'c < 'f + 6.seconds)
...LATERAL JOIN
Joins a table with a user‑defined table function (UDTF):
val udtf = new MyUDTF
val result = source.join(udtf('c) as ('d, 'e))Temporal Table JOIN
Joins a stream with a versioned table:
val rates = tEnv.scan("versionedTable").createTemporalTableFunction('rowtime, 'r_currency)
val result = left.join(rates('o_rowtime), 'r_currency === 'o_currency)Window Functions
OVER Window
Two kinds: ROWS (each row defines a new window) and RANGE (rows sharing the same timestamp belong to the same window). Example (bounded ROWS OVER to get max price of the current and two preceding items):
val result = item.window(Over.partitionBy('itemType).orderBy('rowtime).preceding(2).following(CURRENT_ROW) as 'w)
.select('itemID, 'itemType, 'onSellTime, 'price, 'price.max over 'w as 'maxPrice)Bounded RANGE OVER (2‑minute range):
val result = item.window(Over.partitionBy('itemType).orderBy('rowtime).preceding(2.minute).following(CURRENT_RANGE) as 'w)
.select('itemID, 'itemType, 'onSellTime, 'price, 'price.max over 'w as 'maxPrice)Group Window
Supported bounded group windows:
Tumble – fixed‑size, non‑overlapping windows.
Hop – fixed‑size windows that slide, may overlap.
Session – windows defined by inactivity gaps.
Example (tumbling window of 2 minutes counting page views per region):
val result = pageAccess.window(Tumble.over(2.minute).on('rowtime) as 'w)
.groupBy('w, 'region)
.select('region, 'w.start, 'w.end, 'region.count as 'pv)Hop window (5‑minute slide, 10‑minute size) on page‑access‑count table:
val result = pageAccessCount.window(Slide.over(10.minute).every(5.minute).on('rowtime) as 'w)
.groupBy('w)
.select('w.start, 'w.end, 'accessCount.sum as 'accessCount)Session window (3‑minute gap) on page‑access‑session table:
val result = pageAccessSession.window(Session.withGap(3.minute).on('rowtime) as 'w)
.groupBy('w, 'region)
.select('region, 'w.start, 'w.end, 'region.count as 'pv)Source & Sink
A custom event‑time source is implemented by extending SourceFunction and emitting Watermark s. The source returns a DataStream[Row] with a defined row‑time attribute.
class MySourceFunction[T](dataWithTimestampList: Seq[Either[(Long, T), Long]]) extends SourceFunction[T] {
override def run(ctx: SourceContext[T]): Unit = {
dataWithTimestampList.foreach {
case Left(t) => ctx.collectWithTimestamp(t._2, t._1)
case Right(w) => ctx.emitWatermark(new Watermark(w))
}
}
override def cancel(): Unit = {}
}The corresponding StreamTableSource registers the schema and row‑time descriptor.
class MyTableSource extends StreamTableSource[Row] with DefinedRowtimeAttributes {
val fieldNames = Array("accessTime", "region", "userId")
val schema = new TableSchema(fieldNames, Array(Types.SQL_TIMESTAMP, Types.STRING, Types.STRING))
val rowType = new RowTypeInfo(Array(Types.LONG, Types.STRING, Types.STRING).asInstanceOf[Array[TypeInformation[_]]], fieldNames)
val data = Seq(
Left(1510365660000L, Row.of(new JLong(1510365660000L), "ShangHai", "U0010")),
Right(1510365660000L),
...
)
override def getRowtimeAttributeDescriptors = Collections.singletonList(
new RowtimeAttributeDescriptor("accessTime", new ExistingField("accessTime"), PreserveWatermarks.INSTANCE))
override def getDataStream(execEnv: StreamExecutionEnvironment) =
execEnv.addSource(new MySourceFunction[Row](data)).returns(rowType).setParallelism(1)
override def getReturnType = rowType
override def getTableSchema = schema
}The sink uses Flink's built‑in CsvTableSink to write results to a temporary CSV file.
def getCsvTableSink: TableSink[Row] = {
val tempFile = File.createTempFile("csv_sink_", "tmp")
println("Sink path : " + tempFile)
new CsvTableSink(tempFile.getAbsolutePath)
.configure(Array("region", "winStart", "winEnd", "pv"),
Array(Types.STRING, Types.SQL_TIMESTAMP, Types.SQL_TIMESTAMP, Types.LONG))
}End‑to‑End Job
The main program creates the execution environment, registers the custom source and CSV sink, defines a tumbling window of 2 minutes on the accessTime field, groups by window and region, and writes the page‑view count to the sink.
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
val tEnv = TableEnvironment.getTableEnvironment(env)
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
env.setParallelism(1)
val sourceTable = "mySource"
tEnv.registerTableSource(sourceTable, new MyTableSource)
val sinkTable = "csvSink"
tEnv.registerTableSink(sinkTable, getCsvTableSink)
val result = tEnv.scan(sourceTable)
.window(Tumble.over(2.minute).on('accessTime) as 'w)
.groupBy('w, 'region)
.select('region, 'w.start, 'w.end, 'region.count as 'pv)
result.insertInto(sinkTable)
env.execute()
}Running the job prints the temporary CSV file path; inspecting the file shows page‑view counts per region for each 2‑minute window.
Conclusion
This tutorial introduced the Table API, its declarative nature, core operators, windowing capabilities, and demonstrated how to build a complete Flink Table API application with custom sources and sinks, providing a solid foundation for developing streaming and batch analytics on Flink.
Signed-in readers can open the original source through BestHub's protected redirect.
This article has been distilled and summarized from source material, then republished for learning and reference. If you believe it infringes your rights, please contactand we will review it promptly.
Big Data Technology & Architecture
Wang Zhiwu, a big data expert, dedicated to sharing big data technology.
How this landed with the community
Was this worth your time?
0 Comments
Thoughtful readers leave field notes, pushback, and hard-won operational detail here.
