Big Data 7 min read

Spark + Kudu Advertising Project: Refactoring, Scala Traits, ETL Processor, and Project Entry

This article walks through a Spark and Kudu advertising project, explaining the refactoring approach, Scala trait usage, implementation of ETL and province‑city statistics processors, and shows the complete Spark application entry point with full code examples.

Big Data Technology & Architecture
Big Data Technology & Architecture
Big Data Technology & Architecture
Spark + Kudu Advertising Project: Refactoring, Scala Traits, ETL Processor, and Project Entry

The chapter focuses on refactoring the first two requirements of an advertising project built with Spark and Kudu, providing a clear development roadmap.

1. Refactoring Idea

The goal is to restructure the existing code for better maintainability and scalability.

2. Scala Trait

Scala traits are similar to Java interfaces but can also contain concrete method implementations and fields, enabling multiple inheritance.

Below is the definition of a generic data processing trait:

package com.imooc.bigdata.cp08.`trait`

import org.apache.spark.sql.SparkSession

// Top‑level data processing interface
trait DataProcess {
  def process(spark: SparkSession)
}

3. Processor Implementations

3.1 Requirement 1: ETL Processor

The ETL processor reads raw JSON logs, enriches them with IP location data, runs SQL transformations, and writes the result to a Kudu table.

package com.imooc.bigdata.cp08.business

import com.imooc.bigdata.cp08.`trait`.DataProcess
import com.imooc.bigdata.cp08.utils.{IPUtils, KuduUtils, SQLUtils, SchemaUtils}
import org.apache.spark.sql.SparkSession

object LogETLProcessor extends DataProcess {
  override def process(spark: SparkSession): Unit = {
    // Load JSON data
    var jsonDF = spark.read.json("D:\\Hadoop基础与电商行为日志分析\\spark\\coding385\\sparksql-train\\data\\data-test.json")
    // Load IP rules
    val ipRowRDD = spark.sparkContext.textFile("D:\\Hadoop基础与电商行为日志分析\\spark\\coding385\\sparksql-train\\data\\ip.txt")
    val ipRuleDF = ipRowRDD.map(x => {
      val splits = x.split("\\|")
      val startIP = splits(2).toLong
      val endIP = splits(3).toLong
      val province = splits(6)
      val city = splits(7)
      val isp = splits(9)
      (startIP, endIP, province, city, isp)
    }).toDF("start_ip", "end_ip", "province", "city", "isp")
    // Register UDF to convert IP to long
    import org.apache.spark.sql.functions._
    def getLongIp() = udf((ip: String) => IPUtils.ip2Long(ip))
    jsonDF = jsonDF.withColumn("ip_long", getLongIp()(col("ip")))
    // SQL transformation
    jsonDF.createOrReplaceTempView("logs")
    ipRuleDF.createOrReplaceTempView("ips")
    val sql = SQLUtils.SQL
    val result = spark.sql(sql)
    // Write to Kudu
    val masterAddresses = "hadoop000"
    val tableName = "ods"
    val partitionId = "ip"
    val schema = SchemaUtils.ODSSchema
    KuduUtils.sink(result, tableName, masterAddresses, schema, partitionId)
    // Verify write
    spark.read.format("org.apache.kudu.spark.kudu")
      .option("kudu.master", masterAddresses)
      .option("kudu.table", tableName)
      .load().show()
  }
}

3.2 Requirement 2: Province‑City Statistics Processor

This processor reads the ODS table from Kudu, aggregates data by province and city, and stores the statistics back to Kudu.

package com.imooc.bigdata.cp08.business

import com.imooc.bigdata.cp08.`trait`.DataProcess
import com.imooc.bigdata.cp08.utils.{KuduUtils, SQLUtils, SchemaUtils}
import org.apache.spark.sql.SparkSession

object ProvinceCityStatProcessor extends DataProcess {
  override def process(spark: SparkSession): Unit = {
    val sourceTableName = "ods"
    val masterAddress = "hadoop000"
    val odsDF = spark.read.format("org.apache.kudu.spark.kudu")
      .option("kudu.table", sourceTableName)
      .option("kudu.master", masterAddress)
      .load()
    odsDF.createOrReplaceTempView("ods")
    val result = spark.sql(SQLUtils.PROVINCE_CITY_SQL)
    // Write aggregated result to Kudu
    val sinkTableName = "province_city_stat"
    val partitionId = "provincename"
    val schema = SchemaUtils.ProvinceCitySchema
    KuduUtils.sink(result, sinkTableName, masterAddress, schema, partitionId)
    // Verify write
    spark.read.format("org.apache.kudu.spark.kudu")
      .option("kudu.master", masterAddress)
      .option("kudu.table", sinkTableName)
      .load().show()
  }
}

4. Project Entry Point

The main object creates a SparkSession, runs both processors, and stops the session.

package com.imooc.bigdata.cp08

import com.imooc.bigdata.cp08.business.{LogETLProcessor, ProvinceCityStatProcessor}
import org.apache.spark.sql.SparkSession

object SparkApp {
  def main(args: Array[String]): Unit = {
    val spark = SparkSession.builder()
      .master("local[2]")
      .appName("SparkApp")
      .getOrCreate()
    // Run ETL
    LogETLProcessor.process(spark)
    // Run province‑city statistics
    ProvinceCityStatProcessor.process(spark)
    spark.stop()
  }
}

The article concludes with a copyright notice and a call for readers to like, share, and follow the public account.

Original Source

Signed-in readers can open the original source through BestHub's protected redirect.

Sign in to view source
Republication Notice

This article has been distilled and summarized from source material, then republished for learning and reference. If you believe it infringes your rights, please contactadmin@besthub.devand we will review it promptly.

Big Datadata-processingETLSparkScalaKudu
Big Data Technology & Architecture
Written by

Big Data Technology & Architecture

Wang Zhiwu, a big data expert, dedicated to sharing big data technology.

0 followers
Reader feedback

How this landed with the community

Sign in to like

Rate this article

Was this worth your time?

Sign in to rate
Discussion

0 Comments

Thoughtful readers leave field notes, pushback, and hard-won operational detail here.