Big Data 16 min read

Eliminate Shuffle: Deep Dive into Spark’s Storage Partition Join (SPJ)

This article explains how Spark ≥ 3.3’s Storage Partition Join (SPJ) can avoid costly shuffle operations by using Iceberg tables, outlines the required table properties and Spark configurations, demonstrates the effect with code examples and execution plans, and explores several realistic join scenarios.

Past Memory Big Data
Past Memory Big Data
Past Memory Big Data
Eliminate Shuffle: Deep Dive into Spark’s Storage Partition Join (SPJ)

Requirements for Storage Partitioned Join (SPJ)

Both source and target tables must be Apache Iceberg tables, share at least one identical partition column, the join condition must include the partition column, and the environment must run Spark ≥ 3.3 with Iceberg ≥ 1.2.0.

Configuration

spark.sql.sources.v2.bucketing.enabled = true

spark.sql.sources.v2.bucketing.pushPartValues.enabled = true

spark.sql.iceberg.planning.preserve-data-grouping = true

spark.sql.requireAllClusterKeysForCoPartition = false

spark.sql.sources.v2.bucketing.partiallyClusteredDistribution.enabled = true

Initialize SparkSession

from pyspark.sql import SparkSession, Row

SPARK_VERSION = "3.5"
ICEBERG_VERSION = "1.5.0"
CATALOG_NAME = "local"
DW_PATH = "/path/to/local/warehouse"

spark = SparkSession.builder \
    .master("local[4]") \
    .appName("spj-iceberg") \
    .config("spark.sql.adaptive.enabled", "true") \
    .config('spark.jars.packages', f'org.apache.iceberg:iceberg-spark-runtime-{SPARK_VERSION}_2.12:{ICEBERG_VERSION},org.apache.spark:spark-avro_2.12:3.5.0') \
    .config('spark.sql.extensions', 'org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions') \
    .config(f'spark.sql.catalog.{CATALOG_NAME}', 'org.apache.iceberg.spark.SparkCatalog') \
    .config(f'spark.sql.catalog.{CATALOG_NAME}.type', 'hadoop') \
    .config(f'spark.sql.catalog.{CATALOG_NAME}.warehouse', DW_PATH) \
    .config('spark.sql.autoBroadcastJoinThreshold', '-1') \
    .enableHiveSupport() \
    .getOrCreate()

Prepare Mock Data and Write to Iceberg

Two Iceberg tables ( customers and orders) are created with Faker‑generated data. Both tables are bucket‑partitioned by region.

# Generate customer data
from faker import Faker
import random

fake = Faker()
Faker.seed(42)

def generate_customer_data(num_customers=1000):
    regions = ['North', 'South', 'East', 'West']
    customers = []
    for _ in range(num_customers):
        signup_date = fake.date_time_between(start_date='-3y', end_date='now')
        customers.append(Row(
            customer_id=fake.unique.random_number(digits=6),
            customer_name=fake.name(),
            region=random.choice(regions),
            signup_date=signup_date,
            signup_year=signup_date.year
        ))
    return spark.createDataFrame(customers)

# Generate order data
def generate_order_data(customer_df, num_orders=5000):
    customers = [row.customer_id for row in customer_df.select('customer_id').collect()]
    orders = []
    for _ in range(num_orders):
        order_date = fake.date_time_between(start_date='-3y', end_date='now')
        orders.append(Row(
            order_id=fake.unique.random_number(digits=8),
            customer_id=random.choice(customers),
            order_date=order_date,
            amount=round(random.uniform(10, 1000), 2),
            region=random.choice(['North', 'South', 'East', 'West']),
            order_year=order_date.year
        ))
    return spark.createDataFrame(orders)

print("Generating sample data...")
customer_df = generate_customer_data(1000)
order_df = generate_order_data(customer_df, 5000)
customer_df.show(5, truncate=False)
order_df.show(5, truncate=False)

# Write to Iceberg
customer_df.writeTo("local.db.customers") \
    .tableProperty("format-version", "2") \
    .partitionedBy("region") \
    .create()

order_df.writeTo("local.db.orders") \
    .tableProperty("format-version", "2") \
    .partitionedBy("region") \
    .create()

Join without SPJ (Shuffle Occurs)

CUSTOMERS_TABLE = 'local.db.customers'
ORDERS_TABLE = 'local.db.orders'

cust_df = spark.table(CUSTOMERS_TABLE)
order_df = spark.table(ORDERS_TABLE)

joined_df = cust_df.join(order_df, on='region', how='left')
joined_df.explain("FORMATTED")
joined_df.show(1)

The formatted physical plan contains Exchange nodes, indicating a shuffle stage.

Enable SPJ and Re‑run Join

spark.conf.set('spark.sql.sources.v2.bucketing.enabled', 'true')
spark.conf.set('spark.sql.sources.v2.bucketing.pushPartValues.enabled', 'true')
spark.conf.set('spark.sql.iceberg.planning.preserve-data-grouping', 'true')
spark.conf.set('spark.sql.requireAllClusterKeysForCoPartition', 'false')
spark.conf.set('spark.sql.sources.v2.bucketing.partiallyClusteredDistribution.enabled', 'true')

joined_df = cust_df.join(order_df, on='region', how='left')
joined_df.explain("FORMATTED")

