How Spark SQL Chooses Join Strategies: Broadcast, Shuffle Hash, and Sort Merge
The article explains Spark SQL's Catalyst optimizer rules for selecting among Broadcast hash join, Shuffle hash join, and Sort‑merge join, covering build‑side determination, size thresholds, broadcast hints, local hash‑map construction, and fallback strategies for non‑equi joins.
Introduction
Spark SQL supports three join implementations—Broadcast hash join, Shuffle hash join, and Sort‑merge join. The Catalyst optimizer decides which strategy to use by applying a series of rules defined in the JoinSelection object.
Build‑side Determination
The optimizer first decides which table will be the build side (the smaller table that forms a hash table). The following Scala methods illustrate the logic:
private def canBuildRight(joinType: JoinType): Boolean = joinType match {
case _: InnerLike | LeftOuter | LeftSemi | LeftAnti | _: ExistenceJoin => true
case _ => false
}
private def canBuildLeft(joinType: JoinType): Boolean = joinType match {
case _: InnerLike | RightOuter => true
case _ => false
}If the join type is inner‑like (including cross join) or right outer, the left table can be the build side; otherwise the right table may be the build side.
Broadcast Conditions
A table is automatically broadcast when its size is smaller than the configuration spark.sql.autoBroadcastJoinThreshold (default 10 MB) or when a broadcast hint is supplied.
private def canBroadcast(plan: LogicalPlan): Boolean = {
plan.stats.sizeInBytes >= 0 && plan.stats.sizeInBytes <= conf.autoBroadcastJoinThreshold
}
private def canBroadcastBySizes(joinType: JoinType, left: LogicalPlan, right: LogicalPlan): Boolean = {
val buildLeft = canBuildLeft(joinType) && canBroadcast(left)
val buildRight = canBuildRight(joinType) && canBroadcast(right)
buildLeft || buildRight
}
private def canBroadcastByHints(joinType: JoinType, left: LogicalPlan, right: LogicalPlan): Boolean = {
val buildLeft = canBuildLeft(joinType) && left.stats.hints.broadcast
val buildRight = canBuildRight(joinType) && right.stats.hints.broadcast
buildLeft || buildRight
}Both methods eventually call broadcastSide to decide which side to broadcast:
private def broadcastSide(canBuildLeft: Boolean, canBuildRight: Boolean, left: LogicalPlan, right: LogicalPlan): BuildSide = {
def smallerSide = if (right.stats.sizeInBytes <= left.stats.sizeInBytes) BuildRight else BuildLeft
if (canBuildRight && canBuildLeft) {
// both sides possible – broadcast the smaller one
smallerSide
} else if (canBuildRight) {
BuildRight
} else if (canBuildLeft) {
BuildLeft
} else {
// fallback for default broadcast nested‑loop join
smallerSide
}
}Local HashMap for Shuffle Hash Join
When the data volume is modest, Spark can keep the shuffled intermediate results in a local HashMap. This is allowed if the plan size is less than
spark.sql.autoBroadcastJoinThreshold * spark.sql.shuffle.partitions:
private def canBuildLocalHashMap(plan: LogicalPlan): Boolean = {
plan.stats.sizeInBytes < conf.autoBroadcastJoinThreshold * conf.numShufflePartitions
}Join Strategy Selection Order
The optimizer evaluates strategies from most efficient to least efficient.
Broadcast Hash Join
case ExtractEquiJoinKeys(joinType, leftKeys, rightKeys, condition, left, right)
if canBroadcastByHints(joinType, left, right) =>
val buildSide = broadcastSideByHints(joinType, left, right)
Seq(joins.BroadcastHashJoinExec(leftKeys, rightKeys, joinType, buildSide, condition, planLater(left), planLater(right)))
case ExtractEquiJoinKeys(joinType, leftKeys, rightKeys, condition, left, right)
if canBroadcastBySizes(joinType, left, right) =>
val buildSide = broadcastSideBySizes(joinType, left, right)
Seq(joins.BroadcastHashJoinExec(leftKeys, rightKeys, joinType, buildSide, condition, planLater(left), planLater(right)))Shuffle Hash Join
case ExtractEquiJoinKeys(joinType, leftKeys, rightKeys, condition, left, right)
if !conf.preferSortMergeJoin && canBuildRight(joinType) && canBuildLocalHashMap(right) && muchSmaller(right, left) || !RowOrdering.isOrderable(leftKeys) =>
Seq(joins.ShuffledHashJoinExec(leftKeys, rightKeys, joinType, BuildRight, condition, planLater(left), planLater(right)))
case ExtractEquiJoinKeys(joinType, leftKeys, rightKeys, condition, left, right)
if !conf.preferSortMergeJoin && canBuildLeft(joinType) && canBuildLocalHashMap(left) && muchSmaller(left, right) || !RowOrdering.isOrderable(leftKeys) =>
Seq(joins.ShuffledHashJoinExec(leftKeys, rightKeys, joinType, BuildLeft, condition, planLater(left), planLater(right)))Sort‑Merge Join
case ExtractEquiJoinKeys(joinType, leftKeys, rightKeys, condition, left, right)
if RowOrdering.isOrderable(leftKeys) =>
joins.SortMergeJoinExec(leftKeys, rightKeys, joinType, condition, planLater(left), planLater(right)) :: NilNon‑Equi Join Fallbacks
// BroadcastNestedLoopJoin when a side can be broadcast
case j @ logical.Join(left, right, joinType, condition)
if canBroadcastByHints(joinType, left, right) =>
val buildSide = broadcastSideByHints(joinType, left, right)
joins.BroadcastNestedLoopJoinExec(planLater(left), planLater(right), buildSide, joinType, condition) :: Nil
// Cartesian product for inner joins when no broadcast is possible
case logical.Join(left, right, _: InnerLike, condition) =>
joins.CartesianProductExec(planLater(left), planLater(right), condition) :: NilIf none of the above conditions are satisfied, Spark falls back to broadcasting the smaller side or, in the worst case, performs a full Cartesian product, both of which can be very expensive and should be avoided.
Conclusion
The Catalyst optimizer selects join implementations based on broadcast hints, table size thresholds, join‑type compatibility, and key orderability, prioritizing Broadcast hash join, then Shuffle hash join, and finally Sort‑merge join, with non‑equi joins resorting to nested‑loop or Cartesian strategies.
Signed-in readers can open the original source through BestHub's protected redirect.
This article has been distilled and summarized from source material, then republished for learning and reference. If you believe it infringes your rights, please contactand we will review it promptly.
Big Data Technology & Architecture
Wang Zhiwu, a big data expert, dedicated to sharing big data technology.
How this landed with the community
Was this worth your time?
0 Comments
Thoughtful readers leave field notes, pushback, and hard-won operational detail here.
