Big Data 8 min read

Implementing Real-Time TopN Rankings with Apache Flink

This article demonstrates how to develop a real-time TopN ranking feature in Apache Flink, covering stream setup, word count aggregation, global and grouped TopN calculations, and nested TopN strategies to mitigate hotspot issues, complete with Java code examples.

Big Data Technology & Architecture
Big Data Technology & Architecture
Big Data Technology & Architecture
Implementing Real-Time TopN Rankings with Apache Flink

TopN is a common feature in statistical reports and dashboards, used to compute real-time ranking lists by counting occurrences of items.

Using a word‑frequency example, we show how to quickly develop a TopN program with Apache Flink.

Flink supports various streaming sources; this demo uses the built‑in socketTextStream as the data source.

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime); // use processing time
DataStream<String> text = env.socketTextStream(hostName, port); // listen to socket

Similar to offline word count, the program splits each incoming sentence into words and aggregates counts per word.

DataStream<Tuple2<String, Integer>> ds = text
    .flatMap(new LineSplitter()); // split lines into words, initialize count=1

private static final class LineSplitter implements FlatMapFunction<String, Tuple2<String, Integer>> {
    @Override
    public void flatMap(String value, Collector<Tuple2<String, Integer>> out) {
        String[] tokens = value.toLowerCase().split("\\W+");
        for (String token : tokens) {
            if (token.length() > 0) {
                out.collect(new Tuple2<String, Integer>(token, 1));
            }
        }
    }
}
DataStream<Tuple2<String, Integer>> wcount = ds
    .keyBy(0)
    .window(SlidingProcessingTimeWindows.of(Time.seconds(600), Time.seconds(20)))
    .sum(1); // sum counts per word

Global TopN

After the previous processing, every 20 seconds the word counts are emitted downstream.

DataStream<Tuple2<String, Integer>> ret = wcount
    .windowAll(TumblingProcessingTimeWindows.of(Time.seconds(20)))
    .process(new TopNAllFunction(5)); // compute Top‑5 in the window

The windowAll operation has a parallelism of 1, so all elements are gathered into a single window for ranking.

private static class TopNAllFunction extends ProcessAllWindowFunction<Tuple2<String, Integer>, Tuple2<String, Integer>, TimeWindow> {
    private int topSize = 10;
    public TopNAllFunction(int topSize) { this.topSize = topSize; }
    @Override
    public void process(Context ctx, Iterable<Tuple2<String, Integer>> input, Collector<Tuple2<String, Integer>> out) {
        TreeMap<Integer, Tuple2<String, Integer>> treemap = new TreeMap<>((y, x) -> (x < y) ? -1 : 1);
        for (Tuple2<String, Integer> element : input) {
            treemap.put(element.f1, element);
            if (treemap.size() > topSize) { treemap.pollLastEntry(); }
        }
        for (Entry<Integer, Tuple2<String, Integer>> entry : treemap.entrySet()) {
            out.collect(entry.getValue());
        }
    }
}

Grouped TopN

In some scenarios, users need a ranking per group, such as per initial letter.

wcount.keyBy(new TupleKeySelectorByStart())
       .window(TumblingProcessingTimeWindows.of(Time.seconds(20)))
       .process(new TopNFunction(5)); // group‑wise Top‑N
private static class TupleKeySelectorByStart implements KeySelector<Tuple2<String, Integer>, String> {
    @Override
    public String getKey(Tuple2<String, Integer> value) { return value.f0.substring(0, 1); }
}
private static class TopNFunction extends ProcessWindowFunction<Tuple2<String, Integer>, Tuple2<String, Integer>, String, TimeWindow> {
    private int topSize = 10;
    public TopNFunction(int topSize) { this.topSize = topSize; }
    @Override
    public void process(String key, Context ctx, Iterable<Tuple2<String, Integer>> input, Collector<Tuple2<String, Integer>> out) {
        TreeMap<Integer, Tuple2<String, Integer>> treemap = new TreeMap<>((y, x) -> (x < y) ? -1 : 1);
        for (Tuple2<String, Integer> element : input) {
            treemap.put(element.f1, element);
            if (treemap.size() > topSize) { treemap.pollLastEntry(); }
        }
        for (Entry<Integer, Tuple2<String, Integer>> entry : treemap.entrySet()) {
            out.collect(entry.getValue());
        }
    }
}

The code above groups words by their first letter and emits the top‑N words per group.

Nested TopN

Using only windowAll for global TopN limits scalability because the operation runs with parallelism 1, creating a hotspot.

A nested (two‑level) TopN approach mitigates this: first compute a grouped TopN to distribute load, then merge the partial results in a second global TopN, allowing the first layer to scale horizontally.

Original Source

Signed-in readers can open the original source through BestHub's protected redirect.

Sign in to view source
Republication Notice

This article has been distilled and summarized from source material, then republished for learning and reference. If you believe it infringes your rights, please contactadmin@besthub.devand we will review it promptly.

JavaReal-TimeBig DataFlinkStreamingTopN
Big Data Technology & Architecture
Written by

Big Data Technology & Architecture

Wang Zhiwu, a big data expert, dedicated to sharing big data technology.

0 followers
Reader feedback

How this landed with the community

Sign in to like

Rate this article

Was this worth your time?

Sign in to rate
Discussion

0 Comments

Thoughtful readers leave field notes, pushback, and hard-won operational detail here.