Building a Java DAG-Based Task Orchestration Framework from Scratch

This article explains how to design and implement a task‑orchestration workflow using a directed acyclic graph (DAG) in Java, covering graph representation, core data structures, concurrency handling with executors, persistence to a database, and platform‑level visualisation.

ITFLY8 Architecture Home
ITFLY8 Architecture Home
ITFLY8 Architecture Home
Building a Java DAG-Based Task Orchestration Framework from Scratch

Recently I needed a framework that supports task‑orchestration workflows, so I recorded the implementation ideas.

Task Orchestration Workflow

Task orchestration means arranging atomic "tasks" in a custom order, possibly with dependencies, to form a workflow. For example, we want to run Task A and Task C concurrently, then run Task B after A finishes, and finally run Task D after both B and C complete.

DAG Directed Acyclic Graph

A graph consists of vertices (nodes) and edges (connections). A directed graph has arrows; if it contains no cycles it is a Directed Acyclic Graph (DAG), which fits task‑orchestration perfectly.

Graphs can be stored as an adjacency matrix or an adjacency list. The matrix uses a 2‑D array where Array[x][y] = 1 indicates an edge from x to y.

Alternatively, an adjacency list saves space but makes connectivity checks slower; for simplicity we often choose the matrix.

A Task Orchestration Framework

After understanding DAG basics we can implement a simple framework.

We first define the core data structures: a Dag representing the whole graph and a Node representing each vertex with its parents and children.

//Dag
public final class DefaultDag<T, R> implements Dag<T, R> {
    private Map<T, Node<T, R>> nodes = new HashMap<>();
    // ... other methods
}

//Node
public final class Node<T, R> {
    /** incoming dependencies for this node */
    private Set<Node<T, R>> parents = new LinkedHashSet<>();
    /** outgoing dependencies for this node */
    private Set<Node<T, R>> children = new LinkedHashSet<>();
    // ... other fields and methods
}

We add dependencies between tasks:

public void addDependency(final T evalFirstNode, final T evalLaterNode) {
    Node<T, R> firstNode = createNode(evalFirstNode);
    Node<T, R> afterNode = createNode(evalLaterNode);
    addEdges(firstNode, afterNode);
}

private Node<T, R> createNode(final T value) {
    return new Node<>(value);
}

private void addEdges(final Node<T, R> firstNode, final Node<T, R> afterNode) {
    if (!firstNode.equals(afterNode)) {
        firstNode.getChildren().add(afterNode);
        afterNode.getParents().add(firstNode);
    }
}

To execute the workflow we wrap the DAG with a thread‑pool executor.

// Task orchestration thread pool
public class DefaultDexecutor<T, R> {
    // execution engine and two retry executors
    private final ExecutorService<T, R> executionEngine;
    private final ExecutorService immediatelyRetryExecutor;
    private final ScheduledExecutorService scheduledRetryExecutor;
    // execution state
    private final ExecutorState<T, R> state;
    // ... other fields and methods
}

public class DefaultExecutorState<T, R> {
    // underlying graph
    private final Dag<T, R> graph;
    // completed nodes
    private final Collection<Node<T, R>> processedNodes;
    // pending nodes
    private final Collection<Node<T, R>> unProcessedNodes;
    // errored tasks
    private final Collection<ExecutionResult<T, R>> erroredTasks;
    // final results
    private final Collection<ExecutionResult<T, R>> executionResults;
}

Execution proceeds by breadth‑first traversal of the DAG, submitting ready nodes to the executor while respecting parent dependencies.

private void doProcessNodes(final Set<Node<T, R>> nodes) {
    for (Node<T, R> node : nodes) {
        // shared variable for concurrent waiting
        if (!processedNodes.contains(node) && processedNodes.containsAll(node.getParents())) {
            Task<T, R> task = newTask(node);
            this.executionEngine.submit(task);
            // ... handle result
            ExecutionResult<T, R> executionResult = this.executionEngine.processResult();
            if (executionResult.isSuccess()) {
                state.markProcessingDone(node);
            }
            // continue with children
            doExecute(node.getChildren());
        }
    }
}

Example usage:

DefaultExecutor<String, String> executor = newTaskExecutor();
executor.addDependency("A", "B");
executor.addDependency("B", "D");
executor.addDependency("C", "D");
executor.execute();

Task Orchestration Platformization

To turn the framework into a platform we need visual drag‑and‑drop UI and persistent storage.

Each task vertex is stored in a relational database with fields such as task_id, workflow_id, task_name, task_status, result, and task_parents (a comma‑separated list of parent IDs).

task_id
workflow_id
task_name
task_status
result
task_parents

Sample data for the example workflow:

task_id  workflow_id  task_name  task_status  result  task_parents
1        1            A          0            -       -1
2        1            B          0            -       1
3        1            C          0            -       -1
4        1            D          0            -       2,3

To start execution we query tasks with no parents (e.g.,

SELECT * FROM task WHERE workflow_id = 1 AND task_parents = -1

) and submit them to the thread pool. For subsequent steps we locate child tasks using a pattern match on task_parents (e.g., SELECT * FROM task WHERE task_parents LIKE '%3%').

Task D requires a concurrent wait until both its parents finish; this can be checked with a count query such as

SELECT COUNT(1) FROM task WHERE task_id IN (2,3) AND status != 1

to ensure all parents succeeded.

Retry logic differs between framework and platform: the framework can retry immediately or via a scheduled executor, while the platform typically relies on user‑initiated retries from the UI.

With these extensions the task‑orchestration framework becomes a full‑featured platform offering visual workflow design, status monitoring, and manual retry capabilities.

Source: https://fredal.xin/task-scheduling-based-on-dag

Original Source

Signed-in readers can open the original source through BestHub's protected redirect.

Sign in to view source
Republication Notice

This article has been distilled and summarized from source material, then republished for learning and reference. If you believe it infringes your rights, please contactadmin@besthub.devand we will review it promptly.

BackendJavaDAGworkflowconcurrencytask scheduling
ITFLY8 Architecture Home
Written by

ITFLY8 Architecture Home

ITFLY8 Architecture Home - focused on architecture knowledge sharing and exchange, covering project management and product design. Includes large-scale distributed website architecture (high performance, high availability, caching, message queues...), design patterns, architecture patterns, big data, project management (SCRUM, PMP, Prince2), product design, and more.

0 followers
Reader feedback

How this landed with the community

Sign in to like

Rate this article

Was this worth your time?

Sign in to rate
Discussion

0 Comments

Thoughtful readers leave field notes, pushback, and hard-won operational detail here.