Big Data 14 min read

Understanding Spark Streaming Backpressure Mechanism and Source Code Analysis

This article explains why Spark Streaming introduced backpressure, how the dynamic rate‑control mechanism works, and provides a detailed walkthrough of the relevant source code, including the RateController class, its registration, and the execution flow that adjusts ingestion rates to match processing capacity.

Big Data Technology & Architecture
Big Data Technology & Architecture
Big Data Technology & Architecture
Understanding Spark Streaming Backpressure Mechanism and Source Code Analysis

1. Why Introduce Backpressure

By default Spark Streaming receives data via Receivers at the producer's rate, which can cause the batch processing time to exceed the configured batch interval, leading to data accumulation in memory, possible executor OOM, and increased latency when data spills to disk. Prior to Spark 1.5, users could limit the receive rate with the static parameter spark.streaming.receiver.maxRate, but this often reduced resource utilization when the cluster could handle higher throughput.

2. Backpressure

Starting with Spark 1.5, Spark Streaming adds a backpressure mechanism that dynamically adjusts the Receiver's ingestion rate based on feedback from the JobScheduler. The feature is toggled with spark.streaming.backpressure.enabled (default false).

3. Backpressure Source Code Analysis

3.1 RateController Class Hierarchy

RateController extends StreamingListener to process BatchCompleted events. Core implementation:

/**
 * A StreamingListener that receives batch completion updates, and maintains
 * an estimate of the speed at which this stream should ingest messages,
 * given an estimate computation from a `RateEstimator`
 */
private[streaming] abstract class RateController(val streamUID: Int, rateEstimator: RateEstimator)
    extends StreamingListener with Serializable {

  init()

  protected def publish(rate: Long): Unit

  @transient
  implicit private var executionContext: ExecutionContext = _

  @transient
  private var rateLimit: AtomicLong = _

  /**
   * An initialization method called both from the constructor and Serialization code.
   */
  private def init() {
    executionContext = ExecutionContext.fromExecutorService(
      ThreadUtils.newDaemonSingleThreadExecutor("stream-rate-update"))
    rateLimit = new AtomicLong(-1L)
  }

  private def readObject(ois: ObjectInputStream): Unit = Utils.tryOrIOException {
    ois.defaultReadObject()
    init()
  }

  /**
   * Compute the new rate limit and publish it asynchronously.
   */
  private def computeAndPublish(time: Long, elems: Long, workDelay: Long, waitDelay: Long): Unit =
    Future[Unit] {
      val newRate = rateEstimator.compute(time, elems, workDelay, waitDelay)
      newRate.foreach { s =>
        rateLimit.set(s.toLong)
        publish(getLatestRate())
      }
    }

  def getLatestRate(): Long = rateLimit.get()

  override def onBatchCompleted(batchCompleted: StreamingListenerBatchCompleted) {
    val elements = batchCompleted.batchInfo.streamIdToInputInfo

    for {
      processingEnd <- batchCompleted.batchInfo.processingEndTime
      workDelay    <- batchCompleted.batchInfo.processingDelay
      waitDelay    <- batchCompleted.batchInfo.schedulingDelay
      elems        <- elements.get(streamUID).map(_.numRecords)
    } computeAndPublish(processingEnd, elems, workDelay, waitDelay)
  }
}

3.2 Registration of RateController

When the JobScheduler starts, it scans all InputDStreams in the DStreamGraph, extracts any attached RateController, and registers them with the ListenerBus:

def start(): Unit = synchronized {
    if (eventLoop != null) return // scheduler has already been started

    logDebug("Starting JobScheduler")
    eventLoop = new EventLoop[JobSchedulerEvent]("JobScheduler") {
      override protected def onReceive(event: JobSchedulerEvent): Unit = processEvent(event)
      override protected def onError(e: Throwable): Unit = reportError("Error in job scheduler", e)
    }
    eventLoop.start()

    // attach rate controllers of input streams to receive batch completion updates
    for {
      inputDStream <- ssc.graph.getInputStreams
      rateController <- inputDStream.rateController
    } ssc.addStreamingListener(rateController)

    listenerBus.start()
    // ... other initialization omitted for brevity ...
    logInfo("Started JobScheduler")
  }

3.3 Execution Flow of Backpressure

The backpressure process consists of two stages: triggering a BatchCompleted event and handling that event.

3.3.1 Triggering BatchCompleted

When a batch finishes, the JobGenerator creates a JobSet for the interval, submits it via JobScheduler.submitJobSet, and each Job is executed by a JobHandler. Upon completion, a JobCompleted event is posted, which eventually leads to a StreamingListenerBatchCompleted event.

private def generateJobs(time: Time) {
    // ... omitted ...
    jobScheduler.submitJobSet(JobSet(time, jobs, streamIdToInputInfos))
    // ... omitted ...
}

def submitJobSet(jobSet: JobSet) {
    if (jobSet.jobs.isEmpty) {
      logInfo("No jobs added for time " + jobSet.time)
    } else {
      listenerBus.post(StreamingListenerBatchSubmitted(jobSet.toBatchInfo))
      jobSets.put(jobSet.time, jobSet)
      jobSet.jobs.foreach(job => jobExecutor.execute(new JobHandler(job)))
      logInfo("Added jobs for time " + jobSet.time)
    }
  }

3.3.2 Handling BatchCompleted

The ListenerBus delivers the BatchCompleted event to the registered RateController, which extracts processing and scheduling delays, computes a new optimal rate using a RateEstimator (currently a PID controller), and publishes the rate to the ReceiverTracker.

override def onBatchCompleted(batchCompleted: StreamingListenerBatchCompleted) {
    val elements = batchCompleted.batchInfo.streamIdToInputInfo
    for {
      processingEnd <- batchCompleted.batchInfo.processingEndTime
      workDelay    <- batchCompleted.batchInfo.processingDelay
      waitDelay    <- batchCompleted.batchInfo.schedulingDelay
      elems        <- elements.get(streamUID).map(_.numRecords)
    } computeAndPublish(processingEnd, elems, workDelay, waitDelay)
  }

Finally, the new rate is sent to the ReceiverTracker, which forwards it to the ReceiverSupervisorImpl; the underlying BlockGenerator (a RateLimiter) updates its token bucket via setRate, completing the backpressure adjustment.

def sendRateUpdate(streamUID: Int, newRate: Long): Unit = synchronized {
    if (isTrackerStarted) {
      endpoint.send(UpdateReceiverRateLimit(streamUID, newRate))
    }
  }

private[receiver] def updateRate(newRate: Long): Unit =
    if (newRate > 0) {
      if (maxRateLimit > 0) {
        rateLimiter.setRate(newRate.min(maxRateLimit))
      } else {
        rateLimiter.setRate(newRate)
      }
    }

Through this chain, Spark Streaming dynamically aligns data ingestion with processing capacity, preventing memory pressure and improving overall stability.

Original Source

Signed-in readers can open the original source through BestHub's protected redirect.

Sign in to view source
Republication Notice

This article has been distilled and summarized from source material, then republished for learning and reference. If you believe it infringes your rights, please contactadmin@besthub.devand we will review it promptly.

StreamingSparkRateLimiterbackpressureRateController
Big Data Technology & Architecture
Written by

Big Data Technology & Architecture

Wang Zhiwu, a big data expert, dedicated to sharing big data technology.

0 followers
Reader feedback

How this landed with the community

Sign in to like

Rate this article

Was this worth your time?

Sign in to rate
Discussion

0 Comments

Thoughtful readers leave field notes, pushback, and hard-won operational detail here.