Big Data 25 min read

Understanding Flink AggregateFunction, Session Windows, and Timer Mechanisms

This article explains how Flink's DataStream API uses AggregateFunction and session windows, details the MergingWindowAssigner and MergingWindowSet implementations, and demonstrates timer registration and processing with KeyedProcessFunction, providing full code examples and internal workflow analysis.

Big Data Technology & Architecture
Big Data Technology & Architecture
Big Data Technology & Architecture
Understanding Flink AggregateFunction, Session Windows, and Timer Mechanisms

When using Flink's DataStream API, the aggregate() operator, AggregateFunction, and KeyedProcessFunction are common building blocks for stateful stream processing.

The AggregateFunction interface requires four methods:

public interface AggregateFunction<IN, ACC, OUT> extends Function, Serializable {
    ACC createAccumulator();
    ACC add(IN value, ACC accumulator);
    OUT getResult(ACC accumulator);
    ACC merge(ACC a, ACC b);
}

The merge() method is primarily used for session windows, which create windows based on gaps between events rather than fixed time intervals.

Session windows are assigned with EventTimeSessionWindows.withGap(Time.seconds(gap)), and overlapping windows are merged using a MergingWindowAssigner:

public abstract class MergingWindowAssigner<T, W extends Window> extends WindowAssigner<T, W> {
    public abstract void mergeWindows(Collection<W> windows, MergeCallback<W> callback);
    public interface MergeCallback<W> {
        void merge(Collection<W> toBeMerged, W mergeResult);
    }
}

The actual merge logic is implemented in TimeWindow.mergeWindows:

public static void mergeWindows(Collection<TimeWindow> windows, MergingWindowAssigner.MergeCallback<TimeWindow> c) {
    // sort windows by start time and merge overlapping windows
    List<TimeWindow> sortedWindows = new ArrayList<>(windows);
    Collections.sort(sortedWindows, (o1, o2) -> Long.compare(o1.getStart(), o2.getStart()));
    // ... merging logic using intersects() and cover()
}

To handle both temporal and state merging, Flink uses MergingWindowSet, which maps each window to a state‑holding window:

// Mapping from window to the window that keeps the window state...
private final Map<W, W> mapping;
private final Map<W, W> initialMapping;
private final ListState<Tuple2<W, W>> state;

The core addWindow method adds a new window and triggers merges when necessary:

public W addWindow(W newWindow, MergeFunction<W> mergeFunction) throws Exception {
    // collect existing windows and the new one
    List<W> windows = new ArrayList<>(this.mapping.keySet());
    windows.add(newWindow);
    // invoke MergingWindowAssigner.mergeWindows and collect merge results
    // ... handle mergeResult, mergedWindows, mergedStateWindow, mergedStateWindows
    // update mappings and state accordingly
    return resultWindow;
}

Key variables in the merge process are:

mergeResult – the temporal result of merging windows;

mergedWindows – the set of windows being merged;

mergedStateWindow – the chosen state‑holding window for the result;

mergedStateWindows – all state windows that participated in the merge.

In WindowOperator.processElement, Flink obtains the windows for each element, calls MergingWindowSet.addWindow, and updates triggers and state:

@Override
public void processElement(StreamRecord<IN> element) throws Exception {
    Collection<W> elementWindows = windowAssigner.assignWindows(...);
    if (windowAssigner instanceof MergingWindowAssigner) {
        MergingWindowSet<W> mergingWindows = getMergingWindowSet();
        for (W window : elementWindows) {
            W actualWindow = mergingWindows.addWindow(window, new MergingWindowSet.MergeFunction<W>() {
                @Override
                public void merge(W mergeResult, Collection<W> mergedWindows, W stateWindowResult, Collection<W> mergedStateWindows) {
                    // update trigger timers and clear old windows
                    // merge state namespaces
                }
            });
            // handle late windows, update state, fire triggers, etc.
        }
        mergingWindows.persist();
    }
}

