Spark + Kudu Advertising Project: Refactoring, Scala Traits, ETL Processor, and Project Entry
This article walks through a Spark and Kudu advertising project, explaining the refactoring approach, Scala trait usage, implementation of ETL and province‑city statistics processors, and shows the complete Spark application entry point with full code examples.
The chapter focuses on refactoring the first two requirements of an advertising project built with Spark and Kudu, providing a clear development roadmap.
1. Refactoring Idea
The goal is to restructure the existing code for better maintainability and scalability.
2. Scala Trait
Scala traits are similar to Java interfaces but can also contain concrete method implementations and fields, enabling multiple inheritance.
Below is the definition of a generic data processing trait:
package com.imooc.bigdata.cp08.`trait`
import org.apache.spark.sql.SparkSession
// Top‑level data processing interface
trait DataProcess {
def process(spark: SparkSession)
}3. Processor Implementations
3.1 Requirement 1: ETL Processor
The ETL processor reads raw JSON logs, enriches them with IP location data, runs SQL transformations, and writes the result to a Kudu table.
package com.imooc.bigdata.cp08.business
import com.imooc.bigdata.cp08.`trait`.DataProcess
import com.imooc.bigdata.cp08.utils.{IPUtils, KuduUtils, SQLUtils, SchemaUtils}
import org.apache.spark.sql.SparkSession
object LogETLProcessor extends DataProcess {
override def process(spark: SparkSession): Unit = {
// Load JSON data
var jsonDF = spark.read.json("D:\\Hadoop基础与电商行为日志分析\\spark\\coding385\\sparksql-train\\data\\data-test.json")
// Load IP rules
val ipRowRDD = spark.sparkContext.textFile("D:\\Hadoop基础与电商行为日志分析\\spark\\coding385\\sparksql-train\\data\\ip.txt")
val ipRuleDF = ipRowRDD.map(x => {
val splits = x.split("\\|")
val startIP = splits(2).toLong
val endIP = splits(3).toLong
val province = splits(6)
val city = splits(7)
val isp = splits(9)
(startIP, endIP, province, city, isp)
}).toDF("start_ip", "end_ip", "province", "city", "isp")
// Register UDF to convert IP to long
import org.apache.spark.sql.functions._
def getLongIp() = udf((ip: String) => IPUtils.ip2Long(ip))
jsonDF = jsonDF.withColumn("ip_long", getLongIp()(col("ip")))
// SQL transformation
jsonDF.createOrReplaceTempView("logs")
ipRuleDF.createOrReplaceTempView("ips")
val sql = SQLUtils.SQL
val result = spark.sql(sql)
// Write to Kudu
val masterAddresses = "hadoop000"
val tableName = "ods"
val partitionId = "ip"
val schema = SchemaUtils.ODSSchema
KuduUtils.sink(result, tableName, masterAddresses, schema, partitionId)
// Verify write
spark.read.format("org.apache.kudu.spark.kudu")
.option("kudu.master", masterAddresses)
.option("kudu.table", tableName)
.load().show()
}
}3.2 Requirement 2: Province‑City Statistics Processor
This processor reads the ODS table from Kudu, aggregates data by province and city, and stores the statistics back to Kudu.
package com.imooc.bigdata.cp08.business
import com.imooc.bigdata.cp08.`trait`.DataProcess
import com.imooc.bigdata.cp08.utils.{KuduUtils, SQLUtils, SchemaUtils}
import org.apache.spark.sql.SparkSession
object ProvinceCityStatProcessor extends DataProcess {
override def process(spark: SparkSession): Unit = {
val sourceTableName = "ods"
val masterAddress = "hadoop000"
val odsDF = spark.read.format("org.apache.kudu.spark.kudu")
.option("kudu.table", sourceTableName)
.option("kudu.master", masterAddress)
.load()
odsDF.createOrReplaceTempView("ods")
val result = spark.sql(SQLUtils.PROVINCE_CITY_SQL)
// Write aggregated result to Kudu
val sinkTableName = "province_city_stat"
val partitionId = "provincename"
val schema = SchemaUtils.ProvinceCitySchema
KuduUtils.sink(result, sinkTableName, masterAddress, schema, partitionId)
// Verify write
spark.read.format("org.apache.kudu.spark.kudu")
.option("kudu.master", masterAddress)
.option("kudu.table", sinkTableName)
.load().show()
}
}4. Project Entry Point
The main object creates a SparkSession, runs both processors, and stops the session.
package com.imooc.bigdata.cp08
import com.imooc.bigdata.cp08.business.{LogETLProcessor, ProvinceCityStatProcessor}
import org.apache.spark.sql.SparkSession
object SparkApp {
def main(args: Array[String]): Unit = {
val spark = SparkSession.builder()
.master("local[2]")
.appName("SparkApp")
.getOrCreate()
// Run ETL
LogETLProcessor.process(spark)
// Run province‑city statistics
ProvinceCityStatProcessor.process(spark)
spark.stop()
}
}The article concludes with a copyright notice and a call for readers to like, share, and follow the public account.
Signed-in readers can open the original source through BestHub's protected redirect.
This article has been distilled and summarized from source material, then republished for learning and reference. If you believe it infringes your rights, please contactand we will review it promptly.
Big Data Technology & Architecture
Wang Zhiwu, a big data expert, dedicated to sharing big data technology.
How this landed with the community
Was this worth your time?
0 Comments
Thoughtful readers leave field notes, pushback, and hard-won operational detail here.
