Building Multi‑Agent Collaboration Systems: AutoGen, CrewAI, and a Custom Orchestration Framework

This article walks through the design, pitfalls, and best‑practice solutions for multi‑agent LLM systems, comparing AutoGen, CrewAI, and a self‑built orchestration stack, and provides concrete architecture diagrams, code samples, evaluation metrics, and a checklist for production deployment.

MaGe Linux Operations
MaGe Linux Operations
MaGe Linux Operations
Building Multi‑Agent Collaboration Systems: AutoGen, CrewAI, and a Custom Orchestration Framework

Introduction – Bug that caused endless loops

In Q4 of the previous year a research‑assistant agent was built to replace junior analysts for competitor research. The initial design used three agents (Researcher, Writer, Reviewer) orchestrated with AutoGen GroupChat. After three days of execution the system exhibited five critical failures:

Infinite loop : Reviewer kept requesting more data, leading to 47 rounds and $14 token cost.

Hallucination spread : Researcher cited a non‑existent paper, Writer trusted it, and Reviewer failed to catch the error.

No tool‑failure fallback : A timeout in the search API halted the entire workflow.

Context explosion : Each round appended the full history, reaching >8000 tokens by round 30.

No human‑in‑the‑loop : User requests for additional information were never handled.

The system was rewritten with an explicit state machine, tool registry, message bus, and cost‑circuit‑breaker.

When is a multi‑agent system required?

Single‑intent, single‑answer tasks (e.g., translation) – no multi‑agent needed.

Fixed multi‑step flows (e.g., SQL generation + execution) – chainable calls suffice.

Tasks requiring multiple roles, tool usage, and iterative refinement (e.g., research reports) – must use multi‑agent.

Human‑in‑the‑loop review – must use multi‑agent.

Parallelizable data aggregation – strongly recommended.

Failure‑fallback (e.g., search API down) – must use multi‑agent.

Single‑Agent vs Multi‑Agent – concrete example

Single‑Agent prompt (fails):

You are an AI researcher. Search the latest Qwen and Llama 3 evaluations, then compare their Chinese ability and output a Markdown report.

Problems:

Model knowledge cutoff + only one search per call.

Report quality depends entirely on the model; no division of labor.

Cannot run searches in parallel.

Multi‑Agent solution :

Planner : Decompose into three sub‑tasks – search Qwen, search Llama 3, find benchmark list.

Researcher × 2 : Run the two searches in parallel.

Writer : Merge results and draft the comparison.

Reviewer : Fact‑check and request revisions if needed (max 2 rounds).

Result is markedly better, provided the orchestration is correctly designed.

Overall Architecture – Five‑Layer Orchestration Model

┌────────────────────────────────────────────────────────────────┐
│ ① Interface Layer (Web UI / API / Slack / CLI)               │
│    • Starts tasks, shows progress, collects feedback, human‑in‑the‑loop │
└────────────────────────────────────────────────────────────────┘
        ↓ ↑ (bidirectional)
┌────────────────────────────────────────────────────────────────┐
│ ② Orchestration Layer (State machine / task queue)          │
│    • Decides next actor, termination, human intervention    │
└────────────────────────────────────────────────────────────────┘
        ↓ ↑
┌────────────────────────────────────────────────────────────────┐
│ ③ Message Bus (Redis Streams / Kafka / in‑memory queue)      │
│    • Carries task messages, tool‑call messages, state events │
└────────────────────────────────────────────────────────────────┘
        ↓ ↑
┌────────────────────────────────────────────────────────────────┐
│ ④ Agent Pool (Researcher, Writer, Reviewer, …)            │
│    • Each agent = LLM + system prompt + tools + memory      │
└────────────────────────────────────────────────────────────────┘
        ↓ ↑
┌────────────────────────────────────────────────────────────────┐
│ ⑤ Tool & Data Layer (search API, DB, code exec, file system)│
│    • Registered in a ToolRegistry with timeout, retry, fallback│
└────────────────────────────────────────────────────────────────┘

Data flow for a single "competitor research" task:

[User] "Research domestic LLM inference frameworks"
  ↓
[Orchestrator] create Task, state=initialized → running
  ↓
[Orchestrator] invoke Planner → produces sub‑tasks: search vLLM, search TGI, search TensorRT‑LLM
  ↓
[Orchestrator] push sub‑tasks to Message Bus (parallel)
  ↓
