Analysis of Flink Scheduling Components and Slot Allocation
The article explains Flink’s post‑submission scheduling pipeline—from Dispatcher creating SchedulerNG and building the ExecutionGraph, through pipelined region construction and the PipelinedRegionSchedulingStrategy, to slot sharing allocation—identifying why slot and TaskManager overloads occur and proposing randomization or fine‑grained resource strategies to balance load.
In daily Flink usage, heavy load on certain slots or TaskManagers often causes resource allocation and cost‑reduction issues. This article reviews the Flink task deployment mechanism, focusing on the post‑submission construction of the ExecutionGraph and the subsequent scheduling process.
2. Flink Scheduling Components
2.1 SchedulerNG – After the Dispatcher receives a submit request, it starts the JobManagerRunner and JobMaster, which then creates the SchedulerNG as the first step of job scheduling.
this.schedulerNG = createScheduler(
slotPoolServiceSchedulerFactory,
executionDeploymentTracker,
jobManagerJobMetricGroup,
jobStatusListener);The SchedulerNG initiates scheduling, tracks job status, and triggers checkpoint and savepoint actions.
2.2 ExecutionGraph – The default SchedulerNG implementation (DefaultScheduler) builds the ExecutionGraph, which contains three key elements:
ExecutionJobVertex : Represents a JobVertex from the JobGraph, grouping all parallel tasks.
ExecutionVertex : Represents a single parallel task within an ExecutionJobVertex.
Execution : Represents a concrete deployment/execution of an ExecutionVertex, which may occur multiple times.
The ExecutionGraph is constructed from the JobGraph topology:
// topologically sort the job vertices and attach the graph to the existing one
List
sortedTopology = jobGraph.getVerticesSortedTopologicallyFromSources();
executionGraph.attachJobGraph(sortedTopology){
1. obtain ordered JobVertex list
2. create ExecutionJobVertex one‑to‑one
3. generate producedDataSets (IntermediateDataSet) based on producer parallelism
4. create ExecutionVertex[] according to its own parallelism
5. build stateBackend and checkpointStorage info
6. finish execution topology construction
}2.3 Execution Topology – Flink builds pipeline regions based on the ExecutionGraph. ResultPartitionType defines how data is transferred between vertices:
BLOCKING
BLOCKING_PERSISTENT
PIPELINED
PIPELINED_BOUNDED
PIPELINED_APPROXIMATEThe construction of ExecutionTopology proceeds in three steps:
Build raw pipelined regions; vertices are grouped unless their result type is reconnectable.
Create ResultPartition information for each region, which later drives the PartitionReleaseStrategy.
Validate co‑location constraints to ensure co‑located tasks stay in the same region.
2.4 Scheduling Strategy – The default strategy is PipelinedRegionSchedulingStrategy . After the ExecutionGraph is ready, the scheduler starts:
@Override
public void startScheduling() {
final Set
sourceRegions =
IterableUtils.toStream(schedulingTopology.getAllPipelinedRegions())
.filter(this::isSourceRegion)
.collect(Collectors.toSet());
maybeScheduleRegions(sourceRegions);
}The process iterates over regions, creates deployment options for each ExecutionVertex, and delegates slot allocation to the SlotSharingExecutionSlotAllocator .
2.5 Execution Slot Allocator – The allocator maps logical slots to physical shared slots. Important concepts include:
LocalInputPreferredSlotSharingStrategy : Tries to place producer and consumer tasks together.
SlotProfile : Describes resource requirements and preferred locations.
ResourceProfileRetriever : Retrieves actual resource info for an ExecutionVertex.
ExecutionSlotSharingGroup : Logical grouping of tasks that share a slot.
The main allocation flow (simplified) is:
private List
allocateSlots(
final List
executionVertexDeploymentOptions) {
return executionSlotAllocator.allocateSlotsFor(
executionVertexDeploymentOptions.stream()
.map(ExecutionVertexDeploymentOption::getExecutionVertexId)
.collect(Collectors.toList()));
}During allocation, the system checks existing groups, creates new ones if necessary, matches SlotProfiles, requests physical slots from the ResourceManager, and finally assigns shared slots to all related executions.
3. Problem Discussion
3.1 Why do slots become overloaded? Overload occurs when many tasks are assigned to the same shared slot, a consequence of the round‑robin group creation in ExecutionSlotSharingGroup . Because tasks follow the topological order, early groups receive many tasks, leading to concentration.
private void findAvailableOrCreateNewExecutionSlotSharingGroupFor(
final List
executionVertices) {
for (SchedulingExecutionVertex executionVertex : executionVertices) {
final SlotSharingGroup slotSharingGroup =
getSlotSharingGroup(executionVertex.getId());
final List
groups =
executionSlotSharingGroups.computeIfAbsent(
slotSharingGroup.getSlotSharingGroupId(), k -> new ArrayList<>());
ExecutionSlotSharingGroup group = null;
for (ExecutionSlotSharingGroup executionSlotSharingGroup : groups) {
if (isGroupAvailableForVertex(executionSlotSharingGroup, executionVertex.getId())) {
group = executionSlotSharingGroup;
break;
}
}
if (group == null) {
group = new ExecutionSlotSharingGroup();
group.setResourceProfile(slotSharingGroup.getResourceProfile());
groups.add(group);
}
addVertexToExecutionSlotSharingGroup(executionVertex, group);
}
}Mitigation strategies include adding randomness while respecting constraints or designing a load‑balanced allocation algorithm.
3.2 How to avoid TM‑level overload? When many slots are placed on the same TaskManager, the TM becomes a bottleneck. Co‑location groups ensure that corresponding subtasks run on the same TM, but they do not balance load. Possible solutions:
When fine‑grained resource configuration is not used, distribute tasks evenly across slots and ensure slot‑sharing groups are randomized, which also randomizes TM placement.
With fine‑grained resources, avoid slot‑sharing and place tasks of the same JobVertex on the same TM, adding job‑vertex tags to SlotProfiles so the allocator can enforce this constraint.
4. Conclusion
The Flink community continuously evolves the task deployment chain. Understanding the internal scheduling logic helps address custom scheduling challenges and paves the way for finer‑grained operator‑level resource configuration, improving resource utilization and job stability.
DeWu Technology
A platform for sharing and discussing tech knowledge, guiding you toward the cloud of technology.
How this landed with the community
Was this worth your time?
0 Comments
Thoughtful readers leave field notes, pushback, and hard-won operational detail here.