Big Data 7 min read

Using Temporary Table JOIN in Flink SQL for Real-Time Stream Enrichment

The article explains how to use Flink SQL’s temporary table join to enrich a real‑time traffic‑log stream with versioned tag data, detailing the required DDL, the time‑versioned join syntax, and essential watermark and idle‑timeout settings that prevent stalls and boundary‑delay issues.

DaTaobao Tech
DaTaobao Tech
DaTaobao Tech
Using Temporary Table JOIN in Flink SQL for Real-Time Stream Enrichment

In real‑time development, a dual‑stream join is often implemented with Flink’s temporary join to fetch attribute values at a specific event time.

The article describes a scenario where a traffic‑log stream (TT‑A) needs to be enriched with product‑tag information from another stream (TT‑B). It outlines three key characteristics of temporary join: (1) single‑stream driven, (2) a versioned table that records time‑variant attributes, and (3) time‑versioned queries triggered by watermarks.

Example DDL for the versioned tag table:

CREATE TEMPORARY TABLE `tag_ri` (
  `id` VARCHAR,
  `tag` VARCHAR,
  `time` VARCHAR,
  `ts` AS TO_TIMESTAMP(`time`, 'yyyy-MM-dd HH:mm:ss'),
  WATERMARK FOR `ts` AS withOffset(`ts`, 0)
) WITH (
  'connector' = 'tt',
  'router' = '******',
  'topic' = 'tag_ri',
  'lineDelimiter' = '\n',
  'fieldDelimiter' = '\u0001',
  'encoding' = 'utf-8'
);
-- versioned view
CREATE TEMPORARY VIEW `tag` AS
SELECT `id`, `tag`, `time`, `ts`,
       ROW_NUMBER() OVER (PARTITION BY `id` ORDER BY `time` DESC) AS `rownum`
FROM `tag_ri`
WHERE `rownum` = 1;

DDL for the traffic‑log stream:

CREATE TEMPORARY TABLE `log_ri` (
  `id` VARCHAR,
  `time` VARCHAR,
  ...,
  `ts` AS TO_TIMESTAMP(`time`, 'yyyy-MM-dd HH:mm:ss'),
  WATERMARK FOR `ts` AS withOffset(`ts`, 0)
) WITH (
  'connector' = 'tt',
  'router' = '******',
  'topic' = 'log_ri',
  'lineDelimiter' = '\n',
  'fieldDelimiter' = '\u0001',
  'encoding' = 'utf-8'
);

The join query that enriches the log with the latest tag:

SELECT a.`id`, a.`time`, b.`tag`
FROM (SELECT * FROM `log_ri`) AS a
LEFT JOIN `tag` FOR SYSTEM_TIME AS OF a.`ts` AS b
ON a.`id` = b.`id`;

The result shows that the tag value changes according to the watermark time, confirming correct temporal enrichment.

A common problem is that when the versioned table is sparse, the join may wait for the watermark to advance, causing data to be emitted only at the hour mark. Setting table.exec.source.idle-timeout = 10s prevents this waiting.

Another issue appears when a source stream emits records near the hour boundary; the join is delayed because the other stream’s watermark has not progressed. The fix is to replace future timestamps with the current processing time before the temporary join.

Summary

1. Temporary join is suitable for single‑stream‑driven dual‑stream scenarios. 2. Proper watermark handling and idle‑timeout configuration are required to avoid sparse‑data stalls and data “抢跑”.

Flinkstream processingSQLWatermarkTemporary JoinVersioned Table
DaTaobao Tech
Written by

DaTaobao Tech

Official account of DaTaobao Technology

0 followers
Reader feedback

How this landed with the community

login Sign in to like

Rate this article

Was this worth your time?

Sign in to rate
Discussion

0 Comments

Thoughtful readers leave field notes, pushback, and hard-won operational detail here.