Big Data 31 min read

Comprehensive Guide to Dual‑Stream Join in Flink CDC with Java DataStream API

This article provides a detailed tutorial on implementing various dual‑stream join techniques—including processing‑time, event‑time, and interval joins—using Flink CDC 2.2 and Flink 1.14 with the Java DataStream API, complete with code examples, SQL setup, and execution results.

Big Data Technology & Architecture
Big Data Technology & Architecture
Big Data Technology & Architecture
Comprehensive Guide to Dual‑Stream Join in Flink CDC with Java DataStream API

This article demonstrates how to perform various dual‑stream join operations using Flink CDC 2.2 and Flink 1.14 with the Java DataStream API.

It first describes the overall workflow, including the two‑stream join process, and shows the logical steps for time‑window joins (processing‑time and event‑time) and interval joins.

Data preparation steps are provided, including MySQL table definitions for teacher and course, the required SQL to create and populate the tables, and notes such as enabling binlog.

[
    t_id varchar(3) primary key COMMENT '主键',
    t_name varchar(10) not null COMMENT '主键',
]
[
    c_id varchar(3) primary key COMMENT '主键',
    c_name varchar(20) not null COMMENT '主键',
    c_tid varchar(3) not null COMMENT '主键'
]
[
  t_id:  string,   // 主键id
  t_name: string, // 教师名称
  op: string,    // 数据操作类型
  ts_ms: long    // 毫秒时间戳
]
[
  c_id:  string,   // 主键id
  c_name: string, // 课程名称
  c_tid: string,  // 教师表主键id
  op: string,    // 数据操作类型
  ts_ms: long    // 毫秒时间戳
]

MySQL preparation SQL (create database, tables and insert data):

# 创建数据库 flinkcdc_etl_test
create database flinkcdc_etl_test;

# 使用数据库 flinkcdc_etl_test
use flinkcdc_etl_test;

# 创建教师表
DROP TABLE IF EXISTS `teacher`;
CREATE TABLE `teacher` (
  `t_id` varchar(3) NOT NULL COMMENT '主键',
  `t_name` varchar(10)  NOT NULL COMMENT '教师名称',
  PRIMARY KEY (`t_id`) USING BTREE
) COMMENT = '教师表';

INSERT INTO `teacher` VALUES ('001', '张三');
INSERT INTO `teacher` VALUES ('002', '李四');
INSERT INTO `teacher` VALUES ('003', '王五');

# 创建课程表
DROP TABLE IF EXISTS `course`;
CREATE TABLE `course` (
  `c_id` varchar(3)  NOT NULL COMMENT '主键',
  `c_name` varchar(20)  NOT NULL COMMENT '课程名称',
  `c_tid` varchar(3)  NOT NULL COMMENT '教师表主键',
  PRIMARY KEY (`c_id`) USING BTREE,
  INDEX `c_tid`(`c_tid`) USING BTREE
) COMMENT = '课程表';

INSERT INTO `course` VALUES ('1', '语文', '001');
INSERT INTO `course` VALUES ('2', '数学', '002');

Maven pom.xml configuration:

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <groupId>cn.mfox</groupId>
    <artifactId>teacher-course-etl-demo</artifactId>
    <version>1.0-SNAPSHOT</version>
    <properties>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <maven.compiler.source>1.8</maven.compiler.source>
        <maven.compiler.target>1.8</maven.compiler.target>
        <flink.version>1.14.4</flink.version>
        <scala.binary.version>2.12</scala.binary.version>
    </properties>
    <dependencies>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-java_2.12</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-java</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-clients_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>mysql</groupId>
            <artifactId>mysql-connector-java</artifactId>
            <version>8.0.23</version>
        </dependency>
        <dependency>
            <groupId>com.ververica</groupId>
            <artifactId>flink-sql-connector-mysql-cdc</artifactId>
            <version>2.2.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-planner-blink_2.12</artifactId>
            <version>1.12.0</version>
        </dependency>
        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>1.2.80</version>
        </dependency>
    </dependencies>
    <build>
        <plugins>
            <plugin>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>3.6.0</version>
                <configuration>
                    <source>1.8</source>
                    <target>1.8</target>
                </configuration>
            </plugin>
        </plugins>
    </build>
</project>

Utility enum OpEnum.java:

package cn.mfox.enumeration;

/**
 * CDC 中op类型
 */
