Building a LangGraph‑Style YAML DSL Workflow Engine with Spring AI

This article walks through constructing a lightweight YAML‑based DSL workflow engine on Spring AI 1.1.2 and Ollama, showing how to define state graphs, register tools, parse and execute nodes—including conditional edges, while loops, and parallel branches—without external orchestration tools.

The Dominant Programmer
The Dominant Programmer
The Dominant Programmer
Building a LangGraph‑Style YAML DSL Workflow Engine with Spring AI

Scenario

The author extends a previous Spring AI + Ollama RAG example that supported simple tool chaining, aiming for a declarative, readable way to control execution steps, conditional branches, loops, retries, and parallelism.

Core Concepts

The workflow is modeled as a directed StateGraph consisting of Node (tool call, LLM call, condition, loop, parallel), Edge (execution order), and Conditional Edge (runtime decision based on context). A While node repeats a sub‑graph until a condition fails, while a Parallel node runs multiple branches concurrently. All nodes share a Map<String, Object> Context that stores intermediate results.

YAML DSL Design

The DSL describes a workflow with two top‑level lists: nodes and edges. Nodes contain fields such as id, type, tool, input, prompt, condition, maxIterations, and, for parallel branches, a branches list of ParallelBranch objects. Edges only specify from and to; conditional logic lives inside the node definitions. Camel‑case field names match the Java POJOs, avoiding SnakeYAML mapping issues.

Tool Classes

package com.badao.ai.tools;

import org.springframework.ai.tool.annotation.Tool;
import org.springframework.ai.tool.annotation.ToolParam;
import org.springframework.stereotype.Component;

@Component
public class WeatherTool {
    @Tool(name = "get_weather", description = "查询指定城市的实时天气")
    public String getWeather(@ToolParam(description = "城市名称") String city) {
        System.out.println("调用了天气工具");
        // 模拟天气数据
        return String.format("%s当前天气:晴,温度22℃,湿度45%%。", city);
    }
}
package com.badao.ai.tools;

import org.springframework.ai.tool.annotation.Tool;
import org.springframework.ai.tool.annotation.ToolParam;
import org.springframework.stereotype.Component;

@Component
public class TranslateTool {
    @Tool(name = "translate_to_english", description = "将中文文本翻译成英文")
    public String translate(@ToolParam(description = "待翻译的中文文本") String text) {
        System.out.println("调用了翻译工具");
        // 模拟翻译,实际可接入翻译API
        return "Translated: " + text + " (This is the English version.)";
    }
}

Tool Registration

package com.badao.ai.config;

import com.badao.ai.tools.WeatherTool;
import com.badao.ai.tools.TranslateTool;
import org.springframework.ai.chat.client.ChatClient;
import org.springframework.ai.chat.model.ChatModel;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.HashMap;
import java.util.Map;
import java.util.function.Function;

@Configuration
public class ToolConfig {
    @Bean
    public ChatClient chatClient(ChatModel chatModel) {
        return ChatClient.builder(chatModel).build();
    }

    /** 构建工具名 -> 函数实现的映射,WorkflowEngine 直接使用 */
    @Bean
    public Map<String, Function<String, String>> toolRegistry(WeatherTool weatherTool, TranslateTool translateTool) {
        Map<String, Function<String, String>> registry = new HashMap<>();
        registry.put("get_weather", city -> weatherTool.getWeather(city));
        registry.put("translate_to_english", text -> translateTool.translate(text));
        return registry;
    }
}

Workflow Definition Model

package com.badao.ai.workflow;

import java.util.List;

public class WorkflowDefinition {
    private String name;
    private String start;
    private List<Node> nodes;
    private List<Edge> edges;
    // getters & setters omitted for brevity

    public static class Node {
        private String id;
        private String type;
        private String tool;
        private String input;
        private String prompt;
        private String condition;
        private int maxIterations = 5;
        private List<Node> body;
        private List<ParallelBranch> branches;
        // getters & setters omitted
    }

    public static class ParallelBranch {
        private List<Node> nodes;
        // getters & setters omitted
    }

