Big Data 5 min read

Custom Count Trigger with Timeout for Apache Flink Windows

This article explains how to create a custom Apache Flink trigger that fires a window either when a specified element count is reached or when a time limit expires, includes the full Java implementation and a usage example with a 10‑second timeout and a 1000‑element threshold.

Big Data Technology & Architecture
Big Data Technology & Architecture
Big Data Technology & Architecture
Custom Count Trigger with Timeout for Apache Flink Windows

Apache Flink provides two basic window types, TimeWindow (triggered by time) and CountWindow (triggered by element count).

When you need a window that fires as soon as the configured time elapses, but also wants to fire early if a certain number of elements have already arrived, or to fire even if the count is not reached by the deadline, a custom trigger is required.

The following Java class CountTriggerWithTimeout implements such a trigger. It extends Trigger<T, TimeWindow> and uses a ReducingState to keep the current element count, a maximum count threshold, and a TimeCharacteristic (event time or processing time). The trigger fires and purges the window when either the count reaches the maximum or the time limit is passed, handling element, processing‑time, and event‑time callbacks accordingly.

import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.common.state.ReducingState;
import org.apache.flink.api.common.state.ReducingStateDescriptor;
import org.apache.flink.api.common.typeutils.base.LongSerializer;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.windowing.triggers.Trigger;
import org.apache.flink.streaming.api.windowing.triggers.TriggerResult;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
 * Trigger with timeout for count windows
 */
public class CountTriggerWithTimeout<T> extends Trigger<T, TimeWindow> {
    private static Logger LOG = LoggerFactory.getLogger(CountTriggerWithTimeout.class);
    /** maximum number of elements */
    private int maxCount;
    /** event time / process time */
    private TimeCharacteristic timeType;
    /** state to store current count */
    private ReducingStateDescriptor<Long> countStateDescriptor =
            new ReducingStateDescriptor("counter", new Sum(), LongSerializer.INSTANCE);

    public CountTriggerWithTimeout(int maxCount, TimeCharacteristic timeType) {
        this.maxCount = maxCount;
        this.timeType = timeType;
    }

    private TriggerResult fireAndPurge(TimeWindow window, TriggerContext ctx) throws Exception {
        clear(window, ctx);
        return TriggerResult.FIRE_AND_PURGE;
    }

    @Override
    public TriggerResult onElement(T element, long timestamp, TimeWindow window, TriggerContext ctx) throws Exception {
        ReducingState<Long> countState = ctx.getPartitionedState(countStateDescriptor);
        countState.add(1L);
        if (countState.get() >= maxCount) {
            LOG.info("fire with count: " + countState.get());
            return fireAndPurge(window, ctx);
        }
        if (timestamp >= window.getEnd()) {
            LOG.info("fire with time: " + timestamp);
            return fireAndPurge(window, ctx);
        } else {
            return TriggerResult.CONTINUE;
        }
    }

    @Override
    public TriggerResult onProcessingTime(long time, TimeWindow window, TriggerContext ctx) throws Exception {
        if (timeType != TimeCharacteristic.ProcessingTime) {
            return TriggerResult.CONTINUE;
        }
        if (time >= window.getEnd()) {
            return TriggerResult.CONTINUE;
        } else {
            LOG.info("fire with process time: " + time);
            return fireAndPurge(window, ctx);
        }
    }

    @Override
    public TriggerResult onEventTime(long time, TimeWindow window, TriggerContext ctx) throws Exception {
        if (timeType != TimeCharacteristic.EventTime) {
            return TriggerResult.CONTINUE;
        }
        if (time >= window.getEnd()) {
            return TriggerResult.CONTINUE;
        } else {
            LOG.info("fire with event time: " + time);
            return fireAndPurge(window, ctx);
        }
    }

    @Override
    public void clear(TimeWindow window, TriggerContext ctx) throws Exception {
        ReducingState<Long> countState = ctx.getPartitionedState(countStateDescriptor);
        countState.clear();
    }

    /** counting method */
    class Sum implements ReduceFunction<Long> {
        @Override
        public Long reduce(Long value1, Long value2) throws Exception {
            return value1 + value2;
        }
    }
}

Example usage with a 10‑second timeout and a maximum of 1000 elements:

stream
        .timeWindowAll(Time.seconds(10))
        .trigger(new CountTriggerWithTimeout(1000, TimeCharacteristic.ProcessingTime))
        .process(new XxxxWindowProcessFunction())
        .addSink(new XxxSinkFunction())
        .name("Xxx");

That completes the implementation.

Original Source

Signed-in readers can open the original source through BestHub's protected redirect.

Sign in to view source
Republication Notice

This article has been distilled and summarized from source material, then republished for learning and reference. If you believe it infringes your rights, please contactadmin@besthub.devand we will review it promptly.

Apache FlinkTimeoutCustom TriggerCount Window
Big Data Technology & Architecture
Written by

Big Data Technology & Architecture

Wang Zhiwu, a big data expert, dedicated to sharing big data technology.

0 followers
Reader feedback

How this landed with the community

Sign in to like

Rate this article

Was this worth your time?

Sign in to rate
Discussion

0 Comments

Thoughtful readers leave field notes, pushback, and hard-won operational detail here.