Understanding ProcessFunction and CoProcessFunction in Apache Flink
This article explains Apache Flink's ProcessFunction and CoProcessFunction, detailing their use of events, state, and timers, compares event‑time and processing‑time semantics, and provides a complete Java example illustrating timer registration, onTimer handling, and debugging observations.
Understanding ProcessFunction and CoProcessFunction in Apache Flink
Flink provides two low‑level stream operators: ProcessFunction for DataStream and KeyedProcessFunction (formerly also ProcessFunction) for KeyedStream. These functions give access to keyed state and timers, enabling fine‑grained event processing.
Key Concepts
Events – the elements flowing through the stream.
State – fault‑tolerant, consistent storage that is only available for keyed streams.
Timers – callbacks that can be registered for event time or processing time, also limited to keyed streams.
The processElement(...) method receives a Context object, which provides the element’s timestamp, a TimerService for registering timers, and side‑output capabilities. When a timer fires, the onTimer(...) method is invoked, and the state is scoped to the key that created the timer.
CoProcessFunction – implementing a join
A typical pattern for a low‑level join using CoProcessFunction includes:
Create state for one or both input streams.
Update the state when elements arrive.
When an element arrives from the other stream, update state and emit the joined result.
KeyedProcessFunction
KeyedProcessFunctionextends ProcessFunction and allows retrieving the current key inside onTimer via context.getCurrentKey().
Timer Types
Flink’s TimerService manages two timer types – event‑time and processing‑time – in a queue. Timers are de‑duplicated by the pair (key, timestamp), so only one timer fires for a given key‑timestamp combination.
Example: WordCount with Timeout
The following Java program demonstrates a ProcessFunction that counts occurrences of a key and emits the count if the key does not receive an update for one minute of event time.
public class ProcessFunctionExample {
// 1. ValueState holds count, key and last modified time
// 2. Each incoming record increments the count and updates the timestamp
// 3. A timer is registered for event‑time + 9 seconds
// 4. onTimer checks if the current timestamp matches the stored timestamp and emits the result
private static class StreamDataSource extends RichParallelSourceFunction<Tuple3<String, Long, Long>> {
private volatile boolean running = true;
@Override
public void run(SourceContext<Tuple3<String, Long, Long>> ctx) throws Exception {
Tuple3<String, Long, Long>[] elements = new Tuple3[]{
Tuple3.of("a", 1L, 1000000050000L),
Tuple3.of("a", 1L, 1000000054000L),
Tuple3.of("a", 1L, 1000000079900L),
Tuple3.of("a", 1L, 1000000115000L),
Tuple3.of("b", 1L, 1000000100000L),
Tuple3.of("b", 1L, 1000000108000L)
};
int count = 0;
while (running && count < elements.length) {
ctx.collect(new Tuple3<>(elements[count].f0, elements[count].f1, elements[count].f2));
count++;
Thread.sleep(10000);
}
}
@Override
public void cancel() { running = false; }
}
public static class CountWithTimestamp {
public String key;
public long count;
public long lastModified;
@Override public String toString() { return "CountWithTimestamp{key='"+key+"', count="+count+", lastModified="+new Date(lastModified)+"}"; }
}
public static class CountWithTimeoutFunction extends ProcessFunction<Tuple2<String, Long>, Tuple2<String, Long>> {
private ValueState<CountWithTimestamp> state;
@Override public void open(Configuration parameters) throws Exception {
state = getRuntimeContext().getState(new ValueStateDescriptor<>("myState", CountWithTimestamp.class));
}
@Override public void processElement(Tuple2<String, Long> input, Context ctx, Collector<Tuple2<String, Long>> out) throws Exception {
CountWithTimestamp current = state.value();
if (current == null) { current = new CountWithTimestamp(); current.key = input.f0; }
current.count++;
current.lastModified = ctx.timestamp();
System.out.println("Element " + input.f0 + " event time: " + new Date(current.lastModified));
state.update(current);
ctx.timerService().registerEventTimeTimer(current.lastModified + 9000);
System.out.println("Timer set for: " + new Date(current.lastModified + 9000));
}
@Override public void onTimer(long timestamp, OnTimerContext ctx, Collector<Tuple2<String, Long>> out) throws Exception {
CountWithTimestamp res = state.value();
System.out.println("Timer fired at " + new Date(timestamp) + ", state: " + res);
if (timestamp >= res.lastModified + 9000) {
System.out.println("Emitting result for key " + res.key);
out.collect(new Tuple2<>(res.key, res.count));
}
}
}
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<Tuple2<String, Long>> data = env.addSource(new StreamDataSource())
.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<Tuple3<String, Long, Long>>(Time.milliseconds(0)) {
@Override public long extractTimestamp(Tuple3<String, Long, Long> element) { return element.f2; }
})
.map(t -> new Tuple2<>(t.f0, t.f1));
data.keyBy(0).process(new CountWithTimeoutFunction()).print();
env.execute();
}
}The experiment shows that when the job runs in event‑time mode, timers fire as soon as the watermark surpasses the registered timestamp. In processing‑time mode, timers only fire after all elements have been processed because the watermark is generated from the last observed event.
Key takeaways:
Event‑time semantics require proper watermark generation; otherwise, timers may appear to fire only after the source finishes.
Processing‑time does not need watermarks, and timers are based on the system clock.
Understanding the difference is crucial for building correct timeout or join logic in Flink.
Finally, the article encourages readers to like, bookmark, and share the content.
Signed-in readers can open the original source through BestHub's protected redirect.
This article has been distilled and summarized from source material, then republished for learning and reference. If you believe it infringes your rights, please contactand we will review it promptly.
Big Data Technology & Architecture
Wang Zhiwu, a big data expert, dedicated to sharing big data technology.
How this landed with the community
Was this worth your time?
0 Comments
Thoughtful readers leave field notes, pushback, and hard-won operational detail here.
