Real-time Top‑N Book Ranking with Apache Flink
This tutorial explains how to implement a real‑time top‑N hot‑selling book ranking that outputs the most clicked books every five seconds using Apache Flink, Kafka, sliding processing‑time windows, and a custom TopN aggregation function.
The article presents a practical solution for a book‑store scenario where, during a flash‑sale period, the system must output the top N most‑clicked books every five seconds. It translates the business requirement into a Flink streaming job that uses processing‑time windows.
Requirement breakdown includes: using Flink's processing‑time semantics, filtering book click events, applying a one‑hour sliding window that slides every five seconds, aggregating click counts, and emitting the top N items per window.
public class KafkaProducer {
public static void main(String[] args) throws Exception{
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStreamSource<String> text = env.addSource(new MyNoParalleSource()).setParallelism(1);
Properties properties = new Properties();
properties.setProperty("bootstrap.servers", "127.0.0.1:9092");
FlinkKafkaProducer<String> producer = new FlinkKafkaProducer("topn", new SimpleStringSchema(), properties);
text.addSink(producer);
env.execute();
}
}
public class MyNoParalleSource implements SourceFunction<String> {
private boolean isRunning = true;
@Override
public void run(SourceContext<String> ctx) throws Exception {
while(isRunning){
List<String> books = new ArrayList<>();
books.add("Pyhton从入门到放弃"); //10
books.add("Java从入门到放弃"); //8
books.add("Php从入门到放弃"); //5
books.add("C++从入门到放弃"); //3
books.add("Scala从入门到放弃"); //0‑4
int i = new Random().nextInt(5);
ctx.collect(books.get(i));
Thread.sleep(1000);
}
}
@Override
public void cancel() { isRunning = false; }
}The main Flink job reads these simulated purchase events from the Kafka topic topn, splits each line into a tuple (book, 1), and applies a sliding processing‑time window of 600 seconds that slides every 5 seconds to sum the counts per book.
public class TopN {
public static void main(String[] args) throws Exception{
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);
Properties properties = new Properties();
properties.setProperty("bootstrap.servers", "127.0.0.1:9092");
FlinkKafkaConsumer<String> input = new FlinkKafkaConsumer<>("topn", new SimpleStringSchema(), properties);
input.setStartFromEarliest();
DataStream<String> stream = env.addSource(input);
DataStream<Tuple2<String, Integer>> ds = stream.flatMap(new LineSplitter());
DataStream<Tuple2<String, Integer>> wcount = ds
.keyBy(0)
.window(SlidingProcessingTimeWindows.of(Time.seconds(600), Time.seconds(5)))
.sum(1);
wcount
.windowAll(TumblingProcessingTimeWindows.of(Time.seconds(5)))
.process(new TopNAllFunction(3))
.print();
env.execute();
}
// LineSplitter and TopNAllFunction definitions omitted for brevity
}
private static class LineSplitter implements FlatMapFunction<String, Tuple2<String,Integer>> {
@Override
public void flatMap(String value, Collector<Tuple2<String,Integer>> out) {
out.collect(new Tuple2<>(value, 1));
}
}
private static class TopNAllFunction extends ProcessAllWindowFunction<Tuple2<String,Integer>, String, TimeWindow> {
private int topSize;
public TopNAllFunction(int topSize){ this.topSize = topSize; }
@Override
public void process(Context ctx, Iterable<Tuple2<String,Integer>> input, Collector<String> out) {
TreeMap<Integer, Tuple2<String,Integer>> treemap = new TreeMap<>( (y,x) -> x<y?-1:1 );
for(Tuple2<String,Integer> e : input){
treemap.put(e.f1, e);
if(treemap.size()>topSize) treemap.pollLastEntry();
}
for(Map.Entry<Integer, Tuple2<String,Integer>> entry : treemap.entrySet()){
out.collect("=================
热销图书列表:
" + new Timestamp(System.currentTimeMillis()) + treemap.toString() + "
===============
");
}
}
}When executed, the job prints formatted lists of the hottest books with timestamps, as shown in the sample output. The complete source code can be downloaded by replying with the keyword “Flink” to the associated public account.
Signed-in readers can open the original source through BestHub's protected redirect.
This article has been distilled and summarized from source material, then republished for learning and reference. If you believe it infringes your rights, please contactand we will review it promptly.
Big Data Technology & Architecture
Wang Zhiwu, a big data expert, dedicated to sharing big data technology.
How this landed with the community
Was this worth your time?
0 Comments
Thoughtful readers leave field notes, pushback, and hard-won operational detail here.
