Spark + Kudu Advertising Business Project: Data Statistics and Processing Guide
This article demonstrates how to implement an advertising business data statistics pipeline using Spark and Kudu, detailing metric requirements, Scala processing code, complex SQL aggregations, schema design, and data sinking for verification.
The tutorial outlines the statistical requirements for an advertising business project, including raw request counts, valid requests, ad requests, bidding metrics, display and click counts, and DSP consumption and cost.
It then provides Scala code (AreaStatProcessor) that reads data from a Kudu table using Spark, registers a temporary view, and runs the first aggregation SQL (AREA_SQL_STEP1) to compute the basic metrics.
package com.imooc.bigdata.cp08.business
import com.imooc.bigdata.cp08.`trait`.DataProcess
import com.imooc.bigdata.cp08.utils.{KuduUtils, SQLUtils, SchemaUtils}
import org.apache.spark.sql.SparkSession
object AreaStatProcessor extends DataProcess{
override def process(spark: SparkSession): Unit = {
val sourceTableName = "ods"
val masterAddresses = "hadoop000"
val odsDF = spark.read.format("org.apache.kudu.spark.kudu")
.option("kudu.table",sourceTableName)
.option("kudu.master",masterAddresses)
.load()
odsDF.createOrReplaceTempView("ods")
val resultTmp = spark.sql(SQLUtils.AREA_SQL_STEP1)
resultTmp.show()
}
}The corresponding long SQL statement (AREA_SQL_STEP1) aggregates various conditional sums to produce columns such as origin_request, valid_request, ad_request, bid_cnt, bid_success_cnt, ad_display_cnt, ad_click_cnt, medium_display_cnt, medium_click_cnt, ad_consumption, and ad_cost.
lazy val AREA_SQL_STEP1 = "select provincename,cityname, " +
"sum(case when requestmode=1 and processnode >=1 then 1 else 0 end) origin_request," +
"sum(case when requestmode=1 and processnode >=2 then 1 else 0 end) valid_request," +
"sum(case when requestmode=1 and processnode =3 then 1 else 0 end) ad_request," +
"sum(case when adplatformproviderid>=100000 and iseffective=1 and isbilling=1 and isbid=1 and adorderid!=0 then 1 else 0 end) bid_cnt," +
"sum(case when adplatformproviderid>=100000 and iseffective=1 and isbilling=1 and iswin=1 then 1 else 0 end) bid_success_cnt," +
"sum(case when requestmode=2 and iseffective=1 then 1 else 0 end) ad_display_cnt," +
"sum(case when requestmode=3 and processnode=1 then 1 else 0 end) ad_click_cnt," +
"sum(case when requestmode=2 and iseffective=1 and isbilling=1 then 1 else 0 end) medium_display_cnt," +
"sum(case when requestmode=3 and iseffective=1 and isbilling=1 then 1 else 0 end) medium_click_cnt," +
"sum(case when adplatformproviderid>=100000 and iseffective=1 and isbilling=1 and iswin=1 and adorderid>20000 then 1*winprice/1000 else 0 end) ad_consumption," +
"sum(case when adplatformproviderid>=100000 and iseffective=1 and isbilling=1 and iswin=1 and adorderid>20000 then 1*adpayment/1000 else 0 end) ad_cost " +
"from ods group by provincename,cityname"A second aggregation (AREA_SQL_STEP2) calculates derived rates like bid_success_rate and ad_click_rate while filtering out zero denominators, and the result is displayed.
lazy val AREA_SQL_STEP2 = "select provincename,cityname, " +
"origin_request," +
"valid_request," +
"ad_request," +
"bid_cnt," +
"bid_success_cnt," +
"bid_success_cnt/bid_cnt bid_success_rate," +
"ad_display_cnt," +
"ad_click_cnt," +
"ad_click_cnt/ad_display_cnt ad_click_rate," +
"ad_consumption," +
"ad_cost from area_tmp " +
"where bid_cnt!=0 and ad_display_cnt!=0"The article then defines the Kudu schema (AREASchema) that matches the result columns, ensuring correct data types and key settings.
lazy val AREASchema: Schema = {
val columns = List(
new ColumnSchemaBuilder("provincename",Type.STRING).nullable(false).key(true).build(),
new ColumnSchemaBuilder("cityname",Type.STRING).nullable(false).key(true).build(),
new ColumnSchemaBuilder("origin_request",Type.INT64).nullable(false).build(),
new ColumnSchemaBuilder("valid_request",Type.INT64).nullable(false).build(),
new ColumnSchemaBuilder("ad_request",Type.INT64).nullable(false).build(),
new ColumnSchemaBuilder("bid_cnt",Type.INT64).nullable(false).build(),
new ColumnSchemaBuilder("bid_success_cnt",Type.INT64).nullable(false).build(),
new ColumnSchemaBuilder("bid_success_rate",Type.DOUBLE).nullable(false).build(),
new ColumnSchemaBuilder("ad_display_cnt",Type.INT64).nullable(false).build(),
new ColumnSchemaBuilder("ad_click_cnt",Type.INT64).nullable(false).build(),
new ColumnSchemaBuilder("ad_click_rate",Type.DOUBLE).nullable(false).build(),
new ColumnSchemaBuilder("ad_consumption",Type.DOUBLE).nullable(false).build(),
new ColumnSchemaBuilder("ad_cost",Type.DOUBLE).nullable(false).build()
).asJava
}Finally, the processed DataFrame is written to a Kudu table (area_stat) using the defined schema and partitioned by province name, after which the table is read back and displayed for verification.
val sinkTableName = "area_stat"
val partitionId = "provincename"
val schema = SchemaUtils.AREASchema
KuduUtils.sink(result,sinkTableName,masterAddresses,schema,partitionId)
spark.read.format("org.apache.kudu.spark.kudu")
.option("kudu.master",masterAddresses)
.option("kudu.table",sinkTableName)
.load().show()Readers are instructed to check the output in the IDE and via the Kudu UI (port 8050) to confirm successful table creation and correct results.
Signed-in readers can open the original source through BestHub's protected redirect.
This article has been distilled and summarized from source material, then republished for learning and reference. If you believe it infringes your rights, please contactand we will review it promptly.
Big Data Technology & Architecture
Wang Zhiwu, a big data expert, dedicated to sharing big data technology.
How this landed with the community
Was this worth your time?
0 Comments
Thoughtful readers leave field notes, pushback, and hard-won operational detail here.
