Big Data 8 min read

Implementing Real-Time TopN Rankings with Apache Flink

This article explains how to develop a real‑time TopN ranking feature using Apache Flink, covering both global and grouped TopN implementations, nested TopN strategies, and provides complete Java code snippets for environment setup, word counting, windowing, and custom TopN functions.

Big Data Technology & Architecture
Big Data Technology & Architecture
Big Data Technology & Architecture
Implementing Real-Time TopN Rankings with Apache Flink

TopN is a common feature in statistical reports and dashboards, used to compute real‑time ranking lists. In streaming scenarios, TopN can rank items in memory based on a metric such as occurrence count and quickly emit updated rankings.

The example demonstrates how to build a Flink program that calculates TopN for word‑frequency statistics.

First, set up the Flink execution environment and use a socket as the data source:

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime); // use processing time
DataStream<String> text = env.socketTextStream(hostName, port); // read from socket

Split each incoming sentence into words and emit a tuple (word, 1):

DataStream<Tuple2<String, Integer>> ds = text
    .flatMap(new LineSplitter());

private static final class LineSplitter implements FlatMapFunction<String, Tuple2<String, Integer>> {
    @Override
    public void flatMap(String value, Collector<Tuple2<String, Integer>> out) {
        String[] tokens = value.toLowerCase().split("\\W+");
        for (String token : tokens) {
            if (token.length() > 0) {
                out.collect(new Tuple2<String, Integer>(token, 1));
            }
        }
    }
}

Aggregate counts per word in a sliding window (600 s length, slide every 20 s):

DataStream<Tuple2<String, Integer>> wcount = ds
    .keyBy(0)
    .window(SlidingProcessingTimeWindows.of(Time.seconds(600), Time.seconds(20)))
    .sum(1);

Compute a global TopN every 20 s using a non‑parallel windowAll operation:

DataStream<Tuple2<String, Integer>> ret = wcount
    .windowAll(TumblingProcessingTimeWindows.of(Time.seconds(20)))
    .process(new TopNAllFunction(5));

The TopNAllFunction maintains a descending TreeMap to keep the top‑N elements:

private static class TopNAllFunction extends ProcessAllWindowFunction<Tuple2<String, Integer>, Tuple2<String, Integer>, TimeWindow> {
    private int topSize = 10;
    public TopNAllFunction(int topSize) { this.topSize = topSize; }
    @Override
    public void process(Context ctx, Iterable<Tuple2<String, Integer>> input, Collector<Tuple2<String, Integer>> out) {
        TreeMap<Integer, Tuple2<String, Integer>> treemap = new TreeMap<>( (y, x) -> (x < y) ? -1 : 1 );
        for (Tuple2<String, Integer> element : input) {
            treemap.put(element.f1, element);
            if (treemap.size() > topSize) treemap.pollLastEntry();
        }
        for (Entry<Integer, Tuple2<String, Integer>> entry : treemap.entrySet()) {
            out.collect(entry.getValue());
        }
    }
}

For scenarios requiring ranking per group (e.g., by first letter), use a keyed window and a custom TopNFunction:

wcount.keyBy(new TupleKeySelectorByStart())
    .window(TumblingProcessingTimeWindows.of(Time.seconds(20)))
    .process(new TopNFunction(5));

private static class TupleKeySelectorByStart implements KeySelector<Tuple2<String, Integer>, String> {
    @Override
    public String getKey(Tuple2<String, Integer> value) { return value.f0.substring(0, 1); }
}

private static class TopNFunction extends ProcessWindowFunction<Tuple2<String, Integer>, Tuple2<String, Integer>, String, TimeWindow> {
    private int topSize = 10;
    public TopNFunction(int topSize) { this.topSize = topSize; }
    @Override
    public void process(String key, Context ctx, Iterable<Tuple2<String, Integer>> input, Collector<Tuple2<String, Integer>> out) {
        TreeMap<Integer, Tuple2<String, Integer>> treemap = new TreeMap<>( (y, x) -> (x < y) ? -1 : 1 );
        for (Tuple2<String, Integer> element : input) {
            treemap.put(element.f1, element);
            if (treemap.size() > topSize) treemap.pollLastEntry();
        }
        for (Entry<Integer, Tuple2<String, Integer>> entry : treemap.entrySet()) {
            out.collect(entry.getValue());
        }
    }
}

Because windowAll runs with parallelism 1, it can become a bottleneck. A nested (two‑level) TopN approach mitigates this by first computing per‑group TopN and then merging the results in a second global TopN, allowing the first level to scale horizontally.

Original Source

Signed-in readers can open the original source through BestHub's protected redirect.

Sign in to view source
Republication Notice

This article has been distilled and summarized from source material, then republished for learning and reference. If you believe it infringes your rights, please contactadmin@besthub.devand we will review it promptly.

JavaReal-TimeFlinkStreamingTopN
Big Data Technology & Architecture
Written by

Big Data Technology & Architecture

Wang Zhiwu, a big data expert, dedicated to sharing big data technology.

0 followers
Reader feedback

How this landed with the community

Sign in to like

Rate this article

Was this worth your time?

Sign in to rate
Discussion

0 Comments

Thoughtful readers leave field notes, pushback, and hard-won operational detail here.