Big Data 10 min read

How Spark SQL Chooses Join Strategies: Broadcast, Shuffle Hash, and Sort Merge

The article explains Spark SQL's Catalyst optimizer rules for selecting among Broadcast hash join, Shuffle hash join, and Sort‑merge join, covering build‑side determination, size thresholds, broadcast hints, local hash‑map construction, and fallback strategies for non‑equi joins.

Big Data Technology & Architecture
Big Data Technology & Architecture
Big Data Technology & Architecture
How Spark SQL Chooses Join Strategies: Broadcast, Shuffle Hash, and Sort Merge

Introduction

Spark SQL supports three join implementations—Broadcast hash join, Shuffle hash join, and Sort‑merge join. The Catalyst optimizer decides which strategy to use by applying a series of rules defined in the JoinSelection object.

Build‑side Determination

The optimizer first decides which table will be the build side (the smaller table that forms a hash table). The following Scala methods illustrate the logic:

private def canBuildRight(joinType: JoinType): Boolean = joinType match {
  case _: InnerLike | LeftOuter | LeftSemi | LeftAnti | _: ExistenceJoin => true
  case _ => false
}

private def canBuildLeft(joinType: JoinType): Boolean = joinType match {
  case _: InnerLike | RightOuter => true
  case _ => false
}

If the join type is inner‑like (including cross join) or right outer, the left table can be the build side; otherwise the right table may be the build side.

Broadcast Conditions

A table is automatically broadcast when its size is smaller than the configuration spark.sql.autoBroadcastJoinThreshold (default 10 MB) or when a broadcast hint is supplied.

private def canBroadcast(plan: LogicalPlan): Boolean = {
  plan.stats.sizeInBytes >= 0 && plan.stats.sizeInBytes <= conf.autoBroadcastJoinThreshold
}

private def canBroadcastBySizes(joinType: JoinType, left: LogicalPlan, right: LogicalPlan): Boolean = {
  val buildLeft = canBuildLeft(joinType) && canBroadcast(left)
  val buildRight = canBuildRight(joinType) && canBroadcast(right)
  buildLeft || buildRight
}

private def canBroadcastByHints(joinType: JoinType, left: LogicalPlan, right: LogicalPlan): Boolean = {
  val buildLeft = canBuildLeft(joinType) && left.stats.hints.broadcast
  val buildRight = canBuildRight(joinType) && right.stats.hints.broadcast
  buildLeft || buildRight
}

Both methods eventually call broadcastSide to decide which side to broadcast:

private def broadcastSide(canBuildLeft: Boolean, canBuildRight: Boolean, left: LogicalPlan, right: LogicalPlan): BuildSide = {
  def smallerSide = if (right.stats.sizeInBytes <= left.stats.sizeInBytes) BuildRight else BuildLeft
  if (canBuildRight && canBuildLeft) {
    // both sides possible – broadcast the smaller one
    smallerSide
  } else if (canBuildRight) {
    BuildRight
  } else if (canBuildLeft) {
    BuildLeft
  } else {
    // fallback for default broadcast nested‑loop join
    smallerSide
  }
}

Local HashMap for Shuffle Hash Join

When the data volume is modest, Spark can keep the shuffled intermediate results in a local HashMap. This is allowed if the plan size is less than

spark.sql.autoBroadcastJoinThreshold * spark.sql.shuffle.partitions

:

private def canBuildLocalHashMap(plan: LogicalPlan): Boolean = {
  plan.stats.sizeInBytes < conf.autoBroadcastJoinThreshold * conf.numShufflePartitions
}

Join Strategy Selection Order

The optimizer evaluates strategies from most efficient to least efficient.

Broadcast Hash Join

case ExtractEquiJoinKeys(joinType, leftKeys, rightKeys, condition, left, right)
  if canBroadcastByHints(joinType, left, right) =>
    val buildSide = broadcastSideByHints(joinType, left, right)
    Seq(joins.BroadcastHashJoinExec(leftKeys, rightKeys, joinType, buildSide, condition, planLater(left), planLater(right)))

case ExtractEquiJoinKeys(joinType, leftKeys, rightKeys, condition, left, right)
  if canBroadcastBySizes(joinType, left, right) =>
    val buildSide = broadcastSideBySizes(joinType, left, right)
    Seq(joins.BroadcastHashJoinExec(leftKeys, rightKeys, joinType, buildSide, condition, planLater(left), planLater(right)))

Shuffle Hash Join

case ExtractEquiJoinKeys(joinType, leftKeys, rightKeys, condition, left, right)
  if !conf.preferSortMergeJoin && canBuildRight(joinType) && canBuildLocalHashMap(right) && muchSmaller(right, left) || !RowOrdering.isOrderable(leftKeys) =>
    Seq(joins.ShuffledHashJoinExec(leftKeys, rightKeys, joinType, BuildRight, condition, planLater(left), planLater(right)))

case ExtractEquiJoinKeys(joinType, leftKeys, rightKeys, condition, left, right)
  if !conf.preferSortMergeJoin && canBuildLeft(joinType) && canBuildLocalHashMap(left) && muchSmaller(left, right) || !RowOrdering.isOrderable(leftKeys) =>
    Seq(joins.ShuffledHashJoinExec(leftKeys, rightKeys, joinType, BuildLeft, condition, planLater(left), planLater(right)))

Sort‑Merge Join

case ExtractEquiJoinKeys(joinType, leftKeys, rightKeys, condition, left, right)
  if RowOrdering.isOrderable(leftKeys) =>
    joins.SortMergeJoinExec(leftKeys, rightKeys, joinType, condition, planLater(left), planLater(right)) :: Nil

Non‑Equi Join Fallbacks

// BroadcastNestedLoopJoin when a side can be broadcast
case j @ logical.Join(left, right, joinType, condition)
  if canBroadcastByHints(joinType, left, right) =>
    val buildSide = broadcastSideByHints(joinType, left, right)
    joins.BroadcastNestedLoopJoinExec(planLater(left), planLater(right), buildSide, joinType, condition) :: Nil

// Cartesian product for inner joins when no broadcast is possible
case logical.Join(left, right, _: InnerLike, condition) =>
  joins.CartesianProductExec(planLater(left), planLater(right), condition) :: Nil

If none of the above conditions are satisfied, Spark falls back to broadcasting the smaller side or, in the worst case, performs a full Cartesian product, both of which can be very expensive and should be avoided.

Conclusion

The Catalyst optimizer selects join implementations based on broadcast hints, table size thresholds, join‑type compatibility, and key orderability, prioritizing Broadcast hash join, then Shuffle hash join, and finally Sort‑merge join, with non‑equi joins resorting to nested‑loop or Cartesian strategies.

Original Source

Signed-in readers can open the original source through BestHub's protected redirect.

Sign in to view source
Republication Notice

This article has been distilled and summarized from source material, then republished for learning and reference. If you believe it infringes your rights, please contactadmin@besthub.devand we will review it promptly.

Big DataSpark SQLBroadcast Joinjoin strategyShuffle Hash JoinSort Merge Join
Big Data Technology & Architecture
Written by

Big Data Technology & Architecture

Wang Zhiwu, a big data expert, dedicated to sharing big data technology.

0 followers
Reader feedback

How this landed with the community

Sign in to like

Rate this article

Was this worth your time?

Sign in to rate
Discussion

0 Comments

Thoughtful readers leave field notes, pushback, and hard-won operational detail here.