Big Data 17 min read

Understanding Flink Event‑Time Windows, Watermarks, and Allowed Lateness with Scala Examples

This article explains how Apache Flink uses event‑time windows, watermarks, and allowed lateness to handle out‑of‑order and late data, and provides complete Scala code examples that demonstrate timestamp assignment, watermark generation, window triggering, and side‑output of late records.

Big Data Technology & Architecture
Big Data Technology & Architecture
Big Data Technology & Architecture
Understanding Flink Event‑Time Windows, Watermarks, and Allowed Lateness with Scala Examples

Flink processes streams based on event time, requiring each record to carry a timestamp and a watermark to handle out‑of‑order data.

The watermark is defined as eventTime - maxOutOfOrderness (e.g., 3 seconds) and advances as newer timestamps arrive, allowing the system to wait for delayed records within a bounded time.

A tumbling event‑time window of 5 seconds (left‑closed, right‑open) groups records; the window fires when the watermark exceeds the window’s end time, optionally plus an allowed lateness.

Allowed lateness (e.g., 2 seconds) extends the period during which late records can still be processed; beyond this, records are considered late and can be captured via sideOutputLateData.

Two complete Scala examples demonstrate assigning timestamps and watermarks, defining a 5‑second tumbling window, setting allowed lateness, and handling late data with side‑output.

import org.apache.commons.lang3.time.FastDateFormat
import org.apache.flink.api.java.tuple.Tuple
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks
import org.apache.flink.streaming.api.scala.function.WindowFunction
import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}
import org.apache.flink.streaming.api.watermark.Watermark
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.api.windowing.windows.TimeWindow
import org.apache.flink.util.Collector

import scala.collection.mutable.ArrayBuffer

/**
  * Watermark test
  */
object WaterMarkFunc01 {
  // thread‑safe date formatter
  val sdf: FastDateFormat = FastDateFormat.getInstance("yyyy-MM-dd HH:mm:ss:SSS")

  def main(args: Array[String]): Unit = {
    val hostName = "s102"
    val port = 9000
    val delimiter = '
'
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    // use event time
    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
    env.setParallelism(1)
    val streams: DataStream[String] = env.socketTextStream(hostName, port, delimiter)
    import org.apache.flink.api.scala._
    val data = streams.map(data => {
      // format: name:timestamp
      try {
        val items = data.split(":")
        (items(0), items(1).toLong)
      } catch {
        case _: Exception =>
          println("Invalid input: " + data)
          ("0", 0L)
      }
    }).filter(data => !data._1.equals("0") && data._2 != 0L)

    // assign timestamps and watermarks (max out‑of‑order = 3s)
    val waterStream: DataStream[(String, Long)] = data.assignTimestampsAndWatermarks(new AssignerWithPeriodicWatermarks[(String, Long)] {
      var currentMaxTimestamp = 0L
      val maxOutOfOrderness = 3000L
      var lastEmittedWatermark: Long = Long.MinValue

      override def getCurrentWatermark: Watermark = {
        val potentialWM = currentMaxTimestamp - maxOutOfOrderness
        if (potentialWM >= lastEmittedWatermark) {
          lastEmittedWatermark = potentialWM
        }
        new Watermark(lastEmittedWatermark)
      }

      override def extractTimestamp(element: (String, Long), previousElementTimestamp: Long): Long = {
        val time = element._2
        if (time > currentMaxTimestamp) {
          currentMaxTimestamp = time
        }
        val outData = String.format("key: %s    EventTime: %s    waterMark:  %s", element._1, sdf.format(time), sdf.format(getCurrentWatermark.getTimestamp))
        println(outData)
        time
      }
    })

    val result: DataStream[String] = waterStream.keyBy(0)
      .window(TumblingEventTimeWindows.of(Time.seconds(5L)))
      .apply(new WindowFunction[(String, Long), String, Tuple, TimeWindow] {
        override def apply(key: Tuple, window: TimeWindow, input: Iterable[(String, Long)], out: Collector[String]): Unit = {
          val timeArr = ArrayBuffer[String]()
          val iterator = input.iterator
          while (iterator.hasNext) {
            val tup2 = iterator.next()
            timeArr.append(sdf.format(tup2._2))
          }
          val outData = String.format("key: %s    data: %s    startTime:  %s    endTime:  %s", key.toString, timeArr.mkString("-"), sdf.format(window.getStart), sdf.format(window.getEnd))
          out.collect(outData)
        }
      })
    result.print("window result:")
    env.execute(this.getClass.getName)
  }
}
import org.apache.commons.lang3.time.FastDateFormat
import org.apache.flink.api.java.tuple.Tuple
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks
import org.apache.flink.streaming.api.scala.function.WindowFunction
import org.apache.flink.streaming.api.scala.{DataStream, OutputTag, StreamExecutionEnvironment}
import org.apache.flink.streaming.api.watermark.Watermark
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.api.windowing.windows.TimeWindow
import org.apache.flink.util.Collector

