Eliminating Shuffle in Spark Joins with Storage Partitioned Join (SPJ) for Iceberg Tables
This article explains how Spark ≥ 3.3 introduces Storage Partitioned Join (SPJ) to avoid costly shuffle operations when joining partitioned V2 source tables such as Apache Iceberg, detailing the required conditions, configuration settings, practical code examples, and various join scenarios including mismatched partitions and data skew.
With Spark ≥ 3.3 (and more mature in 3.4), the Storage Partitioned Join (SPJ) optimization allows joins on partitioned V2 source tables (e.g., Apache Iceberg, Hudi, Delta Lake) without triggering a shuffle, provided certain requirements are met.
Why shuffle is expensive : it incurs network‑heavy data transfer (CPU intensive) and writes shuffle files to local disks (disk I/O intensive).
SPJ prerequisites :
Both target and source tables must be Iceberg tables.
Tables should share at least one identical partition column.
The join condition must include the partition column.
Relevant Spark configurations must be enabled.
Iceberg version ≥ 1.2.0 and Spark version ≥ 3.3.0.
Essential configurations (set to true):
spark.sql.sources.v2.bucketing.enabled spark.sql.sources.v2.bucketing.pushPartValues.enabled spark.sql.iceberg.planning.preserve-data-grouping spark.sql.requireAllClusterKeysForCoPartition = false spark.sql.sources.v2.bucketing.partiallyClusteredDistribution.enabledInitialize SparkSession (no SPJ configs yet) :
from pyspark.sql import SparkSession, Row
SPARK_VERSION = "3.5"
ICEBERG_VERSION = "1.5.0"
CATALOG_NAME = "local"
DW_PATH = "/path/to/local/warehouse"
spark = SparkSession.builder \
.master("local[4]") \
.appName("spj-iceberg") \
.config("spark.sql.adaptive.enabled", "true") \
.config('spark.jars.packages', f'org.apache.iceberg:iceberg-spark-runtime-{SPARK_VERSION}_2.12:{ICEBERG_VERSION},org.apache.spark:spark-avro_2.12:3.5.0') \
.config('spark.sql.extensions', 'org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions') \
.config(f'spark.sql.catalog.{CATALOG_NAME}', 'org.apache.iceberg.spark.SparkCatalog') \
.config(f'spark.sql.catalog.{CATALOG_NAME}.type', 'hadoop') \
.config(f'spark.sql.catalog.{CATALOG_NAME}.warehouse', DW_PATH) \
.config('spark.sql.autoBroadcastJoinThreshold', '-1') \
.enableHiveSupport() \
.getOrCreate()Create mock data for customers and orders tables using Faker (code omitted for brevity).
Write data to Iceberg tables :
customer_df.writeTo("local.db.customers") \
.tableProperty("format-version", "2") \
.partitionedBy("region") \
.create()
order_df.writeTo("local.db.orders") \
.tableProperty("format-version", "2") \
.partitionedBy("region") \
.create()Join without SPJ (shuffle occurs) :
CUSTOMERS_TABLE = 'local.db.customers'
ORDERS_TABLE = 'local.db.orders'
cust_df = spark.table(CUSTOMERS_TABLE)
order_df = spark.table(ORDERS_TABLE)
joined_df = cust_df.join(order_df, on='region', how='left')
joined_df.explain("FORMATTED")The physical plan contains Exchange nodes, indicating shuffle.
Enable SPJ configurations and repeat the join:
spark.conf.set('spark.sql.sources.v2.bucketing.enabled', 'true')
spark.conf.set('spark.sql.sources.v2.bucketing.pushPartValues.enabled', 'true')
spark.conf.set('spark.sql.iceberg.planning.preserve-data-grouping', 'true')
spark.conf.set('spark.sql.requireAllClusterKeysForCoPartition', 'false')
spark.conf.set('spark.sql.sources.v2.bucketing.partiallyClusteredDistribution.enabled', 'true')
joined_df = cust_df.join(order_df, on='region', how='left')
joined_df.explain("FORMATTED")The new plan no longer shows Exchange nodes, confirming that shuffle has been eliminated.
Scenario 1 – Join key equals partition key : Minimal configuration works; no shuffle.
Scenario 2 – Mismatched partitions : After deleting a partition from the orders table, shuffle reappears. Enabling spark.sql.sources.v2.bucketing.pushPartValues.enabled creates empty partitions on the missing side, removing the shuffle.
Scenario 3 – Join key is a superset of partition keys :
Set spark.sql.requireAllClusterKeysForCoPartition = false to relax the requirement that all partition columns appear in the join.
The plan shows no shuffle after the setting.
Scenario 3.2 – Join key is a subset of partition keys (Spark 4.0) :
Enable
spark.sql.sources.v2.bucketing.allowJoinKeysSubsetOfPartitionKeys.enabledto allow joins that do not contain all partition columns.
Example Scala code demonstrates the configuration and shows a shuffle‑free plan.
Scenario 4 – Data skew in partitions :
When a partition (e.g., region='East') is heavily skewed, enable
spark.sql.sources.v2.bucketing.partiallyClusteredDistribution.enabledto split the skewed partition and replicate the counterpart, avoiding shuffle.
References :
Storage Partitioned Join design document.
Apache Spark pull requests for SPJ and partially clustered distribution.
Spark 4.0 preview documentation on join optimizations.
Additional blog post links.
Signed-in readers can open the original source through BestHub's protected redirect.
This article has been distilled and summarized from source material, then republished for learning and reference. If you believe it infringes your rights, please contactand we will review it promptly.
Big Data Technology & Architecture
Wang Zhiwu, a big data expert, dedicated to sharing big data technology.
How this landed with the community
Was this worth your time?
0 Comments
Thoughtful readers leave field notes, pushback, and hard-won operational detail here.
