Big Data 15 min read

Eliminating Shuffle in Spark Joins with Storage Partitioned Join (SPJ) for Iceberg Tables

This article explains how Spark ≥ 3.3 introduces Storage Partitioned Join (SPJ) to avoid costly shuffle operations when joining partitioned V2 source tables such as Apache Iceberg, detailing the required conditions, configuration settings, practical code examples, and various join scenarios including mismatched partitions and data skew.

Big Data Technology & Architecture
Big Data Technology & Architecture
Big Data Technology & Architecture
Eliminating Shuffle in Spark Joins with Storage Partitioned Join (SPJ) for Iceberg Tables

With Spark ≥ 3.3 (and more mature in 3.4), the Storage Partitioned Join (SPJ) optimization allows joins on partitioned V2 source tables (e.g., Apache Iceberg, Hudi, Delta Lake) without triggering a shuffle, provided certain requirements are met.

Why shuffle is expensive : it incurs network‑heavy data transfer (CPU intensive) and writes shuffle files to local disks (disk I/O intensive).

SPJ prerequisites :

Both target and source tables must be Iceberg tables.

Tables should share at least one identical partition column.

The join condition must include the partition column.

Relevant Spark configurations must be enabled.

Iceberg version ≥ 1.2.0 and Spark version ≥ 3.3.0.

Essential configurations (set to true):

spark.sql.sources.v2.bucketing.enabled
spark.sql.sources.v2.bucketing.pushPartValues.enabled
spark.sql.iceberg.planning.preserve-data-grouping
spark.sql.requireAllClusterKeysForCoPartition = false
spark.sql.sources.v2.bucketing.partiallyClusteredDistribution.enabled

Initialize SparkSession (no SPJ configs yet) :

from pyspark.sql import SparkSession, Row
SPARK_VERSION = "3.5"
ICEBERG_VERSION = "1.5.0"
CATALOG_NAME = "local"
DW_PATH = "/path/to/local/warehouse"

spark = SparkSession.builder \
    .master("local[4]") \
    .appName("spj-iceberg") \
    .config("spark.sql.adaptive.enabled", "true") \
    .config('spark.jars.packages', f'org.apache.iceberg:iceberg-spark-runtime-{SPARK_VERSION}_2.12:{ICEBERG_VERSION},org.apache.spark:spark-avro_2.12:3.5.0') \
    .config('spark.sql.extensions', 'org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions') \
    .config(f'spark.sql.catalog.{CATALOG_NAME}', 'org.apache.iceberg.spark.SparkCatalog') \
    .config(f'spark.sql.catalog.{CATALOG_NAME}.type', 'hadoop') \
    .config(f'spark.sql.catalog.{CATALOG_NAME}.warehouse', DW_PATH) \
    .config('spark.sql.autoBroadcastJoinThreshold', '-1') \
    .enableHiveSupport() \
    .getOrCreate()

Create mock data for customers and orders tables using Faker (code omitted for brevity).

Write data to Iceberg tables :

customer_df.writeTo("local.db.customers") \
    .tableProperty("format-version", "2") \
    .partitionedBy("region") \
    .create()

order_df.writeTo("local.db.orders") \
    .tableProperty("format-version", "2") \
    .partitionedBy("region") \
    .create()

Join without SPJ (shuffle occurs) :

CUSTOMERS_TABLE = 'local.db.customers'
ORDERS_TABLE = 'local.db.orders'

cust_df = spark.table(CUSTOMERS_TABLE)
order_df = spark.table(ORDERS_TABLE)
joined_df = cust_df.join(order_df, on='region', how='left')
joined_df.explain("FORMATTED")

The physical plan contains Exchange nodes, indicating shuffle.

Enable SPJ configurations and repeat the join:

spark.conf.set('spark.sql.sources.v2.bucketing.enabled', 'true')
spark.conf.set('spark.sql.sources.v2.bucketing.pushPartValues.enabled', 'true')
spark.conf.set('spark.sql.iceberg.planning.preserve-data-grouping', 'true')
spark.conf.set('spark.sql.requireAllClusterKeysForCoPartition', 'false')
spark.conf.set('spark.sql.sources.v2.bucketing.partiallyClusteredDistribution.enabled', 'true')

joined_df = cust_df.join(order_df, on='region', how='left')
joined_df.explain("FORMATTED")

The new plan no longer shows Exchange nodes, confirming that shuffle has been eliminated.

Scenario 1 – Join key equals partition key : Minimal configuration works; no shuffle.

Scenario 2 – Mismatched partitions : After deleting a partition from the orders table, shuffle reappears. Enabling spark.sql.sources.v2.bucketing.pushPartValues.enabled creates empty partitions on the missing side, removing the shuffle.

Scenario 3 – Join key is a superset of partition keys :

Set spark.sql.requireAllClusterKeysForCoPartition = false to relax the requirement that all partition columns appear in the join.

The plan shows no shuffle after the setting.

Scenario 3.2 – Join key is a subset of partition keys (Spark 4.0) :

Enable

spark.sql.sources.v2.bucketing.allowJoinKeysSubsetOfPartitionKeys.enabled

to allow joins that do not contain all partition columns.

Example Scala code demonstrates the configuration and shows a shuffle‑free plan.

Scenario 4 – Data skew in partitions :

When a partition (e.g., region='East') is heavily skewed, enable

spark.sql.sources.v2.bucketing.partiallyClusteredDistribution.enabled

to split the skewed partition and replicate the counterpart, avoiding shuffle.

References :

Storage Partitioned Join design document.

Apache Spark pull requests for SPJ and partially clustered distribution.

Spark 4.0 preview documentation on join optimizations.

Additional blog post links.

Original Source

Signed-in readers can open the original source through BestHub's protected redirect.

Sign in to view source
Republication Notice

This article has been distilled and summarized from source material, then republished for learning and reference. If you believe it infringes your rights, please contactadmin@besthub.devand we will review it promptly.

SQLData SkewSparkShuffle OptimizationBucketingStorage Partitioned Join
Big Data Technology & Architecture
Written by

Big Data Technology & Architecture

Wang Zhiwu, a big data expert, dedicated to sharing big data technology.

0 followers
Reader feedback

How this landed with the community

Sign in to like

Rate this article

Was this worth your time?

Sign in to rate
Discussion

0 Comments

Thoughtful readers leave field notes, pushback, and hard-won operational detail here.