Implementing Real-Time TopN Rankings with Apache Flink
This article demonstrates how to develop a real-time TopN ranking feature in Apache Flink, covering stream setup, word count aggregation, global and grouped TopN calculations, and nested TopN strategies to mitigate hotspot issues, complete with Java code examples.
TopN is a common feature in statistical reports and dashboards, used to compute real-time ranking lists by counting occurrences of items.
Using a word‑frequency example, we show how to quickly develop a TopN program with Apache Flink.
Flink supports various streaming sources; this demo uses the built‑in socketTextStream as the data source.
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime); // use processing time
DataStream<String> text = env.socketTextStream(hostName, port); // listen to socketSimilar to offline word count, the program splits each incoming sentence into words and aggregates counts per word.
DataStream<Tuple2<String, Integer>> ds = text
.flatMap(new LineSplitter()); // split lines into words, initialize count=1
private static final class LineSplitter implements FlatMapFunction<String, Tuple2<String, Integer>> {
@Override
public void flatMap(String value, Collector<Tuple2<String, Integer>> out) {
String[] tokens = value.toLowerCase().split("\\W+");
for (String token : tokens) {
if (token.length() > 0) {
out.collect(new Tuple2<String, Integer>(token, 1));
}
}
}
} DataStream<Tuple2<String, Integer>> wcount = ds
.keyBy(0)
.window(SlidingProcessingTimeWindows.of(Time.seconds(600), Time.seconds(20)))
.sum(1); // sum counts per wordGlobal TopN
After the previous processing, every 20 seconds the word counts are emitted downstream.
DataStream<Tuple2<String, Integer>> ret = wcount
.windowAll(TumblingProcessingTimeWindows.of(Time.seconds(20)))
.process(new TopNAllFunction(5)); // compute Top‑5 in the windowThe windowAll operation has a parallelism of 1, so all elements are gathered into a single window for ranking.
private static class TopNAllFunction extends ProcessAllWindowFunction<Tuple2<String, Integer>, Tuple2<String, Integer>, TimeWindow> {
private int topSize = 10;
public TopNAllFunction(int topSize) { this.topSize = topSize; }
@Override
public void process(Context ctx, Iterable<Tuple2<String, Integer>> input, Collector<Tuple2<String, Integer>> out) {
TreeMap<Integer, Tuple2<String, Integer>> treemap = new TreeMap<>((y, x) -> (x < y) ? -1 : 1);
for (Tuple2<String, Integer> element : input) {
treemap.put(element.f1, element);
if (treemap.size() > topSize) { treemap.pollLastEntry(); }
}
for (Entry<Integer, Tuple2<String, Integer>> entry : treemap.entrySet()) {
out.collect(entry.getValue());
}
}
}Grouped TopN
In some scenarios, users need a ranking per group, such as per initial letter.
wcount.keyBy(new TupleKeySelectorByStart())
.window(TumblingProcessingTimeWindows.of(Time.seconds(20)))
.process(new TopNFunction(5)); // group‑wise Top‑N private static class TupleKeySelectorByStart implements KeySelector<Tuple2<String, Integer>, String> {
@Override
public String getKey(Tuple2<String, Integer> value) { return value.f0.substring(0, 1); }
} private static class TopNFunction extends ProcessWindowFunction<Tuple2<String, Integer>, Tuple2<String, Integer>, String, TimeWindow> {
private int topSize = 10;
public TopNFunction(int topSize) { this.topSize = topSize; }
@Override
public void process(String key, Context ctx, Iterable<Tuple2<String, Integer>> input, Collector<Tuple2<String, Integer>> out) {
TreeMap<Integer, Tuple2<String, Integer>> treemap = new TreeMap<>((y, x) -> (x < y) ? -1 : 1);
for (Tuple2<String, Integer> element : input) {
treemap.put(element.f1, element);
if (treemap.size() > topSize) { treemap.pollLastEntry(); }
}
for (Entry<Integer, Tuple2<String, Integer>> entry : treemap.entrySet()) {
out.collect(entry.getValue());
}
}
}The code above groups words by their first letter and emits the top‑N words per group.
Nested TopN
Using only windowAll for global TopN limits scalability because the operation runs with parallelism 1, creating a hotspot.
A nested (two‑level) TopN approach mitigates this: first compute a grouped TopN to distribute load, then merge the partial results in a second global TopN, allowing the first layer to scale horizontally.
Signed-in readers can open the original source through BestHub's protected redirect.
This article has been distilled and summarized from source material, then republished for learning and reference. If you believe it infringes your rights, please contactand we will review it promptly.
Big Data Technology & Architecture
Wang Zhiwu, a big data expert, dedicated to sharing big data technology.
How this landed with the community
Was this worth your time?
0 Comments
Thoughtful readers leave field notes, pushback, and hard-won operational detail here.