public enum OpEnum {
    CREATE("c", "create", "新增"),
    UPDATA("u", "update", "更新"),
    DELETE("d", "delete", "删除"),
    READ("r", "read", "首次读");
    private String dictCode;
    private String dictValue;
    private String description;
    OpEnum(String dictCode, String dictValue, String description) {
        this.dictCode = dictCode;
        this.dictValue = dictValue;
        this.description = description;
    }
    public String getDictCode() { return dictCode; }
    public String getDictValue() { return dictValue; }
    public String getDescription() { return description; }
}

Utility class TransformUtil.java for formatting CDC records:

package cn.mfox.utils;

import cn.mfox.enumeration.OpEnum;
import com.alibaba.fastjson.JSONObject;

/**
 * 转换工具类
 */
public class TransformUtil {
    /**
     * 格式化抽取数据格式,去除 before、after、source 等冗余内容
     */
    public static JSONObject formatResult(String extractData) {
        JSONObject formatDataObj = new JSONObject();
        JSONObject rawDataObj = JSONObject.parseObject(extractData);
        formatDataObj.putAll(rawDataObj);
        formatDataObj.remove("before");
        formatDataObj.remove("after");
        formatDataObj.remove("source");
        String op = rawDataObj.getString("op");
        if (OpEnum.DELETE.getDictCode().equals(op)) {
            formatDataObj.putAll(rawDataObj.getJSONObject("before"));
        } else {
            formatDataObj.putAll(rawDataObj.getJSONObject("after"));
        }
        return formatDataObj;
    }
}

Source classes without watermarks:

package cn.mfox.etl.v2.join.watermark;

import cn.mfox.utils.TransformUtil;
import com.alibaba.fastjson.JSONObject;
import com.ververica.cdc.connectors.mysql.source.MySqlSource;
import com.ververica.cdc.connectors.mysql.table.StartupOptions;
import com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

/**
 * 抽取 课程表表数据源,无水位线
 */
public class CourseDataStreamNoWatermark {
    public static DataStream<JSONObject> getCourseDataStream(StreamExecutionEnvironment env) {
        MySqlSource<String> courseSouce = MySqlSource.<String>builder()
                .hostname("192.168.18.101")
                .port(3306)
                .username("root")
                .password("123456")
                .databaseList("flinkcdc_etl_test")
                .tableList("flinkcdc_etl_test.course")
                .startupOptions(StartupOptions.initial())
                .deserializer(new JsonDebeziumDeserializationSchema())
                .serverTimeZone("Asia/Shanghai")
                .build();
        DataStreamSource<String> mysqlDataStreamSource = env.fromSource(
                courseSouce,
                WatermarkStrategy.noWatermarks(),
                "CourseDataStreamNoWatermark Source"
        );
        DataStream<JSONObject> courseDataStream = mysqlDataStreamSource.map(rawData -> TransformUtil.formatResult(rawData));
        return courseDataStream;
    }
}
package cn.mfox.etl.v2.join.watermark;

import cn.mfox.utils.TransformUtil;
import com.alibaba.fastjson.JSONObject;
import com.ververica.cdc.connectors.mysql.source.MySqlSource;
import com.ververica.cdc.connectors.mysql.table.StartupOptions;
import com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

/**
 * 抽取 课程表表数据源,有水位线,指定 ts_ms 为时间戳
 */
public class CourseDataStreamWithWatermark {
    public static DataStream<JSONObject> getCourseDataStream(StreamExecutionEnvironment env) {
        MySqlSource<String> courseSouce = MySqlSource.<String>builder()
                .hostname("192.168.18.101")
                .port(3306)
                .username("root")
                .password("123456")
                .databaseList("flinkcdc_etl_test")
                .tableList("flinkcdc_etl_test.course")
                .startupOptions(StartupOptions.initial())
                .deserializer(new JsonDebeziumDeserializationSchema())
                .serverTimeZone("Asia/Shanghai")
                .build();
        DataStreamSource<String> mysqlDataStreamSource = env.fromSource(
                courseSouce,
                WatermarkStrategy.<String>forBoundedOutOfOrderness(java.time.Duration.ofSeconds(1L))
                        .withTimestampAssigner((extractData, l) -> JSONObject.parseObject(extractData).getLong("ts_ms")),
                "CourseDataStreamWithWatermark Source"
        );
        DataStream<JSONObject> courseDataStream = mysqlDataStreamSource.map(rawData -> TransformUtil.formatResult(rawData));
        return courseDataStream;
    }
}
package cn.mfox.etl.v2.join.watermark;

import cn.mfox.utils.TransformUtil;
import com.alibaba.fastjson.JSONObject;
import com.ververica.cdc.connectors.mysql.source.MySqlSource;
import com.ververica.cdc.connectors.mysql.table.StartupOptions;
import com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

