Big Data 7 min read

Apache Flink Table API and SQL Tutorial with Code Examples

This article introduces Apache Flink’s Table API and SQL, explains the TableEnvironment programming model, shows how to register tables and sinks, and provides two complete Java examples—WordCount and a file‑based aggregation—complete with code that can be downloaded for local testing.

Big Data Technology & Architecture
Big Data Technology & Architecture
Big Data Technology & Architecture
Apache Flink Table API and SQL Tutorial with Code Examples

Apache Flink provides two relational APIs—Table API and SQL—that unify stream and batch processing, allowing developers to switch seamlessly between DataStream, DataSet, CEP, and Gelly operations.

The programming model centers on TableEnvironment, which is responsible for registering tables, external catalogs, executing SQL queries, registering user‑defined functions, converting DataStream/DataSet to tables, and holding references to the execution environment.

Tables can be registered from an existing Table, a TableSource (e.g., CsvTableSource), or directly from a DataStream/DataSet. Example registration code:

Table projTable = tableEnv.scan("X").select(...);
TableSource csvSource = new CsvTableSource("/path/to/file", ...);
Table table = tableEnv.fromDataSet(tableset);

Table sinks allow results to be written to external storage such as CSV, Parquet, Avro, or ORC. Example sink registration:

TableSink csvSink = new CsvTableSink("/path/to/file", ...);
String[] fieldNames = {"a","b","c"};
TypeInformation[] fieldTypes = {Types.INT, Types.STRING, Types.LONG};
tableEnv.registerTableSink("CsvSinkTable", fieldNames, fieldTypes, csvSink);

Example 1 – WordCount using Flink SQL shows a Java class WordCountSQL that creates an execution environment, registers a DataSet, runs a SQL query

SELECT word, SUM(frequency) AS frequency FROM WordCount GROUP BY word

, converts the result back to a DataSet, and prints it.

public class WordCountSQL {
    public static void main(String[] args) throws Exception {
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
        BatchTableEnvironment tEnv = TableEnvironment.getTableEnvironment(env);
        // ... build DataSet<WC> and register as "WordCount"
        Table table = tEnv.sqlQuery("SELECT word, SUM(frequency) as frequency FROM WordCount GROUP BY word");
        DataSet<WC> result = tEnv.toDataSet(table, WC.class);
        result.print();
    }
    // WC POJO definition omitted for brevity
}

Example 2 – File‑based aggregation reads a text file, maps each line to an Orders POJO, registers it as a table, executes a SQL aggregation

SELECT name, SUM(price) AS total FROM Orders GROUP BY name ORDER BY total DESC

, and writes the result to a CSV sink.

public class SQLTest {
    public static void main(String[] args) throws Exception {
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
        BatchTableEnvironment tableEnv = BatchTableEnvironment.getTableEnvironment(env);
        DataSource<String> input = env.readTextFile("test.txt");
        DataSet<Orders> topInput = input.map(new MapFunction<String, Orders>() { ... });
        Table order = tableEnv.fromDataSet(topInput);
        tableEnv.registerTable("Orders", order);
        Table sqlQuery = tableEnv.sqlQuery(
            "SELECT name, SUM(price) AS total FROM Orders GROUP BY name ORDER BY total DESC");
        TableSink sink = new CsvTableSink("SQLTEST.txt", "|");
        String[] fieldNames = {"name","total"};
        TypeInformation[] fieldTypes = {Types.STRING, Types.DOUBLE};
        tableEnv.registerTableSink("SQLTEST", fieldNames, fieldTypes, sink);
        sqlQuery.insertInto("SQLTEST");
        env.execute();
    }
    // Orders and Result POJO definitions omitted for brevity
}

All code snippets can be downloaded by replying “Flink” to the public account, enabling local execution and debugging.

Original Source

Signed-in readers can open the original source through BestHub's protected redirect.

Sign in to view source
Republication Notice

This article has been distilled and summarized from source material, then republished for learning and reference. If you believe it infringes your rights, please contactadmin@besthub.devand we will review it promptly.

JavaBig DataFlinkSQLDataStreamTable API
Big Data Technology & Architecture
Written by

Big Data Technology & Architecture

Wang Zhiwu, a big data expert, dedicated to sharing big data technology.

0 followers
Reader feedback

How this landed with the community

Sign in to like

Rate this article

Was this worth your time?

Sign in to rate
Discussion

0 Comments

Thoughtful readers leave field notes, pushback, and hard-won operational detail here.