Big Data 4 min read

Spark + Kudu Advertising Project: Province‑City Statistics and Data Persistence

This tutorial walks through a Spark‑Kudu advertising project that computes province‑city distribution statistics using SQL, defines the necessary schema, and demonstrates how to write the aggregated results back to a Kudu table for persistent storage, complete with Scala code examples.

Big Data Technology & Architecture
Big Data Technology & Architecture
Big Data Technology & Architecture
Spark + Kudu Advertising Project: Province‑City Statistics and Data Persistence

This article demonstrates a Spark‑Kudu advertising project, showing how to compute province‑city distribution statistics and persist the results into Kudu.

First, a Scala application ProvinceCityStatApp creates a SparkSession, reads the ods table from Kudu, registers it as a temporary view, and runs the SQL defined in SQLUtils.PROVINCE_CITY_SQL to count records per province and city.

package com.imooc.bigdata.cp08.business

import com.imooc.bigdata.cp08.utils.SQLUtils
import org.apache.spark.sql.SparkSession

object ProvinceCityStatApp {
  def main(args: Array[String]): Unit = {
    val spark = SparkSession.builder()
      .master("local[2]")
      .appName("ProvinceCityStatApp")
      .getOrCreate()

    //从Kudu的ods表中读取数据,然后按照省份和城市分组即可
    val sourceTableName = "ods"
    val masterAddress = "hadoop000"

    val odsDF = spark.read.format("org.apache.kudu.spark.kudu")
      .option("kudu.table", sourceTableName)
      .option("kudu.master", masterAddress)
      .load()
    //odsDF.show(false)

    odsDF.createOrReplaceTempView("ods")
    val result = spark.sql(SQLUtils.PROVINCE_CITY_SQL)
    result.show(false)

    spark.stop()
  }
}

The accompanying SQL constant is:

lazy val PROVINCE_CITY_SQL = "select provincename,cityname,count(1) as cnt from ods group by provincename,cityname"

The query result is displayed (image omitted).

Second, the result is written back to Kudu using KuduUtils.sink. The sink table name, partition column, and schema are defined, and the schema is built with three columns: provincename, cityname, and cnt.

val sinkTableName = "province_city_stat"
val partitionId = "provincename"
val schema = SchemaUtils.ProvinceCitySchema

KuduUtils.sink(result,sinkTableName,masterAddress,schema,partitionId)

The schema definition:

lazy val ProvinceCitySchema: Schema = {
  val columns = List(
    new ColumnSchemaBuilder("provincename",Type.STRING).nullable(false).key(true).build(),
    new ColumnSchemaBuilder("cityname",Type.STRING).nullable(false).key(true).build(),
    new ColumnSchemaBuilder("cnt",Type.INT64).nullable(false).key(true).build()
  ).asJava
  new Schema(columns)
}

Finally, the persisted data can be verified with a simple read:

spark.read.format("org.apache.kudu.spark.kudu")
  .option("kudu.master",masterAddress)
  .option("kudu.table",sinkTableName)
  .load().show()

If the output shows rows, the data has been successfully stored in Kudu.

Original Source

Signed-in readers can open the original source through BestHub's protected redirect.

Sign in to view source
Republication Notice

This article has been distilled and summarized from source material, then republished for learning and reference. If you believe it infringes your rights, please contactadmin@besthub.devand we will review it promptly.

data engineeringBig DataSQLSparkScalaKudu
Big Data Technology & Architecture
Written by

Big Data Technology & Architecture

Wang Zhiwu, a big data expert, dedicated to sharing big data technology.

0 followers
Reader feedback

How this landed with the community

Sign in to like

Rate this article

Was this worth your time?

Sign in to rate
Discussion

0 Comments

Thoughtful readers leave field notes, pushback, and hard-won operational detail here.