Big Data 13 min read

Understanding Watermarks, Event Time, and Processing Time in Apache Flink

This article explains the three time concepts in Flink—Process Time, Event Time, and Ingestion Time—illustrates their impact on windowed computations with examples, introduces watermarks and allowed lateness for handling out‑of‑order data, and provides complete Scala code for both processing‑time and event‑time streaming applications.

Big Data Technology & Architecture
Big Data Technology & Architecture
Big Data Technology & Architecture
Understanding Watermarks, Event Time, and Processing Time in Apache Flink

In real‑world streaming applications data may arrive late, causing window calculations based on simple processing time to produce incorrect results; Flink introduces watermarks to address this issue. Before discussing watermarks, the article defines Flink's three time notions: Process Time, Event Time, and Ingestion Time.

Event Time is the timestamp embedded in each event when it is generated; it is independent of the physical clock and allows correct handling of out‑of‑order, delayed, or replayed data, but requires a watermark to indicate how far behind the system can wait for late events.

Ingestion Time records the moment an event enters Flink at the source operator, providing a stable timestamp between event and processing time, though it cannot handle delayed or out‑of‑order events without explicit watermarks.

Process Time uses the machine's current clock when processing each event, offering minimal latency and highest performance but no determinism in distributed or asynchronous environments.

The article then presents a processing‑time example in Scala, ignoring timestamps:

val text = senv.socketTextStream("localhost", 9999)</code><code>val counts = text.map {(m: String) => (m.split(",")(0), 1) }</code><code>    .keyBy(0)</code><code>    .timeWindow(Time.seconds(10), Time.seconds(5))</code><code>    .sum(1)</code><code>counts.print</code><code>senv.execute("ProcessingTime processing example")

Two scenarios are examined: (1) messages arriving without delay, showing expected window counts, and (2) a message delayed by six seconds, which falls into later windows and causes incorrect results for earlier windows.

To solve this, the article switches to an Event‑Time system, setting the stream time characteristic and adding a timestamp extractor:

senv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)</code><code>val text = senv.socketTextStream("localhost", 9999)</code><code>                .assignTimestampsAndWatermarks(new TimestampExtractor)</code><code>val counts = text.map {(m: String) => (m.split(",")(0), 1) }</code><code>      .keyBy(0)</code><code>      .timeWindow(Time.seconds(10), Time.seconds(5))</code><code>      .sum(1)</code><code>counts.print</code><code>senv.execute("EventTime processing example")

The required TimestampExtractor implements AssignerWithPeriodicWatermarks to extract the event timestamp and generate watermarks:

class TimestampExtractor extends AssignerWithPeriodicWatermarks[String] with Serializable {</code><code>  override def extractTimestamp(e: String, prevElementTimestamp: Long) = {</code><code>    e.split(",")(1).toLong </code><code>  }</code><code>  override def getCurrentWatermark(): Watermark = { </code><code>      new Watermark(System.currentTimeMillis)</code><code>  }</code><code>}

Running this code shows improved results: windows 2 and 3 produce correct counts, but window 1 still misses the delayed event because it was evaluated before the event arrived. The article then introduces watermarks that subtract a fixed delay (e.g., 5 seconds) from the current system time, allowing late events to be incorporated:

override def getCurrentWatermark(): Watermark = {</code><code>new Watermark(System.currentTimeMillis - 5000)</code><code>}

With this watermark configuration, all three windows emit the expected counts (a,2), (a,3), and (a,1). The concept of allowedLateness is also explained: it permits a window to accept late elements for a configurable period after the watermark has passed the window’s end, triggering additional computations instead of discarding the data.

Finally, a complete Scala program is provided that reads from Kafka, assigns timestamps and watermarks, applies a count trigger, and writes results to Redis, demonstrating a production‑ready Flink streaming job with event‑time semantics.

Original Source

Signed-in readers can open the original source through BestHub's protected redirect.

Sign in to view source
Republication Notice

This article has been distilled and summarized from source material, then republished for learning and reference. If you believe it infringes your rights, please contactadmin@besthub.devand we will review it promptly.

FlinkStreamProcessingWatermarkbigdataScalaEventTime
Big Data Technology & Architecture
Written by

Big Data Technology & Architecture

Wang Zhiwu, a big data expert, dedicated to sharing big data technology.

0 followers
Reader feedback

How this landed with the community

Sign in to like

Rate this article

Was this worth your time?

Sign in to rate
Discussion

0 Comments

Thoughtful readers leave field notes, pushback, and hard-won operational detail here.