Overview of SparkSQL Join Execution Process and Implementations
This article explains SparkSQL's overall workflow, introduces the basic elements of joins, and details the physical execution processes for various join types—including sort‑merge, broadcast, and hash joins—along with their implementation conditions and optimization considerations.
SparkSQL Overall Process Overview
Before discussing join implementations, we briefly introduce SparkSQL's overall workflow. SparkSQL can be used either by writing SQL statements (requiring a metastore such as Hive) or by building Dataset/DataFrame applications. SQL statements are parsed into an abstract syntax tree (SQL AST) and then transformed into a query plan, or APIs from Dataset/DataFrame generate a query plan directly. Query plans are divided into logical and physical plans. After syntax analysis (Analyzer) and a series of optimizations (Optimizer), the logical plan is optimized and finally mapped to a physical plan that is executed as RDD operations.
The article does not detail syntax parsing, analysis, or optimization; it focuses on the physical execution of joins.
Basic Elements of a Join
A join consists of three main elements: join type, join condition, and filter condition (the latter can also be expressed with an AND clause inside the join condition).
Spark supports all join types, including:
inner join
left outer join
right outer join
full outer join
left semi join
left anti join
The following sections describe the implementation of each join type.
General Join Implementation Flow
In Spark, the two tables participating in a join are abstracted as a streaming side (streamIter) and a build side (buildIter). Typically, streamIter is the larger table and buildIter the smaller one, but Spark automatically decides which side is which based on the join statement.
During execution, Spark iterates over streamIter. For each rowA, it computes keyA from the join condition, looks up matching rowsB in buildIter where keyB == keyA, joins each matching rowB with rowA, and finally applies any filter conditions to produce the result rows.
Because each row from streamIter requires a lookup in buildIter, buildIter must be a data structure with efficient lookup performance. Spark provides three join implementations: sort‑merge join, broadcast join, and hash join.
Sort‑Merge Join Implementation
To join records, Spark first shuffles data so that records with the same key end up in the same partition. The map phase determines each record's key based on the join condition and writes shuffled data. In the shuffle‑read phase, both streamIter and buildIter are merge‑sorted. While scanning streamIter, Spark performs a sequential search on the already‑sorted buildIter, resuming from the previous match position, which yields good lookup performance.
Broadcast Join Implementation
If the build side is very small, Spark can avoid the shuffle entirely by broadcasting the entire buildIter to every executor and storing it in a hash table. This is often called a map join. Spark automatically chooses broadcast join when the estimated size of buildIter is less than the configuration parameter spark.sql.autoBroadcastJoinThreshold (default 10 MB).
Hash Join Implementation
In a hash join, Spark does not sort records during the shuffle‑read phase. Instead, it builds a hash table from the buildIter records within each partition, enabling fast lookups. Hash join is disabled by default and can be used only when several conditions are met:
buildIter size exceeds spark.sql.autoBroadcastJoinThreshold (so broadcast join is not applicable)
The switch spark.sql.join.preferSortMergeJoin=false is enabled
The average size per partition does not exceed spark.sql.autoBroadcastJoinThreshold (so the hash table fits in memory)
streamIter is at least three times larger than buildIter
Because these conditions are strict, hash join is rarely used; sort‑merge join usually offers comparable performance.
Specific Join Types
Inner Join
Both sides must satisfy the join condition. Spark automatically treats the larger table as streamIter and the smaller as buildIter. If no matching record is found on the right side, the row is omitted.
Left Outer Join
The left table is the primary side; if no matching record exists on the right, a row with all right‑side fields set to NULL is emitted.
Right Outer Join
The right table is the primary side; missing matches on the left produce rows with NULL left‑side fields.
Full Outer Join
Combines the behavior of left and right outer joins. Spark implements it using sort‑merge join, treating each side alternately as streamIter and buildIter. The algorithm compares keys from both sides and emits matched rows, or rows with NULLs when a key exists only on one side.
During the merge, if keyA<keyB , the left row is joined with a null row; if keyA>keyB , the right row is joined with a null row; otherwise the rows are joined together.
Left Semi Join
Returns only the left‑side rows that have at least one matching row on the right; non‑matching left rows are omitted.
Left Anti Join
Returns only the left‑side rows that have no matching rows on the right.
Conclusion
Joins are a fundamental operation in database queries. SparkSQL, as a distributed data‑warehouse system, provides comprehensive join support and performs numerous internal optimizations transparently. Understanding these join implementations helps developers better grasp the execution path of their Spark applications.
Big Data Technology Architecture
Exploring Open Source Big Data and AI Technologies
How this landed with the community
Was this worth your time?
0 Comments
Thoughtful readers leave field notes, pushback, and hard-won operational detail here.