Understanding Time Semantics, Windows, and Process Functions in Apache Flink
This article explains how to define time characteristics, assign timestamps and watermarks, use Flink's window API, implement custom process functions, side outputs, and triggers, and handle late events, providing both Scala and Java code examples for real‑time stream processing.
We start by introducing Flink's time attributes—ProcessingTime, EventTime, and IngestionTime—explaining their semantics and when to use each. ProcessingTime relies on the machine clock, EventTime uses timestamps embedded in events together with watermarks, and IngestionTime is a hybrid of both.
To set the time characteristic, use:
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment;<br/>env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);Assigning timestamps and generating watermarks can be done with a TimestampAssigner or built‑in assigners. Example of a periodic watermark assigner in Scala:
class PeriodicAssigner extends AssignerWithPeriodicWatermarks[SensorReading] {<br/> val bound = 60 * 1000 // 1 minute delay<br/> var maxTs = Long.MinValue + bound + 1<br/> override def getCurrentWatermark: Watermark = new Watermark(maxTs - bound - 1)<br/> override def extractTimestamp(r: SensorReading, previousTS: Long): Long = {<br/> maxTs = maxTs.max(r.timestamp)<br/> r.timestamp<br/> }<br/>}For monotonously increasing timestamps you can use .assignAscendingTimestamps:
val stream = ...<br/>val withTimestampsAndWatermarks = stream.assignAscendingTimestamps(e => e.timestamp)Flink provides eight low‑level ProcessFunction variants, such as KeyedProcessFunction, which gives access to element timestamps, keys, and timer services. A typical keyed process function that raises an alert when temperature rises continuously for one second looks like:
val warnings = readings.keyBy(r => r.id).process(new TempIncreaseAlertFunction) class TempIncreaseAlertFunction extends KeyedProcessFunction[String, SensorReading, String] {<br/> lazy val lastTemp: ValueState[Double] = getRuntimeContext.getState(new ValueStateDescriptor[Double]("last-temp", Types.of[Double]))<br/> lazy val timer: ValueState[Long] = getRuntimeContext.getState(new ValueStateDescriptor[Long]("timer", Types.of[Long]))<br/> override def processElement(value: SensorReading, ctx: Context, out: Collector[String]): Unit = {<br/> val prevTemp = lastTemp.value()<br/> lastTemp.update(value.temperature)<br/> val ts = timer.value()<br/> if (prevTemp == 0.0 || value.temperature < prevTemp) {<br/> ctx.timerService().deleteProcessingTimeTimer(ts)<br/> timer.clear()<br/> } else if (value.temperature > prevTemp && ts == 0) {<br/> val oneSecondLater = ctx.timerService().currentProcessingTime() + 1000L<br/> ctx.timerService().registerProcessingTimeTimer(oneSecondLater)<br/> timer.update(oneSecondLater)<br/> }<br/> }<br/> override def onTimer(timestamp: Long, ctx: OnTimerContext, out: Collector[String]): Unit = {<br/> out.collect(s"Sensor ${ctx.getCurrentKey} temperature rose for 1s!")<br/> timer.clear()<br/> }<br/>}Side outputs allow a process function to emit events to a separate stream. Example:
object SideOutputExample {<br/> val output = new OutputTag[String]("side-output")<br/> def main(args: Array[String]): Unit = {<br/> val env = StreamExecutionEnvironment.getExecutionEnvironment<br/> val stream = env.addSource(new SensorSource)<br/> val warnings = stream.process(new FreezingAlarm)<br/> warnings.print()<br/> warnings.getSideOutput(output).print()<br/> env.execute()<br/> }<br/> class FreezingAlarm extends ProcessFunction[SensorReading, SensorReading] {<br/> override def processElement(value: SensorReading, ctx: Context, out: Collector[SensorReading]): Unit = {<br/> if (value.temperature < 32.0) ctx.output(output, s"Sensor ${value.id} temperature below 32!")<br/> out.collect(value)<br/> }<br/> }<br/>}Flink's window operators transform infinite streams into bounded chunks. Common window assigners include tumbling, sliding, and session windows. A tumbling event‑time window of one second can be defined as:
sensorData.keyBy(r => r.id).window(TumblingEventTimeWindows.of(Time.seconds(1))).process(new TemperatureAverager)Window functions fall into two categories: incremental (e.g., ReduceFunction, AggregateFunction) and full‑window (e.g., ProcessWindowFunction). Example of a reduce function that computes the minimum temperature per 15‑second window:
val minTempPerWindow = sensorData.map(r => (r.id, r.temperature)).keyBy(_._1).timeWindow(Time.seconds(15)).reduce((r1, r2) => (r1._1, r1._2.min(r2._2)))Custom window assigners, triggers, and evictors can be implemented for advanced use‑cases. A custom periodic watermark assigner in Java:
.assignTimestampsAndWatermarks(<br/> new AssignerWithPeriodicWatermarks[(String, Long)] {<br/> val bound = 10 * 1000L<br/> var maxTs = Long.MinValue + bound + 1<br/> override def getCurrentWatermark: Watermark = new Watermark(maxTs - bound - 1)<br/> override def extractTimestamp(t: (String, Long), l: Long): Long = {<br/> maxTs = maxTs.max(t._2)<br/> t._2<br/> }<br/> })Triggers decide when a window fires. A trigger that fires every second before the window ends can be written as:
class OneSecondIntervalTrigger extends Trigger[SensorReading, TimeWindow] {<br/> override def onElement(r: SensorReading, timestamp: Long, window: TimeWindow, ctx: TriggerContext): TriggerResult = {<br/> val firstSeen = ctx.getPartitionedState(new ValueStateDescriptor[Boolean]("firstSeen", classOf[Boolean]))<br/> if (!firstSeen.value()) {<br/> val t = ctx.getCurrentWatermark + (1000 - ctx.getCurrentWatermark % 1000)<br/> ctx.registerEventTimeTimer(t)<br/> ctx.registerEventTimeTimer(window.getEnd)<br/> firstSeen.update(true)<br/> }<br/> TriggerResult.CONTINUE<br/> }<br/> // onEventTime and other methods omitted for brevity<br/>}Late events are handled via allowed lateness, side outputs, or by discarding them. Setting allowed lateness of five seconds:
readings.keyBy(_.id).timeWindow(Time.seconds(10)).allowedLateness(Time.seconds(5)).process(new UpdatingWindowCountFunction)Overall, the article provides a comprehensive guide to time handling, windowing, process functions, and related APIs in Apache Flink, illustrated with both Scala and Java code snippets.
Signed-in readers can open the original source through BestHub's protected redirect.
This article has been distilled and summarized from source material, then republished for learning and reference. If you believe it infringes your rights, please contactand we will review it promptly.
Big Data Technology & Architecture
Wang Zhiwu, a big data expert, dedicated to sharing big data technology.
How this landed with the community
Was this worth your time?
0 Comments
Thoughtful readers leave field notes, pushback, and hard-won operational detail here.
