Understanding Flink SQL Window Functions: Types, Implementation, and Emit Triggers
This article provides a comprehensive overview of Flink SQL window functions, detailing time‑based window types, their underlying implementation in the StreamExecGroupWindowAggregate operator, the processing flow of WindowOperator, timer handling, emit/trigger strategies, and practical code examples for Tumble, Hop, and Session windows.
Overview
Window is a core mechanism in infinite streams that splits the stream into finite sized windows for aggregation, enabling users to perform complex statistical analysis on continuous data.
Window Types
Flink supports two main categories of windows: TimeWindow (time‑driven) and CountWindow (count‑driven). The article focuses on TimeWindow, which includes three sub‑types:
Tumble Window (fixed, non‑overlapping)
Hop Window (sliding, potentially overlapping)
Session Window (gap‑based, non‑overlapping)
Tumble Window
The assigner places each element into a fixed‑size, non‑overlapping window. Example DDL and INSERT statements illustrate how to define a 5‑minute tumbling window using ROWTIME or PROCTIME.
<code style="font-family: SFMono-Regular, Consolas, 'Liberation Mono', Menlo, Courier, monospace; font-size: 14px; color: wheat;">
CREATE TABLE sessionOrderTableRowtime (
ctime TIMESTAMP,
categoryName VARCHAR,
shopName VARCHAR,
itemName VARCHAR,
userId VARCHAR,
price FLOAT,
action BIGINT,
WATERMARK FOR ctime AS withOffset(ctime, 1000),
proc AS PROCTIME()
) WITH (
`type` = 'kafka',
format = 'json',
updateMode = 'append',
`group.id` = 'groupId',
bootstrap.servers = 'xxxxx:9092',
version = '0.10',
`zookeeper.connect` = 'xxxxx:2181',
startingOffsets = 'latest',
topic = 'sessionsourceproctime'
);
INSERT INTO popwindowsink
SELECT COUNT(*),
TUMBLE_START(ctime, INTERVAL '5' MINUTE),
DATE_FORMAT(TUMBLE_END(ctime, INTERVAL '5' MINUTE), 'yyyy-MM-dd-HH-mm-ss:SSS'),
DATE_FORMAT(TUMBLE_ROWTIME(ctime, INTERVAL '5' MINUTE), 'yyyy-MM-dd-HH-mm-ss:SSS'),
categoryName,
SUM(price)
FROM sessionOrderTableRowtime
GROUP BY TUMBLE(ctime, INTERVAL '5' MINUTE), categoryName;
</code>Hop Window
Sliding windows assign each element to multiple overlapping windows. Example shows a 10‑minute window sliding every 5 minutes.
<code style="font-family: SFMono-Regular, Consolas, 'Liberation Mono', Menlo, Courier, monospace; font-size: 14px; color: wheat;">
INSERT INTO popwindowsink
SELECT COUNT(*),
HOP_START(ctime, INTERVAL '5' MINUTE, INTERVAL '10' MINUTE),
DATE_FORMAT(HOP_END(ctime, INTERVAL '5' MINUTE, INTERVAL '10' MINUTE), 'yyyy-MM-dd-HH-mm-ss:SSS'),
DATE_FORMAT(HOP_ROWTIME(ctime, INTERVAL '5' MINUTE, INTERVAL '10' MINUTE), 'yyyy-MM-dd-HH-mm-ss:SSS'),
categoryName,
SUM(price)
FROM sessionOrderTableRowtime
GROUP BY HOP(ctime, INTERVAL '5' MINUTE, INTERVAL '10' MINUTE), categoryName;
</code>Session Window
Session windows group elements based on activity gaps; they close when no elements arrive for a configured interval.
<code style="font-family: SFMono-Regular, Consolas, 'Liberation Mono', Menlo, Courier, monospace; font-size: 14px; color: wheat;">
INSERT INTO popwindowsink
SELECT COUNT(*),
SESSION_START(ctime, INTERVAL '5' MINUTE),
DATE_FORMAT(SESSION_END(ctime, INTERVAL '5' MINUTE), 'yyyy-MM-dd-HH-mm-ss:SSS'),
DATE_FORMAT(SESSION_ROWTIME(ctime, INTERVAL '5' MINUTE), 'yyyy-MM-dd-HH-mm-ss:SSS'),
categoryName,
SUM(price)
FROM sessionOrderTableRowtime
GROUP BY SESSION(ctime, INTERVAL '5' MINUTE), categoryName;
</code>Window Operator Creation and Processing
The StreamExecGroupWindowAggregate#createWindowOperator method builds a WindowOperator based on the window type, time attribute (ROWTIME or PROCTIME), and aggregation logic. It configures the appropriate WindowAssigner, trigger, and retraction behavior.
<code style="font-family: SFMono-Regular, Consolas, 'Liberation Mono', Menlo, Courier, monospace; font-size: 14px; color: wheat;">
class StreamExecGroupWindowAggregate {
private def createWindowOperator(...): WindowOperator[_, _] = {
val builder = WindowOperatorBuilder.builder()
.withInputFields(inputFields.toArray)
// configure based on window type
val newBuilder = window match {
case TumblingGroupWindow(_, timeField, size) if isProctimeAttribute(timeField) =>
builder.tumble(toDuration(size), timeZoneOffset).withProcessingTime()
case TumblingGroupWindow(_, timeField, size) if isRowtimeAttribute(timeField) =>
builder.tumble(toDuration(size), timeZoneOffset).withEventTime(timeIdx)
case SlidingGroupWindow(_, timeField, size, slide) if isProctimeAttribute(timeField) =>
builder.sliding(toDuration(size), toDuration(slide), timeZoneOffset).withProcessingTime()
// ... other cases
}
if (emitStrategy.produceUpdates) {
newBuilder.withSendRetraction().triggering(emitStrategy.getTrigger)
}
newBuilder
.aggregate(aggsHandler, recordEqualiser, accTypes, aggValueTypes, windowPropertyTypes)
.withAllowedLateness(Duration.ofMillis(emitStrategy.getAllowLateness))
.build()
}
}
</code>The WindowOperator#processElement method extracts the timestamp, assigns the element to affected windows, updates aggregation state, registers cleanup timers, and invokes the trigger to decide whether to emit results.
<code style="font-family: SFMono-Regular, Consolas, 'Liberation Mono', Menlo, Courier, monospace; font-size: 14px; color: wheat;">
public void processElement(StreamRecord<BaseRow> record) throws Exception {
BaseRow inputRow = record.getValue();
long timestamp = windowAssigner.isEventTime() ? inputRow.getLong(rowtimeIndex)
: internalTimerService.currentProcessingTime();
Collection<W> affectedWindows = windowFunction.assignStateNamespace(inputRow, timestamp);
for (W window : affectedWindows) {
windowState.setCurrentNamespace(window);
BaseRow acc = windowState.value();
if (acc == null) acc = windowAggregator.createAccumulators();
windowAggregator.setAccumulators(window, acc);
if (BaseRowUtil.isAccumulateMsg(inputRow)) {
windowAggregator.accumulate(inputRow);
} else {
windowAggregator.retract(inputRow);
}
acc = windowAggregator.getAccumulators();
windowState.update(acc);
}
// trigger handling and timer registration omitted for brevity
}
</code>Emit (Trigger) Strategies
Flink 1.9 introduces the EMIT clause to control output latency and result accuracy. Users can configure early‑fire (BEFORE WATERMARK) and late‑fire (AFTER WATERMARK) strategies via TableConfig or directly in SQL.
Examples:
INSERT INTO result SELECT * FROM tumble_window EMIT WITH DELAY '1' MINUTE BEFORE WATERMARK– emits partial results every minute before the watermark.
INSERT INTO result SELECT * FROM tumble_window EMIT WITH DELAY '1' MINUTE BEFORE WATERMARK, WITHOUT DELAY AFTER WATERMARK– emits every minute before the window ends and emits each late element immediately after the watermark.
The underlying implementation creates appropriate Trigger instances (e.g., ProcessingTimeTriggers.every for periodic emission or ElementTriggers.every for per‑element emission).
Trigger Class Hierarchy
Flink provides several trigger implementations, such as AfterEndOfWindow (default), EveryElement, AfterEndOfWindowNoLate, and AfterFirstElementPeriodic, each corresponding to different EMIT configurations.
Overall, the article walks through the complete lifecycle of a Flink SQL window query—from SQL definition, through physical plan generation, operator creation, element processing, timer handling, to result emission—providing both conceptual explanations and concrete code snippets.
Signed-in readers can open the original source through BestHub's protected redirect.
This article has been distilled and summarized from source material, then republished for learning and reference. If you believe it infringes your rights, please contactand we will review it promptly.
Big Data Technology & Architecture
Wang Zhiwu, a big data expert, dedicated to sharing big data technology.
How this landed with the community
Was this worth your time?
0 Comments
Thoughtful readers leave field notes, pushback, and hard-won operational detail here.
