Big Data 15 min read

Real-time Hot Item, PV, and UV Statistics Using Apache Flink, Kafka, and Bloom Filter

This article demonstrates how to implement real-time hot item ranking, page view counting, and unique visitor estimation using Apache Flink with Kafka sources, sliding windows, custom aggregation functions, and a Bloom filter backed by Redis, providing complete Scala code examples.

Big Data Technology & Architecture
Big Data Technology & Architecture
Big Data Technology & Architecture
Real-time Hot Item, PV, and UV Statistics Using Apache Flink, Kafka, and Bloom Filter

The article provides a step‑by‑step tutorial for three real‑time analytics scenarios built on Apache Flink: hot‑item ranking, page‑view (PV) counting, and unique‑visitor (UV) estimation.

Hot‑Item Ranking reads clickstream data from a Kafka topic, filters for "pv" events, applies a 20‑second sliding window (10‑second slide), aggregates counts per item, and then selects the top‑N items using a custom KeyedProcessFunction. The full Scala implementation is shown below.

package com.ongbo.hotAnalysis

import java.sql.Timestamp
import java.util.Properties
import org.apache.flink.api.common.functions.AggregateFunction
import org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.flink.api.common.state.{ListState, ListStateDescriptor}
import org.apache.flink.api.java.tuple.Tuple
import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.functions.KeyedProcessFunction
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.scala.function.WindowFunction
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.api.windowing.windows.TimeWindow
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer
import org.apache.flink.util.Collector
import scala.collection.mutable.ListBuffer

case class UserBehavior(userId: Long, itemId: Long, cateGoryId: Int, behavior: String, timestamp: Long)
case class ItemViewCount(itemId: Long, windowEnd: Long, count: Long)

object HotItems {
  def main(args: Array[String]): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.setParallelism(1)
    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
    val properties = new Properties()
    properties.setProperty("bootstrap.servers", "114.116.219.197:5008,114.116.220.98:5008,114.116.199.154:5008")
    properties.setProperty("group.id", "web-consumer-group")
    properties.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
    properties.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
    properties.setProperty("auto.offset.reset", "latest")
    val dataStream = env.addSource(new FlinkKafkaConsumer[String]("weblog", new SimpleStringSchema(), properties))
      .map(data => {
        val dataArray = data.split(",")
        UserBehavior(dataArray(0).trim.toLong, dataArray(1).trim.toLong, dataArray(2).trim.toInt, dataArray(3).trim, dataArray(4).trim.toLong)
      })
      .assignAscendingTimestamps(_.timestamp * 1000L)

    val processStream = dataStream
      .filter(_.behavior.equals("pv"))
      .keyBy(_.itemId)
      .timeWindow(Time.seconds(20), Time.seconds(10))
      .aggregate(new CountAgg(), new WindowResult())
      .keyBy(_.windowEnd)
      .process(new TopNHotItems(10))

    processStream.print("processStream::")
    env.execute("hot Items Job")
  }
}

class CountAgg extends AggregateFunction[UserBehavior, Long, Long] {
  override def createAccumulator(): Long = 0L
  override def add(in: UserBehavior, acc: Long): Long = acc + 1
  override def getResult(acc: Long): Long = acc
  override def merge(acc: Long, acc1: Long): Long = acc + acc1
}

class WindowResult extends WindowFunction[Long, ItemViewCount, Long, TimeWindow] {
  override def apply(key: Long, window: TimeWindow, input: Iterable[Long], out: Collector[ItemViewCount]): Unit = {
    out.collect(ItemViewCount(key, window.getEnd, input.iterator.next()))
  }
}

