Eliminate Shuffle: Deep Dive into Spark’s Storage Partition Join (SPJ)
This article explains how Spark ≥ 3.3’s Storage Partition Join (SPJ) can avoid costly shuffle operations by using Iceberg tables, outlines the required table properties and Spark configurations, demonstrates the effect with code examples and execution plans, and explores several realistic join scenarios.
Requirements for Storage Partitioned Join (SPJ)
Both source and target tables must be Apache Iceberg tables, share at least one identical partition column, the join condition must include the partition column, and the environment must run Spark ≥ 3.3 with Iceberg ≥ 1.2.0.
Configuration
spark.sql.sources.v2.bucketing.enabled = true
spark.sql.sources.v2.bucketing.pushPartValues.enabled = true
spark.sql.iceberg.planning.preserve-data-grouping = true
spark.sql.requireAllClusterKeysForCoPartition = false
spark.sql.sources.v2.bucketing.partiallyClusteredDistribution.enabled = true
Initialize SparkSession
from pyspark.sql import SparkSession, Row
SPARK_VERSION = "3.5"
ICEBERG_VERSION = "1.5.0"
CATALOG_NAME = "local"
DW_PATH = "/path/to/local/warehouse"
spark = SparkSession.builder \
.master("local[4]") \
.appName("spj-iceberg") \
.config("spark.sql.adaptive.enabled", "true") \
.config('spark.jars.packages', f'org.apache.iceberg:iceberg-spark-runtime-{SPARK_VERSION}_2.12:{ICEBERG_VERSION},org.apache.spark:spark-avro_2.12:3.5.0') \
.config('spark.sql.extensions', 'org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions') \
.config(f'spark.sql.catalog.{CATALOG_NAME}', 'org.apache.iceberg.spark.SparkCatalog') \
.config(f'spark.sql.catalog.{CATALOG_NAME}.type', 'hadoop') \
.config(f'spark.sql.catalog.{CATALOG_NAME}.warehouse', DW_PATH) \
.config('spark.sql.autoBroadcastJoinThreshold', '-1') \
.enableHiveSupport() \
.getOrCreate()Prepare Mock Data and Write to Iceberg
Two Iceberg tables ( customers and orders) are created with Faker‑generated data. Both tables are bucket‑partitioned by region.
# Generate customer data
from faker import Faker
import random
fake = Faker()
Faker.seed(42)
def generate_customer_data(num_customers=1000):
regions = ['North', 'South', 'East', 'West']
customers = []
for _ in range(num_customers):
signup_date = fake.date_time_between(start_date='-3y', end_date='now')
customers.append(Row(
customer_id=fake.unique.random_number(digits=6),
customer_name=fake.name(),
region=random.choice(regions),
signup_date=signup_date,
signup_year=signup_date.year
))
return spark.createDataFrame(customers)
# Generate order data
def generate_order_data(customer_df, num_orders=5000):
customers = [row.customer_id for row in customer_df.select('customer_id').collect()]
orders = []
for _ in range(num_orders):
order_date = fake.date_time_between(start_date='-3y', end_date='now')
orders.append(Row(
order_id=fake.unique.random_number(digits=8),
customer_id=random.choice(customers),
order_date=order_date,
amount=round(random.uniform(10, 1000), 2),
region=random.choice(['North', 'South', 'East', 'West']),
order_year=order_date.year
))
return spark.createDataFrame(orders)
print("Generating sample data...")
customer_df = generate_customer_data(1000)
order_df = generate_order_data(customer_df, 5000)
customer_df.show(5, truncate=False)
order_df.show(5, truncate=False)
# Write to Iceberg
customer_df.writeTo("local.db.customers") \
.tableProperty("format-version", "2") \
.partitionedBy("region") \
.create()
order_df.writeTo("local.db.orders") \
.tableProperty("format-version", "2") \
.partitionedBy("region") \
.create()Join without SPJ (Shuffle Occurs)
CUSTOMERS_TABLE = 'local.db.customers'
ORDERS_TABLE = 'local.db.orders'
cust_df = spark.table(CUSTOMERS_TABLE)
order_df = spark.table(ORDERS_TABLE)
joined_df = cust_df.join(order_df, on='region', how='left')
joined_df.explain("FORMATTED")
joined_df.show(1)The formatted physical plan contains Exchange nodes, indicating a shuffle stage.
Enable SPJ and Re‑run Join
spark.conf.set('spark.sql.sources.v2.bucketing.enabled', 'true')
spark.conf.set('spark.sql.sources.v2.bucketing.pushPartValues.enabled', 'true')
spark.conf.set('spark.sql.iceberg.planning.preserve-data-grouping', 'true')
spark.conf.set('spark.sql.requireAllClusterKeysForCoPartition', 'false')
spark.conf.set('spark.sql.sources.v2.bucketing.partiallyClusteredDistribution.enabled', 'true')
joined_df = cust_df.join(order_df, on='region', how='left')
joined_df.explain("FORMATTED")The new plan no longer contains Exchange nodes, confirming that the join avoided shuffle.
Scenario 1 – Join Key Equals Partition Key
When the join column is exactly the partition column, the minimal configuration is sufficient.
spark.conf.set('spark.sql.sources.v2.bucketing.enabled', 'true')
spark.conf.set('spark.sql.iceberg.planning.preserve-data-grouping', 'true')
joined_df = cust_df.join(order_df, on='region', how='left')
joined_df.explain("FORMATTED")The plan shows no Exchange nodes.
Scenario 2 – Mismatched Partitions
After deleting all rows for the West region from orders, the join re‑introduces a shuffle. Enabling spark.sql.sources.v2.bucketing.pushPartValues.enabled creates empty partitions on the missing side, eliminating the shuffle.
# Delete a partition
spark.sql("DELETE FROM {ORDERS_TABLE} where region='West'")
orders_df.groupBy('region').count().show()
# Join without extra config (shuffle appears)
joined_df = cust_df.join(order_df, on='region', how='left')
joined_df.explain("FORMATTED")
# Enable pushPartValues to avoid shuffle
spark.conf.set('spark.sql.sources.v2.bucketing.pushPartValues.enabled', 'true')
joined_df = cust_df.join(order_df, on='region', how='left')
joined_df.explain("FORMATTED")Scenario 3.1 – Join Key Is a Superset of Partition Key
When the join includes additional columns (e.g., customer_id) beyond the partition column, Spark normally requires the partition keys to match exactly. Setting spark.sql.requireAllClusterKeysForCoPartition = false relaxes this requirement, and the plan shows no shuffle.
spark.conf.set('spark.sql.requireAllClusterKeysForCoPartition', 'false')
joined_df = cust_df.join(order_df, on=['region', 'customer_id'], how='left')
joined_df.explain("FORMATTED")Scenario 3.2 – Join Key Is a Subset of Partition Key (Spark 4.0+)
In Spark < 4.0 this case is not supported. Spark 4.0 introduces the config
spark.sql.sources.v2.bucketing.allowJoinKeysSubsetOfPartitionKeys.enabled. When set to true, Spark can avoid shuffle even if the join does not contain all partition columns.
// Scala example for Spark 4.0 preview
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions.col
object SPJTest {
def main(args: Array[String]): Unit = {
val spark = SparkSession.builder().getOrCreate()
spark.conf.set("spark.sql.sources.v2.bucketing.enabled", "true")
spark.conf.set("spark.sql.iceberg.planning.preserve-data-grouping", "true")
spark.conf.set("spark.sql.sources.v2.bucketing.pushPartValues.enabled", "true")
spark.conf.set("spark.sql.requireAllClusterKeysForCoPartition", "false")
spark.conf.set("spark.sql.sources.v2.bucketing.allowJoinKeysSubsetOfPartitionKeys.enabled", "true")
val cust_df = spark.table("local.db.customers_buck")
val orders_df = spark.table("local.db.orders_buck")
val joined_df = cust_df.alias("cust").join(
orders_df.alias("ord"),
col("cust.region") === col("ord.region"),
"left"
)
println(joined_df.explain("FORMATTED"))
}
}Scenario 4 – Skewed Partitions
If a partition (e.g., region=East) contains a disproportionately large amount of data, enabling
spark.sql.sources.v2.bucketing.partiallyClusteredDistribution.enabledsplits the heavy partition into smaller pieces and replicates the matching side, thereby avoiding shuffle.
spark.conf.set('spark.sql.sources.v2.bucketing.partiallyClusteredDistribution.enabled', 'true')
joined_df = cust_df.join(order_df, on='region', how='left')
joined_df.explain("FORMATTED")References
Storage Partitioned Join Design Doc: https://docs.google.com/document/d/1foTkDSM91VxKgkEcBMsuAvEjNybjja-uHk-r3vtXWFE/edit
Spark PR for SPJ: https://github.com/apache/spark/pull/32875
Spark PR for Partially Clustered Distribution: https://github.com/apache/spark/pull/32875
Spark 4.0.0 preview2 documentation: https://spark.apache.org/docs/4.0.0-preview2/sql-performance-tuning.html#converting-sort-merge-join-to-shuffled-hash-join
Original article (Chinese): https://www.guptaakashdeep.com/storage-partition-join-in-apache-spark-why-how-and-where/
Signed-in readers can open the original source through BestHub's protected redirect.
This article has been distilled and summarized from source material, then republished for learning and reference. If you believe it infringes your rights, please contactand we will review it promptly.
Past Memory Big Data
A popular big-data architecture channel with over 100,000 developers. Publishes articles on Spark, Hadoop, Flink, Kafka and more. Visit the Past Memory Big Data blog at https://www.iteblog.com. Search "Past Memory" on Google or Baidu.
How this landed with the community
Was this worth your time?
0 Comments
Thoughtful readers leave field notes, pushback, and hard-won operational detail here.
