How to Detect and Resolve Data Skew in Spark and Hadoop
This article explains what data skew is in distributed big‑data systems like Spark and Hadoop, why it hurts performance, how to spot it using the Web UI or key statistics, and presents eight practical mitigation techniques ranging from filtering and shuffle parallelism to custom partitioners and broadcast joins.
What Is Data Skew?
In distributed systems such as Spark or Hadoop, the ideal situation is that adding more nodes reduces total execution time linearly. Data skew occurs when the workload is unevenly distributed, causing a few tasks to process a disproportionate amount of data while most tasks finish quickly.
Harms of Data Skew
Overall job duration is dominated by the slowest task, wasting parallelism.
Excessive data on a single task can trigger out‑of‑memory (OOM) errors, leading to job failure.
Typical Symptoms
Most tasks finish quickly, but a handful run extremely slowly, preventing the stage from completing.
Spark jobs that previously ran fine suddenly throw OOM exceptions.
Root Causes
During shuffle, all records with the same key are sent to a single reducer. If a key has an exceptionally large number of records (e.g., 1 million vs. typical 10), the reducer handling that key becomes a bottleneck.
Detection & Diagnosis
1. Spark Web UI – Inspect the "Shuffle Read Size/Records" column for each task to see uneven data distribution.
2. Key Statistics – Sample the key column and count occurrences to identify heavy keys:
df.select("key").sample(false, 0.1) // data sampling
.map(k => (k, 1))
.reduceByKey(_ + _)
.map{ case (k, cnt) => (cnt, k) }
.sortByKey(false)
.take(10) // top 10 heavy keysMitigation Strategies
Basic Idea
Business Logic : Separate skewed keys (e.g., specific cities) and compute them independently before merging results.
Implementation : Replace expensive groupByKey with reduceByKey, or pre‑aggregate in Hive.
Parameter Tuning : Adjust Spark/Hadoop configuration to increase parallelism.
1. Filter Abnormal Data
Identify skewed keys and simply discard or clean them if they are erroneous.
2. Increase Shuffle Parallelism
Set a higher number of shuffle partitions to spread keys across more tasks.
# Spark RDD API
rdd.repartition(200) // or set spark.sql.shuffle.partitions=200
# Spark SQL
SET spark.sql.shuffle.partitions=200;3. Custom Partitioner
Implement a custom Partitioner to control key‑to‑task mapping:
.groupByKey(new Partitioner() {
@Override
public int numPartitions() { return 12; }
@Override
public int getPartition(Object key) {
int id = Integer.parseInt(key.toString());
if (id >= 9500000 && id <= 9500084 && ((id - 9500000) % 12) == 0) {
return (id - 9500000) / 12;
} else {
return id % 12;
}
}
})4. Broadcast (Map‑Side) Join
When one side of the join is small enough to fit in executor memory, broadcast it to avoid shuffle entirely:
from pyspark.sql.functions import broadcast
result = broadcast(A).join(B, ["join_col"], "left")Set a large broadcast threshold if needed:
SET spark.sql.autoBroadcastJoinThreshold=104857600;5. Split‑Join‑Union
Separate skewed keys into a dedicated RDD, add a random prefix to disperse them, join with the other side (also prefixed), then union the skewed join result with the normal join result.
6. Salted Large‑Table Join
Add a random prefix to every record of the large table, then perform a Cartesian‑like expansion on the small table before joining, effectively “salting” the join.
7. Map‑Side Local Aggregation
Use a combiner (e.g., reduceByKey) to aggregate locally before shuffle, reducing data volume.
8. Two‑Phase Salted Aggregation
First, prepend a random bucket number to each key and aggregate locally; then remove the bucket prefix and perform a second global aggregation.
def antiSkew(): RDD[(String, Int)] = {
val SPLIT = "-"
val prefix = new Random().nextInt(10)
pairs.map(t => (prefix + SPLIT + t._1, 1))
.reduceByKey(_ + _)
.map(t => (t._1.split(SPLIT)(1), t._2))
.reduceByKey(_ + _)
}Data Skew in Hadoop MapReduce
Skew often appears in the reduce phase, causing a few reducers to stall at 99.99% while others finish, leading to OOM and task kills.
Typical remedies include:
Map‑side join.
Convert count(distinct) to groupBy then count.
Enable hive.map.aggr=true and hive.groupby.skewindata=true for automatic load balancing.
Compress map‑side output and intermediate data.
These techniques collectively help balance workload, reduce shuffle volume, and mitigate data skew across Spark and Hadoop ecosystems.
Signed-in readers can open the original source through BestHub's protected redirect.
This article has been distilled and summarized from source material, then republished for learning and reference. If you believe it infringes your rights, please contactand we will review it promptly.
dbaplus Community
Enterprise-level professional community for Database, BigData, and AIOps. Daily original articles, weekly online tech talks, monthly offline salons, and quarterly XCOPS&DAMS conferences—delivered by industry experts.
How this landed with the community
Was this worth your time?
0 Comments
Thoughtful readers leave field notes, pushback, and hard-won operational detail here.
