Optimizing Spark Shuffle: Can Fetch, Efficient Fetch, and Reliable Fetch
This article analyzes three Spark shuffle bottlenecks—oversized partitions that exceed Netty's 2 GB limit, excessive retry latency caused by dead executors, and insufficient data‑corruption checks—and presents concrete configuration changes, new block identifiers, executor‑liveness checks, and CRC‑32 verification to improve fetchability, efficiency, and reliability at scale.
Background Spark is a leading big‑data processing engine and its shuffle phase is often a performance bottleneck. This article shares three optimization techniques applied to Spark shuffle at NetEase over the past year.
Can Fetch An issue where a map task produced a partition larger than 2 GB caused Netty to reject the fetch due to its 2 GB chunk limit. The problem was reproduced from a NetEase Cloud Music job that failed intermittently. The solution includes lowering spark.maxRemoteBlockSizeFetchToMem below 2 GB, addressing data skew, or handling it at the platform level.
1 WARN [shuffle-client-6-2:TransportChannelHandler@78] - Exception in connection from hostName/hostIp:7337
2 java.lang.IllegalArgumentException: Too large frame: 2991947178
3 at org.spark_project.guava.base.Preconditions.checkArgument(Preconditions.java:119)
4 at org.apache.spark.network.util.TransportFrameDecoder.decodeNext(TransportFrameDecoder.java:133)
5 at org.apache.spark.network.util.TransportFrameDecoder.channelRead(TransportFrameDecoder.java:81)
6 at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)Fetch Efficiently Another case from a Kaola job showed frequent shuffle‑client time‑outs and retries, inflating task latency. The root cause was executor dynamic allocation causing dead executors to be retried. The fix introduces an IsExecutorAlive message and driver‑side check to avoid meaningless retries.
try {
new OneForOneBlockFetcher(client, appId, execId, blockIds, listener,
transportConf, tempFileManager).start()
} catch {
case e: IOException =>
Try {
driverEndPointRef.askSync[Boolean](IsExecutorAlive(execId))
} match {
case Success(v) if v == false =>
throw new ExecutorDeadException(s"The relative remote executor(Id: $execId) which maintains the block data to fetch is dead.")
case _ => throw e
}
}Reliable Fetch Data corruption during shuffle read was observed, with errors such as java.io.IOException: FAILED_TO_UNCOMPRESS. Existing Spark checksum mechanisms only validated compressed data and limited size. The proposed improvement adds CRC‑32 checksums for every partition, stores them in the shuffle index file, and verifies them on the fetch side, covering all data types and sizes.
Implementation Highlights New configuration parameters ( SHUFFLE_FETCH_THRESHOLD, spark.shuffle.fetch.split) control split fetching; a new block identifier ShuffleBlockSegmentId enables segmented fetches; priority queues preserve block order; and driver‑side executor liveness checks prevent futile retries. Performance tests with TPC‑DS on 1 TB and 10 TB datasets show negligible overhead and slight gains over previous implementations.
References to related JIRA tickets and pull requests (e.g., SPARK‑27665, SPARK‑27637, SPARK‑27562) are provided for further details.
Signed-in readers can open the original source through BestHub's protected redirect.
This article has been distilled and summarized from source material, then republished for learning and reference. If you believe it infringes your rights, please contactand we will review it promptly.
Big Data Technology & Architecture
Wang Zhiwu, a big data expert, dedicated to sharing big data technology.
How this landed with the community
Was this worth your time?
0 Comments
Thoughtful readers leave field notes, pushback, and hard-won operational detail here.