[Researcher #1] consumes task → calls search tool → produces vllm_research.md
[Researcher #2] consumes task → calls search tool → produces tgi_research.md
[Researcher #3] consumes task → calls search tool → produces trtllm_research.md
  ↓ (wait for all three)
[Orchestrator] trigger Writer → merges results → draft_v1.md
  ↓
[Orchestrator] trigger Reviewer → reviews draft, may request up to 2 revisions
  ↓
[Orchestrator] state=completed, return final output

Core Processes

4.1 Task Definition (Pydantic model)

from pydantic import BaseModel, Field
from typing import Literal

class ResearchTask(BaseModel):
    task_id: str
    user_query: str
    task_type: Literal["competitor_research", "code_review", "data_analysis"]
    subtasks: list["SubTask"] = Field(default_factory=list)
    state: Literal["initialized", "running", "waiting_human", "completed", "failed"] = "initialized"
    budget_usd: float = 1.0  # cost‑circuit‑breaker
    spent_usd: float = 0.0
    human_intervention_required: bool = False
    artifacts: dict[str, str] = Field(default_factory=dict)
    final_output: str | None = None
    error: str | None = None

class SubTask(BaseModel):
    sub_id: str
    role: Literal["researcher", "writer", "reviewer"]
    input: dict
    output: str | None = None
    state: Literal["pending", "running", "completed", "failed"] = "pending"
    retries: int = 0
    max_retries: int = 2

4.2 Orchestrator State Machine (transitions library)

from transitions import Machine

class OrchestratorStateMachine:
    states = ["initialized", "planning", "researching", "writing", "reviewing", "waiting_human", "completed", "failed"]

    def __init__(self, task):
        self.task = task
        self.machine = Machine(model=self, states=OrchestratorStateMachine.states, initial="initialized")
        self.machine.add_transition("start_planning", "initialized", "planning")
        self.machine.add_transition("plan_done", "planning", "researching")
        self.machine.add_transition("research_done", "researching", "writing")
        self.machine.add_transition("review_done", "reviewing", "completed")
        self.machine.add_transition("needs_revision", "reviewing", "writing")
        self.machine.add_transition("request_human", "*", "waiting_human")
        self.machine.add_transition("human_responded", "waiting_human", "researching")
        self.machine.add_transition("fail", "*", "failed")
        self.machine.add_transition("budget_exceeded", "*", "failed")

4.3 Agent Base Interface

from abc import ABC, abstractmethod
from typing import Callable
import time

class BaseAgent(ABC):
    def __init__(self, name: str, role: str, llm, tools: list["Tool"], prompt: str):
        self.name = name
        self.role = role
        self.llm = llm
        self.tools = {t.name: t for t in tools}
        self.system_prompt = prompt

    @abstractmethod
    def run(self, subtask, context) -> str:
        """Execute sub‑task and return artifact."""
        raise NotImplementedError

    def call_tool(self, tool_name: str, **kwargs) -> str:
        tool = self.tools[tool_name]
        return tool.run(**kwargs)

class ResearcherAgent(BaseAgent):
    def run(self, subtask, context) -> str:
        plan = self.llm.complete(
            system=self.system_prompt,
            user=f"Task: {subtask.input['query']}
Available tools: {list(self.tools.keys())}",
            response_format={"search_query": str, "tool": str},
        )
        results = self.call_tool(plan["tool"], query=plan["search_query"])
        return self.llm.complete(system="You are a researcher, organize the search results into structured notes", user=results)

class WriterAgent(BaseAgent):
    def run(self, subtask, context) -> str:
        research_results = "

".join(context["research_results"])
        return self.llm.complete(
            system=self.system_prompt,
            user=f"Research material:
{research_results}

Write a report for: {context['user_query']}",
            max_tokens=3000,
        )

class ReviewerAgent(BaseAgent):
    def run(self, subtask, context) -> str:
        draft = context["draft"]
        review = self.llm.complete(
            system=self.system_prompt + "
Output JSON: {passed: bool, issues: [...]}",
            user=draft,
            response_format={"passed": bool, "issues": list},
        )
        if not review["passed"]:
            context["revision_count"] = context.get("revision_count", 0) + 1
            if context["revision_count"] >= 2:
                raise MaxRevisionExceededError()
        return review

4.4 Tool Registry with Timeout, Retry, Fallback

import functools, time
from typing import Callable

class Tool:
    def __init__(self, name: str, func: Callable, timeout_s: float = 30.0, max_retries: int = 2, fallback: Callable = None):
        self.name = name
        self.func = func
        self.timeout_s = timeout_s
        self.max_retries = max_retries
        self.fallback = fallback

    def run(self, **kwargs) -> str:
        last_err = None
        for attempt in range(self.max_retries + 1):
            try:
                with ThreadPoolExecutor(max_workers=1) as ex:
                    fut = ex.submit(self.func, **kwargs)
                    return fut.result(timeout=self.timeout_s)
            except TimeoutError:
                last_err = f"tool {self.name} timeout after {self.timeout_s}s"
                logger.warning(f"[{self.name}] attempt {attempt+1} timeout")
            except Exception as e:
                last_err = f"tool {self.name} error: {e}"
                logger.warning(f"[{self.name}] attempt {attempt+1} error: {e}")
                time.sleep(2 ** attempt)
        if self.fallback:
            logger.warning(f"[{self.name}] falling back to fallback")
            return self.fallback(**kwargs)
        raise ToolExecutionError(last_err)

# Example search tool
def web_search(query: str, num_results: int = 5) -> str:
    return serp_api.search(query, num_results=num_results)

def fallback_search(query: str, num_results: int = 5) -> str:
    """When the external API fails, use a local knowledge base as fallback."""
    return knowledge_base.search(query, top_k=num_results)

search_tool = Tool(name="web_search", func=web_search, timeout_s=10.0, max_retries=2, fallback=fallback_search)

4.5 Orchestrator Main Loop – Cost Circuit‑Breaker & Human‑in‑the‑Loop

class Orchestrator:
    def __init__(self, agents: dict, tool_registry, message_bus):
        self.agents = agents
        self.tools = tool_registry
        self.bus = message_bus
        self.cost_tracker = CostTracker()
        self.sm = OrchestratorStateMachine(task)

    def run(self, task):
        try:
            # Planning
            self.sm.start_planning()
            subtasks = self.agents["planner"].run(SubTask(role="planner", input={"query": task.user_query}), context={})
            task.subtasks = subtasks
            self.sm.plan_done()

            # Research (parallel)
            self._run_parallel(task, role="researcher")
            self._check_budget(task)

            # Writing
            self.sm.research_done()
            draft = self.agents["writer"].run(SubTask(role="writer"), context={"research_results": [s.output for s in task.subtasks if s.role == "researcher"]})
            task.artifacts["draft_v1"] = draft

            # Reviewing (max 2 rounds)
            for round_i in range(2):
                self.sm.to_reviewing()
                review = self.agents["reviewer"].run(SubTask(role="reviewer"), context={"draft": draft})
                if review["passed"]:
                    task.final_output = draft
                    self.sm.review_done()
                    return
                draft = self.agents["writer"].run(SubTask(role="writer"), context={"research_results": [s.output for s in task.subtasks], "feedback": review["issues"]})
            task.final_output = draft
            self.sm.review_done()
        except MaxRevisionExceededError:
            task.error = "Reviewer failed multiple times, using last draft"
            self.sm.fail()
        except BudgetExceededError as be:
            task.error = str(be)
            self.sm.fail()
        except Exception as e:
            task.error = str(e)
            self.sm.fail()
        finally:
            logger.info(f"[task={task.task_id}] done, spent=${task.spent_usd:.2f}, rounds={self._count_rounds(task)}")

    def _check_budget(self, task):
        if task.spent_usd > task.budget_usd:
            raise BudgetExceededError()

    def request_human(self, task, question: str):
        self.sm.request_human()
        task.human_intervention_required = True
        notifier.send(user_id=task.user_id, message=question)
        response = wait_for_response(task.task_id, timeout=300)
        self.sm.human_responded()
        return response

AutoGen vs CrewAI vs Custom Orchestration – Selection Guidance

AutoGen – best for code‑generation pipelines that need strong tool integration. Key configuration points: max_round – hard limit to prevent loops. speaker_selection_method="round_robin" – more deterministic than "auto". max_consecutive_auto_reply – caps continuous replies from a single agent. human_input_mode="TERMINATE" or "ALWAYS" – enables user intervention.

CrewAI – suited for report‑type workflows with a fixed number of agents. Important settings: process=Process.sequential – deterministic execution. allow_delegation=False – prevents agents from endlessly delegating. max_iterations – must be set to bound execution. context=[prev_task] – explicitly declares task dependencies.

Custom orchestration – required when scaling to dozens of agents, needing strict cost control, complex state handling, or extensive human‑in‑the‑loop interaction. Provides full control over state machine, message bus, tool registry, and cost circuit‑breaker.

Post‑Launch Evaluation Metrics

Effectiveness

Task completion rate ≥ 85 % (source: orchestrator logs).

Average rounds ≤ 8 (source: orchestrator logs).

Human intervention rate ≤ 20 % (source: user behavior logs).

Final draft human score ≥ 4.0/5.0 (source: reviewer + business side).

Hallucination rate ≤ 5 % (source: spot checks + LLM‑as‑judge).

Cost

Average cost per task ≤ $0.50 (source: LLM API usage).

Monthly total cost stays within budget (source: PromptLayer / custom metrics).

Token utilization (output/input) ≥ 0.30 (source: same as above).

Cache hit rate ≥ 30 % (source: Redis).

Stability

Average task latency ≤ 60 s (source: orchestrator).

P95 latency ≤ 120 s (source: orchestrator).

Tool failure rate ≤ 2 % (source: ToolRegistry).

Loop/timeout rate (max_round trigger) ≤ 1 % (source: orchestrator).

Invalid state transitions = 0 (source: orchestrator).

Common Pitfalls & Solutions

Pitfall 1 – Infinite loop caused by AutoGen speaker selection

Symptom : 47 rounds, token burn.

Root cause : speaker_selection_method="auto" lets the LLM decide the next speaker, leading to polite hand‑offs that never terminate.

Fixes :

Set a hard limit with max_round or max_iterations.

Use speaker_selection_method="round_robin" or a custom selector.

Add an explicit state‑machine transition that forces failed after N rounds.

Pitfall 2 – Context explosion

Symptom : 8000+ tokens in a single call at round 30.

Root cause : Full history is always packed into the prompt.

Solution : Implement a sliding‑window + summarization strategy.

class ContextManager:
    def __init__(self, max_recent_turns=5, summary_max_tokens=500):
        self.max_recent = max_recent_turns
        self.summary_tokens = summary_max_tokens
        self.history = []
        self.summary = ""

    def add(self, role: str, content: str):
        self.history.append({"role": role, "content": content})
        if len(self.history) > self.max_recent * 2:
            old = self.history[:len(self.history) - self.max_recent * 2]
            self.summary = self._summarize(old + [{"summary": self.summary}])
            self.history = self.history[-self.max_recent * 2:]

    def get_context(self) -> str:
        return f"Conversation summary:
{self.summary}

Recent dialogue:
" + "
".join(
            f"{m['role']}: {m['content']}" for m in self.history)

Pitfall 3 – Tool hallucination & no fallback

Symptom : Search tool returns nothing, agent fabricates data.

Fix : Ensure the tool returns an explicit empty result and force the agent to acknowledge missing information.

class Tool:
    def run(self, **kwargs) -> str:
        result = self._safe_call(**kwargs)
        if not result or "no results" in result.lower():
            return "[No search results] Please rephrase the query or admit lack of information."
        return result

Add to the agent prompt: "If the tool returns empty, state that you do not know; do not fabricate."

Pitfall 4 – Reviewer always passes

Root cause : LLM tends to be agreeable.

Solution : Use a strict system prompt that requires factual citations and JSON‑formatted fail‑fast output.

reviewer_system_prompt = """You are an editor. Review must obey:
1. Every fact must have a source citation; otherwise fail.
2. Numbers must be explicit, not vague.
3. Logical chain must be complete.
Output JSON: {\"passed\": bool, \"issues\": [{\"type\": \"...\", \"location\": \"...\"}]}
If any issue exists, set passed=false.
"""

Pitfall 5 – Cost runaway

Symptoms : Monthly bill jumps from $500 to $8000.

Root causes : Repeated searches, repeated rewrites, repeated reviews, unbounded context growth.

Mitigations : Implement a cost circuit‑breaker.

class CostCircuitBreaker:
    def __init__(self, budget_usd: float):
        self.budget = budget_usd
        self.spent = 0.0

    def check(self, estimated_cost: float):
        if self.spent + estimated_cost > self.budget:
            raise BudgetExceededError(f"Budget of ${self.budget:.2f} would be exceeded (current ${self.spent:.2f}).")

    def record(self, actual_cost: float):
        self.spent += actual_cost
        if self.spent > self.budget * 0.8:
            logger.warning(f"⚠️ Cost reached {self.spent/self.budget:.0%} of budget")

Optimization Directions

Move from serial agents to explicit parallelism using a message bus and worker pool.

Upgrade manual state machine to a visual state‑machine platform.

Replace human spot‑check with LLM‑as‑judge plus occasional manual verification.

Introduce prompt versioning and automated prompt optimization.

Implement model routing (small model for simple tasks) and eventually agent distillation.

Final takeaway: The difficulty of a multi‑agent system lies not in the LLM calls themselves but in the orchestration – a robust state machine, message bus, tool registry, and cost‑circuit‑breaker let you scale from three agents to dozens without the system collapsing.
Original Source

Signed-in readers can open the original source through BestHub's protected redirect.

Sign in to view source
Republication Notice

This article has been distilled and summarized from source material, then republished for learning and reference. If you believe it infringes your rights, please contactadmin@besthub.devand we will review it promptly.

LLMMulti-AgentCost ControlOrchestrationAutoGenCrewAI
MaGe Linux Operations
Written by

MaGe Linux Operations

Founded in 2009, MaGe Education is a top Chinese high‑end IT training brand. Its graduates earn 12K+ RMB salaries, and the school has trained tens of thousands of students. It offers high‑pay courses in Linux cloud operations, Python full‑stack, automation, data analysis, AI, and Go high‑concurrency architecture. Thanks to quality courses and a solid reputation, it has talent partnerships with numerous internet firms.

0 followers
Reader feedback

How this landed with the community

Sign in to like

Rate this article

Was this worth your time?

Sign in to rate
Discussion

0 Comments

Thoughtful readers leave field notes, pushback, and hard-won operational detail here.