Streaming SQL with Apache Flink: Theory, Platform Optimizations, and Real‑Time Use Cases
This article introduces Apache Flink's Streaming SQL, explains its theoretical foundations such as the table‑stream relationship and watermark semantics, describes the platform's practical enhancements—including source/sink wrappers, built‑in functions, and native Retract Stream support—and showcases several real‑time computation examples.
1. Streaming SQL Related
Streaming SQL uses SQL syntax to express stream processing, essentially applying relational algebra to continuous data streams.
1.1 Table and Stream Relationship
Traditional relational databases use an Append‑Only write‑ahead log (WAL) before data is persisted. Viewing the WAL as a stream shows that consuming the log builds a table, while a table's change log is naturally a stream.
Table: a static snapshot of data at a point in time.
Stream: dynamic data that evolves over time.
Stream ⇒ Table: integrate ("sum") data changes over time to produce a table.
Table ⇒ Stream: differentiate ("diff") a table snapshot to obtain the underlying change stream.
These operations are inverses in many cases.
1.2 Stream‑Batch Computing and Watermark
Time is a core dimension for stream processing. A watermark provides a completeness guarantee: when the watermark reaches time t , all events with timestamps ≤ t have been observed.
In practice, watermarks are approximated, balancing accuracy and latency.
1.3 Streaming Relational Algebra
SQL relies on relational algebra, which is closed over tables. Extending it to streams adds explicit operators for converting between tables and streams, slightly sacrificing closure.
1.3.1 Time‑Varying Property
Each table snapshot corresponds to a specific timestamp; a sequence of snapshots forms a time‑varying series, and streaming relational algebra applies relational operators to each snapshot.
(Traditional) relational algebra operators are valid in streaming relational algebra. Closure is preserved for relational operators when applied to streams.
1.3.2 Limitations and Execution Boundary
Streaming SQL cannot express every computation because:
SQL's relational model imposes constraints on expressiveness.
Stream‑to‑table conversion requires that stream records conform to the table schema (i.e., the stream must carry a compatible Record type).
Consequently, a valid Streaming SQL job must satisfy three conditions:
The Source can emit a stream that satisfies relational constraints.
The Sink can consume data with relational constraints.
The Transformation can be fully expressed by relational algebra.
1.4 Stream‑Table Conversion and Stream Operation Semantics
Traditional DML operations (INSERT, DELETE, UPDATE) map to stream operation types:
INSERT/Accumulate – add or aggregate.
DELETE/Retract – remove or retract.
For example, MySQL binlog events map as follows:
1. WRITE_ROWS → INSERT
2. UPDATE_ROWS → UPDATE (implemented as DELETE + INSERT)
3. DELETE_ROWS → DELETE1.5 Streaming SQL @ Flink
Flink implements Streaming SQL using a three‑stage pipeline: Source → Transformation → Sink . The SQL is parsed, optimized, and translated into a series of Flink operators.
1.5.1 Window Semantics
TUMBLE – fixed windows.
HOP – sliding windows.
SESSION – session windows.
1.5.2 Stream‑Batch Conversion Operators
fromDataStream – convert a DataStream to a Table.
toAppendStream , toRetractStream – convert a Table back to a DataStream.
1.5.3 Input and Output
Flink provides TableSource and TableSink abstractions for external systems.
1.5.4 Aggregation
Aggregations are expressed with GROUP BY and functions such as SUM , COUNT , or user‑defined aggregate functions (UDAFs).
1.5.5 Stream Types
Type
INSERT
DELETE
Key Required
Append Stream
✓
Retract Stream
✓
✓
Upsert Stream
✓
✓
✓
1.5.6 Stream Join
Flink supports various stream joins, but users must consider state size and accuracy trade‑offs.
2. Platform Practice and Optimizations
2.1 Source & Sink Encapsulation and Optimization
The platform wraps common sources/sinks (Kafka, MySQL/TiDB, Redis, Elasticsearch, HTTP, etc.) and provides configuration parameters for performance tuning.
2.1.1 Kafka Source
Built on the official KafkaConsumerBase , the source adds monitoring metrics and supports multiple data formats (JSON, SPLIT, Protobuf, RAW, BINLOG).
Example JSON format:
{
"action": "click",
"dt": "2020-01-20",
"device_id": "edbdf_ce424452_2"
}Corresponding DDL:
CREATE TABLE `kafka_source_demo` (
`action` STRING,
`dt` STRING,
`device_id` STRING
) WITH (
`type` = 'kafka',
`servers` = '***',
`deserializationType` = 'JSON',
`topicName` = 'test'
);2.1.2 MySQL (TiDB) Sink
The custom MysqlTableSink offers many tunable options:
Option
Description
batchSize
Number of rows per batch write.
flushInterval
Time interval for flushing.
isAsync
Enable asynchronous writes.
isFlushOnCheckpoint
Flush on checkpoint.
isKeyByBeforeSink
Key‑by before sink to reduce write conflicts.
oneSink
Force sink parallelism = 1.
isUpdateOnDuplicateKey
Use UPDATE … ON DUPLICATE KEY instead of REPLACE.
mergeByKey
Merge rows by key before write.
ignoreDelete
Skip DELETE events.
ignoreException
Ignore write exceptions.
2.2 Built‑in Function Support
More than 100 built‑in functions (UDF/UDTF/UDAF) are provided, covering JSON parsing, time conversion, type casting, statistical metrics, window simulation, and string manipulation.
2.2.1 Statistical Functions
count_with_external_redis() – initializes a counter from Redis.
hll_distinct_count(key) – HyperLogLog based approximate distinct count.
udf_tp99(v) – 99th percentile estimation.
Monotonic sequence counters that reset when a supplied compare value increases.
2.2.2 Window Simulation Functions
Scalar function tumble_window_group(ts, windowSize) returns the window start timestamp for a fixed window. Table function slide_window_group(ts, windowSize, slideSize) returns a list of overlapping window timestamps for a sliding window.
2.3 Dimension Table Support
Dimension tables are identified by a special flag in the DDL; during job submission the platform rewrites dimension‑table joins into FlatMap operators.
2.4 Optimization Tools and Strategies
2.4.1 Practical Tools
Interactive SQL console with instant result preview.
Full‑lifecycle job management (deployment, checkpointing, monitoring, alerting).
Integration with the AutoBI visualization system.
2.4.2 Optimization Strategies
SQL Pre‑parsing : the platform parses and validates SQL before Flink's native parser, allowing custom rule injection.
Submit Client Refactoring : a wrapper around Flink’s client provides REST submission, YARN/K8s support, JAR management, built‑in function registration, and load‑balancing.
Source Reuse : identical sources are materialized once to avoid repeated reads.
Temporary View Syntax : supports CREATE VIEW within a session.
Replace Window Aggregation with Regular Aggregation : use time‑handling functions and window‑simulation functions to achieve lower latency and memory usage.
Field Charset Extension : modifies the expression parser to allow special characters (e.g., @timestamp ) in field names.
Micro‑batch : optionally batch writes before the sink to improve throughput when latency requirements allow.
2.5 Embracing Flink 1.9+
After extensive testing, the platform migrated to Flink 1.9+, leveraging its enhanced SQL capabilities and performance improvements.
2.6 Native Loading of Retract Stream
Standard Flink loads Kafka data as an Append Stream, requiring explicit CASE logic for INSERT/UPDATE/DELETE. The platform introduced a native BINLOG_RET deserialization that directly produces a Retract Stream, where:
INSERT → INSERT.
DELETE → RETRACT.
UPDATE → RETRACT of the old row + INSERT of the new row.
DDL example:
CREATE TABLE `kafka_source_demo_ret_1` (
`value` STRING,
`op` STRING
) WITH (
`type` = 'kafka',
`servers` = '***',
`deserializationType` = 'BINLOG_RET',
`topicName` = 'test'
);SQL becomes a simple aggregation:
SELECT SUM(text_to_long(`value`)) FROM `kafka_source_demo_ret_1`;3. Real‑Time Use Cases
3.1 Classic Cases (Simplified)
3.1.1 Real‑Time Metric Calculation
Goal: compute hourly PV and UV per action from Kafka logs and write results to Redis.
CREATE TABLE `kafka_source_demo` (
`action` STRING,
`dt` STRING,
`device_id` STRING
) WITH (
`type` = 'kafka',
`servers` = '***',
`deserializationType` = 'JSON',
`topicName` = 'test'
);
CREATE TABLE `redis_sink_demo` (
`action` STRING,
`dt` STRING,
`pv` BIGINT,
`uv` BIGINT
) WITH (
`type` = 'redis',
`server` = '***',
`valueNames` = 'pv,uv',
`keyType` = 'string',
`keyTemplate` = 'demo_key_${action}_${dt}_'
);
INSERT INTO `redis_sink_demo`
SELECT `dt`, `action`,
COUNT(1) AS `pv`,
hll_distinct_count(`device_id`) AS `uv`
FROM `kafka_source_demo`
GROUP BY `action`, `dt`;3.1.2 Real‑Time ETL
Goal: sync selected fields from a Kafka binlog to a TiDB table with < 5 s latency.
CREATE TABLE `kafka_source_demo_2` (
`biz_type` STRING,
`biz_id` STRING,
`property` STRING
) WITH (
`type` = 'kafka',
`servers` = '***',
`deserializationType` = 'JSON',
`topicName` = 'test2'
);
CREATE TABLE `ti_sink_demo` (
`biz_type` STRING,
`biz_id` STRING,
`property` STRING
) WITH (
`type` = 'mysql',
`url` = 'xxxxx',
`mysqlTableName` = 'test',
`username` = 'xx',
`password` = 'xxxxx',
`mysqlDatabaseName` = 'xx',
`sinkKeyFieldNames` = 'biz_id,biz_type',
`batchSize` = 200,
`flushInterval` = 3000,
`needMerge` = false,
`ignoreDelete` = true,
`specificMysqlSinkExecutionLogicClassName` = 'duplicateKeyUpdate',
`isKeyByBeforeSink` = true
);
INSERT INTO `ti_sink_demo`
SELECT * FROM `kafka_source_demo_2`;3.1.3 Real‑Time Online User Count with Sliding Window
Goal: every 30 s, compute the number of distinct online users per room and write to Redis.
CREATE TABLE `kafka_source_demo_3` (
`room_id` STRING,
`ts` BIGINT,
`device_id` STRING
) WITH (
`type` = 'kafka',
`servers` = '***',
`deserializationType` = 'JSON',
`topicName` = 'test3'
);
CREATE TABLE `redis_sink_demo_3` (
`room_id` STRING,
`window_ts` BIGINT,
`uv` BIGINT
) WITH (
`type` = 'redis',
`server` = '***',
`valueNames` = 'uv',
`noValueName` = true,
`keyType` = 'string',
`keyTemplate` = 'demo_key2_${room_id}_${window_ts}'
);
INSERT INTO `redis_sink_demo_3`
SELECT `room_id`, `window_ts`, COUNT(DISTINCT `device_id`) AS `uv`
FROM `kafka_source_demo_3`,
LATERAL TABLE(`slide_window_group`(`ts`, 60000, 30000)) AS T(`window_ts`)
GROUP BY `room_id`, `window_ts`;Conclusion
The article first clarifies the theoretical basis of Streaming SQL, then details the platform's practical enhancements—including source/sink encapsulation, extensive built‑in functions, native Retract Stream support, and various optimization techniques—and finally demonstrates several concise real‑time scenarios, illustrating how SQL can be used to rapidly solve streaming computation problems and empower users with a Flink‑based real‑time data warehouse.
HomeTech
HomeTech tech sharing
How this landed with the community
Was this worth your time?
0 Comments
Thoughtful readers leave field notes, pushback, and hard-won operational detail here.