/**
 * 抽取 教师表数据源,无水位线
 */
public class TeacherDataStreamNoWatermark {
    public static DataStream<JSONObject> getTeacherDataStream(StreamExecutionEnvironment env) {
        MySqlSource<String> teacherSouce = MySqlSource.<String>builder()
                .hostname("192.168.18.101")
                .port(3306)
                .username("root")
                .password("123456")
                .databaseList("flinkcdc_etl_test")
                .tableList("flinkcdc_etl_test.teacher")
                .startupOptions(StartupOptions.initial())
                .deserializer(new JsonDebeziumDeserializationSchema())
                .serverTimeZone("Asia/Shanghai")
                .build();
        DataStreamSource<String> mysqlDataStreamSource = env.fromSource(
                teacherSouce,
                WatermarkStrategy.noWatermarks(),
                "TeacherDataStreamNoWatermark Source"
        );
        DataStream<JSONObject> teacherDataStream = mysqlDataStreamSource.map(rawData -> TransformUtil.formatResult(rawData));
        return teacherDataStream;
    }
}
package cn.mfox.etl.v2.join.watermark;

import cn.mfox.utils.TransformUtil;
import com.alibaba.fastjson.JSONObject;
import com.ververica.cdc.connectors.mysql.source.MySqlSource;
import com.ververica.cdc.connectors.mysql.table.StartupOptions;
import com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

/**
 * 抽取 教师表数据源,有水位线,指定 ts_ms 为时间戳
 */
public class TeacherDataStreamWithWatermark {
    public static DataStream<JSONObject> getTeacherDataStream(StreamExecutionEnvironment env) {
        MySqlSource<String> teacherSouce = MySqlSource.<String>builder()
                .hostname("192.168.18.101")
                .port(3306)
                .username("root")
                .password("123456")
                .databaseList("flinkcdc_etl_test")
                .tableList("flinkcdc_etl_test.teacher")
                .startupOptions(StartupOptions.initial())
                .deserializer(new JsonDebeziumDeserializationSchema())
                .serverTimeZone("Asia/Shanghai")
                .build();
        DataStreamSource<String> mysqlDataStreamSource = env.fromSource(
                teacherSouce,
                WatermarkStrategy.<String>forBoundedOutOfOrderness(java.time.Duration.ofSeconds(1L))
                        .withTimestampAssigner((extractData, l) -> JSONObject.parseObject(extractData).getLong("ts_ms")),
                "TeacherDataStreamWithWatermark Source"
        );
        DataStream<JSONObject> teacherDataStream = mysqlDataStreamSource.map(rawData -> TransformUtil.formatResult(rawData));
        return teacherDataStream;
    }
}

Processing‑time inner join example ( WindowInnerJoinByProcessTimeTest.java):

package cn.mfox.etl.v2.join.window.inner;

import cn.mfox.etl.v2.join.watermark.CourseDataStreamNoWatermark;
import cn.mfox.etl.v2.join.watermark.TeacherDataStreamNoWatermark;
import com.alibaba.fastjson.JSONObject;
import org.apache.flink.api.common.functions.JoinFunction;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;

public class WindowInnerJoinByProcessTimeTest {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        DataStream<JSONObject> teacherDataStream = TeacherDataStreamNoWatermark.getTeacherDataStream(env);
        DataStream<JSONObject> courseDataStream = CourseDataStreamNoWatermark.getCourseDataStream(env);
        windowInnerJoinAndPrint(teacherDataStream, courseDataStream);
        env.execute("WindowInnerJoinByProcessTimeTest Job");
    }
    private static void windowInnerJoinAndPrint(DataStream<JSONObject> teacherDataStream, DataStream<JSONObject> courseDataStream) {
        DataStream<JSONObject> teacherCourseDataStream = teacherDataStream
                .join(courseDataStream)
                .where(teacher -> teacher.getString("t_id"))
                .equalTo(course -> course.getString("c_tid"))
                .window(TumblingProcessingTimeWindows.of(Time.seconds(3L)))
                .apply(new JoinFunction<JSONObject, JSONObject, JSONObject>() {
                    @Override
                    public JSONObject join(JSONObject left, JSONObject right) {
                        left.putAll(right);
                        return left;
                    }
                });
        teacherCourseDataStream.print("Window Inner Join By Process Time");
    }
}

Processing‑time outer join using coGroup ( WindowOuterJoinByProcessTimeTest.java):

package cn.mfox.etl.v2.join.window.outer;