class TopNHotItems(topsize: Int) extends KeyedProcessFunction[Long, ItemViewCount, String] {
  private var itemState: ListState[ItemViewCount] = _
  override def open(parameters: Configuration): Unit = {
    itemState = getRuntimeContext.getListState(new ListStateDescriptor[ItemViewCount]("item-state", classOf[ItemViewCount]))
  }
  override def processElement(value: ItemViewCount, ctx: KeyedProcessFunction[Long, ItemViewCount, String]#Context, out: Collector[String]): Unit = {
    itemState.add(value)
    ctx.timerService().registerEventTimeTimer(value.windowEnd + 1)
  }
  override def onTimer(timestamp: Long, ctx: KeyedProcessFunction[Long, ItemViewCount, String]#OnTimerContext, out: Collector[String]): Unit = {
    val allItems = new ListBuffer[ItemViewCount]()
    import scala.collection.JavaConversions._
    for (item <- itemState.get()) { allItems += item }
    val sortedItems = allItems.sortBy(_.count)(Ordering[Long].reverse).take(topsize)
    itemState.clear()
    val result = new StringBuilder
    result.append("时间:").append(new Timestamp(timestamp - 1)).append("
")
    for (i <- sortedItems.indices) {
      val cur = sortedItems(i)
      result.append("No").append(i + 1).append(": 商品ID:").append(cur.itemId).append(" 浏览量:").append(cur.count).append("
")
    }
    result.append("============================
")
    Thread.sleep(1000)
    out.collect(result.toString())
  }
}

Page‑View (PV) Counting shows a simpler job that reads a local CSV file, filters for "pv" events, maps each record to a constant key, and uses a one‑hour tumbling window to sum the counts. The Scala code is included below.

package com.ongbo.NetWorkFlow_Analysis

import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.windowing.time.Time

case class UserBehavior(userId: Long, itemId: Long, cateGoryId: Int, behavior: String, timestamp: Long)

object PageVies {
  def main(args: Array[String]): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
    env.setParallelism(1)
    val resource = getClass.getResource("/UserBehavior.csv")
    val dataStream = env.readTextFile(resource.getPath)
      .map(data => {
        val arr = data.split(",")
        UserBehavior(arr(0).trim.toLong, arr(1).trim.toLong, arr(2).trim.toInt, arr(3).trim, arr(4).trim.toLong)
      })
      .assignAscendingTimestamps(_.timestamp * 1000L)
      .filter(_.behavior.equals("pv"))
      .map(_ => ("pv", 1))
      .keyBy(_._1)
      .timeWindow(Time.hours(1))
      .sum(1)
    dataStream.print("pv count")
    env.execute("PV")
  }
}

Unique‑Visitor (UV) Estimation with Bloom Filter explains how to avoid storing every user ID by using a Bloom filter stored in Redis. A custom trigger fires on each element, and a ProcessWindowFunction updates the bitmap and count in Redis. The full implementation follows.

package com.ongbo.NetWorkFlow_Analysis

import com.ongbo.NetWorkFlow_Analysis.UniqueView.getClass
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.functions.ProcessFunction
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.scala.function.ProcessWindowFunction
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.api.windowing.triggers.{Trigger, TriggerResult}
import org.apache.flink.streaming.api.windowing.windows.TimeWindow
import org.apache.flink.util.Collector
import redis.clients.jedis.Jedis

object UvWithBloom {
  def main(args: Array[String]): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
    env.setParallelism(1)
    val resource = getClass.getResource("/UserBehavior.csv")
    val dataStream = env.readTextFile("/Users/ongbo/Maven/bin/UserBehaviorAnalysis/NetWorkFlowAnalysis/src/main/resources/UserBehavior.csv")
      .map(data => {
        val arr = data.split(",")
        UserBehavior(arr(0).trim.toLong, arr(1).trim.toLong, arr(2).trim.toInt, arr(3).trim, arr(4).trim.toLong)
      })
      .assignAscendingTimestamps(_.timestamp * 1000L)
      .filter(_.behavior.equals("pv"))
      .map(data => ("dummyKey", data.userId))
      .keyBy(_._1)
      .timeWindow(Time.hours(1))
      .trigger(new MyTrigger())
      .process(new UvCountWithBloom())
    dataStream.print()
    env.execute()
  }
}

class MyTrigger extends Trigger[(String, Long), TimeWindow] {
  override def onElement(element: (String, Long), timestamp: Long, window: TimeWindow, ctx: Trigger.TriggerContext): TriggerResult = TriggerResult.FIRE_AND_PURGE
  override def onProcessingTime(time: Long, window: TimeWindow, ctx: Trigger.TriggerContext): TriggerResult = TriggerResult.CONTINUE
  override def onEventTime(time: Long, window: TimeWindow, ctx: Trigger.TriggerContext): TriggerResult = TriggerResult.CONTINUE
  override def clear(window: TimeWindow, ctx: Trigger.TriggerContext): Unit = {}
}

class UvCountWithBloom extends ProcessWindowFunction[(String, Long), UvCount, String, TimeWindow] {
  lazy val jedis = new Jedis("114.116.219.97", 5000)
  lazy val bloom = new Bloom(1L << 29)
  override def process(key: String, context: Context, elements: Iterable[(String, Long)], out: Collector[UvCount]): Unit = {
    val storeKey = context.window.getEnd.toString
    var count = 0L
    if (jedis.hget("count", storeKey) != null) {
      count = jedis.hget("count", storeKey).toLong
    }
    val userId = elements.last._2.toString
    val offset = bloom.hash(userId, 61)
    val isExist = jedis.getbit(storeKey, offset)
    if (!isExist) {
      jedis.setbit(storeKey, offset, true)
      jedis.hset("count", storeKey, (count + 1).toString)
      out.collect(UvCount(storeKey.toLong, count + 1))
    } else {
      out.collect(UvCount(storeKey.toLong, count))
    }
  }
}

class Bloom(size: Long) extends Serializable {
  private val cap = if (size > 0) size else 1L << 27
  def hash(value: String, seed: Int): Long = {
    var result: Long = 0L
    for (i <- 0 until value.length) {
      result = result * seed + value.charAt(i)
    }
    result & (cap - 1)
  }
}

Overall, the article equips readers with ready‑to‑run Scala programs for streaming analytics, illustrating how to combine Flink’s windowing, custom aggregation, and external storage (Redis) to achieve low‑latency hot‑item ranking, PV aggregation, and memory‑efficient UV counting.

Original Source

Signed-in readers can open the original source through BestHub's protected redirect.

Sign in to view source
Republication Notice

This article has been distilled and summarized from source material, then republished for learning and reference. If you believe it infringes your rights, please contactadmin@besthub.devand we will review it promptly.

Big DataFlinkredisKafkabloom-filterScala
Big Data Technology & Architecture
Written by

Big Data Technology & Architecture

Wang Zhiwu, a big data expert, dedicated to sharing big data technology.

0 followers
Reader feedback

How this landed with the community

Sign in to like

Rate this article

Was this worth your time?

Sign in to rate
Discussion

0 Comments

Thoughtful readers leave field notes, pushback, and hard-won operational detail here.