How JD’s Custom Spark Engine Tackles Data Skew for Massive Offline Jobs
This article explains JD’s self‑developed data‑skew mitigation solution for Spark, detailing the problem of uneven key distribution, the limitations of the open‑source AQE implementation, and JD’s OptimizeSkewedJoinV2 algorithm that dramatically reduces stage latency in large‑scale join workloads.
What Is Data Skew?
Data skew occurs when the partitioning of a distributed dataset is highly imbalanced, causing a small number of executors to process a disproportionate amount of data. This leads to long‑running tasks, possible out‑of‑memory failures, and overall slowdown of the job. Skew is most common in shuffle‑intensive operators such as Join , Aggregate and Window , where a “hot key” concentrates many rows onto a single partition.
Adaptive Query Execution (AQE) and Community Implementation
Since Spark 3.0, Adaptive Query Execution (AQE) can adjust later stages based on statistics collected from completed stages. The built‑in rule OptimizeSkewedJoin implements a split‑and‑duplicate strategy for skewed joins:
Identify the join type (Inner, Left, Right, LeftSemi, etc.).
Collect partition size statistics; a partition whose size exceeds 5 × medianSize is considered skewed.
Split the skewed partition (e.g., A0 → A0‑0, A0‑1) and duplicate the matching partition on the other side, turning one heavy task into several lighter tasks.
Limitations of the Community Skew Handler
The open‑source implementation only supports two‑table joins that match strict patterns (SortMergeJoin or ShuffledHashJoin) and cannot handle stages that contain aggregates, maps, or other operators. Additional constraints include:
Join‑type constraints: Certain join types (e.g., RightOuter) cannot be split without breaking correctness.
Processing overhead: Non‑skewed partitions may be read multiple times because of duplication.
Combinatorial explosion: If both sides of a join are skewed and are split into M and N parts, the resulting task count becomes M × N, which can degrade performance when the product is large.
JD’s OptimizeSkewedJoinV2 Algorithm
To overcome the above restrictions, JD designed a more generic optimizer called OptimizeSkewedJoinV2 . It does not rely on rigid pattern matching; instead it analyses the logical plan to locate splittable leaf shuffles and applies a controlled split‑duplicate process.
Algorithm Workflow
Operator whitelist check: All descendant nodes of the candidate stage must belong to a whitelist (e.g., Sort, Project, BroadcastHashJoin, non‑skewed SortMergeJoin/ShuffledHashJoin, Aggregate, Window, Filter, etc.). If any node falls outside the whitelist, the stage is excluded.
Semantic leaf‑node search: Traverse the plan according to join‑type rules and stop conditions:
For Inner/Cross SortMergeJoin or ShuffledHashJoin, explore both children.
For LeftOuter, LeftSemi, LeftAnti, explore only the left child.
For RightOuter, explore only the right child.
For FullOuter, stop the current path.
Encountering Aggregate or Window stops the path.
All other nodes are traversed recursively.
Every Shuffle node reached at the end of a path is marked as splittable .
Skew detection and split: Using partition size statistics, determine whether each splittable Shuffle is skewed (default threshold: size > 5 × median). If skewed, split the heavy partitions into multiple sub‑partitions and duplicate the corresponding partitions on the opposite side.
Combinatorial‑explosion guard: Compute the total number of generated tasks. If it exceeds a configurable threshold (default 1000), compress the split factor (e.g., reduce a 100 × 100 × 100 split to 10 × 10 × 10) to keep the task count manageable.
Task generation and marking: Enumerate the Cartesian product of split dimensions to create the final task set. All SortMergeJoin, ShuffledHashJoin, Aggregate and Window nodes involved are marked as skewed so that subsequent optimizer passes do not introduce additional shuffles.
Online Case Study
A production workload consisting of a four‑table join (two SortMergeJoins and one BroadcastHashJoin) exhibited severe skew on the leftmost shuffle. Before optimization, Stage 20 contained roughly 3,000 tasks and ran for 1–2 hours. After applying OptimizeSkewedJoinV2 , 3,051 tasks were generated and the stage completed in about 11 minutes, demonstrating a >10× speedup.
Open Issues and Future Work
Support for leaf nodes that are BucketJoin or generic DataScan types, which are currently excluded.
Relaxation of join‑type constraints to handle left‑outer skew patterns such as A LEFT JOIN B (skewed) LEFT JOIN C.
Extension of the algorithm to address skew introduced by Window and Aggregate operators, which are presently only marked to avoid further shuffles.
Continuous enrichment of the whitelist and pattern library based on observed production workloads.
Signed-in readers can open the original source through BestHub's protected redirect.
This article has been distilled and summarized from source material, then republished for learning and reference. If you believe it infringes your rights, please contactand we will review it promptly.
JD Retail Technology
Official platform of JD Retail Technology, delivering insightful R&D news and a deep look into the lives and work of technologists.
How this landed with the community
Was this worth your time?
0 Comments
Thoughtful readers leave field notes, pushback, and hard-won operational detail here.
