Big Data 13 min read

Mastering Spark Streaming Rate Control: A Deep Dive into Backpressure

This article explains Spark Streaming's rate control mechanisms, covering static limits, the dynamic back‑pressure feature introduced in Spark 1.5, the PID‑based estimator, RPC communication, and how Guava's token‑bucket RateLimiter enforces the calculated thresholds to ensure stability and optimal throughput.

Architect's Must-Have
Architect's Must-Have
Architect's Must-Have
Mastering Spark Streaming Rate Control: A Deep Dive into Backpressure

Introduction to Rate Control

In streaming processing systems, rate control (rate limit) is crucial for stability and maximizing throughput. It prevents spikes that can throttle or crash the system.

Spark Streaming is a micro‑batch framework; the ideal is that batch processing time is less than the batch interval. If processing time exceeds the interval, data accumulates and may cause executor OOM; if much smaller, resources are idle.

Spark Streaming Rate Control Basic Settings

Spark Streaming receives data via a Receiver component and stores it as blocks. To limit the Receiver’s ingest rate, set spark.streaming.receiver.maxRate (records/second). For Direct Kafka streams, set spark.streaming.kafka.maxRatePerPartition.

These static settings are simple but require manual tuning and restart when workload changes.

Since Spark 1.5, a dynamic back‑pressure mechanism can automatically adjust the rate. Enable it by setting spark.streaming.backpressure.enabled to true.

Spark Streaming Backpressure Mechanism Details

Dynamic Rate Controller

The abstract class org.apache.spark.streaming.scheduler.RateController implements StreamingListener and updates the rate based on batch metrics.

private[streaming] abstract class RateController(val streamUID: Int, rateEstimator: RateEstimator) extends StreamingListener with Serializable { ... }

It listens for StreamingListenerBatchCompleted events, extracts processing end time, processing delay, scheduling delay, and number of records, then passes them to a RateEstimator to compute a new rate.

Listen to batch completion events.

Obtain processingEndTime, processingDelay, schedulingDelay, and numRecords.

Feed these four parameters to the estimator and publish the new threshold.

The concrete ReceiverRateController publishes the rate to ReceiverTracker:

private[streaming] class ReceiverRateController(id: Int, estimator: RateEstimator) extends RateController(id, estimator) {
  override def publish(rate: Long): Unit = ssc.scheduler.receiverTracker.sendRateUpdate(id, rate)
}

PID‑based Rate Estimator

The only built‑in estimator is PIDRateEstimator, created when spark.streaming.backpressure.rateEstimator is set to “pid”. It implements a classic proportional‑integral‑derivative controller.

object RateEstimator {
  def create(conf: SparkConf, batchInterval: Duration): RateEstimator = {
    conf.get("spark.streaming.backpressure.rateEstimator", "pid") match {
      case "pid" => new PIDRateEstimator(...parameters...)
      case estimator => throw new IllegalArgumentException(s"Unknown rate estimator: $estimator")
    }
  }
}

The PID controller uses configuration keys spark.streaming.backpressure.pid.proportional, spark.streaming.backpressure.pid.integral, spark.streaming.backpressure.pid.derived, and spark.streaming.backpressure.pid.minRate (default 100 records/s).

private[streaming] class PIDRateEstimator(
    batchIntervalMillis: Long,
    proportional: Double,
    integral: Double,
    derivative: Double,
    minRate: Double) extends RateEstimator with Logging { ... }

It computes processingRate, error, historicalError, and derivative of error to produce a new rate, respecting the minimum rate.

Publishing the Rate via RPC

The new threshold is sent to ReceiverTracker, which forwards it to ReceiverTrackerEndpoint and finally to each BlockGenerator:

def sendRateUpdate(streamUID: Int, newRate: Long): Unit = synchronized {
  if (isTrackerStarted) {
    endpoint.send(UpdateReceiverRateLimit(streamUID, newRate))
  }
}

The BlockGenerator (a subclass of RateLimiter) applies the rate using Guava’s RateLimiter.acquire() method, which implements a token‑bucket algorithm.

Token Bucket Implementation with Guava

Before adding data to the buffer, waitToPush() calls RateLimiter.acquire(), blocking until a permit is available. The limiter’s rate is set by the back‑pressure controller.

@CanIgnoreReturnValue
public double acquire() {
  return acquire(1);
}
...
private[receiver] def updateRate(newRate: Long): Unit = {
  if (newRate > 0) {
    if (maxRateLimit > 0) {
      rateLimiter.setRate(newRate.min(maxRateLimit))
    } else {
      rateLimiter.setRate(newRate)
    }
  }
}

This completes the back‑pressure flow: Spark computes a rate, publishes it via RPC, and the Receiver’s token bucket enforces the limit.

StreamingbigdataSparkbackpressureRateControl
Architect's Must-Have
Written by

Architect's Must-Have

Professional architects sharing high‑quality architecture insights. Covers high‑availability, high‑performance, high‑stability designs, big data, machine learning, Java, system, distributed and AI architectures, plus internet‑driven architectural adjustments and large‑scale practice. Open to idea‑driven, sharing architects for exchange and learning.

0 followers
Reader feedback

How this landed with the community

Sign in to like

Rate this article

Was this worth your time?

Sign in to rate
Discussion

0 Comments

Thoughtful readers leave field notes, pushback, and hard-won operational detail here.