Big Data 14 min read

Hot Goods Top‑N Calculation with Flink Event‑Time Sliding Windows

This article explains how to compute the top‑N hot products or brands within a time window using Apache Flink, covering data modeling, event‑time handling, sliding windows, custom aggregation functions, and result sorting with complete Java code examples.

Big Data Technology & Architecture
Big Data Technology & Architecture
Big Data Technology & Architecture
Hot Goods Top‑N Calculation with Flink Event‑Time Sliding Windows

The goal is to count the most popular items or brands (Top‑N) over a configurable time interval using Apache Flink's event‑time processing.

Technical points include using Flink's EventTime, sliding windows (10‑minute length, 1‑minute slide), and timers.

Step 1 – Data parsing : raw JSON strings are converted into a MyBehavior JavaBean via FastJSON.

import com.alibaba.fastjson.JSON;
import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.datastream.WindowedStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor;
import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.SlidingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;

public class HotGoodsTopN {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
        env.enableCheckpointing(60000);
        env.setParallelism(1);
        DataStreamSource<String> lines = env.socketTextStream("linux01", 8888);
        SingleOutputStreamOperator<MyBehavior> process = lines.process(new ProcessFunction<String, MyBehavior>() {
            @Override
            public void processElement(String input, Context ctx, Collector<MyBehavior> out) throws Exception {
                try {
                    MyBehavior behavior = JSON.parseObject(input, MyBehavior.class);
                    out.collect(behavior);
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        });
        // ... further pipeline omitted for brevity
    }
}

Step 2 – Assign timestamps and watermarks using the event timestamp field.

SingleOutputStreamOperator<MyBehavior> behaviorDSWithWaterMark =
    process.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<MyBehavior>(Time.seconds(0)) {
        @Override
        public long extractTimestamp(MyBehavior element) {
            return element.timestamp;
        }
    });

Step 3 – Key by itemId and type so that each product‑action pair is processed separately.

KeyedStream<MyBehavior, Tuple> keyed = behaviorDSWithWaterMark.keyBy("itemId", "type");

Step 4 – Define a sliding window of 10 minutes that slides every minute.

WindowedStream<MyBehavior, Tuple, TimeWindow> window =
    keyed.window(SlidingEventTimeWindows.of(Time.minutes(10), Time.minutes(1)));

Step 5 – Aggregate within the window using a custom WindowFunction to emit ItemViewCount records containing the window start/end and count.

SingleOutputStreamOperator<ItemViewCount> result = window.apply(new WindowFunction<MyBehavior, ItemViewCount, Tuple, TimeWindow>() {
    @Override
    public void apply(Tuple tuple, TimeWindow window, Iterable<MyBehavior> input, Collector<ItemViewCount> out) throws Exception {
        String itemId = tuple.getField(0);
        String type = tuple.getField(1);
        long start = window.getStart();
        long end = window.getEnd();
        int count = 0;
        for (MyBehavior b : input) {
            count += 1;
        }
        out.collect(ItemViewCount.of(itemId, type, start, end, count));
    }
});
result.print();
env.execute("HotGoodsTopN");

The supporting data classes are defined as follows:

public class MyBehavior {
    public String userId;
    public String itemId;
    public String categoryId;
    public String type; // pv, buy, cart, fav
    public long timestamp; // seconds
    public long counts = 1;
    // factory methods and getters omitted for brevity
}

public class ItemViewCount {
    public String itemId;
    public String type;
    public long windowStart;
    public long windowEnd;
    public long viewCount;
    // factory method and toString omitted for brevity
}

The first implementation prints raw counts per window, e.g.:

{itemId='p1001', type='pv', windowStart=1583636520000 , 2020-03-08 11:02:00.0, windowEnd=1583637120000 , 2020-03-08 11:12:00.0, viewCount=3}

Advanced implementation (Step 6) replaces the apply‑based aggregation with an incremental AggregateFunction ( MyWindowAggFunction) to keep only a running counter, and a WindowFunction ( MyWindowFunction) to emit the final result.

public static class MyWindowAggFunction implements AggregateFunction<MyBehavior, Long, Long> {
    @Override public Long createAccumulator() { return 0L; }
    @Override public Long add(MyBehavior input, Long accumulator) { return accumulator + input.counts; }
    @Override public Long getResult(Long accumulator) { return accumulator; }
    @Override public Long merge(Long a, Long b) { return null; }
}

public static class MyWindowFunction implements WindowFunction<Long, ItemViewCount, Tuple, TimeWindow> {
    @Override
    public void apply(Tuple tuple, TimeWindow window, Iterable<Long> input, Collector<ItemViewCount> out) throws Exception {
        String itemId = tuple.getField(0);
        String type = tuple.getField(1);
        long start = window.getStart();
        long end = window.getEnd();
        Long count = input.iterator().next();
        out.collect(ItemViewCount.of(itemId, type, start, end, count));
    }
}

SingleOutputStreamOperator<ItemViewCount> windowAggregate = window.aggregate(new MyWindowAggFunction(), new MyWindowFunction());

Step 7 – Sorting the aggregated results groups by type and window boundaries, buffers records in a ValueState, registers an event‑time timer, and on timer fires sorts the list by view count in descending order.

KeyedStream<ItemViewCount, Tuple> sortedKeyed = windowAggregate.keyBy("type", "windowStart", "windowEnd");
sortedKeyed.process(new KeyedProcessFunction<Tuple, ItemViewCount, List<ItemViewCount>>() {
    private transient ValueState<List<ItemViewCount>> valueState;
    @Override public void open(Configuration parameters) throws Exception {
        ValueStateDescriptor<List<ItemViewCount>> descriptor = new ValueStateDescriptor<>("list-state", TypeInformation.of(new TypeHint<List<ItemViewCount>>() {}));
        valueState = getRuntimeContext().getState(descriptor);
    }
    @Override public void processElement(ItemViewCount input, Context ctx, Collector<List<ItemViewCount>> out) throws Exception {
        List<ItemViewCount> buffer = valueState.value();
        if (buffer == null) { buffer = new ArrayList<>(); }
        buffer.add(input);
        valueState.update(buffer);
        ctx.timerService().registerEventTimeTimer(input.windowEnd + 1);
    }
    @Override public void onTimer(long timestamp, OnTimerContext ctx, Collector<List<ItemViewCount>> out) throws Exception {
        List<ItemViewCount> buffer = valueState.value();
        buffer.sort((o1, o2) -> -(int)(o1.viewCount - o2.viewCount));
        valueState.update(null);
        out.collect(buffer);
    }
}).print();
env.execute("HotGoodsTopNAdv");

The article concludes with sample output showing per‑window counts for different item‑type combinations and encourages readers to follow the associated WeChat public accounts for more big‑data content.

Original Source

Signed-in readers can open the original source through BestHub's protected redirect.

Sign in to view source
Republication Notice

This article has been distilled and summarized from source material, then republished for learning and reference. If you believe it infringes your rights, please contactadmin@besthub.devand we will review it promptly.

JavaFlinkStreamingbigdataWindowEventTimeTopN
Big Data Technology & Architecture
Written by

Big Data Technology & Architecture

Wang Zhiwu, a big data expert, dedicated to sharing big data technology.

0 followers
Reader feedback

How this landed with the community

Sign in to like

Rate this article

Was this worth your time?

Sign in to rate
Discussion

0 Comments

Thoughtful readers leave field notes, pushback, and hard-won operational detail here.