Big Data 14 min read

Spark Job Execution Architecture: From Submission to Shuffle and Task Processing

This article explains how Spark coordinates master, worker, driver, and executor components to generate, submit, and run jobs, detailing the creation of logical and physical execution graphs, task allocation, result handling, and the shuffle read process with code examples and diagrams.

Big Data Technology & Architecture
Big Data Technology & Architecture
Big Data Technology & Architecture
Spark Job Execution Architecture: From Submission to Shuffle and Task Processing

After a brief introduction, the article revisits the overall Spark deployment diagram and then examines each stage of job execution from an architectural perspective.

Deployment Diagram

The original overview diagram is shown, illustrating the relationships among master, worker, driver, and executor nodes.

Job Submission

A diagram demonstrates how a driver program (assumed to run on the master node) generates a job and submits it to worker nodes.

finalRDD.action()
=> sc.runJob()

// generate job, stages and tasks
=> dagScheduler.runJob()
=> dagScheduler.submitJob()
=> dagSchedulerEventProcessActor ! JobSubmitted
=> dagSchedulerEventProcessActor.JobSubmitted()
=> dagScheduler.handleJobSubmitted()
=> finalStage = newStage()
=>   mapOutputTracker.registerShuffle(shuffleId, rdd.partitions.size)
=> dagScheduler.submitStage()
=>   missingStages = dagScheduler.getMissingParentStages()
=> dagScheduler.subMissingTasks(readyStage)

// add tasks to the taskScheduler
=> taskScheduler.submitTasks(new TaskSet(tasks))
=> fifoSchedulableBuilder.addTaskSetManager(taskSet)

// send tasks
=> sparkDeploySchedulerBackend.reviveOffers()
=> driverActor ! ReviveOffers
=> sparkDeploySchedulerBackend.makeOffers()
=> sparkDeploySchedulerBackend.launchTasks()
=> foreach task
      CoarseGrainedExecutorBackend(executorId) ! LaunchTask(serializedTask)

The driver creates a SparkContext which establishes communication objects, threads, and actors, thereby defining the program's driver role.

Generating the Logical Execution Graph

Transformations in the driver program build a computing chain of RDDs; each RDD defines compute() for partition computation and getDependencies() for data dependencies.

Generating the Physical Execution Graph

Each action triggers a job; during dagScheduler.runJob() stages are created, and submitStage() generates ShuffleMapTask or ResultTask objects, which are packaged into a TaskSet for the task scheduler.

Task Allocation

The sparkDeploySchedulerBackend receives the TaskSet and, via the driver actor, sends serialized tasks to the CoarseGrainedExecutorBackend on the designated worker node.

Job Reception

Workers execute received tasks as follows:

coarseGrainedExecutorBackend ! LaunchTask(serializedTask)
=> executor.launchTask()
=> executor.threadPool.execute(new TaskRunner(taskId, serializedTask))

The executor wraps the task in a TaskRunner and runs it on a free thread; each worker process hosts a single executor object.

Task Execution

After task assignment, the executor deserializes the task, runs it, and produces a directResult. If the result exceeds spark.akka.frameSize (default 10 MB), it is stored locally by the BlockManager and only a reference ( indirectResult) is sent to the driver.

In TaskRunner.run()
// deserialize task, run it and then send the result to 
=> coarseGrainedExecutorBackend.statusUpdate()
=> task = ser.deserialize(serializedTask)
=> value = task.run(taskId)
=> directResult = new DirectTaskResult(ser.serialize(value))
=> if( directResult.size() > akkaFrameSize() )
       indirectResult = blockManager.putBytes(taskId, directResult, MEMORY+DISK+SER)
   else
       return directResult
=> coarseGrainedExecutorBackend.statusUpdate(result)
=> driver ! StatusUpdate(executorId, taskId, result)
ShuffleMapTask

returns a MapStatus (executor ID and file segment sizes), while ResultTask returns the computed function result (e.g., count of records).

