Big Data 23 min read

Comprehensive Guide to FlinkSQL and Table API: Background, Dependencies, Planners, and Usage

This article provides a detailed introduction to FlinkSQL, covering its background, the Table API, required dependencies, differences between old and Blink planners, various API usage patterns, connector configurations for CSV, Kafka, Elasticsearch, MySQL, and how to convert between DataStream and Table in Flink's unified batch‑stream processing model.

Big Data Technology Architecture
Big Data Technology Architecture
Big Data Technology Architecture
Comprehensive Guide to FlinkSQL and Table API: Background, Dependencies, Planners, and Usage

1. Introduction

After a brief apology for the recent inactivity, the author announces a series of tutorials on FlinkSQL and real‑time data warehousing, inviting readers to follow for technical content.

2. Background of FlinkSQL

FlinkSQL is a SQL‑compatible language designed to simplify Flink's real‑time computation model, lowering the entry barrier for users. It originated from Alibaba's research on stream processing engines, leading to the open‑source Blink project, which contributed the implementation of FlinkSQL.

Compared with traditional stream APIs such as Storm or Spark Streaming, FlinkSQL offers a declarative, optimizable, easy‑to‑learn, stable, and unified batch‑stream interface.

Declarative language: users express requirements without implementation details.

Built‑in optimizers generate optimal execution plans.

Low learning cost across industries.

Stable syntax with decades of SQL history.

Unified batch and stream processing at the runtime level.

3. Overall Introduction

3.1 What are Table API and FlinkSQL?

Flink provides a unified batch‑stream framework; Table API and FlinkSQL are higher‑level APIs that operate on this unified engine. Table API is an embedded query API in Java/Scala, while FlinkSQL allows writing standard SQL statements directly.

3.2 Required Dependencies

Two Maven dependencies are needed for Table API and SQL:

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-table-planner_2.11</artifactId>
    <version>1.10.0</version>
</dependency>
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-table-api-scala-bridge_2.11</artifactId>
    <version>1.10.0</version>
</dependency>

The planner provides the core planning capabilities, while the bridge connects Table API with DataStream/DataSet APIs.

3.3 Differences Between Old Planner and Blink Planner

Blink treats batch jobs as a special case of stream jobs.

Blink does not support BatchTableSource; it uses bounded StreamTableSource instead.

Blink does not support the deprecated ExternalCatalog.

FilterableTableSource implementations differ between the two planners.

String‑based configuration options are only available in Blink.

PlannerConfig implementations differ.

Blink can optimize multiple sinks into a single DAG (only for TableEnvironment).

Blink supports catalog statistics, while the old planner does not.

4. API Calls

4.1 Basic Program Structure

The typical steps are: create an execution environment, define source, transformation, and sink.

val tableEnv = ... // create table execution environment

// create a temporary source table
tableEnv.connect(...).createTemporaryTable("inputTable")

// create a temporary sink table
tableEnv.connect(...).createTemporaryTable("outputTable")

val result = tableEnv.from("inputTable").select(...)
val sqlResult = tableEnv.sqlQuery("SELECT ... FROM inputTable ...")

result.insertInto("outputTable")

4.2 Creating Table Environment

Several ways exist to create a TableEnvironment:

val tableEnv = StreamTableEnvironment.create(env) // default

val settings = EnvironmentSettings.newInstance()
    .useOldPlanner()
    .inStreamingMode()
    .build()
val oldPlannerEnv = StreamTableEnvironment.create(env, settings)

val blinkSettings = EnvironmentSettings.newInstance()
    .useBlinkPlanner()
    .inStreamingMode()
    .build()
val blinkEnv = StreamTableEnvironment.create(env, blinkSettings)

TableEnvironment manages catalogs, registers tables, executes SQL, registers UDFs, and bridges DataStream/DataSet with tables.

4.3 Registering Tables in a Catalog

4.3.1 Table Concept

A table is identified by catalog, database, and object name. It can be a regular table (backed by files, databases, or streams) or a view.

4.3.2 CSV Connector

Register a CSV file as a temporary table:

tableEnv
  .connect(new FileSystem().path("sensor.txt"))
  .withFormat(new OldCsv())
  .withSchema(new Schema()
    .field("id", DataTypes.STRING())
    .field("timestamp", DataTypes.BIGINT())
    .field("temperature", DataTypes.DOUBLE()))
  .createTemporaryTable("inputTable")

The old CSV descriptor will be deprecated; the new Csv() descriptor requires the flink-csv dependency.

4.3.3 Kafka Connector

Register a Kafka source table:

tableEnv.connect(
  new Kafka()
    .version("0.11")
    .topic("sensor")
    .property("zookeeper.connect", "localhost:2181")
    .property("bootstrap.servers", "localhost:9092"))
  .withFormat(new Csv())
  .withSchema(new Schema()
    .field("id", DataTypes.STRING())
    .field("timestamp", DataTypes.BIGINT())
    .field("temperature", DataTypes.DOUBLE()))
  .createTemporaryTable("kafkaInputTable")

