Big Data 17 min read

Big Data ETL Project: Parsing Advertising JSON with Spark, IP Lookup, and Storing into Kudu

This tutorial describes how to place advertising JSON data on HDFS, use Spark for ETL and analysis, enrich logs with IP lookup, and persist the results into Kudu with daily scheduling, including code examples and schema definitions.

Big Data Technology & Architecture
Big Data Technology & Architecture
Big Data Technology & Architecture
Big Data ETL Project: Parsing Advertising JSON with Spark, IP Lookup, and Storing into Kudu

1. Introduction: The project aims to place advertising JSON data on HDFS, perform ETL and analysis with Spark, and store results in Kudu, scheduled daily at 3 am.

2. Requirements: Parse JSON files together with an IP library, count regional distribution of provinces, cities, ad placements, and app distribution.

3. Architecture: (image placeholder) – shows the overall data flow.

4. Log fields: Example JSON log record is shown.

{
  "sessionid": "qld2dU4cfhEa3yhADzgphOf3ySv9vMml",
  "advertisersid": 66,
  "adorderid": 142848,
  "adcreativeid": 212312,
  "adplatformproviderid": 174663,
  "sdkversion": "Android 5.0",
  "adplatformkey": "PLMyYnDKQgOPL55frHhxkUIQtBThHfHq",
  "putinmodeltype": 1,
  "requestmode": 1,
  "adprice": 8410.0,
  "adppprice": 5951.0,
  "requestdate": "2018-10-07",
  "ip": "182.91.190.221",
  "appid": "XRX1000014",
  "appname": "支付宝 - 让生活更简单",
  "uuid": "QtxDH9HUueM2IffUe8z2UqLKuZueZLqq",
  "device": "HUAWEI GX1手机",
  "client": 1,
  "osversion": "",
  "density": "",
  "pw": 1334,
  "ph": 750,
  "lang": "",
  "lat": "",
  "provincename": "",
  "cityname": "",
  "ispid": 46007,
  "ispname": "移动",
  "networkmannerid": 1,
  "networkmannername": "4G",
  "iseffective": 1,
  "isbilling": 1,
  "adspacetype": 3,
  "adspacetypename": "全屏",
  "devicetype": 1,
  "processnode": 3,
  "apptype": 0,
  "district": "district",
  "paymode": 1,
  "isbid": 1,
  "bidprice": 6812.0,
  "winprice": 89934.0,
  "iswin": 0,
  "cur": "rmb",
  "rate": 0.0,
  "cnywinprice": 0.0,
  "imei": "",
  "mac": "52:54:00:41:ba:02",
  "idfa": "",
  "openudid": "FIZHDPIKQYVNHOHOOAWMTQDFTPNWAABZTAFVHTEL",
  "androidid": "",
  "rtbprovince": "",
  "rtbcity": "",
  "rtbdistrict": "",
  "rtbstreet": "",
  "storeurl": "",
  "realip": "182.92.196.236",
  "isqualityapp": 0,
  "bidfloor": 0.0,
  "aw": 0,
  "ah": 0,
  "imeimd5": "",
  "macmd5": "",
  "idfamd5": "",
  "openudidmd5": "",
  "androididmd5": "",
  "imeisha1": "",
  "macsha1": "",
  "idfasha1": "",
  "openudidsha1": "",
  "androididsha1": "",
  "uuidunknow": "",
  "userid": "vtUO8pPXfwdsPnvo6ttNGhAAnHi8NVbA",
  "reqdate": null,
  "reqhour": null,
  "iptype": 1,
  "initbidprice": 0.0,
  "adpayment": 175547.0,
  "agentrate": 0.0,
  "lomarkrate": 0.0,
  "adxrate": 0.0,
  "title": "中信建投首次公开发行股票发行结果 本次发行价格为5.42元/股",
  "keywords": "IPO,中信建投证券,股票,投资,财经",
  "tagid": "rBRbAEQhkcAaeZ6XlTrGXOxyw6w9JQ7x",
  "callbackdate": "2018-10-07",
  "channelid": "123528",
  "mediatype": 2,
  "email": "[email protected]",
  "tel": "13105823726",
  "age": "29",
  "sex": "0"
}

5. IP rule parsing: Example IP rule line and explanation of start and end IP decimal values.

1.0.1.0|1.0.3.255|16777472|16778239|亚洲|中国|福建|福州||电信|350100|China|CN|119.306239|26.075302

