Understanding Flink Table API and SQL: Dependencies, Planners, and Practical Usage
This article provides a comprehensive guide to Apache Flink's Table API and SQL, covering required dependencies, the differences between old and Blink planners, program structure, table environment creation, catalog registration, query execution, conversion between DataStream and Table, update modes, and time attribute handling, with Scala code examples throughout.
What are Table API and Flink SQL
Flink is a unified batch‑and‑stream processing framework, and Table API and SQL are its high‑level APIs for unified processing. Table API is an embedded query API in Java/Scala, while Flink SQL allows writing SQL statements directly; both rely on Apache Calcite.
Required Dependencies
For Scala API you need the following Maven dependencies:
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-scala-bridge_2.11</artifactId>
<version>1.11.0</version>
<scope>provided</scope>
</dependency>If you run the program locally in an IDE, add the Blink planner dependency:
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner-blink_2.11</artifactId>
<version>1.11.0</version>
<scope>provided</scope>
</dependency>For custom Kafka formats or user‑defined functions, add:
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-common</artifactId>
<version>1.11.0</version>
<scope>provided</scope>
</dependency>Old Planner vs. Blink Planner
Blink treats batch jobs as a special case of streaming, does not support Table‑DataSet conversion, and uses StreamTableEnvironment.
Blink does not support BatchTableSource; it uses bounded StreamTableSource instead.
Blink only supports the new catalog API; the old ExternalCatalog is deprecated.
FilterableTableSource implementations differ between the two planners.
String‑based configuration options are only available for Blink.
PlannerConfig implementations differ.
Blink can optimize multiple sinks into a single DAG (only on TableEnvironment).
Blink supports catalog statistics, which the old planner lacks.
Basic Program Structure
val tableEnv = ... // create TableEnvironment
// create tables
tableEnv.connect(...).createTemporaryTable("table1")
tableEnv.connect(...).createTemporaryTable("outputTable")
// Table API query
val tapiResult = tableEnv.from("table1").select(...)
// SQL query
val sqlResult = tableEnv.sqlQuery("SELECT ... FROM table1 ...")
// write result
TableResult tableResult = tapiResult.executeInsert("outputTable")
// execute job
tableEnv.execute("scala_job")Creating a Table Environment
The TableEnvironment is the core of Flink's Table API & SQL. It registers tables, catalogs, loads modules, executes SQL, registers UDFs, converts DataStream/DataSet to Table, and holds a reference to the underlying execution environment.
You can create it via BatchTableEnvironment.create() or StreamTableEnvironment.create(), optionally passing EnvironmentSettings or TableConfig to configure planner, mode, etc.
Registering Tables in a Catalog
Tables are identified by a three‑part name: catalog, database, and object name. They can be temporary (session‑scoped) or permanent (visible across sessions). Permanent tables require an external catalog such as Hive.
// register a temporary view
val projTable: Table = tableEnv.from("X").select(...)
tableEnv.createTemporaryView("projectedTable", projTable)Querying Tables
Flink supports both Table API and SQL. Table API uses method chaining (e.g., select, filter), while SQL uses string statements.
// Table API example
val orders = tableEnv.from("Orders")
val revenue = orders.filter($"cCountry" === "FRANCE")
.groupBy($"cID", $"cName")
.select($"cID", $"cName", $"revenue".sum as "revSum")
// SQL example
val revenue = tableEnv.sqlQuery("""
SELECT cID, cName, SUM(revenue) AS revSum
FROM Orders
WHERE cCountry = 'FRANCE'
GROUP BY cID, cName
""".stripMargin)Converting DataStream to Table
Use tableEnv.fromDataStream(stream). You can map fields by name or position, and define processing or event‑time attributes.
val sensorTable = tableEnv.fromDataStream(dataStream, $"timestamp" as "ts", $"id" as "myId", "temperature")
// name‑based mapping
val sensorTable = tableEnv.fromDataStream(dataStream, $"timestamp" as "ts", $"id" as "myId", "temperature")
// position‑based mapping
val sensorTable = tableEnv.fromDataStream(dataStream, $"myId", $"ts")Creating Temporary Views
tableEnv.createTemporaryView("sensorView", dataStream)
tableEnv.createTemporaryView("sensorView", dataStream, $"id", $"temperature", $"timestamp" as "ts")Outputting Tables
Flink defines three update modes for streaming queries:
Append Mode – only insert messages.
Retract Mode – add and retract messages for updates and deletes.
Upsert Mode – upsert and delete messages, requires a unique key.
Converting Table to DataStream
Use toAppendStream for append‑only tables or toRetractStream for tables that may produce updates.
val resultStream: DataStream[Row] = tableEnv.toAppendStream[Row](resultTable)
val aggResultStream: DataStream[(Boolean, (String, Long))] = tableEnv.toRetractStream[(String, Long)](aggResultTable)Time Attributes
Processing time ( proctime) is derived from the machine clock and requires no timestamps or watermarks. Event time ( rowtime) uses timestamps embedded in records and requires watermarks.
// define processing time in DataStream‑to‑Table conversion
val sensorTable = tableEnv.fromDataStream(stream, $"id", $"timestamp", $"temperature", $"pt".proctime())
// define event time in DataStream‑to‑Table conversion
val sensorTable = tableEnv.fromDataStream(stream, $"id", $"timestamp".rowtime(), $"temperature")DDL can also define time attributes and watermarks, e.g.:
val sinkDDL = """
CREATE TABLE dataTable (
id VARCHAR(20) NOT NULL,
ts BIGINT,
temperature DOUBLE,
pt AS PROCTIME()
) WITH (
'connector.type' = 'filesystem',
'connector.path' = 'sensor.txt',
'format.type' = 'csv'
)
""".stripMargin
tableEnv.sqlUpdate(sinkDDL)Signed-in readers can open the original source through BestHub's protected redirect.
This article has been distilled and summarized from source material, then republished for learning and reference. If you believe it infringes your rights, please contactand we will review it promptly.
Big Data Technology & Architecture
Wang Zhiwu, a big data expert, dedicated to sharing big data technology.
How this landed with the community
Was this worth your time?
0 Comments
Thoughtful readers leave field notes, pushback, and hard-won operational detail here.
