Understanding Watermarks in Apache Flink for Handling Out-of-Order Events
This article explains how Apache Flink uses Watermarks to manage event‑time windows, describes the three time semantics, details periodic and punctuated Watermark generation methods with their Java interfaces, and shows practical DDL examples for handling late and out‑of‑order data in stream processing.
The article begins by presenting a common out‑of‑order event problem in stream processing, where data may arrive several seconds later than its event time, affecting the correctness of tumbling window aggregations.
It then introduces Apache Flink's three time semantics—ProcessingTime, IngestionTime, and EventTime—highlighting that the Watermark mechanism operates on EventTime.
Watermarks are defined as monotonically increasing timestamps emitted by sources or custom generators; they signal to downstream operators that no earlier event timestamps will arrive, enabling correct event‑time window triggers.
Two Watermark generation strategies are described:
Periodic: emits a Watermark at regular time intervals or after a certain number of records.
Punctuated: emits a Watermark whenever an event with an increasing EventTime is observed.
The corresponding Java interfaces are shown below.
/**
* Returns the current watermark. This method is periodically called by the
* system to retrieve the current watermark. The method may return {@code null} to
* indicate that no new Watermark is available.
*
* <p>The returned watermark will be emitted only if it is non‑null and itsTimestamp
* is larger than that of the previously emitted watermark (to preserve the contract of
* ascending watermarks). If the current watermark is still
* identical to the previous one, no progress in EventTime has happened since
* the previous call to this method. If a null value is returned, or theTimestamp
* of the returned watermark is smaller than that of the last emitted one, then no
* new watermark will be generated.
*
* <p>The interval in which this method is called and Watermarks are generated
* depends on {@link ExecutionConfig#getAutoWatermarkInterval()}.
*
* @see org.Apache.flink.streaming.api.watermark.Watermark
* @see ExecutionConfig#getAutoWatermarkInterval()
*
* @return {@code Null}, if no watermark should be emitted, or the next watermark to emit.
*/
@Nullable
Watermark getCurrentWatermark(); public interface AssignerWithPunctuatedWatermarks<T> extends TimestampAssigner<T> {
/**
* Asks this implementation if it wants to emit a watermark. This method is called right after
* the {@link #extractTimestamp(Object, long)} method.
*
* <p>The returned watermark will be emitted only if it is non‑null and itsTimestamp
* is larger than that of the previously emitted watermark (to preserve the contract of
* ascending watermarks). If a null value is returned, or theTimestamp of the returned
* watermark is smaller than that of the last emitted one, then no new watermark will
* be generated.
*
* <p>For an example how to use this method, see the documentation of
* {@link AssignerWithPunctuatedWatermarks this class}.
*
* @return {@code Null}, if no watermark should be emitted, or the next watermark to emit.
*/
@Nullable
Watermark checkAndGetNextWatermark(T lastElement, long extractedTimestamp);
} public interface TimestampAssigner<T> extends Function {
/**
* Assigns a Timestamp to an element, in milliseconds since the Epoch.
*
* <p>The method is passed the previously assignedTimestamp of the element.
* That previousTimestamp may have been assigned from a previous assigner,
* by ingestionTime. If the element did not carry aTimestamp before, this value is
* {@code Long.MIN_VALUE}.
*
* @param element The element that theTimestamp is wil be assigned to.
* @param previousElementTimestamp The previous internalTimestamp of the element,
* or a negative value, if noTimestamp has been assigned, yet.
* @return The newTimestamp.
*/
long extractTimestamp(T element, long previousElementTimestamp);
}Using these interfaces, the article shows how to define Watermarks in DDL. The first example creates a table with a Watermark equal to the EventTime (offset 0):
CREATE TABLE source(
...,
Event_timeTimeStamp,
WATERMARK wk1 FOR Event_time as withOffset(Event_time, 0)
) with (
...
);To correctly handle late events, a Watermark with a 5‑second offset is defined, allowing the window to wait for delayed data:
CREATE TABLE source(
...,
Event_timeTimeStamp,
WATERMARK wk1 FOR Event_time as withOffset(Event_time, 5000)
) with (
...
);The article also discusses multi‑stream Watermark handling: when multiple sources are merged, Flink selects the minimum Watermark among the inputs to preserve monotonicity and ensure no data loss.
Overall, the piece demonstrates that Watermarks are essential for solving out‑of‑order and late‑event challenges in Apache Flink's event‑time window processing.
Signed-in readers can open the original source through BestHub's protected redirect.
This article has been distilled and summarized from source material, then republished for learning and reference. If you believe it infringes your rights, please contactand we will review it promptly.
Big Data Technology & Architecture
Wang Zhiwu, a big data expert, dedicated to sharing big data technology.
How this landed with the community
Was this worth your time?
0 Comments
Thoughtful readers leave field notes, pushback, and hard-won operational detail here.
