Why Spark Shuffle Often Runs Out of Memory and How to Fix It
This article examines Spark's memory management and the shuffle process, identifies the components that consume the most memory during shuffle write and read, analyzes common OOM scenarios such as task concurrency and data skew, and offers configuration tips to prevent out‑of‑memory failures.
Spark Memory Management and Consumption Model
Each Spark task runs on an executor with a TaskMemoryManager that forwards allocation requests to the executor‑wide MemoryManager via a MemoryAllocator. Memory consumers implement the abstract class MemoryConsumer (e.g., ExternalAppendOnlyMap, ExternalSorter) and provide acquire, release and spill methods. When several tasks execute concurrently, the memory share for each task lies in the interval [1/(2*n), 1/n] where n is the number of running tasks, so per‑task budget shrinks with higher concurrency. If a MemoryConsumer cannot obtain enough heap memory, it triggers a spill to disk; however, Spark estimates heap usage by sampling, which can be inaccurate and delay spilling, potentially causing OOM.
Spark Shuffle Process
Shuffle Write Phase
BypassMergeSortShuffleWriter
This writer creates a temporary file for each partition, writes records directly, and finally merges all temporary files into a single data file with an accompanying index file. Because it does not perform sorting or combine operations, its memory footprint is minimal.
SortShuffleWriter
SortShuffleWriter is the default implementation. It delegates to ExternalSorter, which implements MemoryConsumer. Incoming records are inserted into either PartitionedAppendOnlyMap (a hash map) or PartitionedPairBuffer (an array). When memory allocation fails or when the number of elements exceeds spark.shuffle.spill.numElementsForceSpillThreshold (default Long.MAX_VALUE, effectively disabled), the data is spilled to disk. The map/array structures are the primary memory consumers.
The sorting algorithm is TimSort; its worst‑case auxiliary space can be up to n/2 elements, where n is the number of items to sort. During the merge of spilled files Spark performs an external merge sort, which adds little extra memory. Only in the rare case of many hash collisions producing distinct keys with identical hash values is additional space required.
Write‑side buffering is controlled by spark.shuffle.file.buffer and spark.shuffle.spill.batchSize, and its overhead is modest.
UnsafeShuffleWriter
UnsafeShuffleWriter optimizes SortShuffleWriter by storing serialized records in off‑heap pages, dramatically reducing the size of in‑memory Java objects. It adds a LongArray to keep partition offsets and page pointers, which introduces a small extra memory overhead compared with SortShuffleWriter.
Shuffle Read Phase
During shuffle read, each reducer fetches partitioned data from remote or local block managers. Remote fetches are limited by spark.reducer.maxSizeInFlight (default 48 MB), spark.reducer.maxReqsInFlight, spark.reducer.maxBlocksInFlightPerAddress, and spark.maxRemoteBlockSizeFetchToMem. If a single map block is very large (data skew), the default behavior loads the whole block into reducer memory, which can cause OOM. Setting spark.maxRemoteBlockSizeFetchToMem to a reasonable limit forces oversized blocks to be spilled to disk.
After fetching, Spark may verify data integrity (controlled by spark.shuffle.detectCorrupt); verification is performed in‑memory and therefore also consumes memory.
Aggregation and sorting on the read side use ExternalAppendOnlyMap similarly to the write side, with spilling to mitigate memory pressure.
Analysis of Spark Shuffle OOM Causes
Executor‑level task concurrency reduces the memory available to each task because the MemoryManager divides the executor heap among concurrent tasks.
Both write and read phases allocate memory for maps/arrays, sorting, and merging. Although Spark can spill to disk, inaccurate JVM heap sampling may delay spills, leading to out‑of‑memory errors.
Data skew on the reducer side can cause a single map block to be fetched entirely into memory. Configuring spark.maxRemoteBlockSizeFetchToMem to a lower threshold forces large blocks to spill, preventing OOM.
Integrity verification during read ( spark.shuffle.detectCorrupt) does not spill to disk and can also trigger OOM in extreme cases.
Conclusion
The detailed walkthrough shows that memory pressure in Spark shuffle originates from task concurrency, the in‑memory structures used by SortShuffleWriter and UnsafeShuffleWriter, and the handling of large remote blocks and integrity checks during shuffle read. Adjusting configuration parameters such as spark.shuffle.file.buffer, spark.shuffle.spill.numElementsForceSpillThreshold, spark.maxRemoteBlockSizeFetchToMem, and spark.shuffle.detectCorrupt, as well as limiting the number of concurrent tasks per executor, can substantially reduce the risk of OOM in production workloads.
Signed-in readers can open the original source through BestHub's protected redirect.
This article has been distilled and summarized from source material, then republished for learning and reference. If you believe it infringes your rights, please contactand we will review it promptly.
Youzan Coder
Official Youzan tech channel, delivering technical insights and occasional daily updates from the Youzan tech team.
How this landed with the community
Was this worth your time?
0 Comments
Thoughtful readers leave field notes, pushback, and hard-won operational detail here.
