Real‑time Dashboard with Flink: Streaming Order Data, Site Metrics, and Top‑N Merchandise Rankings
This article demonstrates how to build a one‑second‑refresh real‑time dashboard for e‑commerce order data using Apache Flink, Kafka, and Redis, covering JSON message parsing, processing‑time windows, stateful aggregation for site‑level KPIs, and efficient top‑N product ranking via Redis sorted sets.
The article describes a practical solution for a real‑time dashboard that visualizes e‑commerce order metrics during high‑traffic events such as Double‑11, using Flink’s true streaming capabilities.
Data format : each sub‑order is represented as a JSON object containing fields like userId, orderId, siteId, merchandiseId, price, quantity, and a timestamp.
{
"userId": 234567,
"orderId": 2902306918400,
"subOrderId": 2902306918401,
"siteId": 10219,
"siteName": "site_blabla",
"cityId": 101,
"cityName": "北京市",
"warehouseId": 636,
"merchandiseId": 187699,
"price": 299,
"quantity": 2,
"orderStatus": 1,
"isNewOrder": 0,
"timestamp": 1572963672217
}Flink environment : the job runs in processing‑time mode, enables checkpointing every minute, and sets a 30‑second checkpoint timeout.
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);
env.enableCheckpointing(60 * 1000, CheckpointingMode.EXACTLY_ONCE);
env.getCheckpointConfig().setCheckpointTimeout(30 * 1000);Kafka source : order messages are consumed via FlinkKafkaConsumer011 with a custom SimpleStringSchema and appropriate properties.
Properties consumerProps = ParameterUtil.getFromResourceFile("kafka.properties");
DataStream<String> sourceStream = env.addSource(
new FlinkKafkaConsumer011<>(ORDER_EXT_TOPIC_NAME, new SimpleStringSchema(), consumerProps)
).setParallelism(PARTITION_COUNT).name("source_kafka_" + ORDER_EXT_TOPIC_NAME).uid("source_kafka_" + ORDER_EXT_TOPIC_NAME);The JSON strings are parsed into a POJO SubOrderDetail using FastJSON, with Lombok annotations simplifying the class definition.
DataStream<SubOrderDetail> orderStream = sourceStream.map(message -> JSON.parseObject(message, SubOrderDetail.class))
.name("map_sub_order_detail").uid("map_sub_order_detail");Site‑level aggregation : the stream is keyed by siteId, windowed with a one‑day tumbling processing‑time window (offset –8 hours for timezone), and triggered every second using ContinuousProcessingTimeTrigger. An AggregateFunction ( OrderAndGmvAggregateFunc) computes total orders, sub‑orders, quantity, and GMV.
WindowedStream<SubOrderDetail, Tuple, TimeWindow> siteDayWindowStream = orderStream
.keyBy("siteId")
.window(TumblingProcessingTimeWindows.of(Time.days(1), Time.hours(-8)))
.trigger(ContinuousProcessingTimeTrigger.of(Time.seconds(1)));
DataStream<OrderAccumulator> siteAggStream = siteDayWindowStream
.aggregate(new OrderAndGmvAggregateFunc())
.name("aggregate_site_order_gmv").uid("aggregate_site_order_gmv");A stateful KeyedProcessFunction ( OutputOrderGmvProcessFunc) caches the previous aggregation result in a MapState and only emits when the sub‑order count changes, reducing unnecessary writes.
public void processElement(OrderAccumulator value, Context ctx, Collector<Tuple2<Long, String>> out) {
long key = value.getSiteId();
OrderAccumulator cached = state.get(key);
if (cached == null || value.getSubOrderSum() != cached.getSubOrderSum()) {
JSONObject result = new JSONObject();
result.put("site_id", value.getSiteId());
result.put("site_name", value.getSiteName());
result.put("quantity", value.getQuantitySum());
result.put("orderCount", value.getOrderIds().size());
result.put("subOrderCount", value.getSubOrderSum());
result.put("gmv", value.getGmv());
out.collect(new Tuple2<>(key, result.toJSONString()));
state.put(key, value);
}
}The final results are written to Redis using a RedisSink with HSET commands; each site’s metrics are stored in a hash whose key includes the current day.
FlinkJedisPoolConfig jedisPoolConfig = ParameterUtil.getFlinkJedisPoolConfig(false, true);
siteResultStream.addSink(new RedisSink<>(jedisPoolConfig, new GmvRedisMapper()))
.name("sink_redis_site_gmv").uid("sink_redis_site_gmv").setParallelism(1);Merchandise Top‑N : a second window (1‑second tumbling) keyed by merchandiseId aggregates the sales quantity. The aggregation result is a Tuple2<Long, Long> (productId, sales).
WindowedStream<SubOrderDetail, Tuple, TimeWindow> merchandiseWindowStream = orderStream
.keyBy("merchandiseId")
.window(TumblingProcessingTimeWindows.of(Time.seconds(1)));
DataStream<Tuple2<Long, Long>> merchandiseRankStream = merchandiseWindowStream
.aggregate(new MerchandiseSalesAggregateFunc(), new MerchandiseSalesWindowFunc())
.name("aggregate_merch_sales").uid("aggregate_merch_sales");Instead of computing Top‑N inside Flink, the article recommends using Redis sorted sets ( ZINCRBY) to maintain rankings, which are updated by a custom RankingRedisMapper.
public RedisCommandDescription getCommandDescription() {
return new RedisCommandDescription(RedisCommand.ZINCRBY, ZSET_NAME_PREFIX);
}
public String getKeyFromData(Tuple2<Long, Long> data) { return String.valueOf(data.f0); }
public String getValueFromData(Tuple2<Long, Long> data) { return String.valueOf(data.f1); }Consumers can retrieve the top‑N products with ZREVRANGE. The article concludes that reusing the same source stream enables multiple real‑time statistics within a single Flink job, and that the approach works well as long as Redis is available and data volume remains manageable.
Signed-in readers can open the original source through BestHub's protected redirect.
This article has been distilled and summarized from source material, then republished for learning and reference. If you believe it infringes your rights, please contactand we will review it promptly.
Big Data Technology & Architecture
Wang Zhiwu, a big data expert, dedicated to sharing big data technology.
How this landed with the community
Was this worth your time?
0 Comments
Thoughtful readers leave field notes, pushback, and hard-won operational detail here.
