Resetting Daily Page View Counters in Spark Streaming Using mapWithState and Timeout
The article explains how to use Spark Streaming's mapWithState operator to count product page views, addresses the issue of daily PV reset by presenting two solutions—restarting the streaming job via a scheduled script and configuring a StreamingContext timeout—plus an alternative approach using Redis for external state management.
In Spark Streaming applications, stateful streams are often used to accumulate metrics such as product page views (PV) with the mapWithState() operator.
val productPvStream = stream.mapPartitions(records => {
var result = new ListBuffer[(String, Int)]
for (record <- records) {
result += Tuple2(record.key(), 1)
}
result.iterator
}).reduceByKey(_ + _).mapWithState(
StateSpec.function((productId: String, pv: Option[Int], state: State[Int]) => {
val sum = pv.getOrElse(0) + state.getOption().getOrElse(0)
state.update(sum)
(productId, sum)
})).stateSnapshots()The problem is that PV values reset to zero each day instead of continuously accumulating, so a mechanism to clear the state at midnight is required.
Two methods are proposed:
1. Restart the Streaming Program with a Script
Schedule a shell script via crontab or similar tools to kill and relaunch the Spark job at 00:00.
stream_app_name='com.xyz.streaming.MallForwardStreaming'
cnt=`ps aux | grep SparkSubmit | grep ${stream_app_name} | wc -l`
if [ ${cnt} -eq 1 ]; then
pid=`ps aux | grep SparkSubmit | grep ${stream_app_name} | awk '{print $2}'`
kill -9 ${pid}
sleep 20
cnt=`ps aux | grep SparkSubmit | grep ${stream_app_name} | wc -l`
if [ ${cnt} -eq 0 ]; then
nohup sh /path/to/streaming/bin/mall_forward.sh > /path/to/streaming/logs/mall_forward.log 2>&1
fi
fiThis approach requires no code changes but becomes cumbersome as the number of streaming jobs grows.
2. Set a Timeout on StreamingContext
Before starting the job, compute the milliseconds until the next midnight and use awaitTerminationOrTimeout inside a loop.
def msTillTomorrow = {
val now = new Date()
val tomorrow = new Date(now.getYear, now.getMonth, now.getDate + 1)
tomorrow.getTime - now.getTime
} while (true) {
val ssc = new StreamingContext(sc, Seconds(BATCH_INTERVAL))
ssc.checkpoint(CHECKPOINT_DIR)
// ... processing logic ...
ssc.start()
ssc.awaitTerminationOrTimeout(msTillTomorrow)
ssc.stop(false, true)
Thread.sleep(BATCH_INTERVAL * 1000)
}After the calculated timeout, the StreamingContext stops and can be restarted, effectively clearing the state each day.
Alternatively, the state can be externalized to a store such as Redis, using keys like product_pv:[product_id]:[date] and the INCRBY command in each batch, eliminating the need for timed resets.
Signed-in readers can open the original source through BestHub's protected redirect.
This article has been distilled and summarized from source material, then republished for learning and reference. If you believe it infringes your rights, please contactand we will review it promptly.
Big Data Technology & Architecture
Wang Zhiwu, a big data expert, dedicated to sharing big data technology.
How this landed with the community
Was this worth your time?
0 Comments
Thoughtful readers leave field notes, pushback, and hard-won operational detail here.
