Real-time Hot Item, PV, and UV Statistics Using Apache Flink, Kafka, and Bloom Filter
This article demonstrates how to implement real-time hot item ranking, page view counting, and unique visitor estimation using Apache Flink with Kafka sources, sliding windows, custom aggregation functions, and a Bloom filter backed by Redis, providing complete Scala code examples.
The article provides a step‑by‑step tutorial for three real‑time analytics scenarios built on Apache Flink: hot‑item ranking, page‑view (PV) counting, and unique‑visitor (UV) estimation.
Hot‑Item Ranking reads clickstream data from a Kafka topic, filters for "pv" events, applies a 20‑second sliding window (10‑second slide), aggregates counts per item, and then selects the top‑N items using a custom KeyedProcessFunction. The full Scala implementation is shown below.
package com.ongbo.hotAnalysis
import java.sql.Timestamp
import java.util.Properties
import org.apache.flink.api.common.functions.AggregateFunction
import org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.flink.api.common.state.{ListState, ListStateDescriptor}
import org.apache.flink.api.java.tuple.Tuple
import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.functions.KeyedProcessFunction
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.scala.function.WindowFunction
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.api.windowing.windows.TimeWindow
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer
import org.apache.flink.util.Collector
import scala.collection.mutable.ListBuffer
case class UserBehavior(userId: Long, itemId: Long, cateGoryId: Int, behavior: String, timestamp: Long)
case class ItemViewCount(itemId: Long, windowEnd: Long, count: Long)
object HotItems {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1)
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
val properties = new Properties()
properties.setProperty("bootstrap.servers", "114.116.219.197:5008,114.116.220.98:5008,114.116.199.154:5008")
properties.setProperty("group.id", "web-consumer-group")
properties.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
properties.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
properties.setProperty("auto.offset.reset", "latest")
val dataStream = env.addSource(new FlinkKafkaConsumer[String]("weblog", new SimpleStringSchema(), properties))
.map(data => {
val dataArray = data.split(",")
UserBehavior(dataArray(0).trim.toLong, dataArray(1).trim.toLong, dataArray(2).trim.toInt, dataArray(3).trim, dataArray(4).trim.toLong)
})
.assignAscendingTimestamps(_.timestamp * 1000L)
val processStream = dataStream
.filter(_.behavior.equals("pv"))
.keyBy(_.itemId)
.timeWindow(Time.seconds(20), Time.seconds(10))
.aggregate(new CountAgg(), new WindowResult())
.keyBy(_.windowEnd)
.process(new TopNHotItems(10))
processStream.print("processStream::")
env.execute("hot Items Job")
}
}
class CountAgg extends AggregateFunction[UserBehavior, Long, Long] {
override def createAccumulator(): Long = 0L
override def add(in: UserBehavior, acc: Long): Long = acc + 1
override def getResult(acc: Long): Long = acc
override def merge(acc: Long, acc1: Long): Long = acc + acc1
}
class WindowResult extends WindowFunction[Long, ItemViewCount, Long, TimeWindow] {
override def apply(key: Long, window: TimeWindow, input: Iterable[Long], out: Collector[ItemViewCount]): Unit = {
out.collect(ItemViewCount(key, window.getEnd, input.iterator.next()))
}
}
class TopNHotItems(topsize: Int) extends KeyedProcessFunction[Long, ItemViewCount, String] {
private var itemState: ListState[ItemViewCount] = _
override def open(parameters: Configuration): Unit = {
itemState = getRuntimeContext.getListState(new ListStateDescriptor[ItemViewCount]("item-state", classOf[ItemViewCount]))
}
override def processElement(value: ItemViewCount, ctx: KeyedProcessFunction[Long, ItemViewCount, String]#Context, out: Collector[String]): Unit = {
itemState.add(value)
ctx.timerService().registerEventTimeTimer(value.windowEnd + 1)
}
override def onTimer(timestamp: Long, ctx: KeyedProcessFunction[Long, ItemViewCount, String]#OnTimerContext, out: Collector[String]): Unit = {
val allItems = new ListBuffer[ItemViewCount]()
import scala.collection.JavaConversions._
for (item <- itemState.get()) { allItems += item }
val sortedItems = allItems.sortBy(_.count)(Ordering[Long].reverse).take(topsize)
itemState.clear()
val result = new StringBuilder
result.append("时间:").append(new Timestamp(timestamp - 1)).append("
")
for (i <- sortedItems.indices) {
val cur = sortedItems(i)
result.append("No").append(i + 1).append(": 商品ID:").append(cur.itemId).append(" 浏览量:").append(cur.count).append("
")
}
result.append("============================
")
Thread.sleep(1000)
out.collect(result.toString())
}
}Page‑View (PV) Counting shows a simpler job that reads a local CSV file, filters for "pv" events, maps each record to a constant key, and uses a one‑hour tumbling window to sum the counts. The Scala code is included below.
package com.ongbo.NetWorkFlow_Analysis
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.windowing.time.Time
case class UserBehavior(userId: Long, itemId: Long, cateGoryId: Int, behavior: String, timestamp: Long)
object PageVies {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
env.setParallelism(1)
val resource = getClass.getResource("/UserBehavior.csv")
val dataStream = env.readTextFile(resource.getPath)
.map(data => {
val arr = data.split(",")
UserBehavior(arr(0).trim.toLong, arr(1).trim.toLong, arr(2).trim.toInt, arr(3).trim, arr(4).trim.toLong)
})
.assignAscendingTimestamps(_.timestamp * 1000L)
.filter(_.behavior.equals("pv"))
.map(_ => ("pv", 1))
.keyBy(_._1)
.timeWindow(Time.hours(1))
.sum(1)
dataStream.print("pv count")
env.execute("PV")
}
}Unique‑Visitor (UV) Estimation with Bloom Filter explains how to avoid storing every user ID by using a Bloom filter stored in Redis. A custom trigger fires on each element, and a ProcessWindowFunction updates the bitmap and count in Redis. The full implementation follows.
package com.ongbo.NetWorkFlow_Analysis
import com.ongbo.NetWorkFlow_Analysis.UniqueView.getClass
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.functions.ProcessFunction
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.scala.function.ProcessWindowFunction
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.api.windowing.triggers.{Trigger, TriggerResult}
import org.apache.flink.streaming.api.windowing.windows.TimeWindow
import org.apache.flink.util.Collector
import redis.clients.jedis.Jedis
object UvWithBloom {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
env.setParallelism(1)
val resource = getClass.getResource("/UserBehavior.csv")
val dataStream = env.readTextFile("/Users/ongbo/Maven/bin/UserBehaviorAnalysis/NetWorkFlowAnalysis/src/main/resources/UserBehavior.csv")
.map(data => {
val arr = data.split(",")
UserBehavior(arr(0).trim.toLong, arr(1).trim.toLong, arr(2).trim.toInt, arr(3).trim, arr(4).trim.toLong)
})
.assignAscendingTimestamps(_.timestamp * 1000L)
.filter(_.behavior.equals("pv"))
.map(data => ("dummyKey", data.userId))
.keyBy(_._1)
.timeWindow(Time.hours(1))
.trigger(new MyTrigger())
.process(new UvCountWithBloom())
dataStream.print()
env.execute()
}
}
class MyTrigger extends Trigger[(String, Long), TimeWindow] {
override def onElement(element: (String, Long), timestamp: Long, window: TimeWindow, ctx: Trigger.TriggerContext): TriggerResult = TriggerResult.FIRE_AND_PURGE
override def onProcessingTime(time: Long, window: TimeWindow, ctx: Trigger.TriggerContext): TriggerResult = TriggerResult.CONTINUE
override def onEventTime(time: Long, window: TimeWindow, ctx: Trigger.TriggerContext): TriggerResult = TriggerResult.CONTINUE
override def clear(window: TimeWindow, ctx: Trigger.TriggerContext): Unit = {}
}
class UvCountWithBloom extends ProcessWindowFunction[(String, Long), UvCount, String, TimeWindow] {
lazy val jedis = new Jedis("114.116.219.97", 5000)
lazy val bloom = new Bloom(1L << 29)
override def process(key: String, context: Context, elements: Iterable[(String, Long)], out: Collector[UvCount]): Unit = {
val storeKey = context.window.getEnd.toString
var count = 0L
if (jedis.hget("count", storeKey) != null) {
count = jedis.hget("count", storeKey).toLong
}
val userId = elements.last._2.toString
val offset = bloom.hash(userId, 61)
val isExist = jedis.getbit(storeKey, offset)
if (!isExist) {
jedis.setbit(storeKey, offset, true)
jedis.hset("count", storeKey, (count + 1).toString)
out.collect(UvCount(storeKey.toLong, count + 1))
} else {
out.collect(UvCount(storeKey.toLong, count))
}
}
}
class Bloom(size: Long) extends Serializable {
private val cap = if (size > 0) size else 1L << 27
def hash(value: String, seed: Int): Long = {
var result: Long = 0L
for (i <- 0 until value.length) {
result = result * seed + value.charAt(i)
}
result & (cap - 1)
}
}Overall, the article equips readers with ready‑to‑run Scala programs for streaming analytics, illustrating how to combine Flink’s windowing, custom aggregation, and external storage (Redis) to achieve low‑latency hot‑item ranking, PV aggregation, and memory‑efficient UV counting.
Signed-in readers can open the original source through BestHub's protected redirect.
This article has been distilled and summarized from source material, then republished for learning and reference. If you believe it infringes your rights, please contactand we will review it promptly.
Big Data Technology & Architecture
Wang Zhiwu, a big data expert, dedicated to sharing big data technology.
How this landed with the community
Was this worth your time?
0 Comments
Thoughtful readers leave field notes, pushback, and hard-won operational detail here.