    public static class Edge {
        private String from;
        private String to;
        // getters & setters omitted
    }
}

Workflow Engine

package com.badao.ai.workflow;

import org.springframework.ai.chat.client.ChatClient;
import org.springframework.stereotype.Component;
import org.yaml.snakeyaml.Yaml;
import java.io.InputStream;
import java.util.*;
import java.util.concurrent.CompletableFuture;
import java.util.function.Function;
import java.util.regex.Matcher;
import java.util.regex.Pattern;

@Component
public class WorkflowEngine {
    private final ChatClient chatClient;
    private final Map<String, Function<String, String>> toolRegistry;

    public WorkflowEngine(ChatClient chatClient, Map<String, Function<String, String>> toolRegistry) {
        this.chatClient = chatClient;
        this.toolRegistry = toolRegistry;
    }

    // ---------- Execution entry ----------
    public String execute(String workflowPath, Map<String, Object> initialContext) {
        WorkflowDefinition wf = loadWorkflow(workflowPath);
        Map<String, Object> context = new HashMap<>(initialContext);
        context.put("_nodes", new HashMap<String, Object>());
        String currentNode = wf.getStart();
        while (currentNode != null && !currentNode.equals("end")) {
            WorkflowDefinition.Node node = findNodeById(wf, currentNode);
            if (node == null) break;
            System.out.println("Executing node: " + node.getId());
            currentNode = executeNode(node, context, wf);
        }
        // Retrieve output of the last non‑end node
        String lastOutput = null;
        Map<String, Object> nodesOutput = (Map<String, Object>) context.get("_nodes");
        if (nodesOutput != null) {
            for (WorkflowDefinition.Edge edge : wf.getEdges()) {
                if ("end".equals(edge.getTo())) {
                    lastOutput = nodesOutput.getOrDefault(edge.getFrom(), "").toString();
                    break;
                }
            }
        }
        return lastOutput != null ? lastOutput : "No output generated";
    }

    // ---------- Node dispatch ----------
    private String executeNode(WorkflowDefinition.Node node, Map<String, Object> context, WorkflowDefinition wf) {
        switch (node.getType()) {
            case "start":
                return getNextNodeId(wf, node.getId());
            case "tool":
                return executeToolNode(node, context, wf);
            case "llm":
                return executeLlmNode(node, context, wf);
            case "while":
                return executeWhileNode(node, context, wf);
            case "parallel":
                return executeParallelNode(node, context, wf);
            default:
                return getNextNodeId(wf, node.getId());
        }
    }

    // ---------- Tool node ----------
    private String executeToolNode(WorkflowDefinition.Node node, Map<String, Object> context, WorkflowDefinition wf) {
        Function<String, String> tool = toolRegistry.get(node.getTool());
        if (tool == null) throw new RuntimeException("Tool not found: " + node.getTool());
        String resolvedInput = resolveExpression(node.getInput(), context);
        String result = tool.apply(resolvedInput);
        saveOutput(node.getId(), result, context);
        return getNextNodeId(wf, node.getId());
    }

    // ---------- LLM node ----------
    private String executeLlmNode(WorkflowDefinition.Node node, Map<String, Object> context, WorkflowDefinition wf) {
        String resolvedPrompt = resolveExpression(node.getPrompt(), context);
        String response = chatClient.prompt().user(resolvedPrompt).call().content();
        saveOutput(node.getId(), response, context);
        return getNextNodeId(wf, node.getId());
    }

    // ---------- While node ----------
    private String executeWhileNode(WorkflowDefinition.Node node, Map<String, Object> context, WorkflowDefinition wf) {
        int count = 0;
        while (count < node.getMaxIterations()) {
            String condition = resolveExpression(node.getCondition(), context);
            if (condition == null || !"true".equalsIgnoreCase(condition)) break;
            for (WorkflowDefinition.Node bodyNode : node.getBody()) {
                executeNode(bodyNode, context, wf);
            }
            count++;
        }
        return getNextNodeId(wf, node.getId());
    }

