Big Data 7 min read

Real-time Top‑N Book Ranking with Apache Flink

This tutorial explains how to implement a real‑time top‑N hot‑selling book ranking that outputs the most clicked books every five seconds using Apache Flink, Kafka, sliding processing‑time windows, and a custom TopN aggregation function.

Big Data Technology & Architecture
Big Data Technology & Architecture
Big Data Technology & Architecture
Real-time Top‑N Book Ranking with Apache Flink

The article presents a practical solution for a book‑store scenario where, during a flash‑sale period, the system must output the top N most‑clicked books every five seconds. It translates the business requirement into a Flink streaming job that uses processing‑time windows.

Requirement breakdown includes: using Flink's processing‑time semantics, filtering book click events, applying a one‑hour sliding window that slides every five seconds, aggregating click counts, and emitting the top N items per window.

public class KafkaProducer {
    public static void main(String[] args) throws Exception{
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        DataStreamSource<String> text = env.addSource(new MyNoParalleSource()).setParallelism(1);
        Properties properties = new Properties();
        properties.setProperty("bootstrap.servers", "127.0.0.1:9092");
        FlinkKafkaProducer<String> producer = new FlinkKafkaProducer("topn", new SimpleStringSchema(), properties);
        text.addSink(producer);
        env.execute();
    }
}

public class MyNoParalleSource implements SourceFunction<String> {
    private boolean isRunning = true;
    @Override
    public void run(SourceContext<String> ctx) throws Exception {
        while(isRunning){
            List<String> books = new ArrayList<>();
            books.add("Pyhton从入门到放弃"); //10
            books.add("Java从入门到放弃");   //8
            books.add("Php从入门到放弃");    //5
            books.add("C++从入门到放弃");    //3
            books.add("Scala从入门到放弃"); //0‑4
            int i = new Random().nextInt(5);
            ctx.collect(books.get(i));
            Thread.sleep(1000);
        }
    }
    @Override
    public void cancel() { isRunning = false; }
}

The main Flink job reads these simulated purchase events from the Kafka topic topn, splits each line into a tuple (book, 1), and applies a sliding processing‑time window of 600 seconds that slides every 5 seconds to sum the counts per book.

public class TopN {
    public static void main(String[] args) throws Exception{
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);
        Properties properties = new Properties();
        properties.setProperty("bootstrap.servers", "127.0.0.1:9092");
        FlinkKafkaConsumer<String> input = new FlinkKafkaConsumer<>("topn", new SimpleStringSchema(), properties);
        input.setStartFromEarliest();
        DataStream<String> stream = env.addSource(input);
        DataStream<Tuple2<String, Integer>> ds = stream.flatMap(new LineSplitter());
        DataStream<Tuple2<String, Integer>> wcount = ds
            .keyBy(0)
            .window(SlidingProcessingTimeWindows.of(Time.seconds(600), Time.seconds(5)))
            .sum(1);
        wcount
            .windowAll(TumblingProcessingTimeWindows.of(Time.seconds(5)))
            .process(new TopNAllFunction(3))
            .print();
        env.execute();
    }
    // LineSplitter and TopNAllFunction definitions omitted for brevity
}

private static class LineSplitter implements FlatMapFunction<String, Tuple2<String,Integer>> {
    @Override
    public void flatMap(String value, Collector<Tuple2<String,Integer>> out) {
        out.collect(new Tuple2<>(value, 1));
    }
}

private static class TopNAllFunction extends ProcessAllWindowFunction<Tuple2<String,Integer>, String, TimeWindow> {
    private int topSize;
    public TopNAllFunction(int topSize){ this.topSize = topSize; }
    @Override
    public void process(Context ctx, Iterable<Tuple2<String,Integer>> input, Collector<String> out) {
        TreeMap<Integer, Tuple2<String,Integer>> treemap = new TreeMap<>( (y,x) -> x<y?-1:1 );
        for(Tuple2<String,Integer> e : input){
            treemap.put(e.f1, e);
            if(treemap.size()>topSize) treemap.pollLastEntry();
        }
        for(Map.Entry<Integer, Tuple2<String,Integer>> entry : treemap.entrySet()){
            out.collect("=================
热销图书列表:
" + new Timestamp(System.currentTimeMillis()) + treemap.toString() + "
===============
");
        }
    }
}

When executed, the job prints formatted lists of the hottest books with timestamps, as shown in the sample output. The complete source code can be downloaded by replying with the keyword “Flink” to the associated public account.

Original Source

Signed-in readers can open the original source through BestHub's protected redirect.

Sign in to view source
Republication Notice

This article has been distilled and summarized from source material, then republished for learning and reference. If you believe it infringes your rights, please contactadmin@besthub.devand we will review it promptly.

JavaFlinkStreamingWindowTopN
Big Data Technology & Architecture
Written by

Big Data Technology & Architecture

Wang Zhiwu, a big data expert, dedicated to sharing big data technology.

0 followers
Reader feedback

How this landed with the community

Sign in to like

Rate this article

Was this worth your time?

Sign in to rate
Discussion

0 Comments

Thoughtful readers leave field notes, pushback, and hard-won operational detail here.