Big Data 36 min read

Apache Flink Table API Tutorial and End‑to‑End Examples

This article provides a comprehensive tutorial on Apache Flink's Table API, explaining its concepts, core features, and a wide range of operators such as SELECT, WHERE, GROUP BY, UNION, JOIN, and various window functions, while offering complete Scala code examples, custom sources, sinks, and an end‑to‑end job that computes page‑view counts per region using event‑time tumbling windows.

Big Data Technology & Architecture
Big Data Technology & Architecture
Big Data Technology & Architecture
Apache Flink Table API Tutorial and End‑to‑End Examples

What is Table API

Table API and SQL are the top‑level analytical APIs in Apache Flink; they share the same semantics, but Table API is expressed in Java/Scala code while SQL is textual.

Features of Table API

Declarative – users specify what to compute, not how.

High performance – query optimizer produces the best execution plan.

Unified batch and stream – the same logic runs on both.

Standard‑compliant – follows SQL standards.

Extended expression capabilities – custom functions, map/flatMap, etc.

HelloWorld Example

First, add Maven dependencies for Flink 1.7.0:

<properties>
    <table.version>1.7.0</table.version>
</properties>
<dependencies>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-table_2.11</artifactId>
        <version>${table.version}</version>
    </dependency>
    ...
</dependencies>

Program structure:

External source (Kafka, CSV, etc.)

Query logic (e.g., simple SELECT, join, window aggregation)

External sink (Kafka, Cassandra, CSV, …)

Running mode selection (streaming vs. batch) is done by creating the appropriate execution environment:

val env = StreamExecutionEnvironment.getExecutionEnvironment
val tEnv = TableEnvironment.getTableEnvironment(env)

Build a test source:

