Custom Count Trigger with Timeout for Apache Flink Windows
This article explains how to create a custom Apache Flink trigger that fires a window either when a specified element count is reached or when a time limit expires, includes the full Java implementation and a usage example with a 10‑second timeout and a 1000‑element threshold.
Apache Flink provides two basic window types, TimeWindow (triggered by time) and CountWindow (triggered by element count).
When you need a window that fires as soon as the configured time elapses, but also wants to fire early if a certain number of elements have already arrived, or to fire even if the count is not reached by the deadline, a custom trigger is required.
The following Java class CountTriggerWithTimeout implements such a trigger. It extends Trigger<T, TimeWindow> and uses a ReducingState to keep the current element count, a maximum count threshold, and a TimeCharacteristic (event time or processing time). The trigger fires and purges the window when either the count reaches the maximum or the time limit is passed, handling element, processing‑time, and event‑time callbacks accordingly.
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.common.state.ReducingState;
import org.apache.flink.api.common.state.ReducingStateDescriptor;
import org.apache.flink.api.common.typeutils.base.LongSerializer;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.windowing.triggers.Trigger;
import org.apache.flink.streaming.api.windowing.triggers.TriggerResult;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Trigger with timeout for count windows
*/
public class CountTriggerWithTimeout<T> extends Trigger<T, TimeWindow> {
private static Logger LOG = LoggerFactory.getLogger(CountTriggerWithTimeout.class);
/** maximum number of elements */
private int maxCount;
/** event time / process time */
private TimeCharacteristic timeType;
/** state to store current count */
private ReducingStateDescriptor<Long> countStateDescriptor =
new ReducingStateDescriptor("counter", new Sum(), LongSerializer.INSTANCE);
public CountTriggerWithTimeout(int maxCount, TimeCharacteristic timeType) {
this.maxCount = maxCount;
this.timeType = timeType;
}
private TriggerResult fireAndPurge(TimeWindow window, TriggerContext ctx) throws Exception {
clear(window, ctx);
return TriggerResult.FIRE_AND_PURGE;
}
@Override
public TriggerResult onElement(T element, long timestamp, TimeWindow window, TriggerContext ctx) throws Exception {
ReducingState<Long> countState = ctx.getPartitionedState(countStateDescriptor);
countState.add(1L);
if (countState.get() >= maxCount) {
LOG.info("fire with count: " + countState.get());
return fireAndPurge(window, ctx);
}
if (timestamp >= window.getEnd()) {
LOG.info("fire with time: " + timestamp);
return fireAndPurge(window, ctx);
} else {
return TriggerResult.CONTINUE;
}
}
@Override
public TriggerResult onProcessingTime(long time, TimeWindow window, TriggerContext ctx) throws Exception {
if (timeType != TimeCharacteristic.ProcessingTime) {
return TriggerResult.CONTINUE;
}
if (time >= window.getEnd()) {
return TriggerResult.CONTINUE;
} else {
LOG.info("fire with process time: " + time);
return fireAndPurge(window, ctx);
}
}
@Override
public TriggerResult onEventTime(long time, TimeWindow window, TriggerContext ctx) throws Exception {
if (timeType != TimeCharacteristic.EventTime) {
return TriggerResult.CONTINUE;
}
if (time >= window.getEnd()) {
return TriggerResult.CONTINUE;
} else {
LOG.info("fire with event time: " + time);
return fireAndPurge(window, ctx);
}
}
@Override
public void clear(TimeWindow window, TriggerContext ctx) throws Exception {
ReducingState<Long> countState = ctx.getPartitionedState(countStateDescriptor);
countState.clear();
}
/** counting method */
class Sum implements ReduceFunction<Long> {
@Override
public Long reduce(Long value1, Long value2) throws Exception {
return value1 + value2;
}
}
}Example usage with a 10‑second timeout and a maximum of 1000 elements:
stream
.timeWindowAll(Time.seconds(10))
.trigger(new CountTriggerWithTimeout(1000, TimeCharacteristic.ProcessingTime))
.process(new XxxxWindowProcessFunction())
.addSink(new XxxSinkFunction())
.name("Xxx");That completes the implementation.
Signed-in readers can open the original source through BestHub's protected redirect.
This article has been distilled and summarized from source material, then republished for learning and reference. If you believe it infringes your rights, please contactand we will review it promptly.
Big Data Technology & Architecture
Wang Zhiwu, a big data expert, dedicated to sharing big data technology.
How this landed with the community
Was this worth your time?
0 Comments
Thoughtful readers leave field notes, pushback, and hard-won operational detail here.
