Spark SQL UNION Causing driver.maxResultSize Error and Its Resolution
When executing a Spark SQL query with dozens of UNION subqueries that each contain JOIN operations on Spark 3.1.2, the job fails because the total serialized result size of the tasks exceeds the driver’s maxResultSize limit, and the issue can be resolved by reducing the initial partition number used by Adaptive Query Execution.
Background
The SQL runs on Spark version 3.1.2 under a Thrift server.
Issue
Running a Spark SQL that contains more than 50 UNION sub‑queries (each with a JOIN) triggers the following error:
Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: Total size of serialized results of 22460 tasks (2.0 GiB) is bigger than spark.driver.maxResultSize (2.0 GiB)One of the UNION sub‑queries looks like this:
SELECT a1.order_no,
a1.need_column,
a1.join_id
FROM temp.actul_a a1
JOIN temp.actul_a a2 ON a1.join_id = a2.join_id AND a2.need_column = 'we need it'
WHERE a1.need_column = 'others needs it'Root Cause Analysis
All RDDs participating in the UNION form a single task set; each write task returns metadata (file count, size, row count) to the driver. The driver aggregates this metadata, and when the accumulated size exceeds spark.driver.maxResultSize, it aborts the job.
The large number of tasks originates from the initial partition number used by Adaptive Query Execution (AQE). The configuration spark.sql.adaptive.coalescePartitions.initialPartitionNum was set to 1000, so after shuffle the job created 1000 partitions, each generating a result task. With AQE and spark.sql.adaptive.enabled turned on, these partitions are not further reduced, causing the driver to receive billions of bytes of metadata.
Key code excerpts:
def canFetchMoreResults(size: Long): Boolean = sched.synchronized {
totalResultSize += size
calculatedTasks += 1
if (!isShuffleMapTasks && maxResultSize > 0 && totalResultSize > maxResultSize) {
val msg = s"Total size of serialized results of ${calculatedTasks} tasks (${Utils.bytesToString(totalResultSize)}) is bigger than ${config.MAX_RESULT_SIZE.key} (${Utils.bytesToString(maxResultSize)})"
logError(msg)
abort(msg)
false
} else {
true
}
} val tasks: Seq[Task[_]] = try {
stage match {
case stage: ShuffleMapStage =>
partitionsToCompute.map { id =>
val locs = taskIdToLocations(id)
val part = partitions(id)
new ShuffleMapTask(stage.id, stage.latestInfo.attemptNumber, taskBinary, part, locs, properties, serializedTaskMetrics, Option(jobId), Option(sc.applicationId), sc.applicationAttemptId, stage.rdd.isBarrier())
}
case stage: ResultStage =>
partitionsToCompute.map { id =>
val p: Int = stage.partitions(id)
val part = partitions(p)
val locs = taskIdToLocations(id)
new ResultTask(stage.id, stage.latestInfo.attemptNumber, taskBinary, part, locs, id, properties, serializedTaskMetrics, Option(jobId), Option(sc.applicationId), sc.applicationAttemptId, stage.rdd.isBarrier())
}
}
}Solution
Reduce the initial partition number for AQE, for example:
spark.sql.adaptive.coalescePartitions.initialPartitionNum=200This lowers the number of shuffle partitions, consequently reducing the number of result tasks and keeping the total serialized result size below the driver’s limit.
Signed-in readers can open the original source through BestHub's protected redirect.
This article has been distilled and summarized from source material, then republished for learning and reference. If you believe it infringes your rights, please contactand we will review it promptly.
Big Data Technology & Architecture
Wang Zhiwu, a big data expert, dedicated to sharing big data technology.
How this landed with the community
Was this worth your time?
0 Comments
Thoughtful readers leave field notes, pushback, and hard-won operational detail here.
