Understanding the Backpressure Mechanism in Spark Streaming
This article explains Spark Streaming's backpressure mechanism, detailing how batch intervals can cause data accumulation, the role of Receivers versus DirectKafkaInputDStream, configuration to enable backpressure, and the internal workings of RateController, ReceiverRateController, ReceiverSupervisor, BlockGenerator, and rate calculations for Kafka streams.
Spark Streaming processes continuously arriving data by grouping the data received within a configurable batch interval into a batch, which is then submitted as an RDD job. When the processing time of a batch exceeds the batch interval, data accumulates at the Receiver side; using MEMORY_ONLY storage can cause OOM, while MEMORY_AND_DISK may increase read latency.
Backpressure is a mechanism that dynamically adjusts the data ingestion rate based on the current processing capacity of the system.
During a stream job, one or more Receivers run on Executors to receive data and split the stream into batches. When the data source is Kafka, Spark can use the built‑in DirectKafkaInputDStream (created via KafkaUtils.createDirectStream(...)) which does not involve a Receiver. For InputDStream implementations that have a Receiver, the source RDD contains data received by the Receiver; for DirectKafkaInputDStream, the RDD initially holds only topic and offset information, and the actual data is fetched from Kafka when the corresponding task runs.
To enable backpressure, set the configuration spark.streaming.backpressure.enabled to true.
When a Receiver is present, backpressure works by controlling the Receiver's data ingestion rate. After all jobs in a stream finish, the JobScheduler receives feedback (including start/end times and record counts) and forwards it to the RateController. The RateController computes the appropriate receiving rate and notifies the Receiver through the ReceiverTracker.
The RateController is instantiated in ReceiverInputDStream when backpressure is enabled:
abstract class ReceiverInputDStream {
override protected[streaming] val rateController: Option[RateController] = {
// Create ReceiverRateController if backpressure is enabled
if (RateController.isBackPressureEnabled(ssc.conf)) {
Some(new ReceiverRateController(id, RateEstimator.create(ssc.conf, ssc.graph.batchDuration)))
} else {
None
}
}
...
}The RateController implements StreamingListener and reacts only to StreamingListenerBatchCompleted events. Its onBatchCompleted method extracts processing times, delays, and record counts, then calls computeAndPublish:
override def onBatchCompleted(batchCompleted: StreamingListenerBatchCompleted) {
val elements = batchCompleted.batchInfo.streamIdToInputInfo
for {
processingEnd <- batchCompleted.batchInfo.processingEndTime
workDelay <- batchCompleted.batchInfo.processingDelay
waitDelay <- batchCompleted.batchInfo.schedulingDelay
elems <- elements.get(streamUID).map(_.numRecords)
} computeAndPublish(processingEnd, elems, workDelay, waitDelay)
}The computeAndPublish method uses a RateEstimator to calculate the new rate and then publishes it:
private def computeAndPublish(time: Long, elems: Long, workDelay: Long, waitDelay: Long): Unit = {
Future[Unit] {
// Compute the next receiving rate using the estimator
val newRate = rateEstimator.compute(time, elems, workDelay, waitDelay)
newRate.foreach { s =>
rateLimit.set(s.toLong)
// Publish the calculated rate to the Receiver side
publish(getLatestRate())
}
}
}The publish implementation forwards the new rate to all Receivers via the JobScheduler's receiverTracker:
override def publish(rate: Long): Unit = {
// Notify all Receivers of the new rate limit
ssc.scheduler.receiverTracker.sendRateUpdate(id, rate)
}The ReceiverTracker runs on the JobScheduler and sends a UpdateReceiverRateLimit RPC to each Receiver, which then updates its BlockGenerator:
def sendRateUpdate(streamUID: Int, newRate: Long): Unit = synchronized {
if (isTrackerStarted) {
endpoint.send(UpdateReceiverRateLimit(streamUID, newRate))
}
}On the Receiver side, the ReceiverSupervisorImpl receives the UpdateRateLimit message and forwards it to all registered BlockGenerator instances:
case UpdateRateLimit(eps) =>
logInfo(s"Received a new rate limit: $eps.")
registeredBlockGenerators.asScala.foreach { bg =>
bg.updateRate(eps)
}The BlockGenerator implements a token‑bucket algorithm (via Guava's RateLimiter) to control the ingestion speed. Its waitToPush method acquires a token before allowing a record to be stored, effectively throttling the Receiver when the bucket is empty:
private val maxRateLimit = conf.getLong("spark.streaming.receiver.maxRate", Long.MaxValue)
private lazy val rateLimiter = GuavaRateLimiter.create(getInitialRateLimit().toDouble)
def waitToPush() {
rateLimiter.acquire()
}
private def updateRate(newRate: Long): Unit = {
if (newRate > 0) {
if (maxRateLimit > 0) {
rateLimiter.setRate(newRate.min(maxRateLimit))
} else {
rateLimiter.setRate(newRate)
}
}
}When using DirectKafkaInputDStream, there is no Receiver. Backpressure is achieved by estimating the overall receiving rate R with IDRateEstimator and limiting each partition with the configuration spark.streaming.kafka.maxRatePerPartition. The per‑partition record count for the next batch is calculated as:
(min((lagN1 / totalLag) * R, maxR)) * batchDurationInSecondsDuring job execution, the generated KafkaRDD reads at most the computed number of records from each partition, thereby enforcing the backpressure limits.
Overall, Spark Streaming's backpressure mechanism combines configuration flags, runtime feedback via RateController, RPC communication to Receivers, and token‑bucket rate limiting in BlockGenerator (or partition‑level limits for DirectKafka streams) to adaptively match data ingestion speed with processing capacity.
Signed-in readers can open the original source through BestHub's protected redirect.
This article has been distilled and summarized from source material, then republished for learning and reference. If you believe it infringes your rights, please contactand we will review it promptly.
Big Data Technology & Architecture
Wang Zhiwu, a big data expert, dedicated to sharing big data technology.
How this landed with the community
Was this worth your time?
0 Comments
Thoughtful readers leave field notes, pushback, and hard-won operational detail here.
