Big Data 32 min read

Comprehensive Guide to Flink Join Operations: Interval Join, Window Join, Broadcast, and Temporal Table Function

This article explains Flink's various join mechanisms—including interval‑based joins, window‑based joins, streaming SQL joins, and dimension‑table joins such as preload, hot‑storage, broadcast, and temporal‑table function—provides detailed code examples in Java, discusses state management and performance considerations, and summarizes the four main dimension‑table join patterns.

Big Data Technology & Architecture
Big Data Technology & Architecture
Big Data Technology & Architecture
Comprehensive Guide to Flink Join Operations: Interval Join, Window Join, Broadcast, and Temporal Table Function

Before reading this article, you may want to review the related series on Flink time, windows, and stream joins as well as network flow control and back‑pressure.

Join Concepts in Flink

1. Join Overview

In stream processing, joining two data streams is a common requirement. Flink DataStream API offers two built‑in join operators based on time conditions: interval join and window join. If these operators cannot express the required semantics, custom joins can be implemented using CoProcessFunction, BroadcastProcessFunction, or KeyedBroadcastProcessFunction.

Note: Design your join operator with efficient state access patterns and proper state cleanup strategies.

1.1 Interval Join

An interval join matches events from two streams that share the same key and whose timestamps differ by no more than a specified interval. The join interval is symmetric; for example, if events from stream B must occur no earlier than 1 hour and no later than 15 minutes after events from stream A, the same condition can be expressed from A's perspective.

The diagram below illustrates the interval join between streams A and B.

Interval joins currently support event time and only INNER JOIN semantics (unmatched events are dropped). The following example defines an interval join:

input1
  .intervalJoin(input2)
  .between(<lower‑bound>, <upper‑bound>) // bounds relative to input1
  .process(ProcessJoinFunction) // handle matched event pairs

The matched pairs are sent to ProcessJoinFunction. Bounds are defined by a negative and a positive time interval, e.g., between(Time.hour(-1), Time.minute(15)). Both streams must buffer records that fall within these bounds, which can increase state size when streams are out of sync.

1.2 Window Join

Window joins use Flink's window mechanism: elements from both streams are assigned to a common window and joined (or cogrouped) when the window fires.

Example definition of a window join:

input1.join(input2)
  .where(...)       // key selector for input1
  .equalTo(...)     // key selector for input2
  .window(...)      // specify WindowAssigner
  [.trigger(...)]   // optional trigger
  [.evictor(...)]   // optional evictor
  .apply(...)       // JoinFunction

The diagram below shows how the DataStream API performs a window join.

Both streams are partitioned by key, and a common window assigns elements from both streams to the same window. When the window timer fires, the operator iterates over the Cartesian product of the two inputs and invokes the JoinFunction. Custom triggers or evictors can also be used. Cogrouping is similar, but CoGroupFunction receives iterators for each side and is called once per window.

Be aware that window joins can produce unexpected semantics: if a 1‑hour tumbling window is configured, events that fall into different windows—even if they differ by only a second—will not be joined.

Streaming SQL Join

Flink SQL also provides join capabilities, illustrated by a series of screenshots (omitted here for brevity) that demonstrate various SQL join patterns.

Flink DataStream Join Examples

Numerous code snippets and screenshots show step‑by‑step implementations of interval joins, window joins, and advanced join techniques using Flink's DataStream API.

Dimension Table Join Practices

Four common dimension‑table join methods are covered:

Pre‑load dimension table into memory.

Hot‑storage dimension table (e.g., Redis, HBase, MySQL).

Broadcast dimension table using Flink's broadcast state.

Temporal table function join.

1. Pre‑load Dimension Table

Implement a RichMapFunction that loads the dimension data in the open() method and performs the join in map():

package join;

import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.common.typeinfo.TypeHint;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import java.util.HashMap;
import java.util.Map;

