Big Data 15 min read

Understanding Spark SQL: Concepts, Queries, Data Sources, and Practical Examples

This article introduces Spark SQL fundamentals, including its architecture, DataFrame and Dataset abstractions, query methods, interoperability with RDD, user-defined functions, integration with Hive, data source handling, and provides step‑by‑step Scala code examples for loading data, performing aggregations, and solving common analytical tasks.

Big Data Technology & Architecture
Big Data Technology & Architecture
Big Data Technology & Architecture
Understanding Spark SQL: Concepts, Queries, Data Sources, and Practical Examples

Spark SQL is a module of Apache Spark that enables SQL‑style queries on distributed data, translating them into RDD operations similar to how Hive maps SQL to MapReduce.

Key features of Spark SQL include seamless integration with Spark Core, a unified SQL interface, Hive compatibility for both embedded and external Hive, and support for JDBC/ODBC via a Thrift server.

Data abstractions have evolved from RDD (Spark 1.0) to DataFrame (Spark 1.3) and finally to Dataset (Spark 1.6). A DataFrame is an RDD with a schema (a two‑dimensional table) and uses the Catalyst optimizer; a Dataset combines the benefits of DataFrames with type‑safe, strongly‑typed APIs.

Spark SQL client usage can be done through the Spark shell (where spark is a SparkSession and sc is a SparkContext), reading JSON files into DataFrames, using the DataFrame API, or registering a DataFrame as a temporary view and executing standard SQL via spark.sql.

Query methods for DataFrames include a DSL style (requiring import spark.implicits._ for implicit conversion) and a SQL style (registering the DataFrame as a view with createOrReplaceTempView or createGlobalTempView and then running spark.sql).

Interoperability between RDD, DataFrame, and Dataset is demonstrated with examples such as converting an RDD to a DataFrame using rdd.map(...).toDF, registering it as a temporary view, and querying it with SQL. Conversions in both directions are shown:

# RDD → DataFrame (simple)
rdd.map(para => (para(0).trim, para(1).trim.toInt)).toDF("name","age")<br/># RDD → DataFrame (reflection)
case class Person(name:String, age:Int)
val peopleDF = spark.sparkContext.textFile("people.txt")
  .map(_.split(","))
  .map(p => Person(p(0).trim, p(1).trim.toInt))
  .toDF
peopleDF.show()

Other conversion snippets include dataFrame.rdd, rdd.map(...).toDS, dataSet.rdd, dataFrame.to[Person], and dataSet.toDF.

User‑Defined Functions (UDF) can be registered with spark.udf.register(name, func) and used in SQL queries after the DataFrame or Dataset is registered as a view.

User‑Defined Aggregate Functions (UDAF) are covered in two forms. A weakly‑typed UDAF extends UserDefinedAggregateFunction and overrides methods such as inputSchema, bufferSchema, dataType, initialize, update, merge, and evaluate. A strongly‑typed UDAF extends Aggregator[Input, Buffer, Output] and overrides zero, reduce, merge, finish, bufferEncoder, and outputEncoder.

Integration with Hive can be done using Spark’s built‑in Hive (version 1.2.1 in Spark 2.1.1) by copying core-site.xml and hdfs-site.xml to conf, setting spark.sql.warehouse.dir, and ensuring data resides in HDFS. For external Hive, copy hive-site.xml to conf and place the MySQL JDBC driver in Spark’s jars directory if needed.

Data source handling uses sparkSession.read for input (e.g., .format("json").load(path)) supporting formats like Parquet, JSON, CSV, ORC, JDBC, and sparkSession.write for output (e.g., .format("json").save(path)), with Parquet as the default format.

Practical Spark SQL example demonstrates loading three datasets ( tbStock, tbStockDetail, tbDate) as case classes, creating temporary views, and executing a series of SQL queries to:

Calculate yearly order count and total sales amount.

Find the maximum order amount per year.

Identify the best‑selling product each year.

# Load datasets
case class tbStock(ordernumber:String, locationid:String, dateid:String)
val tbStockDS = spark.sparkContext.textFile("tbStock.txt")
  .map(_.split(","))
  .map(a => tbStock(a(0), a(1), a(2)))
  .toDS

case class tbStockDetail(ordernumber:String, rownum:Int, itemid:String, number:Int, price:Double, amount:Double)
val tbStockDetailDS = spark.sparkContext.textFile("tbStockDetail.txt")
  .map(_.split(","))
  .map(a => tbStockDetail(a(0), a(1).trim.toInt, a(2), a(3).trim.toInt, a(4).trim.toDouble, a(5).trim.toDouble))
  .toDS

case class tbDate(dateid:String, years:Int, theyear:Int, month:Int, day:Int, weekday:Int, week:Int, quarter:Int, period:Int, halfmonth:Int)
val tbDateDS = spark.sparkContext.textFile("tbDate.txt")
  .map(_.split(","))
  .map(a => tbDate(a(0), a(1).trim.toInt, a(2).trim.toInt, a(3).trim.toInt, a(4).trim.toInt, a(5).trim.toInt, a(6).trim.toInt, a(7).trim.toInt, a(8).trim.toInt, a(9).trim.toInt))
  .toDS

// Register temporary views
tbStockDS.createOrReplaceTempView("tbStock")
tbStockDetailDS.createOrReplaceTempView("tbStockDetail")
tbDateDS.createOrReplaceTempView("tbDate")

// 1) Yearly order count and total amount
select c.theyear, count(distinct a.ordernumber) as order_cnt, sum(b.amount) as total_amount
from tbStock a
join tbStockDetail b on a.ordernumber = b.ordernumber
join tbDate c on a.dateid = c.dateid
group by c.theyear
order by c.theyear;

// 2) Maximum order amount per year
select d.theyear, max(c.SumOfAmount) as max_amount
from (
  select a.dateid, a.ordernumber, sum(b.amount) as SumOfAmount
  from tbStock a
  join tbStockDetail b on a.ordernumber = b.ordernumber
  group by a.dateid, a.ordernumber
) c
join tbDate d on c.dateid = d.dateid
group by d.theyear
order by d.theyear desc;

// 3) Best‑selling product per year
select distinct e.theyear, e.itemid, f.maxofamount
from (
  select c.theyear, b.itemid, sum(b.amount) as sumofamount
  from tbStock a
  join tbStockDetail b on a.ordernumber = b.ordernumber
  join tbDate c on a.dateid = c.dateid
group by c.theyear, b.itemid
) e
join (
  select d.theyear, max(d.sumofamount) as maxofamount
  from (
    select c.theyear, b.itemid, sum(b.amount) as sumofamount
    from tbStock a
    join tbStockDetail b on a.ordernumber = b.ordernumber
    join tbDate c on a.dateid = c.dateid
    group by c.theyear, b.itemid
  ) d
  group by d.theyear
) f on e.theyear = f.theyear and e.sumofamount = f.maxofamount
order by e.theyear;

The accompanying images in the original article illustrate the results of each query.

Original Source

Signed-in readers can open the original source through BestHub's protected redirect.

Sign in to view source
Republication Notice

This article has been distilled and summarized from source material, then republished for learning and reference. If you believe it infringes your rights, please contactadmin@besthub.devand we will review it promptly.

SQLSparkSQLHiveUDFbigdataScalaDataFrames
Big Data Technology & Architecture
Written by

Big Data Technology & Architecture

Wang Zhiwu, a big data expert, dedicated to sharing big data technology.

0 followers
Reader feedback

How this landed with the community

Sign in to like

Rate this article

Was this worth your time?

Sign in to rate
Discussion

0 Comments

Thoughtful readers leave field notes, pushback, and hard-won operational detail here.