Big Data 19 min read

How to Detect and Resolve Data Skew in Spark and Hadoop

This article explains what data skew is in distributed big‑data systems like Spark and Hadoop, why it hurts performance, how to spot it using the Web UI or key statistics, and presents eight practical mitigation techniques ranging from filtering and shuffle parallelism to custom partitioners and broadcast joins.

dbaplus Community
dbaplus Community
dbaplus Community
How to Detect and Resolve Data Skew in Spark and Hadoop

What Is Data Skew?

In distributed systems such as Spark or Hadoop, the ideal situation is that adding more nodes reduces total execution time linearly. Data skew occurs when the workload is unevenly distributed, causing a few tasks to process a disproportionate amount of data while most tasks finish quickly.

Harms of Data Skew

Overall job duration is dominated by the slowest task, wasting parallelism.

Excessive data on a single task can trigger out‑of‑memory (OOM) errors, leading to job failure.

Typical Symptoms

Most tasks finish quickly, but a handful run extremely slowly, preventing the stage from completing.

Spark jobs that previously ran fine suddenly throw OOM exceptions.

Root Causes

During shuffle, all records with the same key are sent to a single reducer. If a key has an exceptionally large number of records (e.g., 1 million vs. typical 10), the reducer handling that key becomes a bottleneck.

Detection & Diagnosis

1. Spark Web UI – Inspect the "Shuffle Read Size/Records" column for each task to see uneven data distribution.

2. Key Statistics – Sample the key column and count occurrences to identify heavy keys:

df.select("key").sample(false, 0.1) // data sampling
  .map(k => (k, 1))
  .reduceByKey(_ + _)
  .map{ case (k, cnt) => (cnt, k) }
  .sortByKey(false)
  .take(10) // top 10 heavy keys

Mitigation Strategies

Basic Idea

Business Logic : Separate skewed keys (e.g., specific cities) and compute them independently before merging results.

Implementation : Replace expensive groupByKey with reduceByKey, or pre‑aggregate in Hive.

Parameter Tuning : Adjust Spark/Hadoop configuration to increase parallelism.

1. Filter Abnormal Data

Identify skewed keys and simply discard or clean them if they are erroneous.

2. Increase Shuffle Parallelism

Set a higher number of shuffle partitions to spread keys across more tasks.

# Spark RDD API
rdd.repartition(200) // or set spark.sql.shuffle.partitions=200

# Spark SQL
SET spark.sql.shuffle.partitions=200;

3. Custom Partitioner

Implement a custom Partitioner to control key‑to‑task mapping:

.groupByKey(new Partitioner() {
  @Override
  public int numPartitions() { return 12; }
  @Override
  public int getPartition(Object key) {
    int id = Integer.parseInt(key.toString());
    if (id >= 9500000 && id <= 9500084 && ((id - 9500000) % 12) == 0) {
      return (id - 9500000) / 12;
    } else {
      return id % 12;
    }
  }
})

4. Broadcast (Map‑Side) Join

When one side of the join is small enough to fit in executor memory, broadcast it to avoid shuffle entirely:

from pyspark.sql.functions import broadcast
result = broadcast(A).join(B, ["join_col"], "left")

Set a large broadcast threshold if needed:

SET spark.sql.autoBroadcastJoinThreshold=104857600;

5. Split‑Join‑Union

Separate skewed keys into a dedicated RDD, add a random prefix to disperse them, join with the other side (also prefixed), then union the skewed join result with the normal join result.

6. Salted Large‑Table Join

Add a random prefix to every record of the large table, then perform a Cartesian‑like expansion on the small table before joining, effectively “salting” the join.

7. Map‑Side Local Aggregation

Use a combiner (e.g., reduceByKey) to aggregate locally before shuffle, reducing data volume.

8. Two‑Phase Salted Aggregation

First, prepend a random bucket number to each key and aggregate locally; then remove the bucket prefix and perform a second global aggregation.

def antiSkew(): RDD[(String, Int)] = {
  val SPLIT = "-"
  val prefix = new Random().nextInt(10)
  pairs.map(t => (prefix + SPLIT + t._1, 1))
       .reduceByKey(_ + _)
       .map(t => (t._1.split(SPLIT)(1), t._2))
       .reduceByKey(_ + _)
}

Data Skew in Hadoop MapReduce

Skew often appears in the reduce phase, causing a few reducers to stall at 99.99% while others finish, leading to OOM and task kills.

Typical remedies include:

Map‑side join.

Convert count(distinct) to groupBy then count.

Enable hive.map.aggr=true and hive.groupby.skewindata=true for automatic load balancing.

Compress map‑side output and intermediate data.

These techniques collectively help balance workload, reduce shuffle volume, and mitigate data skew across Spark and Hadoop ecosystems.

Original Source

Signed-in readers can open the original source through BestHub's protected redirect.

Sign in to view source
Republication Notice

This article has been distilled and summarized from source material, then republished for learning and reference. If you believe it infringes your rights, please contactadmin@besthub.devand we will review it promptly.

optimizationData SkewSparkHadoopShuffleBroadcast JoinPartitioner
dbaplus Community
Written by

dbaplus Community

Enterprise-level professional community for Database, BigData, and AIOps. Daily original articles, weekly online tech talks, monthly offline salons, and quarterly XCOPS&DAMS conferences—delivered by industry experts.

0 followers
Reader feedback

How this landed with the community

Sign in to like

Rate this article

Was this worth your time?

Sign in to rate
Discussion

0 Comments

Thoughtful readers leave field notes, pushback, and hard-won operational detail here.