Spark Job Execution Architecture: From Submission to Shuffle and Task Processing
This article explains how Spark coordinates master, worker, driver, and executor components to generate, submit, and run jobs, detailing the creation of logical and physical execution graphs, task allocation, result handling, and the shuffle read process with code examples and diagrams.
After a brief introduction, the article revisits the overall Spark deployment diagram and then examines each stage of job execution from an architectural perspective.
Deployment Diagram
The original overview diagram is shown, illustrating the relationships among master, worker, driver, and executor nodes.
Job Submission
A diagram demonstrates how a driver program (assumed to run on the master node) generates a job and submits it to worker nodes.
finalRDD.action()
=> sc.runJob()
// generate job, stages and tasks
=> dagScheduler.runJob()
=> dagScheduler.submitJob()
=> dagSchedulerEventProcessActor ! JobSubmitted
=> dagSchedulerEventProcessActor.JobSubmitted()
=> dagScheduler.handleJobSubmitted()
=> finalStage = newStage()
=> mapOutputTracker.registerShuffle(shuffleId, rdd.partitions.size)
=> dagScheduler.submitStage()
=> missingStages = dagScheduler.getMissingParentStages()
=> dagScheduler.subMissingTasks(readyStage)
// add tasks to the taskScheduler
=> taskScheduler.submitTasks(new TaskSet(tasks))
=> fifoSchedulableBuilder.addTaskSetManager(taskSet)
// send tasks
=> sparkDeploySchedulerBackend.reviveOffers()
=> driverActor ! ReviveOffers
=> sparkDeploySchedulerBackend.makeOffers()
=> sparkDeploySchedulerBackend.launchTasks()
=> foreach task
CoarseGrainedExecutorBackend(executorId) ! LaunchTask(serializedTask)The driver creates a SparkContext which establishes communication objects, threads, and actors, thereby defining the program's driver role.
Generating the Logical Execution Graph
Transformations in the driver program build a computing chain of RDDs; each RDD defines compute() for partition computation and getDependencies() for data dependencies.
Generating the Physical Execution Graph
Each action triggers a job; during dagScheduler.runJob() stages are created, and submitStage() generates ShuffleMapTask or ResultTask objects, which are packaged into a TaskSet for the task scheduler.
Task Allocation
The sparkDeploySchedulerBackend receives the TaskSet and, via the driver actor, sends serialized tasks to the CoarseGrainedExecutorBackend on the designated worker node.
Job Reception
Workers execute received tasks as follows:
coarseGrainedExecutorBackend ! LaunchTask(serializedTask)
=> executor.launchTask()
=> executor.threadPool.execute(new TaskRunner(taskId, serializedTask))The executor wraps the task in a TaskRunner and runs it on a free thread; each worker process hosts a single executor object.
Task Execution
After task assignment, the executor deserializes the task, runs it, and produces a directResult. If the result exceeds spark.akka.frameSize (default 10 MB), it is stored locally by the BlockManager and only a reference ( indirectResult) is sent to the driver.
In TaskRunner.run()
// deserialize task, run it and then send the result to
=> coarseGrainedExecutorBackend.statusUpdate()
=> task = ser.deserialize(serializedTask)
=> value = task.run(taskId)
=> directResult = new DirectTaskResult(ser.serialize(value))
=> if( directResult.size() > akkaFrameSize() )
indirectResult = blockManager.putBytes(taskId, directResult, MEMORY+DISK+SER)
else
return directResult
=> coarseGrainedExecutorBackend.statusUpdate(result)
=> driver ! StatusUpdate(executorId, taskId, result) ShuffleMapTaskreturns a MapStatus (executor ID and file segment sizes), while ResultTask returns the computed function result (e.g., count of records).
In task.run(taskId)
// if the task is ShuffleMapTask
=> shuffleMapTask.runTask(context)
=> shuffleWriterGroup = shuffleBlockManager.forMapTask(shuffleId, partitionId, numOutputSplits)
=> shuffleWriterGroup.writers(bucketId).write(rdd.iterator(split, context))
=> return MapStatus(blockManager.blockManagerId, Array[compressedSize(fileSegment)])
// If the task is ResultTask
=> return func(context, rdd.iterator(split, context))Upon receiving task results, the driver updates the scheduler, fetches large results via the BlockManager if needed, and processes them: ResultHandler aggregates ResultTask outputs, while MapStatus objects are stored in mapOutputTrackerMaster for subsequent shuffle reads.
After driver receives StatusUpdate(result)
=> taskScheduler.statusUpdate(taskId, state, result.value)
=> taskResultGetter.enqueueSuccessfulTask(taskSet, tid, result)
=> if result is IndirectResult
serializedTaskResult = blockManager.getRemoteBytes(IndirectResult.blockId)
=> scheduler.handleSuccessfulTask(taskSetManager, tid, result)
=> dagScheduler.taskEnded(result.value, result.accumUpdates)
=> ... (accumulator updates, job completion handling)Shuffle Read
Reducers determine the locations of FileSegment s produced by parent ShuffleMapTask s via the mapOutputTrackerMaster. The BasicBlockFetcherIterator creates fetch requests, which are sent to other nodes' ConnectionManager s. Each request may retrieve multiple file segments, respecting the limit spark.reducer.maxMbInFlight = 48 MB shared among five parallel fetch threads.
rdd.iterator()
=> rdd(ShuffledRDD/CoGroupedRDD).compute()
=> SparkEnv.get.shuffleFetcher.fetch(shuffledId, split.index, context, ser)
=> blockStoreShuffleFetcher.fetch(shuffleId, reduceId, context, serializer)
=> statuses = MapOutputTrackerWorker.getServerStatuses(shuffleId, reduceId)
=> blocksByAddress = compute(statuses)
=> basicBlockFetcherIterator = blockManager.getMultiple(blocksByAddress, serializer)
=> itr = basicBlockFetcherIterator.flatMap(unpackBlock)The fetcher sends requests via NIO connections; receiving workers read the requested file segments from local disk (using either in‑memory buffers or memory‑mapped files) and return them. Received segments are placed in a queue, deserialized, and fed to the reducer iterator.
After the blockManager receives the fetch request
=> connectionManager.receiveMessage(bufferMessage)
=> blockManagerWorker.onBlockMessageReceive()
=> buffer = blockManager.getLocalBytes(blockId)
=> buffer = diskStore.getBytes(blockId)
=> fileSegment = diskManager.getBlockLocation(blockId)
=> if(fileSegment < minMemoryMapBytes)
buffer = ByteBuffer.allocate(fileSegment)
else
channel.map(MapMode.READ_ONLY, segment.offset, segment.length)Discussion
The architecture emphasizes modularity and loose coupling; the BlockManager handles many responsibilities (memory, disk, network). The article provides extensive code snippets and diagrams to illustrate how Spark orchestrates job generation, submission, execution, result collection, and shuffle, encouraging readers to consult the source code for deeper understanding.
Signed-in readers can open the original source through BestHub's protected redirect.
This article has been distilled and summarized from source material, then republished for learning and reference. If you believe it infringes your rights, please contactand we will review it promptly.
Big Data Technology & Architecture
Wang Zhiwu, a big data expert, dedicated to sharing big data technology.
How this landed with the community
Was this worth your time?
0 Comments
Thoughtful readers leave field notes, pushback, and hard-won operational detail here.
