Using Spark SQL User-Defined Functions, Aggregate Functions, and Window Functions
This article demonstrates how to create and register custom scalar UDFs, untyped and type‑safe aggregate functions (UDAF and Aggregator) in Spark SQL, and how to apply window functions such as ROW_NUMBER, providing complete Scala code examples and execution results.
The article explains the usage of Spark SQL custom functions, starting with scalar UDFs written in Scala. It shows how to define a class extending UDF2, register it with sparkSession.udf.register, generate sample data, create a temporary view, and invoke the UDF in SQL queries.
package com.udf
import org.apache.spark.sql.api.java.UDF2
class SqlUDF extends UDF2[String, Integer, String] {
override def call(t1: String, t2: Integer): String = {
t1 + "_udf_test_" + t2
}
} val conf = new SparkConf().setAppName("AppUdf").setMaster("local")
val sparkSession = SparkSession.builder().config(conf).getOrCreate()
sparkSession.udf.register("splicing_t1_t2", new SqlUDF, DataTypes.StringType)
// create sample DataFrame and register temporary view "person"
val sql = "SELECT name, age, splicing_t1_t2(name, age) name_age FROM person"
sparkSession.sql(sql).show()Next, it introduces an untyped UserDefinedAggregateFunction (UDAF) for computing average age. The class AvgAge implements required methods such as inputSchema, bufferSchema, initialize, update, merge, and evaluate. The function is registered and used in a SQL GROUP BY query.
class AvgAge extends UserDefinedAggregateFunction {
override def inputSchema: StructType = DataTypes.createStructType(Array(
DataTypes.createStructField("age", DataTypes.IntegerType, true)))
override def bufferSchema: StructType = DataTypes.createStructType(Array(
DataTypes.createStructField("sum", DataTypes.DoubleType, true),
DataTypes.createStructField("count", DataTypes.IntegerType, true)))
override def dataType: DataType = DataTypes.DoubleType
override def deterministic: Boolean = true
override def initialize(buffer: MutableAggregationBuffer): Unit = {
buffer.update(0, 0.0)
buffer.update(1, 0)
}
override def update(buffer: MutableAggregationBuffer, input: Row): Unit = {
val sum = buffer.getDouble(0)
val count = buffer.getInt(1)
buffer.update(0, sum + input.getInt(0).toDouble)
buffer.update(1, count + 1)
}
override def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit = {
buffer1.update(0, buffer1.getDouble(0) + buffer2.getDouble(0))
buffer1.update(1, buffer1.getInt(1) + buffer2.getInt(1))
}
override def evaluate(buffer: Row): Any = {
val bd = new BigDecimal(buffer.getDouble(0) / buffer.getInt(1).toDouble)
bd.setScale(2, BigDecimal.ROUND_HALF_UP).doubleValue()
}
}Then, a type‑safe aggregator is presented by extending Aggregator. The example defines a case class DataBuf to hold sum and count, and implements zero, reduce, merge, and finish methods, along with the required encoders.
case class DataBuf(var sum: Double, var count: Int)
object AvgAgeAggregator extends Aggregator[Int, DataBuf, Double] {
override def zero: DataBuf = DataBuf(0.0, 0)
override def reduce(b: DataBuf, a: Int): DataBuf = {
b.count += 1
b.sum += a.toDouble
b
}
override def merge(b1: DataBuf, b2: DataBuf): DataBuf = {
b1.sum += b2.sum
b1.count += b2.count
b1
}
override def finish(reduction: DataBuf): Double = {
val bd = new BigDecimal(reduction.sum / reduction.count.toDouble)
bd.setScale(2, BigDecimal.ROUND_HALF_UP).doubleValue()
}
override def bufferEncoder: Encoder[DataBuf] = Encoders.product
override def outputEncoder: Encoder[Double] = Encoders.scalaDouble
}Finally, the article covers Spark window functions, focusing on ROW_NUMBER(). It explains the syntax ROW_NUMBER() OVER (PARTITION BY … ORDER BY …) and provides a full example that adds a rank column to a DataFrame based on grouping by id and ordering by age.
val sql = "SELECT id, name, age, row_number() OVER (PARTITION BY id ORDER BY age) rank FROM person ORDER BY id DESC, rank DESC"
sparkSession.sql(sql).show()The article also highlights the differences between untyped UDAF and type‑safe Aggregator: the former cannot carry type information and is used via SQL registration, while the latter is type‑safe and must be invoked on a Dataset.
Signed-in readers can open the original source through BestHub's protected redirect.
This article has been distilled and summarized from source material, then republished for learning and reference. If you believe it infringes your rights, please contactand we will review it promptly.
Big Data Technology & Architecture
Wang Zhiwu, a big data expert, dedicated to sharing big data technology.
How this landed with the community
Was this worth your time?
0 Comments
Thoughtful readers leave field notes, pushback, and hard-won operational detail here.