public class JoinDemo1 {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        DataStream<Tuple2<String, Integer>> textStream = env.socketTextStream("localhost", 9000, "
")
                .map(p -> {
                    String[] list = p.split(",");
                    return new Tuple2<>(list[0], Integer.valueOf(list[1]));
                })
                .returns(new TypeHint<Tuple2<String, Integer>>() {});
        DataStream<Tuple3<String, Integer, String>> result = textStream.map(new MapJoinDemo1());
        result.print();
        env.execute("joinDemo1");
    }

    static class MapJoinDemo1 extends RichMapFunction<Tuple2<String, Integer>, Tuple3<String, Integer, String>> {
        Map<Integer, String> dim;
        @Override
        public void open(Configuration parameters) throws Exception {
            dim = new HashMap<>();
            dim.put(1001, "beijing");
            dim.put(1002, "shanghai");
            dim.put(1003, "wuhan");
            dim.put(1004, "changsha");
        }
        @Override
        public Tuple3<String, Integer, String> map(Tuple2<String, Integer> value) throws Exception {
            String cityName = dim.getOrDefault(value.f1, "");
            return new Tuple3<>(value.f0, value.f1, cityName);
        }
    }
}

2. Hot‑Storage Dimension Table

Store the dimension data in an external system (e.g., MySQL) and query it at runtime. Using Guava LoadingCache can reduce pressure on the external store:

package join;

import com.google.common.cache.*;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.common.typeinfo.TypeHint;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeUnit;

public class JoinDemo2 {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        DataStream<Tuple2<String, Integer>> textStream = env.socketTextStream("localhost", 9000, "
")
                .map(p -> {
                    String[] list = p.split(",");
                    return new Tuple2<>(list[0], Integer.valueOf(list[1]));
                })
                .returns(new TypeHint<Tuple2<String, Integer>>() {});
        DataStream<Tuple3<String, Integer, String>> result = textStream.map(new MapJoinDemo1());
        result.print();
        env.execute("joinDemo1");
    }

    static class MapJoinDemo1 extends RichMapFunction<Tuple2<String, Integer>, Tuple3<String, Integer, String>> {
        LoadingCache<Integer, String> dim;
        @Override
        public void open(Configuration parameters) throws Exception {
            dim = CacheBuilder.newBuilder()
                    .maximumSize(1000)
                    .expireAfterWrite(10, TimeUnit.MINUTES)
                    .removalListener((RemovalNotification<Integer, String> n) -> {
                        System.out.println(n.getKey() + " removed, value: " + n.getValue());
                    })
                    .build(new CacheLoader<Integer, String>() {
                        @Override
                        public String load(Integer cityId) throws Exception {
                            return readFromHbase(cityId);
                        }
                    });
        }
        private String readFromHbase(Integer cityId) {
            Map<Integer, String> temp = new HashMap<>();
            temp.put(1001, "beijing");
            temp.put(1002, "shanghai");
            temp.put(1003, "wuhan");
            temp.put(1004, "changsha");
            return temp.getOrDefault(cityId, "");
        }
        @Override
        public Tuple3<String, Integer, String> map(Tuple2<String, Integer> value) throws Exception {
            String cityName = dim.get(value.f1);
            return new Tuple3<>(value.f0, value.f1, cityName == null ? "" : cityName);
        }
    }
}

3. Broadcast Dimension Table

Use Flink's broadcast state to distribute a small dimension stream to all parallel instances of the main stream:

package join;

import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.common.state.BroadcastState;
import org.apache.flink.api.common.state.MapStateDescriptor;
import org.apache.flink.api.common.state.ReadOnlyBroadcastState;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.BroadcastStream;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.co.BroadcastProcessFunction;
import org.apache.flink.util.Collector;
import java.util.HashMap;
import java.util.Map;

