Big Data 16 min read

Using Spark SQL User-Defined Functions, Aggregate Functions, and Window Functions

This article demonstrates how to create and register custom scalar UDFs, untyped and type‑safe aggregate functions (UDAF and Aggregator) in Spark SQL, and how to apply window functions such as ROW_NUMBER, providing complete Scala code examples and execution results.

Big Data Technology & Architecture
Big Data Technology & Architecture
Big Data Technology & Architecture
Using Spark SQL User-Defined Functions, Aggregate Functions, and Window Functions

The article explains the usage of Spark SQL custom functions, starting with scalar UDFs written in Scala. It shows how to define a class extending UDF2, register it with sparkSession.udf.register, generate sample data, create a temporary view, and invoke the UDF in SQL queries.

package com.udf
import org.apache.spark.sql.api.java.UDF2
class SqlUDF extends UDF2[String, Integer, String] {
  override def call(t1: String, t2: Integer): String = {
    t1 + "_udf_test_" + t2
  }
}
val conf = new SparkConf().setAppName("AppUdf").setMaster("local")
val sparkSession = SparkSession.builder().config(conf).getOrCreate()
sparkSession.udf.register("splicing_t1_t2", new SqlUDF, DataTypes.StringType)
// create sample DataFrame and register temporary view "person"
val sql = "SELECT name, age, splicing_t1_t2(name, age) name_age FROM person"
sparkSession.sql(sql).show()

Next, it introduces an untyped UserDefinedAggregateFunction (UDAF) for computing average age. The class AvgAge implements required methods such as inputSchema, bufferSchema, initialize, update, merge, and evaluate. The function is registered and used in a SQL GROUP BY query.

class AvgAge extends UserDefinedAggregateFunction {
  override def inputSchema: StructType = DataTypes.createStructType(Array(
    DataTypes.createStructField("age", DataTypes.IntegerType, true)))
  override def bufferSchema: StructType = DataTypes.createStructType(Array(
    DataTypes.createStructField("sum", DataTypes.DoubleType, true),
    DataTypes.createStructField("count", DataTypes.IntegerType, true)))
  override def dataType: DataType = DataTypes.DoubleType
  override def deterministic: Boolean = true
  override def initialize(buffer: MutableAggregationBuffer): Unit = {
    buffer.update(0, 0.0)
    buffer.update(1, 0)
  }
  override def update(buffer: MutableAggregationBuffer, input: Row): Unit = {
    val sum = buffer.getDouble(0)
    val count = buffer.getInt(1)
    buffer.update(0, sum + input.getInt(0).toDouble)
    buffer.update(1, count + 1)
  }
  override def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit = {
    buffer1.update(0, buffer1.getDouble(0) + buffer2.getDouble(0))
    buffer1.update(1, buffer1.getInt(1) + buffer2.getInt(1))
  }
  override def evaluate(buffer: Row): Any = {
    val bd = new BigDecimal(buffer.getDouble(0) / buffer.getInt(1).toDouble)
    bd.setScale(2, BigDecimal.ROUND_HALF_UP).doubleValue()
  }
}

Then, a type‑safe aggregator is presented by extending Aggregator. The example defines a case class DataBuf to hold sum and count, and implements zero, reduce, merge, and finish methods, along with the required encoders.

case class DataBuf(var sum: Double, var count: Int)
object AvgAgeAggregator extends Aggregator[Int, DataBuf, Double] {
  override def zero: DataBuf = DataBuf(0.0, 0)
  override def reduce(b: DataBuf, a: Int): DataBuf = {
    b.count += 1
    b.sum += a.toDouble
    b
  }
  override def merge(b1: DataBuf, b2: DataBuf): DataBuf = {
    b1.sum += b2.sum
    b1.count += b2.count
    b1
  }
  override def finish(reduction: DataBuf): Double = {
    val bd = new BigDecimal(reduction.sum / reduction.count.toDouble)
    bd.setScale(2, BigDecimal.ROUND_HALF_UP).doubleValue()
  }
  override def bufferEncoder: Encoder[DataBuf] = Encoders.product
  override def outputEncoder: Encoder[Double] = Encoders.scalaDouble
}

Finally, the article covers Spark window functions, focusing on ROW_NUMBER(). It explains the syntax ROW_NUMBER() OVER (PARTITION BY … ORDER BY …) and provides a full example that adds a rank column to a DataFrame based on grouping by id and ordering by age.

val sql = "SELECT id, name, age, row_number() OVER (PARTITION BY id ORDER BY age) rank FROM person ORDER BY id DESC, rank DESC"
sparkSession.sql(sql).show()

The article also highlights the differences between untyped UDAF and type‑safe Aggregator: the former cannot carry type information and is used via SQL registration, while the latter is type‑safe and must be invoked on a Dataset.

Original Source

Signed-in readers can open the original source through BestHub's protected redirect.

Sign in to view source
Republication Notice

This article has been distilled and summarized from source material, then republished for learning and reference. If you believe it infringes your rights, please contactadmin@besthub.devand we will review it promptly.

Big DataSQLUDFSparkScalaAggregatorUDAF
Big Data Technology & Architecture
Written by

Big Data Technology & Architecture

Wang Zhiwu, a big data expert, dedicated to sharing big data technology.

0 followers
Reader feedback

How this landed with the community

Sign in to like

Rate this article

Was this worth your time?

Sign in to rate
Discussion

0 Comments

Thoughtful readers leave field notes, pushback, and hard-won operational detail here.