Understanding Apache Flink Interval Join: Syntax, Semantics, and Implementation
This article explains how Apache Flink's Interval Join solves time‑bounded join requirements more efficiently than unbounded joins, covering its syntax, semantics, state‑management considerations, and providing a complete Scala example with code and execution results.
The article starts by presenting a real‑world scenario where orders and payments need to be joined only if the payment occurs within one hour of the order, and shows a traditional relational database SQL solution that directly expresses this condition.
It then discusses how the same requirement can be expressed in Apache Flink using an unbounded double‑stream JOIN, providing the corresponding SQL statement:
SELECT
o.orderId,
o.productName,
p.payType,
o.orderTime,
payTime
FROM Orders AS o JOIN Payment AS p ON
o.orderId = p.orderId AND
p.payTime >= orderTime AND p.payTime < orderTime + 3600Although functional, the unbounded join keeps all incoming records in state, leading to unnecessary performance overhead for the one‑hour window use case.
To address this, Flink introduced the Interval Join (also called Time‑windowed Join) in version 1.4, which treats each side of the join as bounded by a time interval, allowing state to be cleared once records fall outside the defined window.
The Interval Join syntax is illustrated as:
SELECT ... FROM t1 JOIN t2 ON t1.key = t2.key AND TIMEBOUND_EXPRESSIONTwo forms of TIMEBOUND_EXPRESSION are described, e.g., L.time BETWEEN LowerBound(R.time) AND UpperBound(R.time) or a direct comparison of time attributes.
A concrete example joins Orders and Payment tables with the condition that payTime must be between orderTime and orderTime + INTERVAL '1' HOUR. Sample data tables are shown, and the expected result excludes orders whose payment occurs after the one‑hour window.
The article also expands the interval to allow payments one hour before or after the order, demonstrating the flexibility of the join condition.
Implementation details explain that Interval Join still relies on Flink state to store left‑ and right‑hand records, but the state can be purged based on watermarks, reducing memory usage and improving latency.
A complete Scala program (compatible with Flink 1.7.0) is provided, showing how to create the streams, assign timestamps and watermarks, register tables, and execute the Interval Join query. The code snippet is:
import java.sql.Timestamp
import org.apache.flink.api.scala._
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.table.api.TableEnvironment
import org.apache.flink.table.api.scala._
import org.apache.flink.types.Row
import scala.collection.mutable
object SimpleTimeIntervalJoin {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
val tEnv = TableEnvironment.getTableEnvironment(env)
env.setParallelism(1)
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
// construct orders data
val ordersData = new mutable.MutableList[(String, String, Timestamp)]
ordersData.+=("001", "iphone", new Timestamp(1545800002000L))
ordersData.+=("002", "mac", new Timestamp(1545800003000L))
ordersData.+=("003", "book", new Timestamp(1545800004000L))
ordersData.+=("004", "cup", new Timestamp(1545800018000L))
// construct payment data
val paymentData = new mutable.MutableList[(String, String, Timestamp)]
paymentData.+=("001", "alipay", new Timestamp(1545803501000L))
paymentData.+=("002", "card", new Timestamp(1545803602000L))
paymentData.+=("003", "card", new Timestamp(1545803610000L))
paymentData.+=("004", "alipay", new Timestamp(1545803611000L))
val orders = env.fromCollection(ordersData)
.assignTimestampsAndWatermarks(new TimestampExtractor[String, String]())
.toTable(tEnv, 'orderId, 'productName, 'orderTime.rowtime)
val payments = env.fromCollection(paymentData)
.assignTimestampsAndWatermarks(new TimestampExtractor[String, String]())
.toTable(tEnv, 'orderId, 'payType, 'payTime.rowtime)
tEnv.registerTable("Orders", orders)
tEnv.registerTable("Payment", payments)
val sqlQuery = """
|SELECT
| o.orderId,
| o.productName,
| p.payType,
| o.orderTime,
| cast(payTime as timestamp) as payTime
|FROM Orders AS o JOIN Payment AS p ON o.orderId = p.orderId AND
| p.payTime BETWEEN orderTime AND orderTime + INTERVAL '1' HOUR
|""".stripMargin
tEnv.registerTable("TemporalJoinResult", tEnv.sqlQuery(sqlQuery))
val result = tEnv.scan("TemporalJoinResult").toAppendStream[Row]
result.print()
env.execute()
}
}
class TimestampExtractor[T1, T2] extends BoundedOutOfOrdernessTimestampExtractor[(T1, T2, Timestamp)](Time.seconds(10)) {
override def extractTimestamp(element: (T1, T2, Timestamp)): Long = element._3.getTime
}The program prints the joined results, confirming that only orders with payments within the one‑hour interval are returned.
In conclusion, the article demonstrates how Interval Join provides a more efficient and semantically richer alternative to unbounded joins for time‑bounded correlation tasks in Apache Flink, and supplies a ready‑to‑run example for practitioners.
Signed-in readers can open the original source through BestHub's protected redirect.
This article has been distilled and summarized from source material, then republished for learning and reference. If you believe it infringes your rights, please contactand we will review it promptly.
Big Data Technology & Architecture
Wang Zhiwu, a big data expert, dedicated to sharing big data technology.
How this landed with the community
Was this worth your time?
0 Comments
Thoughtful readers leave field notes, pushback, and hard-won operational detail here.
