Big Data 10 min read

Understanding Event Time and Watermarks in Apache Flink

This article explains how Apache Flink uses event‑time timestamps and watermarks to handle out‑of‑order and late data, describes the assignTimestampsAndWatermarks API with periodic and punctuated watermark assigners, and provides practical code examples for window lateness and side‑output handling.

Big Data Technology & Architecture
Big Data Technology & Architecture
Big Data Technology & Architecture
Understanding Event Time and Watermarks in Apache Flink

Introduction : The article revisits concepts of event time, watermarks, out‑of‑order data, and late elements in Flink.

Event time is the timestamp attached to each data element, independent of processing time; its unpredictability requires watermarks to track progress.

Watermarks are special elements that signal when all events with timestamps ≤ T have arrived, serving as the basis for detecting late data and triggering windows.

Illustrative diagrams show how watermarks work in a single‑parallelism stream and how the minimum watermark across parallel streams determines window firing.

Flink provides the DataStream.assignTimestampsAndWatermarks() API, accepting either AssignerWithPeriodicWatermarks or AssignerWithPunctuatedWatermarks to extract timestamps and generate watermarks.

Periodic watermark assigners include three built‑in classes, each with code examples:

AscendingTimestampExtractor:

public abstract long extractAscendingTimestamp(T element);
@Override
public final long extractTimestamp(T element, long elementPrevTimestamp) {
    final long newTimestamp = extractAscendingTimestamp(element);
    if (newTimestamp >= this.currentTimestamp) {
        this.currentTimestamp = newTimestamp;
        return newTimestamp;
    } else {
        violationHandler.handleViolation(newTimestamp, this.currentTimestamp);
        return newTimestamp;
    }
}
@Override
public final Watermark getCurrentWatermark() {
    return new Watermark(currentTimestamp == Long.MIN_VALUE ? Long.MIN_VALUE : currentTimestamp - 1);
}

BoundedOutOfOrdernessTimestampExtractor:

public BoundedOutOfOrdernessTimestampExtractor(Time maxOutOfOrderness) {
    if (maxOutOfOrderness.toMilliseconds() < 0) {
        throw new RuntimeException("Tried to set the maximum allowed lateness to " + maxOutOfOrderness + ". This parameter cannot be negative.");
    }
    this.maxOutOfOrderness = maxOutOfOrderness.toMilliseconds();
    this.currentMaxTimestamp = Long.MIN_VALUE + this.maxOutOfOrderness;
}
public abstract long extractTimestamp(T element);
@Override
public final Watermark getCurrentWatermark() {
    long potentialWM = currentMaxTimestamp - maxOutOfOrderness;
    if (potentialWM >= lastEmittedWatermark) {
        lastEmittedWatermark = potentialWM;
    }
    return new Watermark(lastEmittedWatermark);
}
@Override
public final long extractTimestamp(T element, long previousElementTimestamp) {
    long timestamp = extractTimestamp(element);
    if (timestamp > currentMaxTimestamp) {
        currentMaxTimestamp = timestamp;
    }
    return timestamp;
}

IngestionTimeExtractor:

@Override
public long extractTimestamp(T element, long previousElementTimestamp) {
    final long now = Math.max(System.currentTimeMillis(), maxTimestamp);
    maxTimestamp = now;
    return now;
}
@Override
public Watermark getCurrentWatermark() {
    final long now = Math.max(System.currentTimeMillis(), maxTimestamp);
    maxTimestamp = now;
    return new Watermark(now - 1);
}

Punctuated watermarks are less common; a custom example emits a watermark when a user ID ends with "0":

sourceStream.assignTimestampsAndWatermarks(new AssignerWithPunctuatedWatermarks<UserActionRecord>() {
    @Nullable
    @Override
    public Watermark checkAndGetNextWatermark(UserActionRecord lastElement, long extractedTimestamp) {
        return lastElement.getUserId().endsWith("0") ? new Watermark(extractedTimestamp - 1) : null;
    }
    @Override
    public long extractTimestamp(UserActionRecord element, long previousElementTimestamp) {
        return element.getTimestamp();
    }
});

Additional recommendations: avoid generating watermarks too frequently, generate them early after the source (or after simple transformations), and consider using processing time if event‑time semantics are not needed.

Late‑data handling is covered with two strategies:

Window allowed lateness : use WindowedStream.allowedLateness() to keep a window open for a specified duration after its normal trigger, allowing late elements to be processed.

Side output of late data : use WindowedStream.sideOutputLateData() with an OutputTag to divert late elements to a separate stream for further handling.

Example of allowed lateness:

sourceStream.assignTimestampsAndWatermarks(
    new BoundedOutOfOrdernessTimestampExtractor<UserActionRecord>(Time.seconds(30)) {
        @Override
        public long extractTimestamp(UserActionRecord record) { return record.getTimestamp(); }
    })
    .keyBy("platform")
    .window(TumblingEventTimeWindows.of(Time.minutes(1)))
    .allowedLateness(Time.seconds(30))
    .aggregate(new ViewAggregateFunc(), new ViewSumWindowFunc());

Example of side‑outputting late data:

OutputTag<UserActionRecord> lateOutputTag = new OutputTag<>("late_data_output_tag");
sourceStream.assignTimestampsAndWatermarks(
    new BoundedOutOfOrdernessTimestampExtractor<UserActionRecord>(Time.seconds(30)) {
        @Override
        public long extractTimestamp(UserActionRecord record) { return record.getTimestamp(); }
    })
    .keyBy("platform")
    .window(TumblingEventTimeWindows.of(Time.minutes(1)))
    .allowedLateness(Time.seconds(30))
    .sideOutputLateData(lateOutputTag)
    .aggregate(new ViewAggregateFunc(), new ViewSumWindowFunc());

stream.getSideOutput(lateOutputTag).addSink(lateDataSink);

These mechanisms provide a second layer of protection for truly late data beyond the watermark’s bounded out‑of‑order interval.

Original Source

Signed-in readers can open the original source through BestHub's protected redirect.

Sign in to view source
Republication Notice

This article has been distilled and summarized from source material, then republished for learning and reference. If you believe it infringes your rights, please contactadmin@besthub.devand we will review it promptly.

Flinkstream processingWatermarkApache FlinkEvent Timelate data
Big Data Technology & Architecture
Written by

Big Data Technology & Architecture

Wang Zhiwu, a big data expert, dedicated to sharing big data technology.

0 followers
Reader feedback

How this landed with the community

Sign in to like

Rate this article

Was this worth your time?

Sign in to rate
Discussion

0 Comments

Thoughtful readers leave field notes, pushback, and hard-won operational detail here.