Big Data 11 min read

Real-Time Advertising Click Counting with Spark Structured Streaming and Redis Streams

This article presents a complete solution for real‑time advertising click counting using Spark Structured Streaming combined with Redis Streams, detailing the business scenario, data flow, input/output formats, and step‑by‑step implementation including data extraction, processing, storage, and query via Spark‑SQL.

Big Data Technology & Architecture
Big Data Technology & Architecture
Big Data Technology & Architecture
Real-Time Advertising Click Counting with Spark Structured Streaming and Redis Streams

Business Scenario Introduction

An advertising company delivers dynamic image ads on web pages. The ad format is generated from hot‑spot images, and to maximize revenue they need to count clicks for each ad in real time to decide which ads stay longer and which should be replaced quickly.

Business Data Flow

Below is the data pipeline for the use case.

Click data from browsers or mobile devices is sent to a "Data Extraction" component, processed by "Data Processing" to compute real‑time click counts, and finally stored in a database for analysis via a "Data Query" step.

Input

Each click event contains two fields: asset (the ad identifier) and cost (the actual cost). Example records:

asset [asset id] cost [actual cost]
asset aksh1hf98qw7tt9q7 cost 39
asset aksh1hf98qw7tt9q8 cost 19

Output

After the processing step, results are stored in a table that can be queried with SQL, for example:

select asset, count from clicks order by count desc

asset            count
----------------- -----
aksh1hf98qw7tt9q7    2392
aksh1hf98qw7tt9q8    2010
aksh1hf98qw7tt9q6    1938

Solution

The chosen solution combines Spark StructuredStreaming with Redis Stream. The main components are:

Spark StructuredStreaming – a millisecond‑level latency stream processing framework built on Spark SQL.

Redis Stream – a high‑throughput, append‑only data structure introduced in Redis 5.0, offering sub‑millisecond latency.

Spark‑Redis connector – bridges Spark and Redis, allowing StructuredStreaming to read from and write to Redis Streams and Hashes.

Data Processing Flow

The flow is: click events are first written to a Redis Stream, Spark StructuredStreaming consumes the stream, aggregates the data, writes the results back to Redis, and finally Spark SQL queries Redis for analytics.

Data Extraction

Redis Stream provides millions of reads/writes per second and automatically orders entries by time. The Spark‑Redis connector can use a Redis Stream as a data source, making it ideal for this scenario.

Data Processing

Spark StructuredStreaming reads the stream, converts it to DataFrames, and can perform micro‑batch or full‑batch queries. The processed results are written to a Redis Hash via a custom writer.

Data Query

The connector maps Redis Hashes to Spark DataFrames, allowing the creation of a temporary view that can be queried with Spark‑SQL in real time.

Development Steps

The following example demonstrates the implementation steps.

Redis Stream Store Data

Redis Streams are append‑only. After deploying Redis, data can be added with redis-cli:

XADD clicks MAXLEN ~ 1000000 * asset aksh1hf98qw7tt9q7 cost 29

Data Processing

The StructuredStreaming job is divided into three sub‑steps:

Read and process data from Redis Stream.

Store the aggregated results back to Redis.

Start the StructuredStreaming query.

Read and Process Data from Redis Stream

First, create a SparkSession with Redis connection parameters:

val spark = SparkSession
      .builder()
      .appName("StructuredStreaming on Redis")
      .config("spark.redis.host", redisHost)
      .config("spark.redis.port", redisPort)
      .config("spark.redis.auth", redisPassword)
      .getOrCreate()

Define the schema for the stream (fields asset and cost) and read it:

val clicks = spark
      .readStream
      .format("redis")
      .option("stream.keys", redisTableName)
      .schema(StructType(Array(
        StructField("asset", StringType),
        StructField("cost", LongType)
      )))
      .load()

Aggregate click counts per asset: val bypass = clicks.groupBy("asset").count() Start the streaming query with a custom writer:

val query = bypass
      .writeStream
      .outputMode("update")
      .foreach(clickWriter)
      .start()

Store Data to Redis

The custom ClickForeachWriter extends ForeachWriter and uses Jedis to write aggregated results into a Redis Hash:

class ClickForeachWriter(redisHost: String, redisPort: String, redisPassword: String) extends ForeachWriter[Row] {

  var jedis: Jedis = _

  def connect() = {
    val shardInfo: JedisShardInfo = new JedisShardInfo(redisHost, redisPort.toInt)
    shardInfo.setPassword(redisPassword)
    jedis = new Jedis(shardInfo)
  }

  override def open(partitionId: Long, version: Long): Boolean = true

  override def process(value: Row): Unit = {
    val asset = value.getString(0)
    val count = value.getLong(1)
    if (jedis == null) { connect() }
    jedis.hset("click:" + asset, "asset", asset)
    jedis.hset("click:" + asset, "count", count.toString)
    jedis.expire("click:" + asset, 300)
  }

  override def close(errorOrNull: Throwable): Unit = {}
}

Run StructuredStreaming Program

After packaging, submit the job with spark-submit:

--class com.aliyun.spark.redis.StructuredStremingWithRedisStream
--jars /spark_on_redis/ali-spark-redis-2.3.1-SNAPSHOT_2.3.2-1.0-SNAPSHOT.jar,/spark_on_redis/commons-pool2-2.0.jar,/spark_on_redis/jedis-3.0.0-20181113.105826-9.jar
--driver-memory 1G 
--driver-cores 1
--executor-cores 1
--executor-memory 2G
--num-executors 1
--name spark_on_polardb
/spark_on_redis/structuredstreaming-0.0.1-SNAPSHOT.jar
xxx1 6379 xxx2 clicks

Parameters:

xxx1 – Redis host address.

6379 – Redis port.

xxx2 – Redis password.

clicks – Redis Stream name.

Data Query

Create a Spark SQL table that maps to the Redis Hash:

CREATE TABLE IF NOT EXISTS clicks(asset STRING, count INT)
USING org.apache.spark.sql.redis
OPTIONS (
  'host' 'xxx1',
  'port' '6379',
  'auth' 'xxx2',
  'table' 'click'
)

Run a simple query:

select * from clicks;

Spark‑SQL, via the Spark‑Redis connector, directly queries Redis and aggregates ad click counts.

Big DataSparkScalaStructured StreamingRedis Stream
Big Data Technology & Architecture
Written by

Big Data Technology & Architecture

Wang Zhiwu, a big data expert, dedicated to sharing big data technology.

0 followers
Reader feedback

How this landed with the community

Sign in to like

Rate this article

Was this worth your time?

Sign in to rate
Discussion

0 Comments

Thoughtful readers leave field notes, pushback, and hard-won operational detail here.