    // ---------- Parallel node ----------
    private String executeParallelNode(WorkflowDefinition.Node node, Map<String, Object> context, WorkflowDefinition wf) {
        List<CompletableFuture<Void>> futures = new ArrayList<>();
        for (WorkflowDefinition.ParallelBranch branch : node.getBranches()) {
            List<WorkflowDefinition.Node> branchNodes = branch.getNodes();
            CompletableFuture<Void> future = CompletableFuture.runAsync(() -> {
                Map<String, Object> branchContext = new HashMap<>(context);
                for (WorkflowDefinition.Node branchNode : branchNodes) {
                    executeNode(branchNode, branchContext, wf);
                }
                synchronized (context) {
                    context.putAll(branchContext);
                }
            });
            futures.add(future);
        }
        CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).join();
        return getNextNodeId(wf, node.getId());
    }

    // ---------- Helper methods ----------
    private WorkflowDefinition.Node findNodeById(WorkflowDefinition wf, String id) {
        return wf.getNodes().stream().filter(n -> n.getId().equals(id)).findFirst().orElse(null);
    }

    private String getNextNodeId(WorkflowDefinition wf, String currentId) {
        return wf.getEdges().stream()
                .filter(e -> e.getFrom().equals(currentId))
                .map(WorkflowDefinition.Edge::getTo)
                .findFirst().orElse("end");
    }

    private void saveOutput(String nodeId, String output, Map<String, Object> context) {
        ((Map<String, Object>) context.get("_nodes")).put(nodeId, output);
        context.put(nodeId + ".output", output);
    }

    private String resolveExpression(String expr, Map<String, Object> context) {
        if (expr == null) return null;
        // Simple placeholder replacement, e.g., {city}
        Pattern pattern = Pattern.compile("\\{([^}]+)\\}");
        Matcher matcher = pattern.matcher(expr);
        StringBuffer sb = new StringBuffer();
        while (matcher.find()) {
            String key = matcher.group(1);
            Object value = context.get(key);
            matcher.appendReplacement(sb, value == null ? "" : Matcher.quoteReplacement(value.toString()));
        }
        matcher.appendTail(sb);
        return sb.toString();
    }

    private WorkflowDefinition loadWorkflow(String path) {
        // Load YAML file from classpath or file system using SnakeYAML
        Yaml yaml = new Yaml();
        InputStream is = this.getClass().getClassLoader().getResourceAsStream(path);
        return yaml.loadAs(is, WorkflowDefinition.class);
    }
}

Execution and Control Flow

The engine loads the YAML, builds a WorkflowDefinition, and starts from the node identified by start. It follows edges sequentially, dispatching each node to the appropriate handler. While nodes evaluate a condition expression against the shared context and repeat their body up to maxIterations. Parallel nodes launch each branch in a separate CompletableFuture, wait for all to finish with CompletableFuture.allOf(...).join(), and then merge branch contexts back into the main map. The author notes that concurrent writes to the same key may cause overwrites, suggesting aggregation strategies for production use.

Design Considerations

Expressions use the {variable} syntax and are resolved at runtime by scanning the context map. Camel‑case field names align with Java POJOs, preventing SnakeYAML mapping failures. Parallel branches are wrapped in a ParallelBranch class because SnakeYAML cannot directly map a List<List<Node>>. Node types are represented as string enums to simplify future extensions (e.g., adding HTTP or switch nodes).

Workflow visualization
Workflow visualization
Original Source

Signed-in readers can open the original source through BestHub's protected redirect.

Sign in to view source
Republication Notice

This article has been distilled and summarized from source material, then republished for learning and reference. If you believe it infringes your rights, please contactadmin@besthub.devand we will review it promptly.

Spring AIparallel processingworkflow-engineLangGraphyaml-dsl
The Dominant Programmer
Written by

The Dominant Programmer

Resources and tutorials for programmers' advanced learning journey. Advanced tracks in Java, Python, and C#. Blog: https://blog.csdn.net/badao_liumang_qizhi

0 followers
Reader feedback

How this landed with the community

Sign in to like

Rate this article

Was this worth your time?

Sign in to rate
Discussion

0 Comments

Thoughtful readers leave field notes, pushback, and hard-won operational detail here.