import scala.collection.mutable.ArrayBuffer

/**
  * Late data test
  */
object WaterMarkFunc02 {
  val sdf: FastDateFormat = FastDateFormat.getInstance("yyyy-MM-dd HH:mm:ss:SSS")

  def main(args: Array[String]): Unit = {
    val hostName = "s102"
    val port = 9000
    val delimiter = '
'
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
    env.setParallelism(1)
    val streams: DataStream[String] = env.socketTextStream(hostName, port, delimiter)
    import org.apache.flink.api.scala._
    val data = streams.map(data => {
      try {
        val items = data.split(":")
        (items(0), items(1).toLong)
      } catch {
        case _: Exception =>
          println("Invalid input: " + data)
          ("0", 0L)
      }
    }).filter(data => !data._1.equals("0") && data._2 != 0L)

    val waterStream: DataStream[(String, Long)] = data.assignTimestampsAndWatermarks(new AssignerWithPeriodicWatermarks[(String, Long)] {
      var currentMaxTimestamp = 0L
      val maxOutOfOrderness = 3000L
      var lastEmittedWatermark: Long = Long.MinValue

      override def getCurrentWatermark: Watermark = {
        val potentialWM = currentMaxTimestamp - maxOutOfOrderness
        if (potentialWM >= lastEmittedWatermark) {
          lastEmittedWatermark = potentialWM
        }
        new Watermark(lastEmittedWatermark)
      }

      override def extractTimestamp(element: (String, Long), previousElementTimestamp: Long): Long = {
        val time = element._2
        if (time > currentMaxTimestamp) {
          currentMaxTimestamp = time
        }
        val outData = String.format("key: %s    EventTime: %s    waterMark:  %s", element._1, sdf.format(time), sdf.format(getCurrentWatermark.getTimestamp))
        println(outData)
        time
      }
    })

    val lateData = new OutputTag[(String, Long)]("late")
    val result: DataStream[String] = waterStream.keyBy(0)
      .window(TumblingEventTimeWindows.of(Time.seconds(5L)))
      .allowedLateness(Time.seconds(2L))
      .sideOutputLateData(lateData)
      .apply(new WindowFunction[(String, Long), String, Tuple, TimeWindow] {
        override def apply(key: Tuple, window: TimeWindow, input: Iterable[(String, Long)], out: Collector[String]): Unit = {
          val timeArr = ArrayBuffer[String]()
          val iterator = input.iterator
          while (iterator.hasNext) {
            val tup2 = iterator.next()
            timeArr.append(sdf.format(tup2._2))
          }
          val outData = String.format("key: %s    data: %s    startTime:  %s    endTime:  %s", key.toString, timeArr.mkString("-"), sdf.format(window.getStart), sdf.format(window.getEnd))
          out.collect(outData)
        }
      })
    result.print("window result:")
    val late = result.getSideOutput(lateData)
    late.print("late data:")
    env.execute(this.getClass.getName)
  }
}
Original Source

Signed-in readers can open the original source through BestHub's protected redirect.

Sign in to view source
Republication Notice

This article has been distilled and summarized from source material, then republished for learning and reference. If you believe it infringes your rights, please contactadmin@besthub.devand we will review it promptly.

FlinkStreamProcessingScalaWindowEventTimeAllowedLateness
Big Data Technology & Architecture
Written by

Big Data Technology & Architecture

Wang Zhiwu, a big data expert, dedicated to sharing big data technology.

0 followers
Reader feedback

How this landed with the community

Sign in to like

Rate this article

Was this worth your time?

Sign in to rate
Discussion

0 Comments

Thoughtful readers leave field notes, pushback, and hard-won operational detail here.