import cn.mfox.etl.v2.join.watermark.CourseDataStreamNoWatermark;
import cn.mfox.etl.v2.join.watermark.TeacherDataStreamNoWatermark;
import com.alibaba.fastjson.JSONObject;
import org.apache.flink.api.common.functions.CoGroupFunction;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.util.Collector;

public class WindowOuterJoinByProcessTimeTest {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        DataStream<JSONObject> teacherDataStream = TeacherDataStreamNoWatermark.getTeacherDataStream(env);
        DataStream<JSONObject> courseDataStream = CourseDataStreamNoWatermark.getCourseDataStream(env);
        windowOuterJoinAndPrint(teacherDataStream, courseDataStream);
        env.execute("WindowOuterJoinByProcessTimeTest Job");
    }
    private static void windowOuterJoinAndPrint(DataStream<JSONObject> teacherDataStream, DataStream<JSONObject> courseDataStream) {
        DataStream<JSONObject> teacherCourseDataStream = teacherDataStream
                .coGroup(courseDataStream)
                .where(teacher -> teacher.getString("t_id"))
                .equalTo(course -> course.getString("c_tid"))
                .window(TumblingProcessingTimeWindows.of(Time.seconds(3L)))
                .apply(new CoGroupFunction<JSONObject, JSONObject, JSONObject>() {
                    @Override
                    public void coGroup(Iterable<JSONObject> left, Iterable<JSONObject> right, Collector<JSONObject> out) {
                        JSONObject result = new JSONObject();
                        for (JSONObject o : left) { result.putAll(o); }
                        for (JSONObject o : right) { result.putAll(o); }
                        out.collect(result);
                    }
                });
        teacherCourseDataStream.print("Window Outer Join By Process Time");
    }
}

Event‑time inner and outer join examples ( WindowInnerJoinByEventTimeTest.java and WindowOuterJoinByEventTimeTest.java) follow the same pattern but use TumblingEventTimeWindows and the watermark‑enabled sources.

Interval join based on event time ( InteralJoinByEventTimeTest.java) uses keyBy and intervalJoin with a ±5‑second window:

package cn.mfox.etl.v2.join.interal;

import cn.mfox.etl.v2.join.watermark.CourseDataStreamWithWatermark;
import cn.mfox.etl.v2.join.watermark.TeacherDataStreamWithWatermark;
import com.alibaba.fastjson.JSONObject;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.co.ProcessJoinFunction;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.util.Collector;

public class InteralJoinByEventTimeTest {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        DataStream<JSONObject> teacherDataStream = TeacherDataStreamWithWatermark.getTeacherDataStream(env);
        DataStream<JSONObject> courseDataStream = CourseDataStreamWithWatermark.getCourseDataStream(env);
        intervalJoinAndPrint(teacherDataStream, courseDataStream);
        env.execute("TeacherJoinCourseTest Job");
    }
    private static void intervalJoinAndPrint(DataStream<JSONObject> teacherDataStream, DataStream<JSONObject> courseDataStream) {
        DataStream<JSONObject> teacherCourseDataStream = teacherDataStream
                .keyBy(teacher -> teacher.getString("t_id"))
                .intervalJoin(courseDataStream.keyBy(course -> course.getString("c_tid")))
                .between(Time.seconds(-5), Time.seconds(5))
                .process(new ProcessJoinFunction<JSONObject, JSONObject, JSONObject>() {
                    @Override
                    public void processElement(JSONObject left, JSONObject right, Context ctx, Collector<JSONObject> out) {
                        left.putAll(right);
                        out.collect(left);
                    }
                });
        teacherCourseDataStream.print("Interval Join By Event Time");
    }
}

Each program prints the joined JSON records to the console; screenshots in the original article illustrate the output and compare inner vs. outer joins for both processing‑time and event‑time windows, as well as the interval join behavior.

The article concludes with a comparison chart summarising the characteristics of all join methods and a friendly reminder to like, share, and bookmark the post.

Original Source

Signed-in readers can open the original source through BestHub's protected redirect.

Sign in to view source
Republication Notice

This article has been distilled and summarized from source material, then republished for learning and reference. If you believe it infringes your rights, please contactadmin@besthub.devand we will review it promptly.

JavaBig DataFlinkCDCDataStreamStream Join
Big Data Technology & Architecture
Written by

Big Data Technology & Architecture

Wang Zhiwu, a big data expert, dedicated to sharing big data technology.

0 followers
Reader feedback

How this landed with the community

Sign in to like

Rate this article

Was this worth your time?

Sign in to rate
Discussion

0 Comments

Thoughtful readers leave field notes, pushback, and hard-won operational detail here.