Spark + Kudu Advertising Business Project: Step-by-Step Implementation
This article walks through the complete implementation of an advertising statistics pipeline using Spark and Kudu, covering requirement analysis, Scala code development, SQL queries, schema definition, and data sinking, with full code snippets and execution results.
This guide demonstrates how to build an advertising statistics module for an app using Apache Spark and Apache Kudu. It starts with the statistical requirements, which involve calculating various request and bidding metrics for each app.
1. Requirement
The goal is to compute fields such as origin_request, valid_request, ad_request, bid_cnt, bid_success_cnt, ad_display_cnt, ad_click_cnt, and related rates from the raw ODS table.
2. Code Development
The entry point is created with the following Scala call: AppStatProcessor.process(spark) The processor reads data from a Kudu table, registers it as a temporary view, and runs the first SQL statement:
package com.imooc.bigdata.cp08.business
import com.imooc.bigdata.cp08.`trait`.DataProcess
import com.imooc.bigdata.cp08.utils.SQLUtils
import org.apache.spark.sql.SparkSession
object AppStatProcessor extends DataProcess{
override def process(spark: SparkSession): Unit = {
val sourceTableName = \"ods\"
val masterAddresses = \"hadoop000\"
val odsDF = spark.read.format(\"org.apache.kudu.spark.kudu\")
.option(\"kudu.table\",sourceTableName)
.option(\"kudu.master\",masterAddresses)
.load()
odsDF.createOrReplaceTempView(\"ods\")
val resultTmp = spark.sql(SQLUtils.APP_SQL_STEP1)
resultTmp.show()
}
}The corresponding SQL (APP_SQL_STEP1) aggregates the required metrics:
lazy val APP_SQL_STEP1 = \"select appid,appname, \" +
\"sum(case when requestmode=1 and processnode >=1 then 1 else 0 end) origin_request,\" +
\"sum(case when requestmode=1 and processnode >=2 then 1 else 0 end) valid_request,\" +
\"sum(case when requestmode=1 and processnode =3 then 1 else 0 end) ad_request,\" +
\"sum(case when adplatformproviderid>=100000 and iseffective=1 and isbilling=1 and isbid=1 and adorderid!=0 then 1 else 0 end) bid_cnt,\" +
\"sum(case when adplatformproviderid>=100000 and iseffective=1 and isbilling=1 and iswin=1 then 1 else 0 end) bid_success_cnt,\" +
\"sum(case when requestmode=2 and iseffective=1 then 1 else 0 end) ad_display_cnt,\" +
\"sum(case when requestmode=3 and processnode=1 then 1 else 0 end) ad_click_cnt,\" +
\"sum(case when requestmode=2 and iseffective=1 and isbilling=1 then 1 else 0 end) medium_display_cnt,\" +
\"sum(case when requestmode=3 and iseffective=1 and isbilling=1 then 1 else 0 end) medium_click_cnt,\" +
\"sum(case when adplatformproviderid>=100000 and iseffective=1 and isbilling=1 and iswin=1 and adorderid>20000 then 1*winprice/1000 else 0 end) ad_consumption,\" +
\"sum(case when adplatformproviderid>=100000 and iseffective=1 and isbilling=1 and iswin=1 and adorderid>20000 then 1*adpayment/1000 else 0 end) ad_cost \" +
\"from ods group by appid,appname\"After verifying the first result, a second SQL (APP_SQL_STEP2) calculates rates and filters out zero‑division rows:
lazy val APP_SQL_STEP2 = \"select appid,appname, \" +
\"origin_request,\" +
\"valid_request,\" +
\"ad_request,\" +
\"bid_cnt,\" +
\"bid_success_cnt,\" +
\"bid_success_cnt/bid_cnt bid_success_rate,\" +
\"ad_display_cnt,\" +
\"ad_click_cnt,\" +
\"ad_click_cnt/ad_display_cnt ad_click_rate,\" +
\"ad_consumption,\" +
\"ad_cost from app_tmp \" +
\"where bid_cnt!=0 and ad_display_cnt!=0\"The results are displayed and then written back to Kudu.
3. Persisting to Kudu
val sinkTableName = \"app_stat\"
val partitionId = \"appid\"
val schema = SchemaUtils.APPSchema
KuduUtils.sink(result,sinkTableName,masterAddresses,schema,partitionId)
spark.read.format(\"org.apache.kudu.spark.kudu\")
.option(\"kudu.master\",masterAddresses)
.option(\"kudu.table\",sinkTableName)
.load().show()The schema definition for the target table is:
lazy val APPSchema: Schema = {
val columns = List(
new ColumnSchemaBuilder(\"appid\", Type.STRING).nullable(false).key(true).build(),
new ColumnSchemaBuilder(\"appname\", Type.STRING).nullable(false).key(true).build(),
new ColumnSchemaBuilder(\"origin_request\", Type.INT64).nullable(false).build(),
new ColumnSchemaBuilder(\"valid_request\", Type.INT64).nullable(false).build(),
new ColumnSchemaBuilder(\"ad_request\", Type.INT64).nullable(false).build(),
new ColumnSchemaBuilder(\"bid_cnt\", Type.INT64).nullable(false).build(),
new ColumnSchemaBuilder(\"bid_success_cnt\", Type.INT64).nullable(false).build(),
new ColumnSchemaBuilder(\"bid_success_rate\", Type.DOUBLE).nullable(false).build(),
new ColumnSchemaBuilder(\"ad_display_cnt\", Type.INT64).nullable(false).build(),
new ColumnSchemaBuilder(\"ad_click_cnt\", Type.INT64).nullable(false).build(),
new ColumnSchemaBuilder(\"ad_click_rate\", Type.DOUBLE).nullable(false).build(),
new ColumnSchemaBuilder(\"ad_consumption\", Type.DOUBLE).nullable(false).build(),
new ColumnSchemaBuilder(\"ad_cost\", Type.DOUBLE).nullable(false).build()
).asJava
new Schema(columns)
}Execution screenshots show the intermediate and final results, confirming that the statistics are correctly computed and stored in Kudu.
Finally, the article encourages readers to like, bookmark, and share the tutorial.
Signed-in readers can open the original source through BestHub's protected redirect.
This article has been distilled and summarized from source material, then republished for learning and reference. If you believe it infringes your rights, please contactand we will review it promptly.
Big Data Technology & Architecture
Wang Zhiwu, a big data expert, dedicated to sharing big data technology.
How this landed with the community
Was this worth your time?
0 Comments
Thoughtful readers leave field notes, pushback, and hard-won operational detail here.
