Big Data 15 min read

Spark Job Generation and Execution: From Logical DAG to Physical Stages and Tasks

This article explains how Spark transforms a logical execution graph into a physical job by partitioning stages, applying pipeline concepts, and generating tasks, while illustrating the process with detailed code examples and the internal workflow of job submission.

Big Data Technology & Architecture
Big Data Technology & Architecture
Big Data Technology & Architecture
Spark Job Generation and Execution: From Logical DAG to Physical Stages and Tasks

Logical to Physical Execution Graph

The article starts by revisiting the logical DAG of a complex Spark job, describing the challenge of converting it into a physical execution graph composed of stages and tasks.

Stage Partitioning Algorithm

It proposes a backward‑propagation algorithm: when a ShuffleDependency is encountered, a new stage is created; NarrowDependency chains are kept within the same stage. The number of tasks in a stage equals the number of partitions of the stage’s final RDD.

Pipeline Idea

By breaking at shuffle points, Spark can pipeline data so that records are computed only when needed, flowing directly to the next computation without storing intermediate results.

Code Example – ComplexJob

package internals

import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark.HashPartitioner

object complexJob {
  def main(args: Array[String]) {
    val sc = new SparkContext("local", "ComplexJob test")

    val data1 = Array[(Int, Char)](
      (1, 'a'), (2, 'b'),
      (3, 'c'), (4, 'd'),
      (5, 'e'), (3, 'f'),
      (2, 'g'), (1, 'h'))
    val rangePairs1 = sc.parallelize(data1, 3)
    val hashPairs1 = rangePairs1.partitionBy(new HashPartitioner(3))

    val data2 = Array[(Int, String)]((1, "A"), (2, "B"),
      (3, "C"), (4, "D"))
    val pairs2 = sc.parallelize(data2, 2)
    val rangePairs2 = pairs2.map(x => (x._1, x._2.charAt(0)))

    val data3 = Array[(Int, Char)]((1, 'X'), (2, 'Y'))
    val rangePairs3 = sc.parallelize(data3, 2)

    val rangePairs = rangePairs2.union(rangePairs3)
    val result = hashPairs1.join(rangePairs)

    result.foreachWith(i => i)((x, i) => println("[result " + i + "] " + x))
    println(result.toDebugString)
  }
}

Job Generation Flow

When an action (e.g., foreach()) is called, Spark’s DAGScheduler.runJob creates a job, determines partitions, and builds stages using newStage(). getParentStages() walks the logical graph backward, splitting at shuffle dependencies.

Each stage is submitted via submitStage. If parent stages are missing, they are submitted first; otherwise, submitMissingTasks creates a TaskSet of either ShuffleMapTask or ResultTask objects, which the TaskSchedulerImpl hands to the backend scheduler.

The backend (e.g., SparkDeploySchedulerBackend) sends resource offers to the driver, which then launches serialized tasks on executors.

Discussion

The article concludes by summarizing the covered topics—driver‑initiated job creation, logical‑to‑physical graph conversion, pipeline implementation, and the concrete code path for job submission—while noting that shuffle mechanics and task execution details will be explored in the next chapter.

Original Source

Signed-in readers can open the original source through BestHub's protected redirect.

Sign in to view source
Republication Notice

This article has been distilled and summarized from source material, then republished for learning and reference. If you believe it infringes your rights, please contactadmin@besthub.devand we will review it promptly.

PipelineSparkRDDScalaJob SchedulingStage Partitioning
Big Data Technology & Architecture
Written by

Big Data Technology & Architecture

Wang Zhiwu, a big data expert, dedicated to sharing big data technology.

0 followers
Reader feedback

How this landed with the community

Sign in to like

Rate this article

Was this worth your time?

Sign in to rate
Discussion

0 Comments

Thoughtful readers leave field notes, pushback, and hard-won operational detail here.