How EasyScheduler Powers Scalable Big Data Workflow Management
EasyScheduler is an open‑source big‑data workflow scheduler that uses a decentralized architecture with Master and Worker nodes coordinated via ZooKeeper, supporting DAG‑based task definitions, various task types, fault tolerance, priority handling, distributed locks, and remote logging, all illustrated with detailed component diagrams.
Terminology
DAG (Directed Acyclic Graph) is the core structure of a workflow; tasks are arranged as a DAG and traversed topologically from nodes with zero indegree.
Process definition: a visual DAG created by dragging task nodes and establishing relationships.
Process instance: an instantiated process definition, started manually or by schedule.
Task instance: the concrete execution state of a task node.
Task types: SHELL, SQL, SUB_PROCESS, PROCEDURE, MR, SPARK, PYTHON, DEPENDENT (with SUB_PROCESS itself being a separate process definition).
Scheduling methods: cron‑based timed scheduling and manual scheduling.
Supported command types include starting a workflow, executing from the current node, recovering fault‑tolerant workflows, resuming paused flows, executing from a failed node, compensation, scheduling, re‑run, pause, stop, and resuming waiting threads (the latter two are internal only).
Timed scheduling uses a Quartz distributed scheduler with visual cron expression generation.
Dependencies: besides simple predecessor‑successor links, the system provides a task‑dependency node for custom cross‑process dependencies.
Priority: process‑instance and task‑instance priorities are supported; if not set, FIFO order is used.
Email alerts: SQL task result emails, workflow instance result alerts, and fault‑tolerance notifications.
Failure strategies: "continue" (ignore failed parallel tasks) and "end" (kill all running parallel tasks on first failure).
Compensation: supports interval parallel and serial compensation for historical data.
System Architecture
MasterServer adopts a decentralized design, handling DAG task splitting, submission monitoring, and health checks of other MasterServers and WorkerServers. It registers a temporary node in ZooKeeper on startup and uses ZooKeeper watches for fault tolerance.
Key components inside MasterServer:
Distributed Quartz component for starting and stopping timed tasks.
MasterSchedulerThread: scans the command table and dispatches business logic based on command type.
MasterExecThread: performs DAG task splitting, submission monitoring, and various command‑type logic.
MasterTaskExecThread: persists task information.
WorkerServer also follows a decentralized model, executing tasks and providing log services. It registers a temporary node in ZooKeeper and maintains a heartbeat.
Key components inside WorkerServer:
FetchTaskThread: continuously fetches tasks from the Task Queue and invokes the appropriate executor.
LoggerServer: an RPC service offering log segment view, refresh, and download.
ZooKeeper is used for cluster management, fault tolerance, event listening, distributed locks, and implements the Task Queue. The system deliberately avoids external dependencies such as Redis.
Alert module stores, queries, and notifies two types of alert data (email and SNMP, the latter not yet implemented).
API layer provides a unified RESTful interface for UI requests, covering workflow creation, definition, query, modification, publishing, offline, manual start, stop, pause, resume, and execution from a specific node.
UI offers visual operation pages for the system.
Design Philosophy
Centralized vs Decentralized
In a centralized design, Master distributes tasks and monitors Slave health. A Master failure leads to cluster collapse, so most systems adopt Master/Slave with hot‑ or cold‑standby and automatic election.
Decentralized design eliminates a distinguished manager node; all nodes are peers, reducing single‑point‑of‑failure risk but increasing communication complexity.
EasyScheduler achieves decentralization by registering Master and Worker nodes in ZooKeeper, forming leader‑less clusters, and using ZooKeeper distributed locks to elect a Master or Worker as the temporary manager for task execution.
Distributed Lock Practice
EasyScheduler uses ZooKeeper to ensure that only one Master runs the Scheduler and only one Worker submits tasks at a time.
Core lock acquisition algorithm:
Scheduler thread lock flowchart:
Thread Starvation and Deadlock
If a DAG contains many nested sub‑processes, threads can enter a circular wait, causing the entire workflow to stall.
Three mitigation strategies were considered; the third—introducing a “resource‑insufficient” command that pauses the main flow to free threads—was adopted.
Fault Tolerance
Service fault tolerance relies on ZooKeeper watchers. When a Master or Worker node disappears, the system triggers fault‑tolerance workflows for both process and task instances.
After Master fault recovery, the Scheduler re‑scans the DAG, monitors running tasks, and resubmits missing tasks from the Task Queue.
Task failure retry is automatic at the task level (e.g., a Shell task can be retried up to three times). Process‑level recovery or rerun is manual, either from the failed node or from the start.
Priority Design
Priorities are defined for process instances (HIGHEST, HIGH, MEDIUM, LOW, LOWEST) and for tasks (same five levels). The priority hierarchy is: process‑instance priority > same‑process‑instance priority > task priority > submission order.
Priority information is encoded into a string (processInstancePriority_processInstanceId_taskPriority_taskId) and stored in ZooKeeper; lexical comparison yields the highest‑priority task.
Logging
Because the UI and Workers may run on different machines, logs are accessed remotely via gRPC rather than Elasticsearch.
Custom Logback FileAppender creates a separate log file for each task instance, and a Filter ensures only task‑related logs are captured.
/** * task log appender */ public class TaskLogAppender extends FileAppender<ILoggingEvent> { ... } @Override protected void append(ILoggingEvent event) { if (currentlyActiveFile == null) { currentlyActiveFile = getFile(); } String activeFile = currentlyActiveFile; String threadName = event.getThreadName(); String[] threadNameArr = threadName.split("-"); String logId = threadNameArr[1]; ... super.subAppend(event); } /*** task log filter */ public class TaskLogFilter extends Filter<ILoggingEvent> { @Override public FilterReply decide(ILoggingEvent event) { if (event.getThreadName().startsWith("TaskLogInfo-")) { return FilterReply.ACCEPT; } return FilterReply.DENY; } }Conclusion
This article provides an overview of the architecture and implementation ideas of EasyScheduler, a distributed big‑data workflow scheduling system.
Signed-in readers can open the original source through BestHub's protected redirect.
This article has been distilled and summarized from source material, then republished for learning and reference. If you believe it infringes your rights, please contactand we will review it promptly.
21CTO
21CTO (21CTO.com) offers developers community, training, and services, making it your go‑to learning and service platform.
How this landed with the community
Was this worth your time?
0 Comments
Thoughtful readers leave field notes, pushback, and hard-won operational detail here.
