Building Multi‑Agent Collaboration Systems: AutoGen, CrewAI, and a Custom Orchestration Framework
This article walks through the design, pitfalls, and best‑practice solutions for multi‑agent LLM systems, comparing AutoGen, CrewAI, and a self‑built orchestration stack, and provides concrete architecture diagrams, code samples, evaluation metrics, and a checklist for production deployment.
Introduction – Bug that caused endless loops
In Q4 of the previous year a research‑assistant agent was built to replace junior analysts for competitor research. The initial design used three agents (Researcher, Writer, Reviewer) orchestrated with AutoGen GroupChat. After three days of execution the system exhibited five critical failures:
Infinite loop : Reviewer kept requesting more data, leading to 47 rounds and $14 token cost.
Hallucination spread : Researcher cited a non‑existent paper, Writer trusted it, and Reviewer failed to catch the error.
No tool‑failure fallback : A timeout in the search API halted the entire workflow.
Context explosion : Each round appended the full history, reaching >8000 tokens by round 30.
No human‑in‑the‑loop : User requests for additional information were never handled.
The system was rewritten with an explicit state machine, tool registry, message bus, and cost‑circuit‑breaker.
When is a multi‑agent system required?
Single‑intent, single‑answer tasks (e.g., translation) – no multi‑agent needed.
Fixed multi‑step flows (e.g., SQL generation + execution) – chainable calls suffice.
Tasks requiring multiple roles, tool usage, and iterative refinement (e.g., research reports) – must use multi‑agent.
Human‑in‑the‑loop review – must use multi‑agent.
Parallelizable data aggregation – strongly recommended.
Failure‑fallback (e.g., search API down) – must use multi‑agent.
Single‑Agent vs Multi‑Agent – concrete example
Single‑Agent prompt (fails):
You are an AI researcher. Search the latest Qwen and Llama 3 evaluations, then compare their Chinese ability and output a Markdown report.Problems:
Model knowledge cutoff + only one search per call.
Report quality depends entirely on the model; no division of labor.
Cannot run searches in parallel.
Multi‑Agent solution :
Planner : Decompose into three sub‑tasks – search Qwen, search Llama 3, find benchmark list.
Researcher × 2 : Run the two searches in parallel.
Writer : Merge results and draft the comparison.
Reviewer : Fact‑check and request revisions if needed (max 2 rounds).
Result is markedly better, provided the orchestration is correctly designed.
Overall Architecture – Five‑Layer Orchestration Model
┌────────────────────────────────────────────────────────────────┐
│ ① Interface Layer (Web UI / API / Slack / CLI) │
│ • Starts tasks, shows progress, collects feedback, human‑in‑the‑loop │
└────────────────────────────────────────────────────────────────┘
↓ ↑ (bidirectional)
┌────────────────────────────────────────────────────────────────┐
│ ② Orchestration Layer (State machine / task queue) │
│ • Decides next actor, termination, human intervention │
└────────────────────────────────────────────────────────────────┘
↓ ↑
┌────────────────────────────────────────────────────────────────┐
│ ③ Message Bus (Redis Streams / Kafka / in‑memory queue) │
│ • Carries task messages, tool‑call messages, state events │
└────────────────────────────────────────────────────────────────┘
↓ ↑
┌────────────────────────────────────────────────────────────────┐
│ ④ Agent Pool (Researcher, Writer, Reviewer, …) │
│ • Each agent = LLM + system prompt + tools + memory │
└────────────────────────────────────────────────────────────────┘
↓ ↑
┌────────────────────────────────────────────────────────────────┐
│ ⑤ Tool & Data Layer (search API, DB, code exec, file system)│
│ • Registered in a ToolRegistry with timeout, retry, fallback│
└────────────────────────────────────────────────────────────────┘Data flow for a single "competitor research" task:
[User] "Research domestic LLM inference frameworks"
↓
[Orchestrator] create Task, state=initialized → running
↓
[Orchestrator] invoke Planner → produces sub‑tasks: search vLLM, search TGI, search TensorRT‑LLM
↓
[Orchestrator] push sub‑tasks to Message Bus (parallel)
↓
[Researcher #1] consumes task → calls search tool → produces vllm_research.md
[Researcher #2] consumes task → calls search tool → produces tgi_research.md
[Researcher #3] consumes task → calls search tool → produces trtllm_research.md
↓ (wait for all three)
[Orchestrator] trigger Writer → merges results → draft_v1.md
↓
[Orchestrator] trigger Reviewer → reviews draft, may request up to 2 revisions
↓
[Orchestrator] state=completed, return final outputCore Processes
4.1 Task Definition (Pydantic model)
from pydantic import BaseModel, Field
from typing import Literal
class ResearchTask(BaseModel):
task_id: str
user_query: str
task_type: Literal["competitor_research", "code_review", "data_analysis"]
subtasks: list["SubTask"] = Field(default_factory=list)
state: Literal["initialized", "running", "waiting_human", "completed", "failed"] = "initialized"
budget_usd: float = 1.0 # cost‑circuit‑breaker
spent_usd: float = 0.0
human_intervention_required: bool = False
artifacts: dict[str, str] = Field(default_factory=dict)
final_output: str | None = None
error: str | None = None
class SubTask(BaseModel):
sub_id: str
role: Literal["researcher", "writer", "reviewer"]
input: dict
output: str | None = None
state: Literal["pending", "running", "completed", "failed"] = "pending"
retries: int = 0
max_retries: int = 24.2 Orchestrator State Machine (transitions library)
from transitions import Machine
class OrchestratorStateMachine:
states = ["initialized", "planning", "researching", "writing", "reviewing", "waiting_human", "completed", "failed"]
def __init__(self, task):
self.task = task
self.machine = Machine(model=self, states=OrchestratorStateMachine.states, initial="initialized")
self.machine.add_transition("start_planning", "initialized", "planning")
self.machine.add_transition("plan_done", "planning", "researching")
self.machine.add_transition("research_done", "researching", "writing")
self.machine.add_transition("review_done", "reviewing", "completed")
self.machine.add_transition("needs_revision", "reviewing", "writing")
self.machine.add_transition("request_human", "*", "waiting_human")
self.machine.add_transition("human_responded", "waiting_human", "researching")
self.machine.add_transition("fail", "*", "failed")
self.machine.add_transition("budget_exceeded", "*", "failed")4.3 Agent Base Interface
from abc import ABC, abstractmethod
from typing import Callable
import time
class BaseAgent(ABC):
def __init__(self, name: str, role: str, llm, tools: list["Tool"], prompt: str):
self.name = name
self.role = role
self.llm = llm
self.tools = {t.name: t for t in tools}
self.system_prompt = prompt
@abstractmethod
def run(self, subtask, context) -> str:
"""Execute sub‑task and return artifact."""
raise NotImplementedError
def call_tool(self, tool_name: str, **kwargs) -> str:
tool = self.tools[tool_name]
return tool.run(**kwargs)
class ResearcherAgent(BaseAgent):
def run(self, subtask, context) -> str:
plan = self.llm.complete(
system=self.system_prompt,
user=f"Task: {subtask.input['query']}
Available tools: {list(self.tools.keys())}",
response_format={"search_query": str, "tool": str},
)
results = self.call_tool(plan["tool"], query=plan["search_query"])
return self.llm.complete(system="You are a researcher, organize the search results into structured notes", user=results)
class WriterAgent(BaseAgent):
def run(self, subtask, context) -> str:
research_results = "
".join(context["research_results"])
return self.llm.complete(
system=self.system_prompt,
user=f"Research material:
{research_results}
Write a report for: {context['user_query']}",
max_tokens=3000,
)
class ReviewerAgent(BaseAgent):
def run(self, subtask, context) -> str:
draft = context["draft"]
review = self.llm.complete(
system=self.system_prompt + "
Output JSON: {passed: bool, issues: [...]}",
user=draft,
response_format={"passed": bool, "issues": list},
)
if not review["passed"]:
context["revision_count"] = context.get("revision_count", 0) + 1
if context["revision_count"] >= 2:
raise MaxRevisionExceededError()
return review4.4 Tool Registry with Timeout, Retry, Fallback
import functools, time
from typing import Callable
class Tool:
def __init__(self, name: str, func: Callable, timeout_s: float = 30.0, max_retries: int = 2, fallback: Callable = None):
self.name = name
self.func = func
self.timeout_s = timeout_s
self.max_retries = max_retries
self.fallback = fallback
def run(self, **kwargs) -> str:
last_err = None
for attempt in range(self.max_retries + 1):
try:
with ThreadPoolExecutor(max_workers=1) as ex:
fut = ex.submit(self.func, **kwargs)
return fut.result(timeout=self.timeout_s)
except TimeoutError:
last_err = f"tool {self.name} timeout after {self.timeout_s}s"
logger.warning(f"[{self.name}] attempt {attempt+1} timeout")
except Exception as e:
last_err = f"tool {self.name} error: {e}"
logger.warning(f"[{self.name}] attempt {attempt+1} error: {e}")
time.sleep(2 ** attempt)
if self.fallback:
logger.warning(f"[{self.name}] falling back to fallback")
return self.fallback(**kwargs)
raise ToolExecutionError(last_err)
# Example search tool
def web_search(query: str, num_results: int = 5) -> str:
return serp_api.search(query, num_results=num_results)
def fallback_search(query: str, num_results: int = 5) -> str:
"""When the external API fails, use a local knowledge base as fallback."""
return knowledge_base.search(query, top_k=num_results)
search_tool = Tool(name="web_search", func=web_search, timeout_s=10.0, max_retries=2, fallback=fallback_search)4.5 Orchestrator Main Loop – Cost Circuit‑Breaker & Human‑in‑the‑Loop
class Orchestrator:
def __init__(self, agents: dict, tool_registry, message_bus):
self.agents = agents
self.tools = tool_registry
self.bus = message_bus
self.cost_tracker = CostTracker()
self.sm = OrchestratorStateMachine(task)
def run(self, task):
try:
# Planning
self.sm.start_planning()
subtasks = self.agents["planner"].run(SubTask(role="planner", input={"query": task.user_query}), context={})
task.subtasks = subtasks
self.sm.plan_done()
# Research (parallel)
self._run_parallel(task, role="researcher")
self._check_budget(task)
# Writing
self.sm.research_done()
draft = self.agents["writer"].run(SubTask(role="writer"), context={"research_results": [s.output for s in task.subtasks if s.role == "researcher"]})
task.artifacts["draft_v1"] = draft
# Reviewing (max 2 rounds)
for round_i in range(2):
self.sm.to_reviewing()
review = self.agents["reviewer"].run(SubTask(role="reviewer"), context={"draft": draft})
if review["passed"]:
task.final_output = draft
self.sm.review_done()
return
draft = self.agents["writer"].run(SubTask(role="writer"), context={"research_results": [s.output for s in task.subtasks], "feedback": review["issues"]})
task.final_output = draft
self.sm.review_done()
except MaxRevisionExceededError:
task.error = "Reviewer failed multiple times, using last draft"
self.sm.fail()
except BudgetExceededError as be:
task.error = str(be)
self.sm.fail()
except Exception as e:
task.error = str(e)
self.sm.fail()
finally:
logger.info(f"[task={task.task_id}] done, spent=${task.spent_usd:.2f}, rounds={self._count_rounds(task)}")
def _check_budget(self, task):
if task.spent_usd > task.budget_usd:
raise BudgetExceededError()
def request_human(self, task, question: str):
self.sm.request_human()
task.human_intervention_required = True
notifier.send(user_id=task.user_id, message=question)
response = wait_for_response(task.task_id, timeout=300)
self.sm.human_responded()
return responseAutoGen vs CrewAI vs Custom Orchestration – Selection Guidance
AutoGen – best for code‑generation pipelines that need strong tool integration. Key configuration points: max_round – hard limit to prevent loops. speaker_selection_method="round_robin" – more deterministic than "auto". max_consecutive_auto_reply – caps continuous replies from a single agent. human_input_mode="TERMINATE" or "ALWAYS" – enables user intervention.
CrewAI – suited for report‑type workflows with a fixed number of agents. Important settings: process=Process.sequential – deterministic execution. allow_delegation=False – prevents agents from endlessly delegating. max_iterations – must be set to bound execution. context=[prev_task] – explicitly declares task dependencies.
Custom orchestration – required when scaling to dozens of agents, needing strict cost control, complex state handling, or extensive human‑in‑the‑loop interaction. Provides full control over state machine, message bus, tool registry, and cost circuit‑breaker.
Post‑Launch Evaluation Metrics
Effectiveness
Task completion rate ≥ 85 % (source: orchestrator logs).
Average rounds ≤ 8 (source: orchestrator logs).
Human intervention rate ≤ 20 % (source: user behavior logs).
Final draft human score ≥ 4.0/5.0 (source: reviewer + business side).
Hallucination rate ≤ 5 % (source: spot checks + LLM‑as‑judge).
Cost
Average cost per task ≤ $0.50 (source: LLM API usage).
Monthly total cost stays within budget (source: PromptLayer / custom metrics).
Token utilization (output/input) ≥ 0.30 (source: same as above).
Cache hit rate ≥ 30 % (source: Redis).
Stability
Average task latency ≤ 60 s (source: orchestrator).
P95 latency ≤ 120 s (source: orchestrator).
Tool failure rate ≤ 2 % (source: ToolRegistry).
Loop/timeout rate (max_round trigger) ≤ 1 % (source: orchestrator).
Invalid state transitions = 0 (source: orchestrator).
Common Pitfalls & Solutions
Pitfall 1 – Infinite loop caused by AutoGen speaker selection
Symptom : 47 rounds, token burn.
Root cause : speaker_selection_method="auto" lets the LLM decide the next speaker, leading to polite hand‑offs that never terminate.
Fixes :
Set a hard limit with max_round or max_iterations.
Use speaker_selection_method="round_robin" or a custom selector.
Add an explicit state‑machine transition that forces failed after N rounds.
Pitfall 2 – Context explosion
Symptom : 8000+ tokens in a single call at round 30.
Root cause : Full history is always packed into the prompt.
Solution : Implement a sliding‑window + summarization strategy.
class ContextManager:
def __init__(self, max_recent_turns=5, summary_max_tokens=500):
self.max_recent = max_recent_turns
self.summary_tokens = summary_max_tokens
self.history = []
self.summary = ""
def add(self, role: str, content: str):
self.history.append({"role": role, "content": content})
if len(self.history) > self.max_recent * 2:
old = self.history[:len(self.history) - self.max_recent * 2]
self.summary = self._summarize(old + [{"summary": self.summary}])
self.history = self.history[-self.max_recent * 2:]
def get_context(self) -> str:
return f"Conversation summary:
{self.summary}
Recent dialogue:
" + "
".join(
f"{m['role']}: {m['content']}" for m in self.history)Pitfall 3 – Tool hallucination & no fallback
Symptom : Search tool returns nothing, agent fabricates data.
Fix : Ensure the tool returns an explicit empty result and force the agent to acknowledge missing information.
class Tool:
def run(self, **kwargs) -> str:
result = self._safe_call(**kwargs)
if not result or "no results" in result.lower():
return "[No search results] Please rephrase the query or admit lack of information."
return resultAdd to the agent prompt: "If the tool returns empty, state that you do not know; do not fabricate."
Pitfall 4 – Reviewer always passes
Root cause : LLM tends to be agreeable.
Solution : Use a strict system prompt that requires factual citations and JSON‑formatted fail‑fast output.
reviewer_system_prompt = """You are an editor. Review must obey:
1. Every fact must have a source citation; otherwise fail.
2. Numbers must be explicit, not vague.
3. Logical chain must be complete.
Output JSON: {\"passed\": bool, \"issues\": [{\"type\": \"...\", \"location\": \"...\"}]}
If any issue exists, set passed=false.
"""Pitfall 5 – Cost runaway
Symptoms : Monthly bill jumps from $500 to $8000.
Root causes : Repeated searches, repeated rewrites, repeated reviews, unbounded context growth.
Mitigations : Implement a cost circuit‑breaker.
class CostCircuitBreaker:
def __init__(self, budget_usd: float):
self.budget = budget_usd
self.spent = 0.0
def check(self, estimated_cost: float):
if self.spent + estimated_cost > self.budget:
raise BudgetExceededError(f"Budget of ${self.budget:.2f} would be exceeded (current ${self.spent:.2f}).")
def record(self, actual_cost: float):
self.spent += actual_cost
if self.spent > self.budget * 0.8:
logger.warning(f"⚠️ Cost reached {self.spent/self.budget:.0%} of budget")Optimization Directions
Move from serial agents to explicit parallelism using a message bus and worker pool.
Upgrade manual state machine to a visual state‑machine platform.
Replace human spot‑check with LLM‑as‑judge plus occasional manual verification.
Introduce prompt versioning and automated prompt optimization.
Implement model routing (small model for simple tasks) and eventually agent distillation.
Final takeaway: The difficulty of a multi‑agent system lies not in the LLM calls themselves but in the orchestration – a robust state machine, message bus, tool registry, and cost‑circuit‑breaker let you scale from three agents to dozens without the system collapsing.
Signed-in readers can open the original source through BestHub's protected redirect.
This article has been distilled and summarized from source material, then republished for learning and reference. If you believe it infringes your rights, please contactand we will review it promptly.
MaGe Linux Operations
Founded in 2009, MaGe Education is a top Chinese high‑end IT training brand. Its graduates earn 12K+ RMB salaries, and the school has trained tens of thousands of students. It offers high‑pay courses in Linux cloud operations, Python full‑stack, automation, data analysis, AI, and Go high‑concurrency architecture. Thanks to quality courses and a solid reputation, it has talent partnerships with numerous internet firms.
How this landed with the community
Was this worth your time?
0 Comments
Thoughtful readers leave field notes, pushback, and hard-won operational detail here.