6. Spark ETL application (LogETLApp.scala) loads JSON, reads IP rules, registers a UDF to convert IP strings to long, joins logs with IP rules, and displays results.

package com.imooc.bigdata.cp08

import com.imooc.bigdata.cp08.utils.IPUtils
import org.apache.spark.sql.SparkSession

object LogETLApp {

  def main(args: Array[String]): Unit = {
    //启动本地模式的spark
    val spark = SparkSession.builder()
      .master("local[2]")
      .appName("LogETLApp")
      .getOrCreate()

    //使用DataSourceAPI直接加载json数据
    var jsonDF = spark.read.json("data-test.json")

    //导入隐式转换
    import spark.implicits._
    //加载IP库,建议将RDD转成DF
    val ipRowRDD = spark.sparkContext.textFile("ip.txt")
    val ipRuleDF = ipRowRDD.map(x => {
      val splits = x.split("\\|")
      val startIP = splits(2).toLong
      val endIP = splits(3).toLong
      val province = splits(6)
      val city = splits(7)
      val isp = splits(9)

      (startIP, endIP, province, city, isp)
    }).toDF("start_ip", "end_ip", "province", "city", "isp")

    //利用Spark SQL UDF转换json中的ip
    import org.apache.spark.sql.functions._
    def getLongIp() = udf((ip:String)=>{
      IPUtils.ip2Long(ip)
    })

    //添加字段传入十进制IP
    jsonDF = jsonDF.withColumn("ip_long",
      getLongIp()($"ip"))

    //将日志每一行的ip对应省份、城市、运行商进行解析
    //两个DF进行join,条件是:json中的ip在规则ip中的范围内
    jsonDF.join(ipRuleDF,jsonDF("ip_long")
      .between(ipRuleDF("start_ip"),ipRuleDF("end_ip")))
        .show(false)

    spark.stop()
  }
}

7. Utility class IPUtils.scala provides ip2Long conversion.

package com.imooc.bigdata.cp08.utils

object IPUtils {

  //字符串->十进制
  def ip2Long(ip:String)={
    val splits = ip.split("[.]")
    var ipNum = 0L

    for(i<-0 until(splits.length)){
      //“|”是按位或操作,有1即1,全0则0
      //“<<”是整体左移
      //也就是说每一个数字算完向前移动8位接下一个数字
      ipNum = splits(i).toLong | ipNum << 8L
    }
    ipNum
  }

  def main(args: Array[String]): Unit = {
    println(ip2Long("1.1.1.1"))
  }
}

8. Equivalent SQL solution using SQLUtils with a left join between logs and IP rules.

package com.imooc.bigdata.cp08.utils

//项目相关的SQL工具类
object SQLUtils {

  lazy val SQL = "select " +
    "logs.ip ," +
    "logs.sessionid," +
    "logs.advertisersid," +
    "logs.adorderid," +
    "logs.adcreativeid," +
    "logs.adplatformproviderid" +
    ",logs.sdkversion" +
    ",logs.adplatformkey" +
    ",logs.putinmodeltype" +
    ",logs.requestmode" +
    ",logs.adprice" +
    ",logs.adppprice" +
    ",logs.requestdate" +
    ",logs.appid" +
    ",logs.appname" +
    ",logs.uuid, logs.device, logs.client, logs.osversion, logs.density, logs.pw, logs.ph" +
    ",ips.province as provincename" +
    ",ips.city as cityname" +
    ",ips.isp as isp" +
    ",logs.ispid, logs.ispname" +
    ",logs.networkmannerid, logs.networkmannername, logs.iseffective, logs.isbilling" +
    ",logs.adspacetype, logs.adspacetypename, logs.devicetype, logs.processnode, logs.apptype" +
    ",logs.district, logs.paymode, logs.isbid, logs.bidprice, logs.winprice, logs.iswin, logs.cur" +
    ",logs.rate, logs.cnywinprice, logs.imei, logs.mac, logs.idfa, logs.openudid,logs.androidid" +
    ",logs.rtbprovince,logs.rtbcity,logs.rtbdistrict,logs.rtbstreet,logs.storeurl,logs.realip" +
    ",logs.isqualityapp,logs.bidfloor,logs.aw,logs.ah,logs.imeimd5,logs.macmd5,logs.idfamd5" +
    ",logs.openudidmd5,logs.androididmd5,logs.imeisha1,logs.macsha1,logs.idfasha1,logs.openudidsha1" +
    ",logs.androididsha1,logs.uuidunknow,logs.userid,logs.iptype,logs.initbidprice,logs.adpayment" +
    ",logs.agentrate,logs.lomarkrate,logs.adxrate,logs.title,logs.keywords,logs.tagid,logs.callbackdate" +
    ",logs.channelid,logs.mediatype,logs.email,logs.tel,logs.sex,logs.age " +
    "from logs left join " +
    "ips on logs.ip_long between ips.start_ip and ips.end_ip "
}