public class JoinDemo4 {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        DataStream<Tuple2<String, Integer>> textStream = env.socketTextStream("localhost", 9000, "
")
                .map(p -> {
                    String[] list = p.split(",");
                    return new Tuple2<>(list[0], Integer.valueOf(list[1]));
                })
                .returns(new TypeHint<Tuple2<String, Integer>>() {});
        DataStream<Tuple2<Integer, String>> cityStream = env.socketTextStream("localhost", 9001, "
")
                .map(p -> {
                    String[] list = p.split(",");
                    return new Tuple2<>(Integer.valueOf(list[0]), list[1]);
                })
                .returns(new TypeHint<Tuple2<Integer, String>>() {});
        MapStateDescriptor<Integer, String> broadcastDesc = new MapStateDescriptor<>("broad1", Integer.class, String.class);
        BroadcastStream<Tuple2<Integer, String>> broadcastStream = cityStream.broadcast(broadcastDesc);
        DataStream<Tuple3<String, Integer, String>> result = textStream.connect(broadcastStream)
                .process(new BroadcastProcessFunction<Tuple2<String, Integer>, Tuple2<Integer, String>, Tuple3<String, Integer, String>>() {
                    @Override
                    public void processElement(Tuple2<String, Integer> value, ReadOnlyContext ctx, Collector<Tuple3<String, Integer, String>> out) throws Exception {
                        ReadOnlyBroadcastState<Integer, String> state = ctx.getBroadcastState(broadcastDesc);
                        String cityName = state.get(value.f1);
                        out.collect(new Tuple3<>(value.f0, value.f1, cityName == null ? "" : cityName));
                    }
                    @Override
                    public void processBroadcastElement(Tuple2<Integer, String> value, Context ctx, Collector<Tuple3<String, Integer, String>> out) throws Exception {
                        ctx.getBroadcastState(broadcastDesc).put(value.f0, value.f1);
                    }
                });
        result.print();
        env.execute("joinDemo");
    }
}

4. Temporal Table Function Join

Temporal tables provide a view of a continuously changing dimension table at a specific point in time. The join can be performed in Flink SQL API using a lateral table function.

Processing‑time example:

package join;

import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.java.StreamTableEnvironment;
import org.apache.flink.table.functions.TemporalTableFunction;
import org.apache.flink.types.Row;

public class JoinDemo5 {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        EnvironmentSettings settings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
        StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, settings);
        DataStream<Tuple2<String, Integer>> userStream = env.socketTextStream("localhost", 9000, "
")
                .map(p -> {
                    String[] list = p.split(",");
                    return new Tuple2<>(list[0], Integer.valueOf(list[1]));
                });
        DataStream<Tuple2<Integer, String>> cityStream = env.socketTextStream("localhost", 9001, "
")
                .map(p -> {
                    String[] list = p.split(",");
                    return new Tuple2<>(Integer.valueOf(list[0]), list[1]);
                });
        Table userTable = tableEnv.fromDataStream(userStream, "user_name,city_id,ps.proctime");
        Table cityTable = tableEnv.fromDataStream(cityStream, "city_id,city_name,ps.proctime");
        TemporalTableFunction dimCity = cityTable.createTemporalTableFunction("ps", "city_id");
        tableEnv.registerFunction("dimCity", dimCity);
        Table result = tableEnv.sqlQuery(
                "SELECT u.user_name, u.city_id, d.city_name FROM " + userTable + " AS u, LATERAL TABLE (dimCity(u.ps)) d WHERE u.city_id = d.city_id");
        tableEnv.toAppendStream(result, Row.class).print();
        env.execute("joinDemo");
    }
}

Event‑time and Kafka‑source examples follow the same pattern, demonstrating how the temporal view aligns with the event timestamps of the main stream.

5. Summary of Dimension‑Table Join Methods

The article concludes with a comparative diagram summarizing the advantages and limitations of the four dimension‑table join approaches (pre‑load, hot‑storage, broadcast, temporal table function).

JavaFlinkjoinBroadcast StateTemporal Table
Big Data Technology & Architecture
Written by

Big Data Technology & Architecture

Wang Zhiwu, a big data expert, dedicated to sharing big data technology.

0 followers
Reader feedback

How this landed with the community

Sign in to like

Rate this article

Was this worth your time?

Sign in to rate
Discussion

0 Comments

Thoughtful readers leave field notes, pushback, and hard-won operational detail here.