Apache Flink Table API and SQL Tutorial with Code Examples
This article introduces Apache Flink’s Table API and SQL, explains the TableEnvironment programming model, shows how to register tables and sinks, and provides two complete Java examples—WordCount and a file‑based aggregation—complete with code that can be downloaded for local testing.
Apache Flink provides two relational APIs—Table API and SQL—that unify stream and batch processing, allowing developers to switch seamlessly between DataStream, DataSet, CEP, and Gelly operations.
The programming model centers on TableEnvironment, which is responsible for registering tables, external catalogs, executing SQL queries, registering user‑defined functions, converting DataStream/DataSet to tables, and holding references to the execution environment.
Tables can be registered from an existing Table, a TableSource (e.g., CsvTableSource), or directly from a DataStream/DataSet. Example registration code:
Table projTable = tableEnv.scan("X").select(...);
TableSource csvSource = new CsvTableSource("/path/to/file", ...);
Table table = tableEnv.fromDataSet(tableset);Table sinks allow results to be written to external storage such as CSV, Parquet, Avro, or ORC. Example sink registration:
TableSink csvSink = new CsvTableSink("/path/to/file", ...);
String[] fieldNames = {"a","b","c"};
TypeInformation[] fieldTypes = {Types.INT, Types.STRING, Types.LONG};
tableEnv.registerTableSink("CsvSinkTable", fieldNames, fieldTypes, csvSink);Example 1 – WordCount using Flink SQL shows a Java class WordCountSQL that creates an execution environment, registers a DataSet, runs a SQL query
SELECT word, SUM(frequency) AS frequency FROM WordCount GROUP BY word, converts the result back to a DataSet, and prints it.
public class WordCountSQL {
public static void main(String[] args) throws Exception {
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
BatchTableEnvironment tEnv = TableEnvironment.getTableEnvironment(env);
// ... build DataSet<WC> and register as "WordCount"
Table table = tEnv.sqlQuery("SELECT word, SUM(frequency) as frequency FROM WordCount GROUP BY word");
DataSet<WC> result = tEnv.toDataSet(table, WC.class);
result.print();
}
// WC POJO definition omitted for brevity
}Example 2 – File‑based aggregation reads a text file, maps each line to an Orders POJO, registers it as a table, executes a SQL aggregation
SELECT name, SUM(price) AS total FROM Orders GROUP BY name ORDER BY total DESC, and writes the result to a CSV sink.
public class SQLTest {
public static void main(String[] args) throws Exception {
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
BatchTableEnvironment tableEnv = BatchTableEnvironment.getTableEnvironment(env);
DataSource<String> input = env.readTextFile("test.txt");
DataSet<Orders> topInput = input.map(new MapFunction<String, Orders>() { ... });
Table order = tableEnv.fromDataSet(topInput);
tableEnv.registerTable("Orders", order);
Table sqlQuery = tableEnv.sqlQuery(
"SELECT name, SUM(price) AS total FROM Orders GROUP BY name ORDER BY total DESC");
TableSink sink = new CsvTableSink("SQLTEST.txt", "|");
String[] fieldNames = {"name","total"};
TypeInformation[] fieldTypes = {Types.STRING, Types.DOUBLE};
tableEnv.registerTableSink("SQLTEST", fieldNames, fieldTypes, sink);
sqlQuery.insertInto("SQLTEST");
env.execute();
}
// Orders and Result POJO definitions omitted for brevity
}All code snippets can be downloaded by replying “Flink” to the public account, enabling local execution and debugging.
Signed-in readers can open the original source through BestHub's protected redirect.
This article has been distilled and summarized from source material, then republished for learning and reference. If you believe it infringes your rights, please contactand we will review it promptly.
Big Data Technology & Architecture
Wang Zhiwu, a big data expert, dedicated to sharing big data technology.
How this landed with the community
Was this worth your time?
0 Comments
Thoughtful readers leave field notes, pushback, and hard-won operational detail here.
