Real-Time Advertising Click Counting with Spark Structured Streaming and Redis Streams
This article presents a complete solution for real‑time advertising click counting using Spark Structured Streaming combined with Redis Streams, detailing the business scenario, data flow, input/output formats, and step‑by‑step implementation including data extraction, processing, storage, and query via Spark‑SQL.
Business Scenario Introduction
An advertising company delivers dynamic image ads on web pages. The ad format is generated from hot‑spot images, and to maximize revenue they need to count clicks for each ad in real time to decide which ads stay longer and which should be replaced quickly.
Business Data Flow
Below is the data pipeline for the use case.
Click data from browsers or mobile devices is sent to a "Data Extraction" component, processed by "Data Processing" to compute real‑time click counts, and finally stored in a database for analysis via a "Data Query" step.
Input
Each click event contains two fields: asset (the ad identifier) and cost (the actual cost). Example records:
asset [asset id] cost [actual cost]
asset aksh1hf98qw7tt9q7 cost 39
asset aksh1hf98qw7tt9q8 cost 19Output
After the processing step, results are stored in a table that can be queried with SQL, for example:
select asset, count from clicks order by count desc
asset count
----------------- -----
aksh1hf98qw7tt9q7 2392
aksh1hf98qw7tt9q8 2010
aksh1hf98qw7tt9q6 1938Solution
The chosen solution combines Spark StructuredStreaming with Redis Stream. The main components are:
Spark StructuredStreaming – a millisecond‑level latency stream processing framework built on Spark SQL.
Redis Stream – a high‑throughput, append‑only data structure introduced in Redis 5.0, offering sub‑millisecond latency.
Spark‑Redis connector – bridges Spark and Redis, allowing StructuredStreaming to read from and write to Redis Streams and Hashes.
Data Processing Flow
The flow is: click events are first written to a Redis Stream, Spark StructuredStreaming consumes the stream, aggregates the data, writes the results back to Redis, and finally Spark SQL queries Redis for analytics.
Data Extraction
Redis Stream provides millions of reads/writes per second and automatically orders entries by time. The Spark‑Redis connector can use a Redis Stream as a data source, making it ideal for this scenario.
Data Processing
Spark StructuredStreaming reads the stream, converts it to DataFrames, and can perform micro‑batch or full‑batch queries. The processed results are written to a Redis Hash via a custom writer.
Data Query
The connector maps Redis Hashes to Spark DataFrames, allowing the creation of a temporary view that can be queried with Spark‑SQL in real time.
Development Steps
The following example demonstrates the implementation steps.
Redis Stream Store Data
Redis Streams are append‑only. After deploying Redis, data can be added with redis-cli:
XADD clicks MAXLEN ~ 1000000 * asset aksh1hf98qw7tt9q7 cost 29Data Processing
The StructuredStreaming job is divided into three sub‑steps:
Read and process data from Redis Stream.
Store the aggregated results back to Redis.
Start the StructuredStreaming query.
Read and Process Data from Redis Stream
First, create a SparkSession with Redis connection parameters:
val spark = SparkSession
.builder()
.appName("StructuredStreaming on Redis")
.config("spark.redis.host", redisHost)
.config("spark.redis.port", redisPort)
.config("spark.redis.auth", redisPassword)
.getOrCreate()Define the schema for the stream (fields asset and cost) and read it:
val clicks = spark
.readStream
.format("redis")
.option("stream.keys", redisTableName)
.schema(StructType(Array(
StructField("asset", StringType),
StructField("cost", LongType)
)))
.load()Aggregate click counts per asset: val bypass = clicks.groupBy("asset").count() Start the streaming query with a custom writer:
val query = bypass
.writeStream
.outputMode("update")
.foreach(clickWriter)
.start()Store Data to Redis
The custom ClickForeachWriter extends ForeachWriter and uses Jedis to write aggregated results into a Redis Hash:
class ClickForeachWriter(redisHost: String, redisPort: String, redisPassword: String) extends ForeachWriter[Row] {
var jedis: Jedis = _
def connect() = {
val shardInfo: JedisShardInfo = new JedisShardInfo(redisHost, redisPort.toInt)
shardInfo.setPassword(redisPassword)
jedis = new Jedis(shardInfo)
}
override def open(partitionId: Long, version: Long): Boolean = true
override def process(value: Row): Unit = {
val asset = value.getString(0)
val count = value.getLong(1)
if (jedis == null) { connect() }
jedis.hset("click:" + asset, "asset", asset)
jedis.hset("click:" + asset, "count", count.toString)
jedis.expire("click:" + asset, 300)
}
override def close(errorOrNull: Throwable): Unit = {}
}Run StructuredStreaming Program
After packaging, submit the job with spark-submit:
--class com.aliyun.spark.redis.StructuredStremingWithRedisStream
--jars /spark_on_redis/ali-spark-redis-2.3.1-SNAPSHOT_2.3.2-1.0-SNAPSHOT.jar,/spark_on_redis/commons-pool2-2.0.jar,/spark_on_redis/jedis-3.0.0-20181113.105826-9.jar
--driver-memory 1G
--driver-cores 1
--executor-cores 1
--executor-memory 2G
--num-executors 1
--name spark_on_polardb
/spark_on_redis/structuredstreaming-0.0.1-SNAPSHOT.jar
xxx1 6379 xxx2 clicksParameters:
xxx1 – Redis host address.
6379 – Redis port.
xxx2 – Redis password.
clicks – Redis Stream name.
Data Query
Create a Spark SQL table that maps to the Redis Hash:
CREATE TABLE IF NOT EXISTS clicks(asset STRING, count INT)
USING org.apache.spark.sql.redis
OPTIONS (
'host' 'xxx1',
'port' '6379',
'auth' 'xxx2',
'table' 'click'
)Run a simple query:
select * from clicks;Spark‑SQL, via the Spark‑Redis connector, directly queries Redis and aggregates ad click counts.
Big Data Technology & Architecture
Wang Zhiwu, a big data expert, dedicated to sharing big data technology.
How this landed with the community
Was this worth your time?
0 Comments
Thoughtful readers leave field notes, pushback, and hard-won operational detail here.
