Big Data 15 min read

Flink Multi-Stream Union Operations and Event-Time Sorting

This article explains how to use Flink's DataStream.union() to combine multiple streams of the same type, demonstrates Maven project setup and code examples for simple unions and for unions with custom event-time sorting, and shows the resulting ordered output.

Big Data Technology & Architecture
Big Data Technology & Architecture
Big Data Technology & Architecture
Flink Multi-Stream Union Operations and Event-Time Sorting

In Flink, multiple streams can be combined using the union operation, which merges streams of the same data type; the article explains the basic union concept, its usage via DataStream.union(), and important constraints such as identical element types.

The Maven project structure is presented, including a parent pom.xml for the flink-demo project and a child pom.xml for the muti-stream-union module.

<project xmlns="http://maven.apache.org/POM/4.0.0" ...>
    <modelVersion>4.0.0</modelVersion>
    <groupId>cn.antiy</groupId>
    <artifactId>flink-demo</artifactId>
    <version>1.0-SNAPSHOT</version>
    <modules>
        <module>muti-stream-union</module>
    </modules>
    ...
</project>
<project xmlns="http://maven.apache.org/POM/4.0.0" ...>
    <parent>
        <artifactId>flink-demo</artifactId>
        <groupId>cn.antiy</groupId>
        <version>1.0-SNAPSHOT</version>
    </parent>
    <artifactId>muti-stream-union</artifactId>
    <packaging>jar</packaging>
    ...
</project>

Case 1 – Simple Union of Odd and Even Integer Streams

package cn.antiy.union.base;

import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

public class OddUnionEvenDataStreamTest {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        DataStreamSource<Integer> oddNumStream = env.fromElements(1, 3, 5, 7);
        DataStreamSource<Integer> evenStream = env.fromElements(2, 4, 6, 8);
        oddNumStream.union(evenStream).print();
        env.execute("OddUnionEvenDataStreamTest");
    }
}

The program prints the merged stream of odd and even numbers.

Case 2 – Union of Login and Download Event Streams

package cn.antiy.union.event.v1.entity;

import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;

@Data
@NoArgsConstructor
@AllArgsConstructor
public class Event {
    private Long timestamp;
    private String userId;
    private String ipAddress;
    private String eventType;
    private String remark;
    private String downloadStatus;
    private String filename;
}
package cn.antiy.union.event.v1;

import cn.antiy.union.event.v1.entity.Event;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

public class LoginUnionDownloadDataStreamTest {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        DataStream<Event> loginEventStream = env.fromElements(
            new Event(1659657901L, "A", "192.168.1.1", "0", "2022-08-05 08:05:01", null, null),
            ...
        );
        DataStream<Event> downloadEventStream = env.fromElements(
            new Event(1659657902L, "A", null, null, "2022-08-05 08:05:02", "O", "西游记"),
            ...
        );
        loginEventStream.union(downloadEventStream).print();
        env.execute("LoginUnionDownloadDataStreamTest");
    }
}

This union simply concatenates the two streams, resulting in an unordered output.

Ordered Union Using Event-Time Sorting

package cn.antiy.union.event.v2.entity;

import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;

@Data
@NoArgsConstructor
@AllArgsConstructor
public class Event implements Comparable<Event> {
    private Long timestamp;
    private String userId;
    private String ipAddress;
    private String eventType;
    private String remark;
    private String downloadStatus;
    private String filename;
    public String getKey() { return "1"; }
    @Override
    public int compareTo(Event o) { return Long.compare(this.timestamp, o.timestamp); }
}
package cn.antiy.union.event.v2;

import cn.antiy.union.event.v2.entity.Event;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.typeinfo.TypeHint;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.TimerService;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.util.Collector;
import java.util.PriorityQueue;

public class SortByTimestampFunction extends KeyedProcessFunction<String, Event, Event> {
    private ValueState<PriorityQueue<Event>> queueState = null;
    @Override
    public void open(Configuration config) {
        ValueStateDescriptor<PriorityQueue<Event>> descriptor = new ValueStateDescriptor<>(
            "sorted-events",
            TypeInformation.of(new TypeHint<PriorityQueue<Event>>() {}));
        queueState = getRuntimeContext().getState(descriptor);
    }
    @Override
    public void processElement(Event event, Context context, Collector<Event> out) throws Exception {
        TimerService timerService = context.timerService();
        long currentWatermark = timerService.currentWatermark();
        if (context.timestamp() > currentWatermark) {
            PriorityQueue<Event> queue = queueState.value();
            if (queue == null) { queue = new PriorityQueue<>(10); }
            queue.add(event);
            queueState.update(queue);
            timerService.registerEventTimeTimer(event.getTimestamp());
        }
    }
    @Override
    public void onTimer(long timestamp, OnTimerContext context, Collector<Event> out) throws Exception {
        PriorityQueue<Event> queue = queueState.value();
        long watermark = context.timerService().currentWatermark();
        Event head = queue.peek();
        while (head != null && head.getTimestamp() <= watermark) {
            out.collect(head);
            queue.remove(head);
            head = queue.peek();
        }
    }
}
package cn.antiy.union.event.v2;

import cn.antiy.union.event.v2.entity.Event;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import java.time.Duration;

public class UnionAndSortDataStreamTest {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        DataStream<Event> loginEventStream = env.fromElements(
            new Event(1659657901L, "2022-08-05 08:05:01", "192.168.1.1", "0", null, null, null),
            ...
        ).assignTimestampsAndWatermarks(
            WatermarkStrategy.<Event>forBoundedOutOfOrderness(Duration.ofSeconds(1L))
                .withTimestampAssigner((event, ts) -> event.getTimestamp())
        );
        DataStream<Event> downloadEventStream = env.fromElements(
            new Event(1659657902L, "2022-08-05 08:05:02", null, null, null, "O", "西游记"),
            ...
        ).assignTimestampsAndWatermarks(
            WatermarkStrategy.<Event>forBoundedOutOfOrderness(Duration.ofSeconds(1L))
                .withTimestampAssigner((event, ts) -> event.getTimestamp())
        );
        loginEventStream.union(downloadEventStream)
            .keyBy(r -> r.getKey())
            .process(new SortByTimestampFunction())
            .print();
        env.execute("UnionAndSortDataStreamTest");
    }
}

By applying watermarks and the custom SortByTimestampFunction, the merged stream is emitted in ascending timestamp order, achieving the expected sorted output.

Conclusion

The article demonstrates basic union of Flink streams, shows how to set up Maven modules, provides concrete Java examples, and explains how to obtain ordered results by combining union with event‑time watermarks and a custom sorting process function.

Original Source

Signed-in readers can open the original source through BestHub's protected redirect.

Sign in to view source
Republication Notice

This article has been distilled and summarized from source material, then republished for learning and reference. If you believe it infringes your rights, please contactadmin@besthub.devand we will review it promptly.

JavaBig DataFlinkDataStreamEventTimeUNION
Big Data Technology & Architecture
Written by

Big Data Technology & Architecture

Wang Zhiwu, a big data expert, dedicated to sharing big data technology.

0 followers
Reader feedback

How this landed with the community

Sign in to like

Rate this article

Was this worth your time?

Sign in to rate
Discussion

0 Comments

Thoughtful readers leave field notes, pushback, and hard-won operational detail here.