Similar connectors exist for Elasticsearch, MySQL, HBase, Hive, etc.

4.4 Table Queries

4.4.1 Table API

Table API uses method chaining:

val sensorTable: Table = tableEnv.from("inputTable")
val resultTable: Table = sensorTable
  .select("id, temperature")
  .filter("id = 'sensor_1'")

4.4.2 SQL

SQL queries are expressed as strings:

val sqlResult: Table = tableEnv.sqlQuery("SELECT id, temperature FROM inputTable WHERE id = 'sensor_1'")

val aggResult: Table = tableEnv.sqlQuery("SELECT id, COUNT(id) AS cnt FROM inputTable GROUP BY id")

4.5 Converting DataStream to Table

Use tableEnv.fromDataStream() to create a Table from a DataStream, optionally renaming fields:

val inputStream: DataStream[String] = env.readTextFile("sensor.txt")
val dataStream: DataStream[SensorReading] = inputStream.map(...)
val sensorTable: Table = tableEnv.fromDataStream(dataStream, 'id, 'timestamp as 'ts, 'temperature)

4.6 Creating Temporary Views

tableEnv.createTemporaryView("sensorView", dataStream)
tableEnv.createTemporaryView("sensorView", dataStream, 'id, 'temperature, 'timestamp as 'ts)
tableEnv.createTemporaryView("sensorView", sensorTable)

4.7 Output Tables

4.7.1 File Sink

tableEnv.connect(new FileSystem().path(".../out.txt"))
  .withFormat(new Csv())
  .withSchema(new Schema()
    .field("id", DataTypes.STRING())
    .field("temp", DataTypes.DOUBLE()))
  .createTemporaryTable("outputTable")

resultSqlTable.insertInto("outputTable")

4.7.2 Update Modes

Flink supports Append, Retract, Upsert, and Update modes for dynamic tables, each defining how insert, delete, and update messages are exchanged with external connectors.

4.7.3 Kafka Sink

tableEnv.connect(new Kafka()
    .version("0.11")
    .topic("sinkTest")
    .property("zookeeper.connect", "localhost:2181")
    .property("bootstrap.servers", "localhost:9092"))
  .withFormat(new Csv())
  .withSchema(new Schema()
    .field("id", DataTypes.STRING())
    .field("temp", DataTypes.DOUBLE()))
  .createTemporaryTable("kafkaOutputTable")

resultTable.insertInto("kafkaOutputTable")

4.7.4 Elasticsearch Sink

tableEnv.connect(new Elasticsearch()
    .version("6")
    .host("localhost", 9200, "http")
    .index("sensor")
    .documentType("temp"))
  .inUpsertMode()
  .withFormat(new Json())
  .withSchema(new Schema()
    .field("id", DataTypes.STRING())
    .field("count", DataTypes.BIGINT()))
  .createTemporaryTable("esOutputTable")

aggResultTable.insertInto("esOutputTable")

4.7.5 MySQL Sink (DDL)

val sinkDDL = """
  |create table jdbcOutputTable (
  |  id varchar(20) not null,
  |  cnt bigint not null
  |) with (
  |  'connector.type' = 'jdbc',
  |  'connector.url' = 'jdbc:mysql://localhost:3306/test',
  |  'connector.table' = 'sensor_count',
  |  'connector.driver' = 'com.mysql.jdbc.Driver',
  |  'connector.username' = 'root',
  |  'connector.password' = '123456'
  |)
  """.stripMargin

tableEnv.sqlUpdate(sinkDDL)
aggResultSqlTable.insertInto("jdbcOutputTable")

4.7.6 Converting Table to DataStream

val resultStream: DataStream[Row] = tableEnv.toAppendStream[Row](resultTable)
val aggResultStream: DataStream[(Boolean, (String, Long))] =
  tableEnv.toRetractStream[(String, Long)](aggResultTable)

resultStream.print("result")
aggResultStream.print("aggResult")

4.7.7 Explain and Execute Query

val explanation: String = tableEnv.explain(resultTable)
println(explanation)

The explanation shows the unoptimized logical plan, the optimized logical plan, and the physical execution plan. Blink always generates a DataStream program, while the old planner may produce DataSet programs for batch queries.

5. References

http://www.atguigu.com/

https://www.bilibili.com/video/BV12k4y1z7LM

https://blog.csdn.net/u013411339/article/details/93267838

6. Summary

The article delivers a thorough, over‑five‑thousand‑word tutorial on FlinkSQL, covering its origin, the Table API, planner differences, API usage, connector configurations, update modes, and how to switch between Table and DataStream representations, aiming to help beginners quickly master real‑time stream processing with Flink.

ConnectorstreamingKafkaFlinkSQLDataStreamScalaTable API
Big Data Technology Architecture
Written by

Big Data Technology Architecture

Exploring Open Source Big Data and AI Technologies

0 followers
Reader feedback

How this landed with the community

login Sign in to like

Rate this article

Was this worth your time?

Sign in to rate
Discussion

0 Comments

Thoughtful readers leave field notes, pushback, and hard-won operational detail here.