Understanding Flink Event‑Time Windows, Watermarks, and Allowed Lateness with Scala Examples
This article explains how Apache Flink uses event‑time windows, watermarks, and allowed lateness to handle out‑of‑order and late data, and provides complete Scala code examples that demonstrate timestamp assignment, watermark generation, window triggering, and side‑output of late records.
Flink processes streams based on event time, requiring each record to carry a timestamp and a watermark to handle out‑of‑order data.
The watermark is defined as eventTime - maxOutOfOrderness (e.g., 3 seconds) and advances as newer timestamps arrive, allowing the system to wait for delayed records within a bounded time.
A tumbling event‑time window of 5 seconds (left‑closed, right‑open) groups records; the window fires when the watermark exceeds the window’s end time, optionally plus an allowed lateness.
Allowed lateness (e.g., 2 seconds) extends the period during which late records can still be processed; beyond this, records are considered late and can be captured via sideOutputLateData.
Two complete Scala examples demonstrate assigning timestamps and watermarks, defining a 5‑second tumbling window, setting allowed lateness, and handling late data with side‑output.
import org.apache.commons.lang3.time.FastDateFormat
import org.apache.flink.api.java.tuple.Tuple
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks
import org.apache.flink.streaming.api.scala.function.WindowFunction
import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}
import org.apache.flink.streaming.api.watermark.Watermark
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.api.windowing.windows.TimeWindow
import org.apache.flink.util.Collector
import scala.collection.mutable.ArrayBuffer
/**
* Watermark test
*/
object WaterMarkFunc01 {
// thread‑safe date formatter
val sdf: FastDateFormat = FastDateFormat.getInstance("yyyy-MM-dd HH:mm:ss:SSS")
def main(args: Array[String]): Unit = {
val hostName = "s102"
val port = 9000
val delimiter = '
'
val env = StreamExecutionEnvironment.getExecutionEnvironment
// use event time
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
env.setParallelism(1)
val streams: DataStream[String] = env.socketTextStream(hostName, port, delimiter)
import org.apache.flink.api.scala._
val data = streams.map(data => {
// format: name:timestamp
try {
val items = data.split(":")
(items(0), items(1).toLong)
} catch {
case _: Exception =>
println("Invalid input: " + data)
("0", 0L)
}
}).filter(data => !data._1.equals("0") && data._2 != 0L)
// assign timestamps and watermarks (max out‑of‑order = 3s)
val waterStream: DataStream[(String, Long)] = data.assignTimestampsAndWatermarks(new AssignerWithPeriodicWatermarks[(String, Long)] {
var currentMaxTimestamp = 0L
val maxOutOfOrderness = 3000L
var lastEmittedWatermark: Long = Long.MinValue
override def getCurrentWatermark: Watermark = {
val potentialWM = currentMaxTimestamp - maxOutOfOrderness
if (potentialWM >= lastEmittedWatermark) {
lastEmittedWatermark = potentialWM
}
new Watermark(lastEmittedWatermark)
}
override def extractTimestamp(element: (String, Long), previousElementTimestamp: Long): Long = {
val time = element._2
if (time > currentMaxTimestamp) {
currentMaxTimestamp = time
}
val outData = String.format("key: %s EventTime: %s waterMark: %s", element._1, sdf.format(time), sdf.format(getCurrentWatermark.getTimestamp))
println(outData)
time
}
})
val result: DataStream[String] = waterStream.keyBy(0)
.window(TumblingEventTimeWindows.of(Time.seconds(5L)))
.apply(new WindowFunction[(String, Long), String, Tuple, TimeWindow] {
override def apply(key: Tuple, window: TimeWindow, input: Iterable[(String, Long)], out: Collector[String]): Unit = {
val timeArr = ArrayBuffer[String]()
val iterator = input.iterator
while (iterator.hasNext) {
val tup2 = iterator.next()
timeArr.append(sdf.format(tup2._2))
}
val outData = String.format("key: %s data: %s startTime: %s endTime: %s", key.toString, timeArr.mkString("-"), sdf.format(window.getStart), sdf.format(window.getEnd))
out.collect(outData)
}
})
result.print("window result:")
env.execute(this.getClass.getName)
}
} import org.apache.commons.lang3.time.FastDateFormat
import org.apache.flink.api.java.tuple.Tuple
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks
import org.apache.flink.streaming.api.scala.function.WindowFunction
import org.apache.flink.streaming.api.scala.{DataStream, OutputTag, StreamExecutionEnvironment}
import org.apache.flink.streaming.api.watermark.Watermark
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.api.windowing.windows.TimeWindow
import org.apache.flink.util.Collector
import scala.collection.mutable.ArrayBuffer
/**
* Late data test
*/
object WaterMarkFunc02 {
val sdf: FastDateFormat = FastDateFormat.getInstance("yyyy-MM-dd HH:mm:ss:SSS")
def main(args: Array[String]): Unit = {
val hostName = "s102"
val port = 9000
val delimiter = '
'
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
env.setParallelism(1)
val streams: DataStream[String] = env.socketTextStream(hostName, port, delimiter)
import org.apache.flink.api.scala._
val data = streams.map(data => {
try {
val items = data.split(":")
(items(0), items(1).toLong)
} catch {
case _: Exception =>
println("Invalid input: " + data)
("0", 0L)
}
}).filter(data => !data._1.equals("0") && data._2 != 0L)
val waterStream: DataStream[(String, Long)] = data.assignTimestampsAndWatermarks(new AssignerWithPeriodicWatermarks[(String, Long)] {
var currentMaxTimestamp = 0L
val maxOutOfOrderness = 3000L
var lastEmittedWatermark: Long = Long.MinValue
override def getCurrentWatermark: Watermark = {
val potentialWM = currentMaxTimestamp - maxOutOfOrderness
if (potentialWM >= lastEmittedWatermark) {
lastEmittedWatermark = potentialWM
}
new Watermark(lastEmittedWatermark)
}
override def extractTimestamp(element: (String, Long), previousElementTimestamp: Long): Long = {
val time = element._2
if (time > currentMaxTimestamp) {
currentMaxTimestamp = time
}
val outData = String.format("key: %s EventTime: %s waterMark: %s", element._1, sdf.format(time), sdf.format(getCurrentWatermark.getTimestamp))
println(outData)
time
}
})
val lateData = new OutputTag[(String, Long)]("late")
val result: DataStream[String] = waterStream.keyBy(0)
.window(TumblingEventTimeWindows.of(Time.seconds(5L)))
.allowedLateness(Time.seconds(2L))
.sideOutputLateData(lateData)
.apply(new WindowFunction[(String, Long), String, Tuple, TimeWindow] {
override def apply(key: Tuple, window: TimeWindow, input: Iterable[(String, Long)], out: Collector[String]): Unit = {
val timeArr = ArrayBuffer[String]()
val iterator = input.iterator
while (iterator.hasNext) {
val tup2 = iterator.next()
timeArr.append(sdf.format(tup2._2))
}
val outData = String.format("key: %s data: %s startTime: %s endTime: %s", key.toString, timeArr.mkString("-"), sdf.format(window.getStart), sdf.format(window.getEnd))
out.collect(outData)
}
})
result.print("window result:")
val late = result.getSideOutput(lateData)
late.print("late data:")
env.execute(this.getClass.getName)
}
}Signed-in readers can open the original source through BestHub's protected redirect.
This article has been distilled and summarized from source material, then republished for learning and reference. If you believe it infringes your rights, please contactand we will review it promptly.
Big Data Technology & Architecture
Wang Zhiwu, a big data expert, dedicated to sharing big data technology.
How this landed with the community
Was this worth your time?
0 Comments
Thoughtful readers leave field notes, pushback, and hard-won operational detail here.
