Understanding Flink Window Table-Valued Functions (TVF) and Incremental Optimization
This article explains the concept of window table-valued functions in Flink, compares the old grouped‑window syntax with the new TVF syntax, details the physical and execution plans, introduces sliced windows for state reduction, and presents a small incremental‑output improvement with code examples.
Table‑valued functions (TVF) return a table and are common in databases such as Oracle and SQL Server; Flink 1.13 introduced window TVFs via FLIP‑145 to replace the older grouped‑window syntax.
Before 1.13, a 10‑second tumbling window aggregation required a SQL statement using SELECT TUMBLE_START(...), ... GROUP BY TUMBLE(...). In 1.13 the same logic can be expressed more concisely with
SELECT window_start, window_end, ... FROM TABLE(TUMBLE(TABLE source, DESCRIPTOR(procTime), INTERVAL '10' SECONDS)) GROUP BY window_start, window_end, merchandiseId.
The window TVF extends Calcite's SqlWindowTableFunction by adding three columns— window_start, window_end, and window_time. Validation of TVF operands is performed by AbstractOperandMetadata and its implementation OperandMetadataImpl.
When a window TVF is used together with aggregation or Top‑N, Flink generates a logical plan that includes a LogicalTableFunctionScan and a LogicalAggregate. The plan is transformed by three rules: StreamPhysicalWindowTableFunctionRule (converter), StreamPhysicalWindowAggregateRule (converter), and PullUpWindowTableFunctionIntoWindowAggregateRule (rel‑opt). The optimized physical and execution plans contain StreamPhysicalWindowAggregate and StreamExecWindowAggregate nodes.
To avoid state explosion from fine‑grained sliding windows, Flink adopts a sliced‑window approach. A slice is a sub‑window that can be shared across multiple windows; the SliceAssigner hierarchy (including CumulativeSliceAssigner) determines slice boundaries. The SlicingWindowOperator wraps a SlicingWindowProcessor, which consists of three key components: WindowBuffer (in‑memory cache), WindowValueState (state schema [key, window_end, accumulator]), and NamespaceAggsHandleFunction generated by AggsHandlerCodeGenerator. The processor’s processElement method assigns slices, registers timers, and buffers elements, while the merge method combines slice accumulators into the final window state.
@Override
public boolean processElement(RowData key, RowData element) throws Exception {
long sliceEnd = sliceAssigner.assignSliceEnd(element, clockService);
if (!isEventTime) {
windowTimerService.registerProcessingTimeWindowTimer(sliceEnd);
}
if (isEventTime && isWindowFired(sliceEnd, currentProgress, shiftTimeZone)) {
long lastWindowEnd = sliceAssigner.getLastWindowEnd(sliceEnd);
if (isWindowFired(lastWindowEnd, currentProgress, shiftTimeZone)) {
return true;
} else {
windowBuffer.addElement(key, sliceStateMergeTarget(sliceEnd), element);
long unfiredFirstWindow = sliceEnd;
while (isWindowFired(unfiredFirstWindow, currentProgress, shiftTimeZone)) {
unfiredFirstWindow += windowInterval;
}
windowTimerService.registerEventTimeWindowTimer(unfiredFirstWindow);
return false;
}
} else {
windowBuffer.addElement(key, sliceEnd, element);
return false;
}
} @Override
public void merge(@Nullable Long mergeResult, Iterable<Long> toBeMerged) throws Exception {
final RowData acc;
if (mergeResult == null) {
acc = aggregator.createAccumulators();
} else {
RowData stateAcc = windowState.value(mergeResult);
if (stateAcc == null) {
acc = aggregator.createAccumulators();
} else {
acc = stateAcc;
}
}
aggregator.setAccumulators(mergeResult, acc);
for (Long slice : toBeMerged) {
RowData sliceAcc = windowState.value(slice);
if (sliceAcc != null) {
aggregator.merge(slice, sliceAcc);
}
}
if (mergeResult != null) {
windowState.update(mergeResult, aggregator.getAccumulators());
}
}To reduce downstream traffic, a small invasive change adds a boolean INCREMENTAL flag to the cumulative window TVF. When enabled, the fireWindow method only emits results that differ from the previously stored state, as shown below.
@Override
public void fireWindow(Long windowEnd) throws Exception {
sliceSharedAssigner.mergeSlices(windowEnd, this);
RowData aggResult = aggregator.getValue(windowEnd);
if (!isWindowEmpty()) {
if (sliceSharedAssigner instanceof CumulativeSliceAssigner && ((CumulativeSliceAssigner) sliceSharedAssigner).isIncremental()) {
RowData stateValue = windowState.value(windowEnd);
if (stateValue == null || !stateValue.equals(aggResult)) {
collect(aggResult);
}
} else {
collect(aggResult);
}
}
// register next window timer if needed
}This optimization introduces additional state access overhead, which will be evaluated with performance testing.
Signed-in readers can open the original source through BestHub's protected redirect.
This article has been distilled and summarized from source material, then republished for learning and reference. If you believe it infringes your rights, please contactand we will review it promptly.
Big Data Technology & Architecture
Wang Zhiwu, a big data expert, dedicated to sharing big data technology.
How this landed with the community
Was this worth your time?
0 Comments
Thoughtful readers leave field notes, pushback, and hard-won operational detail here.
