Implementing Real-Time TopN Rankings with Apache Flink
This article explains how to develop a real‑time TopN ranking feature using Apache Flink, covering both global and grouped TopN implementations, nested TopN strategies, and provides complete Java code snippets for environment setup, word counting, windowing, and custom TopN functions.
TopN is a common feature in statistical reports and dashboards, used to compute real‑time ranking lists. In streaming scenarios, TopN can rank items in memory based on a metric such as occurrence count and quickly emit updated rankings.
The example demonstrates how to build a Flink program that calculates TopN for word‑frequency statistics.
First, set up the Flink execution environment and use a socket as the data source:
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime); // use processing time
DataStream<String> text = env.socketTextStream(hostName, port); // read from socketSplit each incoming sentence into words and emit a tuple (word, 1):
DataStream<Tuple2<String, Integer>> ds = text
.flatMap(new LineSplitter());
private static final class LineSplitter implements FlatMapFunction<String, Tuple2<String, Integer>> {
@Override
public void flatMap(String value, Collector<Tuple2<String, Integer>> out) {
String[] tokens = value.toLowerCase().split("\\W+");
for (String token : tokens) {
if (token.length() > 0) {
out.collect(new Tuple2<String, Integer>(token, 1));
}
}
}
}Aggregate counts per word in a sliding window (600 s length, slide every 20 s):
DataStream<Tuple2<String, Integer>> wcount = ds
.keyBy(0)
.window(SlidingProcessingTimeWindows.of(Time.seconds(600), Time.seconds(20)))
.sum(1);Compute a global TopN every 20 s using a non‑parallel windowAll operation:
DataStream<Tuple2<String, Integer>> ret = wcount
.windowAll(TumblingProcessingTimeWindows.of(Time.seconds(20)))
.process(new TopNAllFunction(5));The TopNAllFunction maintains a descending TreeMap to keep the top‑N elements:
private static class TopNAllFunction extends ProcessAllWindowFunction<Tuple2<String, Integer>, Tuple2<String, Integer>, TimeWindow> {
private int topSize = 10;
public TopNAllFunction(int topSize) { this.topSize = topSize; }
@Override
public void process(Context ctx, Iterable<Tuple2<String, Integer>> input, Collector<Tuple2<String, Integer>> out) {
TreeMap<Integer, Tuple2<String, Integer>> treemap = new TreeMap<>( (y, x) -> (x < y) ? -1 : 1 );
for (Tuple2<String, Integer> element : input) {
treemap.put(element.f1, element);
if (treemap.size() > topSize) treemap.pollLastEntry();
}
for (Entry<Integer, Tuple2<String, Integer>> entry : treemap.entrySet()) {
out.collect(entry.getValue());
}
}
}For scenarios requiring ranking per group (e.g., by first letter), use a keyed window and a custom TopNFunction:
wcount.keyBy(new TupleKeySelectorByStart())
.window(TumblingProcessingTimeWindows.of(Time.seconds(20)))
.process(new TopNFunction(5));
private static class TupleKeySelectorByStart implements KeySelector<Tuple2<String, Integer>, String> {
@Override
public String getKey(Tuple2<String, Integer> value) { return value.f0.substring(0, 1); }
}
private static class TopNFunction extends ProcessWindowFunction<Tuple2<String, Integer>, Tuple2<String, Integer>, String, TimeWindow> {
private int topSize = 10;
public TopNFunction(int topSize) { this.topSize = topSize; }
@Override
public void process(String key, Context ctx, Iterable<Tuple2<String, Integer>> input, Collector<Tuple2<String, Integer>> out) {
TreeMap<Integer, Tuple2<String, Integer>> treemap = new TreeMap<>( (y, x) -> (x < y) ? -1 : 1 );
for (Tuple2<String, Integer> element : input) {
treemap.put(element.f1, element);
if (treemap.size() > topSize) treemap.pollLastEntry();
}
for (Entry<Integer, Tuple2<String, Integer>> entry : treemap.entrySet()) {
out.collect(entry.getValue());
}
}
}Because windowAll runs with parallelism 1, it can become a bottleneck. A nested (two‑level) TopN approach mitigates this by first computing per‑group TopN and then merging the results in a second global TopN, allowing the first level to scale horizontally.
Signed-in readers can open the original source through BestHub's protected redirect.
This article has been distilled and summarized from source material, then republished for learning and reference. If you believe it infringes your rights, please contactand we will review it promptly.
Big Data Technology & Architecture
Wang Zhiwu, a big data expert, dedicated to sharing big data technology.
How this landed with the community
Was this worth your time?
0 Comments
Thoughtful readers leave field notes, pushback, and hard-won operational detail here.