In task.run(taskId)
// if the task is ShuffleMapTask
=> shuffleMapTask.runTask(context)
=> shuffleWriterGroup = shuffleBlockManager.forMapTask(shuffleId, partitionId, numOutputSplits)
=> shuffleWriterGroup.writers(bucketId).write(rdd.iterator(split, context))
=> return MapStatus(blockManager.blockManagerId, Array[compressedSize(fileSegment)])

// If the task is ResultTask
=> return func(context, rdd.iterator(split, context))

Upon receiving task results, the driver updates the scheduler, fetches large results via the BlockManager if needed, and processes them: ResultHandler aggregates ResultTask outputs, while MapStatus objects are stored in mapOutputTrackerMaster for subsequent shuffle reads.

After driver receives StatusUpdate(result)
=> taskScheduler.statusUpdate(taskId, state, result.value)
=> taskResultGetter.enqueueSuccessfulTask(taskSet, tid, result)
=> if result is IndirectResult
       serializedTaskResult = blockManager.getRemoteBytes(IndirectResult.blockId)
=> scheduler.handleSuccessfulTask(taskSetManager, tid, result)
=> dagScheduler.taskEnded(result.value, result.accumUpdates)
=> ... (accumulator updates, job completion handling)

Shuffle Read

Reducers determine the locations of FileSegment s produced by parent ShuffleMapTask s via the mapOutputTrackerMaster. The BasicBlockFetcherIterator creates fetch requests, which are sent to other nodes' ConnectionManager s. Each request may retrieve multiple file segments, respecting the limit spark.reducer.maxMbInFlight = 48 MB shared among five parallel fetch threads.

rdd.iterator()
=> rdd(ShuffledRDD/CoGroupedRDD).compute()
=> SparkEnv.get.shuffleFetcher.fetch(shuffledId, split.index, context, ser)
=> blockStoreShuffleFetcher.fetch(shuffleId, reduceId, context, serializer)
=> statuses = MapOutputTrackerWorker.getServerStatuses(shuffleId, reduceId)
=> blocksByAddress = compute(statuses)
=> basicBlockFetcherIterator = blockManager.getMultiple(blocksByAddress, serializer)
=> itr = basicBlockFetcherIterator.flatMap(unpackBlock)

The fetcher sends requests via NIO connections; receiving workers read the requested file segments from local disk (using either in‑memory buffers or memory‑mapped files) and return them. Received segments are placed in a queue, deserialized, and fed to the reducer iterator.

After the blockManager receives the fetch request
=> connectionManager.receiveMessage(bufferMessage)
=> blockManagerWorker.onBlockMessageReceive()
=> buffer = blockManager.getLocalBytes(blockId)
=> buffer = diskStore.getBytes(blockId)
=> fileSegment = diskManager.getBlockLocation(blockId)
=> if(fileSegment < minMemoryMapBytes)
       buffer = ByteBuffer.allocate(fileSegment)
   else
       channel.map(MapMode.READ_ONLY, segment.offset, segment.length)

Discussion

The architecture emphasizes modularity and loose coupling; the BlockManager handles many responsibilities (memory, disk, network). The article provides extensive code snippets and diagrams to illustrate how Spark orchestrates job generation, submission, execution, result collection, and shuffle, encouraging readers to consult the source code for deeper understanding.

Original Source

Signed-in readers can open the original source through BestHub's protected redirect.

Sign in to view source
Republication Notice

This article has been distilled and summarized from source material, then republished for learning and reference. If you believe it infringes your rights, please contactadmin@besthub.devand we will review it promptly.

task schedulingSparkShuffleJob Execution
Big Data Technology & Architecture
Written by

Big Data Technology & Architecture

Wang Zhiwu, a big data expert, dedicated to sharing big data technology.

0 followers
Reader feedback

How this landed with the community

Sign in to like

Rate this article

Was this worth your time?

Sign in to rate
Discussion

0 Comments

Thoughtful readers leave field notes, pushback, and hard-won operational detail here.