Big Data 11 min read

Dynamic Resource Allocation in Spark Streaming: Problems, Mechanisms, and Practical Guidelines

The article explains Spark's default static resource allocation, analyzes the limitations of its Dynamic Resource Allocation (DRA) for streaming workloads, describes the internal Spark components and code paths involved, and proposes concrete design and configuration recommendations for implementing more responsive executor scaling.

Big Data Technology & Architecture
Big Data Technology & Architecture
Big Data Technology & Architecture
Dynamic Resource Allocation in Spark Streaming: Problems, Mechanisms, and Practical Guidelines

Spark traditionally uses static resource pre‑allocation, which leads to significant waste during low‑traffic periods and can cause resource contention when workloads peak; the article examines why the built‑in Dynamic Resource Allocation (DRA) mechanism is insufficient for Spark Streaming scenarios.

Entry point : When the configuration spark.dynamicAllocation.enabled is set to true, SparkContext creates an ExecutorAllocationManager instance.

_executorAllocationManager = if (dynamicAllocationEnabled) {
    Some(new ExecutorAllocationManager(this, listenerBus, _conf))
} else {
    None
}

The author criticises this direct instantiation for lacking extensibility, noting that many Spark components follow a similar pattern.

Challenges for dynamic adjustment include handling cached RDDs, shuffle data, and keeping the DAG scheduler informed after executors are added or removed. Cached executors are protected by a large idle timeout, which can be overridden via spark.dynamicAllocation.cachedExecutorIdleTimeout.

private val cachedExecutorIdleTimeoutS = conf.getTimeAsSeconds(
    "spark.dynamicAllocation.cachedExecutorIdleTimeout",
    s"${Integer.MAX_VALUE}s"
)

Shuffle cleanup requires configuring yarn.nodemanager.aux-services so that executors do not retain shuffle state.

Trigger conditions :

Add worker when a stage is running and the estimated required executors exceed the current count.

Remove worker when an executor has been idle (no tasks) for a configurable period (default 60 s).

The default idle detection interval is 100 ms, implemented in a periodic scheduler. private val intervalMillis: Long = 100 Container add/remove implementation relies on the ApplicationMaster communicating with YARN via the ExecutorAllocationManager. The manager holds an RPC endpoint ( amEndpoint) to issue container requests and kills. private var amEndpoint: Option[RpcEndpointRef] Executor removal is performed by the KillExecutors case in the AM endpoint:

case KillExecutors(executorIds) =>
    logInfo(s"Driver requested to kill executor(s) ${executorIds.mkString(",")}.")
    Option(allocator) match {
        case Some(a) => executorIds.foreach(a.killExecutor)
        case None    => logWarning("Container allocator is not ready to kill executors yet.")
    }
    context.reply(true)

Scheduling information is gathered through ExecutorAllocationListener (a SparkListener) which tracks task counts per stage, idle executors, and pending/running tasks using mutable maps such as stageIdToNumTasks, stageIdToTaskIndices, and executorIdToTaskIds.

private val stageIdToNumTasks = new mutable.HashMap[Int, Int]
private val stageIdToTaskIndices = new mutable.HashMap[Int, mutable.HashSet[Int]]
private val executorIdToTaskIds = new mutable.HashMap[String, mutable.HashSet[Long]]

The core scheduling loop runs every 100 ms, updating the target number of executors and removing those whose idle timeout has expired.

private def schedule(): Unit = synchronized {
    val now = clock.getTimeMillis
    updateAndSyncNumExecutorsTarget(now)
    removeTimes.retain { case (executorId, expireTime) =>
        val expired = now >= expireTime
        if (expired) {
            initializing = false
            removeExecutor(executorId)
        }
        !expired
    }
}

Executor addition follows a back‑off strategy: the required number of executors is computed as

(runningOrPendingTasks + tasksPerExecutor - 1) / tasksPerExecutor

, then compared with the current target; if more are needed, containers are requested in an exponential‑increase pattern (1, 2, 4, 8 …) respecting sustainedSchedulerBacklogTimeoutS to avoid overly aggressive scaling.

private def maxNumExecutorsNeeded(): Int = {
    val numRunningOrPendingTasks = listener.totalPendingTasks + listener.totalRunningTasks
    (numRunningOrPendingTasks + tasksPerExecutor - 1) / tasksPerExecutor
}

DRA evaluation highlights the importance of setting sensible minExecutors / maxExecutors, limiting executor CPU cores (≤ 3), and configuring parallelism for shuffle‑heavy stages to prevent excessive executor termination.

For Spark Streaming, the author suggests a dedicated scaling mechanism based on the processing time of each micro‑batch: if preProcessingTime > 0.8 × duration scale up to maxExecutors; otherwise compute a reduction factor and gradually release executors over several batches using a new parameter spark.streaming.release.num.duration.

Overall, the article provides a detailed walkthrough of Spark's DRA internals, identifies practical shortcomings for streaming workloads, and offers concrete code‑level and configuration‑level recommendations to build a more responsive dynamic scaling solution.

Big DataStreamingYaRNSparkDynamic Resource AllocationExecutor Management
Big Data Technology & Architecture
Written by

Big Data Technology & Architecture

Wang Zhiwu, a big data expert, dedicated to sharing big data technology.

0 followers
Reader feedback

How this landed with the community

Sign in to like

Rate this article

Was this worth your time?

Sign in to rate
Discussion

0 Comments

Thoughtful readers leave field notes, pushback, and hard-won operational detail here.