Big Data 12 min read

Understanding Flink Window Table-Valued Functions (TVF) and Incremental Optimization

This article explains the concept of window table-valued functions in Flink, compares the old grouped‑window syntax with the new TVF syntax, details the physical and execution plans, introduces sliced windows for state reduction, and presents a small incremental‑output improvement with code examples.

Big Data Technology & Architecture
Big Data Technology & Architecture
Big Data Technology & Architecture
Understanding Flink Window Table-Valued Functions (TVF) and Incremental Optimization

Table‑valued functions (TVF) return a table and are common in databases such as Oracle and SQL Server; Flink 1.13 introduced window TVFs via FLIP‑145 to replace the older grouped‑window syntax.

Before 1.13, a 10‑second tumbling window aggregation required a SQL statement using SELECT TUMBLE_START(...), ... GROUP BY TUMBLE(...). In 1.13 the same logic can be expressed more concisely with

SELECT window_start, window_end, ... FROM TABLE(TUMBLE(TABLE source, DESCRIPTOR(procTime), INTERVAL '10' SECONDS)) GROUP BY window_start, window_end, merchandiseId

.

The window TVF extends Calcite's SqlWindowTableFunction by adding three columns— window_start, window_end, and window_time. Validation of TVF operands is performed by AbstractOperandMetadata and its implementation OperandMetadataImpl.

When a window TVF is used together with aggregation or Top‑N, Flink generates a logical plan that includes a LogicalTableFunctionScan and a LogicalAggregate. The plan is transformed by three rules: StreamPhysicalWindowTableFunctionRule (converter), StreamPhysicalWindowAggregateRule (converter), and PullUpWindowTableFunctionIntoWindowAggregateRule (rel‑opt). The optimized physical and execution plans contain StreamPhysicalWindowAggregate and StreamExecWindowAggregate nodes.

To avoid state explosion from fine‑grained sliding windows, Flink adopts a sliced‑window approach. A slice is a sub‑window that can be shared across multiple windows; the SliceAssigner hierarchy (including CumulativeSliceAssigner) determines slice boundaries. The SlicingWindowOperator wraps a SlicingWindowProcessor, which consists of three key components: WindowBuffer (in‑memory cache), WindowValueState (state schema [key, window_end, accumulator]), and NamespaceAggsHandleFunction generated by AggsHandlerCodeGenerator. The processor’s processElement method assigns slices, registers timers, and buffers elements, while the merge method combines slice accumulators into the final window state.

@Override
public boolean processElement(RowData key, RowData element) throws Exception {
    long sliceEnd = sliceAssigner.assignSliceEnd(element, clockService);
    if (!isEventTime) {
        windowTimerService.registerProcessingTimeWindowTimer(sliceEnd);
    }
    if (isEventTime && isWindowFired(sliceEnd, currentProgress, shiftTimeZone)) {
        long lastWindowEnd = sliceAssigner.getLastWindowEnd(sliceEnd);
        if (isWindowFired(lastWindowEnd, currentProgress, shiftTimeZone)) {
            return true;
        } else {
            windowBuffer.addElement(key, sliceStateMergeTarget(sliceEnd), element);
            long unfiredFirstWindow = sliceEnd;
            while (isWindowFired(unfiredFirstWindow, currentProgress, shiftTimeZone)) {
                unfiredFirstWindow += windowInterval;
            }
            windowTimerService.registerEventTimeWindowTimer(unfiredFirstWindow);
            return false;
        }
    } else {
        windowBuffer.addElement(key, sliceEnd, element);
        return false;
    }
}
@Override
public void merge(@Nullable Long mergeResult, Iterable<Long> toBeMerged) throws Exception {
    final RowData acc;
    if (mergeResult == null) {
        acc = aggregator.createAccumulators();
    } else {
        RowData stateAcc = windowState.value(mergeResult);
        if (stateAcc == null) {
            acc = aggregator.createAccumulators();
        } else {
            acc = stateAcc;
        }
    }
    aggregator.setAccumulators(mergeResult, acc);
    for (Long slice : toBeMerged) {
        RowData sliceAcc = windowState.value(slice);
        if (sliceAcc != null) {
            aggregator.merge(slice, sliceAcc);
        }
    }
    if (mergeResult != null) {
        windowState.update(mergeResult, aggregator.getAccumulators());
    }
}

To reduce downstream traffic, a small invasive change adds a boolean INCREMENTAL flag to the cumulative window TVF. When enabled, the fireWindow method only emits results that differ from the previously stored state, as shown below.

@Override
public void fireWindow(Long windowEnd) throws Exception {
    sliceSharedAssigner.mergeSlices(windowEnd, this);
    RowData aggResult = aggregator.getValue(windowEnd);
    if (!isWindowEmpty()) {
        if (sliceSharedAssigner instanceof CumulativeSliceAssigner && ((CumulativeSliceAssigner) sliceSharedAssigner).isIncremental()) {
            RowData stateValue = windowState.value(windowEnd);
            if (stateValue == null || !stateValue.equals(aggResult)) {
                collect(aggResult);
            }
        } else {
            collect(aggResult);
        }
    }
    // register next window timer if needed
}

This optimization introduces additional state access overhead, which will be evaluated with performance testing.

Original Source

Signed-in readers can open the original source through BestHub's protected redirect.

Sign in to view source
Republication Notice

This article has been distilled and summarized from source material, then republished for learning and reference. If you believe it infringes your rights, please contactadmin@besthub.devand we will review it promptly.

Big DataFlinkStreamingIncremental AggregationWindow TVF
Big Data Technology & Architecture
Written by

Big Data Technology & Architecture

Wang Zhiwu, a big data expert, dedicated to sharing big data technology.

0 followers
Reader feedback

How this landed with the community

Sign in to like

Rate this article

Was this worth your time?

Sign in to rate
Discussion

0 Comments

Thoughtful readers leave field notes, pushback, and hard-won operational detail here.