Big Data 26 min read

Understanding Flink Table API and SQL: Dependencies, Planners, and Practical Usage

This article provides a comprehensive guide to Apache Flink's Table API and SQL, covering required dependencies, the differences between old and Blink planners, program structure, table environment creation, catalog registration, query execution, conversion between DataStream and Table, update modes, and time attribute handling, with Scala code examples throughout.

Big Data Technology & Architecture
Big Data Technology & Architecture
Big Data Technology & Architecture
Understanding Flink Table API and SQL: Dependencies, Planners, and Practical Usage

What are Table API and Flink SQL

Flink is a unified batch‑and‑stream processing framework, and Table API and SQL are its high‑level APIs for unified processing. Table API is an embedded query API in Java/Scala, while Flink SQL allows writing SQL statements directly; both rely on Apache Calcite.

Required Dependencies

For Scala API you need the following Maven dependencies:

<dependency>
  <groupId>org.apache.flink</groupId>
  <artifactId>flink-table-api-scala-bridge_2.11</artifactId>
  <version>1.11.0</version>
  <scope>provided</scope>
</dependency>

If you run the program locally in an IDE, add the Blink planner dependency:

<dependency>
  <groupId>org.apache.flink</groupId>
  <artifactId>flink-table-planner-blink_2.11</artifactId>
  <version>1.11.0</version>
  <scope>provided</scope>
</dependency>

For custom Kafka formats or user‑defined functions, add:

<dependency>
  <groupId>org.apache.flink</groupId>
  <artifactId>flink-table-common</artifactId>
  <version>1.11.0</version>
  <scope>provided</scope>
</dependency>

Old Planner vs. Blink Planner

Blink treats batch jobs as a special case of streaming, does not support Table‑DataSet conversion, and uses StreamTableEnvironment.

Blink does not support BatchTableSource; it uses bounded StreamTableSource instead.

Blink only supports the new catalog API; the old ExternalCatalog is deprecated.

FilterableTableSource implementations differ between the two planners.

String‑based configuration options are only available for Blink.

PlannerConfig implementations differ.

Blink can optimize multiple sinks into a single DAG (only on TableEnvironment).

Blink supports catalog statistics, which the old planner lacks.

Basic Program Structure

val tableEnv = ... // create TableEnvironment
// create tables
tableEnv.connect(...).createTemporaryTable("table1")
tableEnv.connect(...).createTemporaryTable("outputTable")
// Table API query
val tapiResult = tableEnv.from("table1").select(...)
// SQL query
val sqlResult = tableEnv.sqlQuery("SELECT ... FROM table1 ...")
// write result
TableResult tableResult = tapiResult.executeInsert("outputTable")
// execute job
tableEnv.execute("scala_job")

Creating a Table Environment

The TableEnvironment is the core of Flink's Table API & SQL. It registers tables, catalogs, loads modules, executes SQL, registers UDFs, converts DataStream/DataSet to Table, and holds a reference to the underlying execution environment.

You can create it via BatchTableEnvironment.create() or StreamTableEnvironment.create(), optionally passing EnvironmentSettings or TableConfig to configure planner, mode, etc.

Registering Tables in a Catalog

Tables are identified by a three‑part name: catalog, database, and object name. They can be temporary (session‑scoped) or permanent (visible across sessions). Permanent tables require an external catalog such as Hive.

// register a temporary view
val projTable: Table = tableEnv.from("X").select(...)
tableEnv.createTemporaryView("projectedTable", projTable)

Querying Tables

Flink supports both Table API and SQL. Table API uses method chaining (e.g., select, filter), while SQL uses string statements.

// Table API example
val orders = tableEnv.from("Orders")
val revenue = orders.filter($"cCountry" === "FRANCE")
  .groupBy($"cID", $"cName")
  .select($"cID", $"cName", $"revenue".sum as "revSum")

// SQL example
val revenue = tableEnv.sqlQuery("""
  SELECT cID, cName, SUM(revenue) AS revSum
  FROM Orders
  WHERE cCountry = 'FRANCE'
  GROUP BY cID, cName
""".stripMargin)

Converting DataStream to Table

Use tableEnv.fromDataStream(stream). You can map fields by name or position, and define processing or event‑time attributes.

val sensorTable = tableEnv.fromDataStream(dataStream, $"timestamp" as "ts", $"id" as "myId", "temperature")

// name‑based mapping
val sensorTable = tableEnv.fromDataStream(dataStream, $"timestamp" as "ts", $"id" as "myId", "temperature")

// position‑based mapping
val sensorTable = tableEnv.fromDataStream(dataStream, $"myId", $"ts")

Creating Temporary Views

tableEnv.createTemporaryView("sensorView", dataStream)
tableEnv.createTemporaryView("sensorView", dataStream, $"id", $"temperature", $"timestamp" as "ts")

Outputting Tables

Flink defines three update modes for streaming queries:

Append Mode – only insert messages.

Retract Mode – add and retract messages for updates and deletes.

Upsert Mode – upsert and delete messages, requires a unique key.

Converting Table to DataStream

Use toAppendStream for append‑only tables or toRetractStream for tables that may produce updates.

val resultStream: DataStream[Row] = tableEnv.toAppendStream[Row](resultTable)
val aggResultStream: DataStream[(Boolean, (String, Long))] = tableEnv.toRetractStream[(String, Long)](aggResultTable)

Time Attributes

Processing time ( proctime) is derived from the machine clock and requires no timestamps or watermarks. Event time ( rowtime) uses timestamps embedded in records and requires watermarks.

// define processing time in DataStream‑to‑Table conversion
val sensorTable = tableEnv.fromDataStream(stream, $"id", $"timestamp", $"temperature", $"pt".proctime())

// define event time in DataStream‑to‑Table conversion
val sensorTable = tableEnv.fromDataStream(stream, $"id", $"timestamp".rowtime(), $"temperature")

DDL can also define time attributes and watermarks, e.g.:

val sinkDDL = """
  CREATE TABLE dataTable (
    id VARCHAR(20) NOT NULL,
    ts BIGINT,
    temperature DOUBLE,
    pt AS PROCTIME()
  ) WITH (
    'connector.type' = 'filesystem',
    'connector.path' = 'sensor.txt',
    'format.type' = 'csv'
  )
""".stripMargin
tableEnv.sqlUpdate(sinkDDL)
Original Source

Signed-in readers can open the original source through BestHub's protected redirect.

Sign in to view source
Republication Notice

This article has been distilled and summarized from source material, then republished for learning and reference. If you believe it infringes your rights, please contactadmin@besthub.devand we will review it promptly.

FlinkSQLStreamingScalaTable API
Big Data Technology & Architecture
Written by

Big Data Technology & Architecture

Wang Zhiwu, a big data expert, dedicated to sharing big data technology.

0 followers
Reader feedback

How this landed with the community

Sign in to like

Rate this article

Was this worth your time?

Sign in to rate
Discussion

0 Comments

Thoughtful readers leave field notes, pushback, and hard-won operational detail here.