The new plan no longer contains Exchange nodes, confirming that the join avoided shuffle.

Scenario 1 – Join Key Equals Partition Key

When the join column is exactly the partition column, the minimal configuration is sufficient.

spark.conf.set('spark.sql.sources.v2.bucketing.enabled', 'true')
spark.conf.set('spark.sql.iceberg.planning.preserve-data-grouping', 'true')

joined_df = cust_df.join(order_df, on='region', how='left')
joined_df.explain("FORMATTED")

The plan shows no Exchange nodes.

Scenario 2 – Mismatched Partitions

After deleting all rows for the West region from orders, the join re‑introduces a shuffle. Enabling spark.sql.sources.v2.bucketing.pushPartValues.enabled creates empty partitions on the missing side, eliminating the shuffle.

# Delete a partition
spark.sql("DELETE FROM {ORDERS_TABLE} where region='West'")
orders_df.groupBy('region').count().show()

# Join without extra config (shuffle appears)
joined_df = cust_df.join(order_df, on='region', how='left')
joined_df.explain("FORMATTED")

# Enable pushPartValues to avoid shuffle
spark.conf.set('spark.sql.sources.v2.bucketing.pushPartValues.enabled', 'true')
joined_df = cust_df.join(order_df, on='region', how='left')
joined_df.explain("FORMATTED")

Scenario 3.1 – Join Key Is a Superset of Partition Key

When the join includes additional columns (e.g., customer_id) beyond the partition column, Spark normally requires the partition keys to match exactly. Setting spark.sql.requireAllClusterKeysForCoPartition = false relaxes this requirement, and the plan shows no shuffle.

spark.conf.set('spark.sql.requireAllClusterKeysForCoPartition', 'false')

joined_df = cust_df.join(order_df, on=['region', 'customer_id'], how='left')
joined_df.explain("FORMATTED")

Scenario 3.2 – Join Key Is a Subset of Partition Key (Spark 4.0+)

In Spark < 4.0 this case is not supported. Spark 4.0 introduces the config

spark.sql.sources.v2.bucketing.allowJoinKeysSubsetOfPartitionKeys.enabled

. When set to true, Spark can avoid shuffle even if the join does not contain all partition columns.

// Scala example for Spark 4.0 preview
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions.col

object SPJTest {
  def main(args: Array[String]): Unit = {
    val spark = SparkSession.builder().getOrCreate()
    spark.conf.set("spark.sql.sources.v2.bucketing.enabled", "true")
    spark.conf.set("spark.sql.iceberg.planning.preserve-data-grouping", "true")
    spark.conf.set("spark.sql.sources.v2.bucketing.pushPartValues.enabled", "true")
    spark.conf.set("spark.sql.requireAllClusterKeysForCoPartition", "false")
    spark.conf.set("spark.sql.sources.v2.bucketing.allowJoinKeysSubsetOfPartitionKeys.enabled", "true")

    val cust_df = spark.table("local.db.customers_buck")
    val orders_df = spark.table("local.db.orders_buck")

    val joined_df = cust_df.alias("cust").join(
      orders_df.alias("ord"),
      col("cust.region") === col("ord.region"),
      "left"
    )
    println(joined_df.explain("FORMATTED"))
  }
}

Scenario 4 – Skewed Partitions

If a partition (e.g., region=East) contains a disproportionately large amount of data, enabling

spark.sql.sources.v2.bucketing.partiallyClusteredDistribution.enabled

splits the heavy partition into smaller pieces and replicates the matching side, thereby avoiding shuffle.

spark.conf.set('spark.sql.sources.v2.bucketing.partiallyClusteredDistribution.enabled', 'true')
joined_df = cust_df.join(order_df, on='region', how='left')
joined_df.explain("FORMATTED")

References

Storage Partitioned Join Design Doc: https://docs.google.com/document/d/1foTkDSM91VxKgkEcBMsuAvEjNybjja-uHk-r3vtXWFE/edit

Spark PR for SPJ: https://github.com/apache/spark/pull/32875

Spark PR for Partially Clustered Distribution: https://github.com/apache/spark/pull/32875

Spark 4.0.0 preview2 documentation: https://spark.apache.org/docs/4.0.0-preview2/sql-performance-tuning.html#converting-sort-merge-join-to-shuffled-hash-join

Original article (Chinese): https://www.guptaakashdeep.com/storage-partition-join-in-apache-spark-why-how-and-where/

Original Source

Signed-in readers can open the original source through BestHub's protected redirect.

Sign in to view source
Republication Notice

This article has been distilled and summarized from source material, then republished for learning and reference. If you believe it infringes your rights, please contactadmin@besthub.devand we will review it promptly.

Big DataSparkApache IcebergShuffle OptimizationSPJStorage Partition Join
Past Memory Big Data
Written by

Past Memory Big Data

A popular big-data architecture channel with over 100,000 developers. Publishes articles on Spark, Hadoop, Flink, Kafka and more. Visit the Past Memory Big Data blog at https://www.iteblog.com. Search "Past Memory" on Google or Baidu.

0 followers
Reader feedback

How this landed with the community

Sign in to like

Rate this article

Was this worth your time?

Sign in to rate
Discussion

0 Comments

Thoughtful readers leave field notes, pushback, and hard-won operational detail here.