Understanding Spark Streaming Backpressure Mechanism and Source Code Analysis
This article explains why Spark Streaming introduced backpressure, how the dynamic rate‑control mechanism works, and provides a detailed walkthrough of the relevant source code, including the RateController class, its registration, and the execution flow that adjusts ingestion rates to match processing capacity.
1. Why Introduce Backpressure
By default Spark Streaming receives data via Receivers at the producer's rate, which can cause the batch processing time to exceed the configured batch interval, leading to data accumulation in memory, possible executor OOM, and increased latency when data spills to disk. Prior to Spark 1.5, users could limit the receive rate with the static parameter spark.streaming.receiver.maxRate, but this often reduced resource utilization when the cluster could handle higher throughput.
2. Backpressure
Starting with Spark 1.5, Spark Streaming adds a backpressure mechanism that dynamically adjusts the Receiver's ingestion rate based on feedback from the JobScheduler. The feature is toggled with spark.streaming.backpressure.enabled (default false).
3. Backpressure Source Code Analysis
3.1 RateController Class Hierarchy
RateController extends StreamingListener to process BatchCompleted events. Core implementation:
/**
* A StreamingListener that receives batch completion updates, and maintains
* an estimate of the speed at which this stream should ingest messages,
* given an estimate computation from a `RateEstimator`
*/
private[streaming] abstract class RateController(val streamUID: Int, rateEstimator: RateEstimator)
extends StreamingListener with Serializable {
init()
protected def publish(rate: Long): Unit
@transient
implicit private var executionContext: ExecutionContext = _
@transient
private var rateLimit: AtomicLong = _
/**
* An initialization method called both from the constructor and Serialization code.
*/
private def init() {
executionContext = ExecutionContext.fromExecutorService(
ThreadUtils.newDaemonSingleThreadExecutor("stream-rate-update"))
rateLimit = new AtomicLong(-1L)
}
private def readObject(ois: ObjectInputStream): Unit = Utils.tryOrIOException {
ois.defaultReadObject()
init()
}
/**
* Compute the new rate limit and publish it asynchronously.
*/
private def computeAndPublish(time: Long, elems: Long, workDelay: Long, waitDelay: Long): Unit =
Future[Unit] {
val newRate = rateEstimator.compute(time, elems, workDelay, waitDelay)
newRate.foreach { s =>
rateLimit.set(s.toLong)
publish(getLatestRate())
}
}
def getLatestRate(): Long = rateLimit.get()
override def onBatchCompleted(batchCompleted: StreamingListenerBatchCompleted) {
val elements = batchCompleted.batchInfo.streamIdToInputInfo
for {
processingEnd <- batchCompleted.batchInfo.processingEndTime
workDelay <- batchCompleted.batchInfo.processingDelay
waitDelay <- batchCompleted.batchInfo.schedulingDelay
elems <- elements.get(streamUID).map(_.numRecords)
} computeAndPublish(processingEnd, elems, workDelay, waitDelay)
}
}3.2 Registration of RateController
When the JobScheduler starts, it scans all InputDStreams in the DStreamGraph, extracts any attached RateController, and registers them with the ListenerBus:
def start(): Unit = synchronized {
if (eventLoop != null) return // scheduler has already been started
logDebug("Starting JobScheduler")
eventLoop = new EventLoop[JobSchedulerEvent]("JobScheduler") {
override protected def onReceive(event: JobSchedulerEvent): Unit = processEvent(event)
override protected def onError(e: Throwable): Unit = reportError("Error in job scheduler", e)
}
eventLoop.start()
// attach rate controllers of input streams to receive batch completion updates
for {
inputDStream <- ssc.graph.getInputStreams
rateController <- inputDStream.rateController
} ssc.addStreamingListener(rateController)
listenerBus.start()
// ... other initialization omitted for brevity ...
logInfo("Started JobScheduler")
}3.3 Execution Flow of Backpressure
The backpressure process consists of two stages: triggering a BatchCompleted event and handling that event.
3.3.1 Triggering BatchCompleted
When a batch finishes, the JobGenerator creates a JobSet for the interval, submits it via JobScheduler.submitJobSet, and each Job is executed by a JobHandler. Upon completion, a JobCompleted event is posted, which eventually leads to a StreamingListenerBatchCompleted event.
private def generateJobs(time: Time) {
// ... omitted ...
jobScheduler.submitJobSet(JobSet(time, jobs, streamIdToInputInfos))
// ... omitted ...
}
def submitJobSet(jobSet: JobSet) {
if (jobSet.jobs.isEmpty) {
logInfo("No jobs added for time " + jobSet.time)
} else {
listenerBus.post(StreamingListenerBatchSubmitted(jobSet.toBatchInfo))
jobSets.put(jobSet.time, jobSet)
jobSet.jobs.foreach(job => jobExecutor.execute(new JobHandler(job)))
logInfo("Added jobs for time " + jobSet.time)
}
}3.3.2 Handling BatchCompleted
The ListenerBus delivers the BatchCompleted event to the registered RateController, which extracts processing and scheduling delays, computes a new optimal rate using a RateEstimator (currently a PID controller), and publishes the rate to the ReceiverTracker.
override def onBatchCompleted(batchCompleted: StreamingListenerBatchCompleted) {
val elements = batchCompleted.batchInfo.streamIdToInputInfo
for {
processingEnd <- batchCompleted.batchInfo.processingEndTime
workDelay <- batchCompleted.batchInfo.processingDelay
waitDelay <- batchCompleted.batchInfo.schedulingDelay
elems <- elements.get(streamUID).map(_.numRecords)
} computeAndPublish(processingEnd, elems, workDelay, waitDelay)
}Finally, the new rate is sent to the ReceiverTracker, which forwards it to the ReceiverSupervisorImpl; the underlying BlockGenerator (a RateLimiter) updates its token bucket via setRate, completing the backpressure adjustment.
def sendRateUpdate(streamUID: Int, newRate: Long): Unit = synchronized {
if (isTrackerStarted) {
endpoint.send(UpdateReceiverRateLimit(streamUID, newRate))
}
}
private[receiver] def updateRate(newRate: Long): Unit =
if (newRate > 0) {
if (maxRateLimit > 0) {
rateLimiter.setRate(newRate.min(maxRateLimit))
} else {
rateLimiter.setRate(newRate)
}
}Through this chain, Spark Streaming dynamically aligns data ingestion with processing capacity, preventing memory pressure and improving overall stability.
Signed-in readers can open the original source through BestHub's protected redirect.
This article has been distilled and summarized from source material, then republished for learning and reference. If you believe it infringes your rights, please contactand we will review it promptly.
Big Data Technology & Architecture
Wang Zhiwu, a big data expert, dedicated to sharing big data technology.
How this landed with the community
Was this worth your time?
0 Comments
Thoughtful readers leave field notes, pushback, and hard-won operational detail here.