The merge of accumulator state is performed by the internal merging state implementations. For heap‑based state:

@Override
public void mergeNamespaces(N target, Collection<N> sources) throws Exception {
    // merge source states into a single accumulator
    SV merged = null;
    for (N source : sources) {
        SV sourceState = map.removeAndGetOld(source);
        merged = (merged != null && sourceState != null) ? mergeState(merged, sourceState) : sourceState;
    }
    if (merged != null) {
        map.transform(target, merged, mergeTransformation);
    }
}

@Override
protected ACC mergeState(ACC a, ACC b) {
    return aggregateTransformation.aggFunction.merge(a, b);
}

For RocksDB‑backed state:

@Override
public void mergeNamespaces(N target, Collection<N> sources) {
    ACC current = null;
    for (N source : sources) {
        // read, delete, and merge serialized accumulator values
        // using aggFunction.merge(...)
    }
    // write merged accumulator back to target namespace
}

Flink timers are accessed via KeyedProcessFunction. Processing‑time timers fire when the system clock exceeds the registered timestamp, while event‑time timers fire when the watermark passes the timestamp.

// Example KeyedProcessFunction that registers a processing‑time timer
static class CountWithTimeoutFunction extends KeyedProcessFunction<Tuple, Tuple2<String, Integer>, Tuple2<String, Long>> {
    private ValueState<CountWithTimestamp> state;
    @Override
    public void open(Configuration parameters) {
        state = getRuntimeContext().getState(new ValueStateDescriptor<>("myState", CountWithTimestamp.class));
    }
    @Override
    public void processElement(Tuple2<String, Integer> value, Context ctx, Collector<Tuple2<String, Long>> out) {
        // update count and lastModified, then register timer
        long timer = ctx.timestamp() + 10000L;
        ctx.timerService().registerProcessingTimeTimer(timer);
    }
    @Override
    public void onTimer(long timestamp, OnTimerContext ctx, Collector<Tuple2<String, Long>> out) {
        // emit result if timeout occurred
    }
}

Timer registration uses a priority queue (a min‑heap). When a timer is added, if it becomes the new head of the heap, Flink schedules a delayed task via ScheduledThreadPoolExecutor to trigger the timer at the correct time.

public void registerProcessingTimeTimer(long time) {
    if (processingTimeTimersQueue.add(new TimerHeapInternalTimer<>(time, key, namespace))) {
        // if new timer is the head, reschedule the executor
    }
}

During execution, InternalTimerServiceImpl.onProcessingTime repeatedly polls the heap, invokes the user’s onTimer callback for each due timer, and re‑schedules the next earliest timer.

while ((timer = processingTimeTimersQueue.peek()) != null && timer.getTimestamp() <= time) {
    processingTimeTimersQueue.poll();
    keyContext.setCurrentKey(timer.getKey());
    triggerTarget.onProcessingTime(timer);
}
// schedule next timer if any

Event‑time timers are driven by watermarks instead of the executor, but the overall merging and state‑update logic is analogous.

Flink persists the timer heap during checkpoints so that timers survive failures, and it synchronizes access to state between processElement and onTimer to avoid race conditions.

Original Source

Signed-in readers can open the original source through BestHub's protected redirect.

Sign in to view source
Republication Notice

This article has been distilled and summarized from source material, then republished for learning and reference. If you believe it infringes your rights, please contactadmin@besthub.devand we will review it promptly.

FlinktimerstateMergingWindowAssignerAggregateFunctionKeyedProcessFunctionSessionWindow
Big Data Technology & Architecture
Written by

Big Data Technology & Architecture

Wang Zhiwu, a big data expert, dedicated to sharing big data technology.

0 followers
Reader feedback

How this landed with the community

Sign in to like

Rate this article

Was this worth your time?

Sign in to rate
Discussion

0 Comments

Thoughtful readers leave field notes, pushback, and hard-won operational detail here.