val data = Seq("Flink", "Bob", "Bob", "something", "Hello", "Flink", "Bob")
val source = env.fromCollection(data).toTable(tEnv, 'word)

WordCount logic:

val result = source.groupBy('word).select('word, 'word.count)

Custom retract sink to collect results:

class RetractSink extends RichSinkFunction[(Boolean, (String, Long))] { ... }

Table API Operators

SELECT

Projects columns or expressions. Example:

val result = customer.select('c_name, concat_ws('c_name, " come ", 'c_desc))

WHERE

Filters rows based on predicates. Example:

val result = customer.where("c_id = 'c_001' || c_id = 'c_003'").select('c_id, 'c_name, 'c_desc)

GROUP BY

Aggregates data per key. Example (order count per customer):

val result = order.groupBy('c_id).select('c_id, 'o_id.count)

Time‑based grouping (e.g., per minute):

val result = order.select('o_id, 'c_id, 'o_time.substring(1,16) as 'o_time_min)
                .groupBy('o_time_min)
                .select('o_time_min, 'o_id.count)

UNION ALL and UNION

Combine two tables. UNION ALL keeps duplicates, UNION removes them.

val resultAll = customer.unionAll(customer)
val resultDistinct = customer.union(customer)

JOIN

Various join types are supported (INNER, LEFT, RIGHT, FULL). Example (inner join customers with orders):

val result = customer.join(order.select('o_id, 'c_id as 'o_c_id, 'o_time, 'o_desc), 'c_id === 'o_c_id)

LEFT JOIN keeps all rows from the left side and fills missing right side columns with NULL.

val result = customer.leftOuterJoin(order.select('o_id, 'c_id as 'o_c_id, 'o_time, 'o_desc), 'c_id === 'o_c_id)

Time‑Interval JOIN

Bounded join based on event‑time intervals:

val result = left.join(right)
    .where('a === 'd && 'c >= 'f - 5.seconds && 'c < 'f + 6.seconds)
    ...

LATERAL JOIN

Joins a table with a user‑defined table function (UDTF):

val udtf = new MyUDTF
val result = source.join(udtf('c) as ('d, 'e))

Temporal Table JOIN

Joins a stream with a versioned table:

val rates = tEnv.scan("versionedTable").createTemporalTableFunction('rowtime, 'r_currency)
val result = left.join(rates('o_rowtime), 'r_currency === 'o_currency)

Window Functions

OVER Window

Two kinds: ROWS (each row defines a new window) and RANGE (rows sharing the same timestamp belong to the same window). Example (bounded ROWS OVER to get max price of the current and two preceding items):

val result = item.window(Over.partitionBy('itemType).orderBy('rowtime).preceding(2).following(CURRENT_ROW) as 'w)
    .select('itemID, 'itemType, 'onSellTime, 'price, 'price.max over 'w as 'maxPrice)

Bounded RANGE OVER (2‑minute range):

val result = item.window(Over.partitionBy('itemType).orderBy('rowtime).preceding(2.minute).following(CURRENT_RANGE) as 'w)
    .select('itemID, 'itemType, 'onSellTime, 'price, 'price.max over 'w as 'maxPrice)

Group Window

Supported bounded group windows:

Tumble – fixed‑size, non‑overlapping windows.

Hop – fixed‑size windows that slide, may overlap.

Session – windows defined by inactivity gaps.

Example (tumbling window of 2 minutes counting page views per region):

val result = pageAccess.window(Tumble.over(2.minute).on('rowtime) as 'w)
    .groupBy('w, 'region)
    .select('region, 'w.start, 'w.end, 'region.count as 'pv)

Hop window (5‑minute slide, 10‑minute size) on page‑access‑count table:

val result = pageAccessCount.window(Slide.over(10.minute).every(5.minute).on('rowtime) as 'w)
    .groupBy('w)
    .select('w.start, 'w.end, 'accessCount.sum as 'accessCount)

Session window (3‑minute gap) on page‑access‑session table:

val result = pageAccessSession.window(Session.withGap(3.minute).on('rowtime) as 'w)
    .groupBy('w, 'region)
    .select('region, 'w.start, 'w.end, 'region.count as 'pv)

Source & Sink

A custom event‑time source is implemented by extending SourceFunction and emitting Watermark s. The source returns a DataStream[Row] with a defined row‑time attribute.

class MySourceFunction[T](dataWithTimestampList: Seq[Either[(Long, T), Long]]) extends SourceFunction[T] {
  override def run(ctx: SourceContext[T]): Unit = {
    dataWithTimestampList.foreach {
      case Left(t) => ctx.collectWithTimestamp(t._2, t._1)
      case Right(w) => ctx.emitWatermark(new Watermark(w))
    }
  }
  override def cancel(): Unit = {}
}

The corresponding StreamTableSource registers the schema and row‑time descriptor.

class MyTableSource extends StreamTableSource[Row] with DefinedRowtimeAttributes {
  val fieldNames = Array("accessTime", "region", "userId")
  val schema = new TableSchema(fieldNames, Array(Types.SQL_TIMESTAMP, Types.STRING, Types.STRING))
  val rowType = new RowTypeInfo(Array(Types.LONG, Types.STRING, Types.STRING).asInstanceOf[Array[TypeInformation[_]]], fieldNames)
  val data = Seq(
    Left(1510365660000L, Row.of(new JLong(1510365660000L), "ShangHai", "U0010")),
    Right(1510365660000L),
    ...
  )
  override def getRowtimeAttributeDescriptors = Collections.singletonList(
    new RowtimeAttributeDescriptor("accessTime", new ExistingField("accessTime"), PreserveWatermarks.INSTANCE))
  override def getDataStream(execEnv: StreamExecutionEnvironment) =
    execEnv.addSource(new MySourceFunction[Row](data)).returns(rowType).setParallelism(1)
  override def getReturnType = rowType
  override def getTableSchema = schema
}

The sink uses Flink's built‑in CsvTableSink to write results to a temporary CSV file.

def getCsvTableSink: TableSink[Row] = {
  val tempFile = File.createTempFile("csv_sink_", "tmp")
  println("Sink path : " + tempFile)
  new CsvTableSink(tempFile.getAbsolutePath)
    .configure(Array("region", "winStart", "winEnd", "pv"),
               Array(Types.STRING, Types.SQL_TIMESTAMP, Types.SQL_TIMESTAMP, Types.LONG))
}

End‑to‑End Job

The main program creates the execution environment, registers the custom source and CSV sink, defines a tumbling window of 2 minutes on the accessTime field, groups by window and region, and writes the page‑view count to the sink.

def main(args: Array[String]): Unit = {
  val env = StreamExecutionEnvironment.getExecutionEnvironment
  val tEnv = TableEnvironment.getTableEnvironment(env)
  env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
  env.setParallelism(1)

  val sourceTable = "mySource"
  tEnv.registerTableSource(sourceTable, new MyTableSource)

  val sinkTable = "csvSink"
  tEnv.registerTableSink(sinkTable, getCsvTableSink)

  val result = tEnv.scan(sourceTable)
    .window(Tumble.over(2.minute).on('accessTime) as 'w)
    .groupBy('w, 'region)
    .select('region, 'w.start, 'w.end, 'region.count as 'pv)

  result.insertInto(sinkTable)
  env.execute()
}

Running the job prints the temporary CSV file path; inspecting the file shows page‑view counts per region for each 2‑minute window.

Conclusion

This tutorial introduced the Table API, its declarative nature, core operators, windowing capabilities, and demonstrated how to build a complete Flink Table API application with custom sources and sinks, providing a solid foundation for developing streaming and batch analytics on Flink.

Original Source

Signed-in readers can open the original source through BestHub's protected redirect.

Sign in to view source
Republication Notice

This article has been distilled and summarized from source material, then republished for learning and reference. If you believe it infringes your rights, please contactadmin@besthub.devand we will review it promptly.

Big DataFlinkSQLStreamingScalaTable API
Big Data Technology & Architecture
Written by

Big Data Technology & Architecture

Wang Zhiwu, a big data expert, dedicated to sharing big data technology.

0 followers
Reader feedback

How this landed with the community

Sign in to like

Rate this article

Was this worth your time?

Sign in to rate
Discussion

0 Comments

Thoughtful readers leave field notes, pushback, and hard-won operational detail here.