Big Data 10 min read

Enabling Early‑Fire Window Computation in Flink SQL for Real‑Time Metrics

This article explains how to configure Flink SQL to emit early‑fire results for tumbling windows, allowing real‑time aggregation of metrics like PV and UV, and provides complete example code, execution output, and a discussion of current limitations.

Big Data Technology & Architecture
Big Data Technology & Architecture
Big Data Technology & Architecture
Enabling Early‑Fire Window Computation in Flink SQL for Real‑Time Metrics

Introduction

In typical online business analytics, window sizes are large (hour, day, week) and results are only available after the window closes, which is useless for real‑time metrics such as page views (PV) and unique visitors (UV). The article demonstrates how to trigger Flink SQL windows early to obtain intermediate results.

Implementation Overview

Two scenarios are presented: one without early‑fire and one with early‑fire. Diagrams illustrate the data flow.

Without Early‑Fire Window

The following demo reads data from Kafka, aggregates it in a one‑minute tumbling window, and outputs daily PV/UV (the window is shortened to one minute for testing).

-- kafka source
DROP TABLE IF EXISTS user_log;
CREATE TABLE user_log(
    user_id VARCHAR,
    item_id VARCHAR,
    category_id VARCHAR,
    behavior VARCHAR,
    proc_time AS PROCTIME(),
    ts TIMESTAMP(3),
    WATERMARK FOR ts AS ts - INTERVAL '5' SECOND
) WITH (
    'connector' = 'kafka',
    'topic' = 'user_log',
    'properties.bootstrap.servers' = 'localhost:9092',
    'properties.group.id' = 'user_log',
    'scan.startup.mode' = 'latest-offset',
    'format' = 'json'
);

DROP TABLE IF EXISTS user_log_sink_1;
CREATE TABLE user_log_sink_1 (
    wCurrent STRING,
    wStart STRING,
    wEnd STRING,
    pv BIGINT,
    uv BIGINT,
    PRIMARY KEY (wCurrent, wStart, wEnd) NOT ENFORCED
) WITH (
    'connector' = 'upsert-kafka',
    'topic' = 'user_log_sink',
    'properties.bootstrap.servers' = 'localhost:9092',
    'properties.group.id' = 'user_log',
    'key.format' = 'json',
    'value.format' = 'json'
);

INSERT INTO user_log_sink_1
SELECT DATE_FORMAT(NOW(), 'yyyy-MM-dd HH:mm:ss') AS wCurrent,
       DATE_FORMAT(TUMBLE_START(proc_time, INTERVAL '1' MINUTE), 'yyyy-MM-dd HH:mm:ss') AS wStart,
       DATE_FORMAT(TUMBLE_END(proc_time, INTERVAL '1' MINUTE), 'yyyy-MM-dd HH:mm:ss') AS wEnd,
       COUNT(1) AS pv,
       COUNT(DISTINCT user_id) AS uv
FROM user_log
GROUP BY TUMBLE(proc_time, INTERVAL '1' MINUTE);

Sample output (window size set to 1 minute for demonstration):

+I[2022-06-01 17:14:00, 2022-06-01 17:13:00, 2022-06-01 17:14:00, 29449, 9999]
+I[2022-06-01 17:15:00, 2022-06-01 17:14:00, 2022-06-01 17:15:00, 29787, 9999]
...

Enabling Early‑Fire Window

To emit intermediate results every 10 seconds, two table configuration parameters are added.

TableConfig config = tenv.getConfig();
config.getConfiguration().setBoolean("table.exec.emit.early-fire.enabled", true);
config.getConfiguration().setString("table.exec.emit.early-fire.delay", "10s"); // emit every 10 seconds

The rest of the job definition remains the same. The execution now produces incremental updates:

2022-06-01 17:54:35, 2022-06-01 17:54:00, 2022-06-01 17:55:00, 2527, 2500
-U[2022-06-01 17:54:40, 2022-06-01 17:54:00, 2022-06-01 17:55:00, 2527, 2500]
+U[2022-06-01 17:54:40, 2022-06-01 17:54:00, 2022-06-01 17:55:00, 5027, 5000]
... (subsequent updates omitted for brevity)

Note: When using the window table function (e.g., TUMBLE(TABLE user_log, DESCRIPTOR(ts), INTERVAL '1' MINUTES)) Flink currently does **not** support early‑fire configuration, and the job fails with the following exception:

org.apache.flink.table.api.TableException: Currently, window table function based aggregate doesn't support early-fire and late-fire configuration 'table.exec.emit.early-fire.enabled' and 'table.exec.emit.late-fire.enabled'.
    at org.apache.flink.table.planner.plan.utils.WindowUtil$.checkEmitConfiguration(WindowUtil.scala:262)
    ...

Conclusion

Early‑fire can be enabled for standard tumbling window aggregations in Flink SQL by setting the appropriate table configuration, providing real‑time intermediate results. However, it is not yet supported for window table‑function based aggregates, which may require alternative approaches or waiting for future Flink releases.

Original Source

Signed-in readers can open the original source through BestHub's protected redirect.

Sign in to view source
Republication Notice

This article has been distilled and summarized from source material, then republished for learning and reference. If you believe it infringes your rights, please contactadmin@besthub.devand we will review it promptly.

FlinkSQLStreamingKafkaWindowEarly Fire
Big Data Technology & Architecture
Written by

Big Data Technology & Architecture

Wang Zhiwu, a big data expert, dedicated to sharing big data technology.

0 followers
Reader feedback

How this landed with the community

Sign in to like

Rate this article

Was this worth your time?

Sign in to rate
Discussion

0 Comments

Thoughtful readers leave field notes, pushback, and hard-won operational detail here.