Big Data 15 min read

Runtime Filter Join Optimization in JD Spark Using Bloom Filters

This article details JD Spark's Runtime Filter Join optimization, which leverages Bloom filters to prune large‑table data before shuffle, reducing I/O and execution time across batch and real‑time workloads, and presents architecture, implementation challenges, code examples, and performance gains in both benchmark and production environments.

JD Retail Technology
JD Retail Technology
JD Retail Technology
Runtime Filter Join Optimization in JD Spark Using Bloom Filters

JD Spark, the unified analytics engine used across JD.com’s retail, logistics, and digital businesses, faces growing task volumes and challenges such as stability, efficiency, cost reduction, and meeting diverse business needs.

Most Spark SQL jobs are dominated by joins where a relatively small table is joined with a much larger one, leading to costly SortMergeJoins that shuffle unnecessary data, increasing disk and network I/O.

The Runtime Filter Join mechanism introduces a Bloom‑filter‑based optimization: during join planning, a Bloom filter is built from the small table and inserted as a filter on the large table, eliminating rows that cannot satisfy the join condition before shuffle.

The implementation includes logical‑plan and physical‑plan rules: if a small table can be broadcast, the native BroadcastHashJoin is used; otherwise, for SortMergeJoins, a RuntimeBloomFilter is inserted before the large‑table shuffle, and an AQE‑based ShuffleBloomFilter can be applied after shuffle when needed.

Key engineering challenges addressed are:

Supporting code‑generated Bloom filter operators for high performance.

Handling multiple join keys with a single Bloom filter by hashing keys with XxHash64.

Introducing a timeout fallback to avoid long‑running Bloom filter construction.

Implementing predicate push‑down rules to reuse a single Bloom filter across multiple joins.

Example code demonstrates disabling broadcast joins and executing a multi‑join query that benefits from the Runtime Filter Join.

spark.range(100000).select(col("id").as("a"), col("id").as("b"), col("id").as("c")).write.format(tableFormat).mode(SaveMode.Overwrite).saveAsTable("tb1")
... (additional table creation and query) ...

Performance evaluation shows significant reductions in shuffle volume (1.5%‑73.7%) and execution time (e.g., a task reduced from 4.4 hours to 4 minutes, a 95%+ speedup) in both TPC‑DS 10 TB benchmarks and real JD production cases.

Overall, the Bloom‑filter‑based Runtime Filter Join has been fully deployed (default off) in JD Spark, achieving an average 72% reduction in shuffle data and a 53% performance improvement, with future work focusing on more accurate row‑count estimation, leveraging storage‑level Bloom filters, and extending support to multi‑table joins.

Original Source

Signed-in readers can open the original source through BestHub's protected redirect.

Sign in to view source
Republication Notice

This article has been distilled and summarized from source material, then republished for learning and reference. If you believe it infringes your rights, please contactadmin@besthub.devand we will review it promptly.

bloom-filterSparkRuntime Filter JoinShuffle Reduction
JD Retail Technology
Written by

JD Retail Technology

Official platform of JD Retail Technology, delivering insightful R&D news and a deep look into the lives and work of technologists.

0 followers
Reader feedback

How this landed with the community

Sign in to like

Rate this article

Was this worth your time?

Sign in to rate
Discussion

0 Comments

Thoughtful readers leave field notes, pushback, and hard-won operational detail here.