How Spark Streaming Submits Tasks: Internal Mechanics and Code Walkthrough
This article explains the internal workflow of Spark Streaming task submission, detailing how StreamingContext, DStream, receivers, and output operators are transformed into RDD jobs, and includes annotated Scala code examples that illustrate each step of the process.
Spark Streaming is a real‑time processing solution built on top of Spark Core. The article walks through how a streaming job is turned into Spark RDD tasks, assuming the reader already knows RDDs and SparkContext submission.
Example Program
The following Scala snippet shows a minimal Spark Streaming word‑count application (original code trimmed for clarity):
object NetworkWordCount {
def main(args: Array[String]) {
val host = args(0)
val port = args(1).toInt
val sparkConf = new SparkConf().setAppName("NetworkWordCount")
val ssc = new StreamingContext(sparkConf, Seconds(1))
val lines = ssc.socketTextStream(host, port, StorageLevel.MEMORY_AND_DISK_SER)
val words = lines.flatMap(_.split(" "))
val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _)
wordCounts.print()
ssc.start()
ssc.awaitTermination()
}
}StreamingContext and Core Components
The entry point of Spark Streaming is StreamingContext . Internally it holds a DStreamGraph that tracks input streams ( DStream , InputDStream , Receiver ) and output streams. A JobScheduler coordinates a ReceiverTracker and a JobGenerator to turn streaming logic into batch RDD jobs.
DStream
DStream is Spark’s temporal wrapper around RDDs, representing a continuous series of RDDs. Its dependencies describe how streams are linked, similar to RDD DAG dependencies.
Output Operator
Output operations such as print or saveAs… are implemented via foreachRDD . The call registers a ForEachDStream into the graph’s outputStreams , which later generates a Job for each batch.
InputDStream Hierarchy
All data sources extend InputDStream . Direct driver‑side sources inherit InputDStream directly, while distributed receivers inherit ReceiverInputDStream . Receivers are treated as RDD jobs and scheduled on executors.
abstract class InputDStream[T: ClassTag](_ssc: StreamingContext) extends DStream[T](_ssc) {
ssc.graph.addInputStream(this)
val id = ssc.getNewInputStreamId()
}ReceiverTracker.start()
The tracker registers an RPC endpoint, launches receivers, and allocates blocks to batches. It creates a dummy Spark job to ensure receivers are spread across workers, then sends a StartAllReceivers message.
private def launchReceivers(): Unit = {
val receivers = receiverInputStreams.map { nis =>
val rcvr = nis.getReceiver()
rcvr.setReceiverId(nis.id)
rcvr
}
runDummySparkJob()
logInfo(s"Starting ${receivers.length} receivers")
endpoint.send(StartAllReceivers(receivers))
}JobGenerator.start()
The generator creates a recurring timer that posts GenerateJobs messages at each batch interval. When a GenerateJobs arrives, it allocates receiver blocks, asks the graph to generate jobs for each output stream, and submits the resulting JobSet to the JobScheduler .
Job Submission Flow
Each Job is executed by a thread pool ( spark.streaming.concurrentJobs ). The JobHandler posts JobStarted and JobCompleted events, disables output‑spec validation for checkpointing, and finally runs the user‑provided function.
ForEachDStream.generateJob()
When an output stream is a ForEachDStream , its generateJob method obtains the parent RDD for the batch, wraps the user’s foreachFunc into a zero‑argument job function, and returns a Job instance.
override def generateJob(time: Time): Option[Job] = {
parent.getOrCompute(time) match {
case Some(rdd) =>
val jobFunc = () => createRDDWithLocalProperties(time, displayInnerRDDOps) {
foreachFunc(rdd, time)
}
Some(new Job(time, jobFunc))
case None => None
}
}print() Implementation
The print operator is a specific foreachFunc that takes the first num elements of each RDD batch using the take action, then prints them with timestamps.
def print(num: Int): Unit = {
foreachRDD { (rdd, time) =>
val firstNum = rdd.take(num + 1)
println("-------------------------------------------")
println(s"Time: $time")
println("-------------------------------------------")
firstNum.take(num).foreach(println)
if (firstNum.length > num) println("...")
println()
}
}Conclusion
The key takeaways are that receivers are scheduled as regular RDD jobs, and every DStream operation ultimately becomes an RDD job submitted to the Spark cluster. This explains why Spark Streaming can reuse Spark’s fault‑tolerance and resource management mechanisms.
Qunar Tech Salon
Qunar Tech Salon is a learning and exchange platform for Qunar engineers and industry peers. We share cutting-edge technology trends and topics, providing a free platform for mid-to-senior technical professionals to exchange and learn.
How this landed with the community
Was this worth your time?
0 Comments
Thoughtful readers leave field notes, pushback, and hard-won operational detail here.