9. Storing results into Kudu: commands to start Kudu services, create the table with schema ODSSchema, write the DataFrame to Kudu, and verify data via the Kudu UI and Spark read.

cd /etc/init.d/
ll
sudo ./kudu-master start
sudo ./kudu-tserver start

val result = jsonDF.join(ipRuleDF, jsonDF("ip_long")
  .between(ipRuleDF("start_ip"), ipRuleDF("end_ip")))

val masterAddresses = "hadoop000"
val tableName = "ods"
val client = new KuduClientBuilder(masterAddresses).build()

if(client.tableExists(tableName)){
  client.deleteTable(tableName)
}

val partitionId = "ip"
val schema = SchemaUtils.ODSSchema
val options = new CreateTableOptions()
options.setNumReplicas(1)
val parcols = new util.LinkedList[String]()
parcols.add(partitionId)
options.addHashPartitions(parcols,3)
client.createTable(tableName, schema, options)

result.write.mode(SaveMode.Append)
  .format("org.apache.kudu.spark.kudu")
  .option("kudu.table", tableName)
  .option("kudu.master", masterAddresses)
  .save()

spark.read.format("org.apache.kudu.spark.kudu")
  .option("kudu.master", masterAddresses)
  .option("kudu.table", tableName)
  .load().show()

10. Schema definition (ODSSchema) lists all columns and types required for the Kudu table.

lazy val ODSSchema: Schema = {
  val columns = List(
    new ColumnSchemaBuilder("ip", Type.STRING).nullable(false).key(true).build(),
    new ColumnSchemaBuilder("sessionid", Type.STRING).nullable(false).build(),
    new ColumnSchemaBuilder("advertisersid",Type.INT64).nullable(false).build(),
    // ... (remaining column definitions omitted for brevity) ...
    new ColumnSchemaBuilder("sex", Type.STRING).nullable(false).build(),
    new ColumnSchemaBuilder("age", Type.STRING).nullable(false).build()
  ).asJava
  new Schema(columns)
}

11. Code refactoring: KuduUtils.scala encapsulates table creation and data sinking logic, and is invoked from the main program.

package com.imooc.bigdata.cp08.utils

import java.util
import org.apache.kudu.Schema
import org.apache.kudu.client.{CreateTableOptions, KuduClient}
import org.apache.kudu.client.KuduClient.KuduClientBuilder
import org.apache.spark.sql.{DataFrame, SaveMode}

object KuduUtils {

  /**
   * 将DF数据落地到Kudu
   * @param data DF结果集
   * @param tableName Kudu目标表
   * @param master Kudu的Master地址
   * @param schema Kudu的schema信息
   * @param partitionId Kudu表的分区字段
   */
  def sink(data:DataFrame, tableName:String, master:String, schema:Schema, partitionId:String)={
    val client = new KuduClientBuilder(master).build()
    if(client.tableExists(tableName)){
      client.deleteTable(tableName)
    }
    val options = new CreateTableOptions()
    options.setNumReplicas(1)
    val parcols = new util.LinkedList[String]()
    parcols.add(partitionId)
    options.addHashPartitions(parcols,3)
    client.createTable(tableName,schema,options)
    data.write.mode(SaveMode.Append)
      .format("org.apache.kudu.spark.kudu")
      .option("kudu.table",tableName)
      .option("kudu.master",master)
      .save()
  }
}

12. The article concludes with a call to like, collect, and share the tutorial.

big datadata pipelineETLSparkScalaKuduIP lookup
Big Data Technology & Architecture
Written by

Big Data Technology & Architecture

Wang Zhiwu, a big data expert, dedicated to sharing big data technology.

0 followers
Reader feedback

How this landed with the community

Sign in to like

Rate this article

Was this worth your time?

Sign in to rate
Discussion

0 Comments

Thoughtful readers leave field notes, pushback, and hard-won operational detail here.