Big Data 10 min read

Understanding DStream Construction and Execution in Spark Streaming

This article explains how Spark Streaming's DStream abstraction is built from InputDStream through successive transform operators, details the internal ForEachDStream implementation, describes the job generation and scheduling workflow, and outlines how Beike's real‑time platform leverages these mechanisms for large‑scale streaming tasks.

Beike Product & Technology
Beike Product & Technology
Beike Product & Technology
Understanding DStream Construction and Execution in Spark Streaming

Beike's real‑time computing platform processes over 100 streaming tasks daily using Spark‑Streaming and Flink, and to improve speed and stability the team studied the internal mechanics of Spark‑Streaming, focusing on the DStream abstraction.

DStream construction starts with an InputDStream as the data source; each transform operator such as map , filter , or foreachRDD creates a new DStream, forming a logical execution chain. For example:

stream.map(...).filter(...).foreachRDD(...)

The ForEachDStream class implements the final action stage. Its generateJob method calls the parent DStream's getOrCompute to obtain an RDD, then runs the user‑provided foreachFunc closure. The relevant source code is:

private[streaming] class ForEachDStream[T: ClassTag](
    parent: DStream[T],
    foreachFunc: (RDD[T], Time) => Unit,
    displayInnerRDDOps: Boolean
) extends DStream[Unit](parent.ssc) {
  override def dependencies: List[DStream[_]] = List(parent)
  override def slideDuration: Duration = parent.slideDuration
  override def compute(validTime: Time): Option[RDD[Unit]] = None
  override def generateJob(time: Time): Option[Job] = {
    parent.getOrCompute(time) match {
      case Some(rdd) =>
        val jobFunc = () => createRDDWithLocalProperties(time, displayInnerRDDOps) {
          foreachFunc(rdd, time)
        }
        Some(new Job(time, jobFunc))
      case None => None
    }
  }
}

When the last DStream in the chain invokes compute , it recursively triggers upstream compute calls until the initial InputDStream generates an RDD that pulls external data.

The platform also notes that a series of non‑window operations on a single stream is equivalent to directly calling foreachRDD on the InputDStream .

Key Spark‑Streaming components include InputDStream , JobGenerator , JobScheduler , and DStreamGraph . Their interaction is driven by a recurring timer and an event loop:

private val timer = new RecurringTimer(clock, ssc.graph.batchDuration.milliseconds,
    longTime => eventLoop.post(GenerateJobs(new Time(longTime))), "JobGenerator")

The processEvent method dispatches events to appropriate handlers:

private def processEvent(event: JobGeneratorEvent) {
  logDebug("Got event " + event)
  event match {
    case GenerateJobs(time) => generateJobs(time)
    case ClearMetadata(time) => clearMetadata(time)
    case DoCheckpoint(time, clearCheckpointDataLater) => doCheckpoint(time, clearCheckpointDataLater)
    case ClearCheckpointData(time) => clearCheckpointData(time)
  }
}

The generateJobs method sets checkpoint properties, allocates received blocks, creates jobs via DStreamGraph.generateJobs , and submits them to the scheduler:

private def generateJobs(time: Time) {
  ssc.sparkContext.setLocalProperty(RDD.CHECKPOINT_ALL_MARKED_ANCESTORS, "true")
  Try {
    jobScheduler.receiverTracker.allocateBlocksToBatch(time)
    graph.generateJobs(time)
  } match {
    case Success(jobs) =>
      val streamIdToInputInfos = jobScheduler.inputInfoTracker.getInfo(time)
      jobScheduler.submitJobSet(JobSet(time, jobs, streamIdToInputInfos))
    case Failure(e) =>
      jobScheduler.reportError("Error generating jobs for time " + time, e)
      PythonDStream.stopStreamingContextIfPythonProcessIsDead(e)
  }
  eventLoop.post(DoCheckpoint(time, clearCheckpointDataLater = false))
}

Job submission is handled by JobScheduler.submitJobSet , which enqueues jobs to a thread‑pool executor ( jobExecutor ) whose concurrency is controlled by spark.streaming.concurrentJobs :

def submitJobSet(jobSet: JobSet) {
  if (jobSet.jobs.isEmpty) {
    logInfo("No jobs added for time " + jobSet.time)
  } else {
    listenerBus.post(StreamingListenerBatchSubmitted(jobSet.toBatchInfo))
    jobSets.put(jobSet.time, jobSet)
    jobSet.jobs.foreach(job => jobExecutor.execute(new JobHandler(job)))
    logInfo("Added jobs for time " + jobSet.time)
  }
}

In Beike's production environment, 143 real‑time tasks run, including 62 Spark‑Streaming jobs and 59 Structured‑Streaming jobs built on the Chronus framework, which abstracts source, sink, and pipeline layers and provides SQL‑based templates for rapid development.

Author: Gu Yuanli (enterprise codename); Producers: Giant Crab, Mei Changsu.

Real-time ProcessingSpark StreamingScalaDStream
Beike Product & Technology
Written by

Beike Product & Technology

As Beike's official product and technology account, we are committed to building a platform for sharing Beike's product and technology insights, targeting internet/O2O developers and product professionals. We share high-quality original articles, tech salon events, and recruitment information weekly. Welcome to follow us.

0 followers
Reader feedback

How this landed with the community

login Sign in to like

Rate this article

Was this worth your time?

Sign in to rate
Discussion

0 Comments

Thoughtful readers leave field notes, pushback, and hard-won operational detail here.