Why Your AI Agent Crashes: 7 Hosting Patterns Compared

The article explains why AI agents fail when deployed with the wrong hosting model, presents a systematic comparison of seven patterns—Cron, Reactive, Daemon, Pipeline, Service, Adaptive, and Mesh—detailing their problem scope, typical scenarios, concrete Python or TypeScript implementations, when to choose each, and the trade‑offs, while warning against the common mistake of over‑engineering from the start.

AI Tech Publishing
AI Tech Publishing
AI Tech Publishing
Why Your AI Agent Crashes: 7 Hosting Patterns Compared

Introduction

Agents often crash because they run in the wrong environment. The author argues that the core issue is not the code but the hosting mode, and provides a systematic comparison of seven recurring patterns to help engineers pick the right one at the right time.

1. Cron (Scheduled Task)

Problem: Periodic checks that do not need real‑time response.

Scenario: Every morning at 08:00 the agent pulls the last six hours of alerts, aggregates a report, posts to Slack and writes to a database.

# scheduled_agent.py — 通过 cron 运行,例如 "0 */6 * * *"
import anthropic
import json
from datetime import datetime, timedelta

client = anthropic.Anthropic()

def check_recent_incidents():
    # 从监控系统拉取最近 6 小时内的告警
    incidents = fetch_incidents(since=datetime.now() - timedelta(hours=6))
    if not incidents:
        return
    response = client.messages.create(
        model="claude-sonnet-4-5-20250514",
        max_tokens=1024,
        messages=[{
            "role": "user",
            "content": f"总结这些事件,标记出需要跟进的问题:
{json.dumps(incidents)}"
        }]
    )
    summary = response.content[0].text
    post_to_slack("#ops-summary", summary)
    save_to_db(summary, incidents)

if __name__ == "__main__":
    check_recent_incidents()

When to use: Data aggregation, periodic monitoring, report generation, cleanup tasks—any situation that only requires "run every N minutes/hours".

Trade‑offs: The agent is stateless between runs and must infer what happened since the last execution; it cannot handle real‑time events.

2. Reactive (Event‑Driven)

Problem: Trigger only when something happens; no need for a long‑running process.

Scenario: A ticket arrives in a support system; the agent analyses the ticket, decides whether to create a Jira issue, and exits after processing.

# event_driven_agent.py — 由 SQS 消息触发
import json
import anthropic

client = anthropic.Anthropic()

TOOLS = [{
    "name": "create_jira_ticket",
    "description": "为工程团队创建 Jira 工单",
    "input_schema": {
        "type": "object",
        "properties": {
            "title": {"type": "string"},
            "description": {"type": "string"},
            "priority": {"type": "string", "enum": ["low", "medium", "high", "critical"]}
        },
        "required": ["title", "description", "priority"]
    }
}]

def handle_event(event):
    """处理传入的工单事件"""
    ticket = json.loads(event["body"])
    response = client.messages.create(
        model="claude-sonnet-4-5-20250514",
        max_tokens=1024,
        system="你是一个支持分诊 Agent。分析工单,在需要时为工程团队创建 Jira 问题。",
        tools=TOOLS,
        messages=[{
            "role": "user",
            "content": f"新支持工单:
主题:{ticket['subject']}
正文:{ticket['body']}
客户等级:{ticket['tier']}"
        }]
    )
    for block in response.content:
        if block.type == "tool_use" and block.name == "create_jira_ticket":
            create_jira_ticket(**block.input)
    # AWS Lambda 入口

def lambda_handler(event, context):
    for record in event["Records"]:
        handle_event(record)

When to use: Triage, PR review, alert enrichment, any workflow triggered by an external event.

Trade‑offs: Requires event infrastructure (queues, webhooks, event bus). Failure handling and dead‑letter queues become critical; Lambda’s 15‑minute timeout can be a bottleneck for complex agents.

3. Daemon (Long‑Running)

Problem: Need to maintain conversation state across multiple turns; process exit would lose state.

Scenario: An internal chatbot that answers employee questions about policies, codebase, or deployment processes, remembering prior dialogue and per‑user preferences.

# daemon_agent.py — 长运行进程,维护状态
import asyncio
import anthropic
from collections import defaultdict

client = anthropic.Anthropic()

class ConversationAgent:
    def __init__(self):
        self.conversations: dict[str, list] = defaultdict(list)
        self.user_preferences: dict[str, dict] = {}

    def chat(self, user_id: str, message: str) -> str:
        self.conversations[user_id].append({"role": "user", "content": message})
        system = "你是一个有帮助的助手,记得之前的对话。"
        if prefs := self.user_preferences.get(user_id):
            system += f"
用户偏好:{prefs}"
        response = client.messages.create(
            model="claude-sonnet-4-5-20250514",
            max_tokens=1024,
            system=system,
            messages=self.conversations[user_id][-20:]  # 滑动窗口
        )
        reply = response.content[0].text
        self.conversations[user_id].append({"role": "assistant", "content": reply})
        return reply

agent = ConversationAgent()
# 通过 HTTP、WebSocket 等暴露
from fastapi import FastAPI
app = FastAPI()

@app.post("/chat")
async def chat(user_id: str, message: str):
    return {"response": agent.chat(user_id, message)}

When to use: Chatbots, interactive assistants, agents that need fast response while preserving state, or streaming data‑flow agents.

Trade‑offs: State lives only in memory; a restart loses it. Horizontal scaling requires sticky sessions or external state stores. Constant resource consumption regardless of workload.

4. Pipeline (Workflow Orchestration)

Problem: Multi‑step operations where any step may fail and need to resume from the last checkpoint instead of starting over.

Scenario: Customer onboarding: analyse profile → search docs → generate personalised welcome email → send email and create ticket. Each step may fail (e.g., email service down) and must be retried without re‑analysing the profile.

// workflow-agent.ts — 带持久化 LLM 调用的 Temporal 工作流
import { proxyActivities } from '@temporalio/workflow';
import type * as activities from './activities';

const { callLLM, searchKnowledgeBase, sendEmail, createTicket } =
  proxyActivities<typeof activities>({
    startToCloseTimeout: '60 seconds',
    retry: { maximumAttempts: 3 },
  });

export async function customerOnboardingAgent(customer: Customer): Promise<OnboardingResult> {
  // 步骤 1:分析客户档案(可重试,有检查点)
  const analysis = await callLLM(`分析此客户并推荐一个入职路径:${JSON.stringify(customer)}`);

  // 步骤 2:搜索相关文档(可重试,有检查点)
  const docs = await searchKnowledgeBase(analysis.recommendedTopics);

  // 步骤 3:生成个性化欢迎(可重试,有检查点)
  const welcome = await callLLM(`使用这些文档创建个性化入职邮件:${JSON.stringify(docs)}`);

  // 步骤 4:发送并记录(可重试,有检查点)
  await sendEmail(customer.email, welcome);
  await createTicket({ type: 'onboarding', customerId: customer.id, status: 'started' });

  return { customerId: customer.id, path: analysis.recommendedPath };
}

When to use: High‑cost failure multi‑step Agent workflows, long‑running operations that take minutes to hours, any scenario that needs auditability and observability.

Trade‑offs: Requires Temporal or similar infrastructure; checkpoints add latency and increase code size, but provide built‑in retries, observability, and recoverability.

5. Service (API Mode)

Problem: Expose the agent through an existing HTTP request/response stack so clients can call it via REST or streaming.

Scenario: Customer‑facing support API; a web or mobile client POSTs a user question and receives a streamed answer. Each request is independent; state is read from an external database.

# agent_service.py — Agent 暴露为流式 API
from fastapi import FastAPI
from fastapi.responses import StreamingResponse
import anthropic

app = FastAPI()
client = anthropic.Anthropic()

TOOLS = [{
    "name": "lookup_order",
    "description": "根据订单 ID 查询订单详情",
    "input_schema": {
        "type": "object",
        "properties": {"order_id": {"type": "string"}},
        "required": ["order_id"]
    }
}]

def run_agent(query: str, user_id: str):
    """运行 Agent 循环,逐步产出文本块"""
    messages = [{"role": "user", "content": query}]
    context = load_user_context(user_id)  # 从 DB 加载,非内存
    while True:
        response = client.messages.create(
            model="claude-sonnet-4-5-20250514",
            max_tokens=1024,
            system=f"你是一个客服 Agent.
上下文:{context}",
            tools=TOOLS,
            messages=messages,
        )
        if response.stop_reason == "end_turn":
            for block in response.content:
                if hasattr(block, "text"):
                    yield block.text
            break
        for block in response.content:
            if block.type == "tool_use":
                result = execute_tool(block.name, block.input)
                messages.append({"role": "assistant", "content": response.content})
                messages.append({"role": "user", "content": [{"type": "tool_result", "tool_use_id": block.id, "content": str(result)}]})

@app.post("/agent")
async def agent_endpoint(query: str, user_id: str):
    return StreamingResponse(run_agent(query, user_id), media_type="text/plain")

When to use: Customer‑facing agents, internal tools, any scenario that needs to plug into an existing service mesh.

Trade‑offs: Subject to HTTP timeout limits (30‑60 s). No built‑in persistence—if the process dies mid‑request the work is lost. Long‑running tasks often require a background queue behind the API.

6. Adaptive (Self‑Scheduling)

Problem: Fixed intervals waste resources or miss events; the agent should adjust its frequency based on the environment.

Scenario: Monitoring system that checks every hour under normal conditions but every five minutes during an anomaly.

# self_scheduling_agent.py — 自主决定下次运行时间
import anthropic
import json

client = anthropic.Anthropic()

TOOLS = [{
    "name": "schedule_next_run",
    "description": "安排此 Agent 下次运行时间",
    "input_schema": {
        "type": "object",
        "properties": {
            "delay_minutes": {"type": "integer", "description": "距离下次运行的分钟数"},
            "reason": {"type": "string", "description": "选择此间隔的原因"}
        },
        "required": ["delay_minutes", "reason"]
    }
}]

def run_monitoring_cycle():
    metrics = fetch_current_metrics()
    previous = load_previous_analysis()
    response = client.messages.create(
        model="claude-sonnet-4-5-20250514",
        max_tokens=1024,
        system="""你是一个监控 Agent。分析指标,报告异常,并安排下次检查。
异常时用更短的间隔,稳定时用更长的间隔。""",
        tools=TOOLS,
        messages=[{"role": "user", "content": f"当前指标:
{json.dumps(metrics)}

之前的分析:
{previous}"}]
    )
    next_delay = 60  # 默认:1 小时
    for block in response.content:
        if hasattr(block, "text"):
            save_analysis(block.text)
        if block.type == "tool_use" and block.name == "schedule_next_run":
            next_delay = block.input["delay_minutes"]
            log(f"{next_delay}分钟后下次运行:{block.input['reason']}")
    schedule_job("run_monitoring_cycle", delay_minutes=next_delay)

if __name__ == "__main__":
    run_monitoring_cycle()

When to use: Monitoring where check frequency should adapt, research agents with variable data‑source rates, any case where a fixed interval is either wasteful or too sparse.

Trade‑offs: Requires a scheduler that supports dynamic delays (Celery, Cloud Tasks, SQS delay queues). Aggressive scheduling can waste tokens; overly conservative scheduling can miss events. Guardrails on min/max intervals are essential.

7. Mesh (Multi‑Agent Grid)

Problem: Different domains need to cooperate, each with its own lifecycle and tooling.

Scenario: Release approval: a Security agent reviews risk, a Compliance agent checks regulations, and a Release Coordinator aggregates decisions before final approval.

# multi_agent_mesh.py — Agent 通过事件总线通信
import anthropic
import json

client = anthropic.Anthropic()

class SecurityAgent:
    def handle_event(self, event: dict):
        if event["type"] != "release.proposed":
            return
        response = client.messages.create(
            model="claude-sonnet-4-5-20250514",
            max_tokens=512,
            system="你是一个安全审查 Agent。分析变更的安全风险。",
            messages=[{"role": "user", "content": json.dumps(event["payload"])}]
        )
        publish_event({
            "type": "review.security",
            "release_id": event["payload"]["release_id"],
            "decision": parse_decision(response.content[0].text),
            "analysis": response.content[0].text
        })

class ComplianceAgent:
    def handle_event(self, event: dict):
        if event["type"] != "release.proposed":
            return
        response = client.messages.create(
            model="claude-sonnet-4-5-20250514",
            max_tokens=512,
            system="你是一个合规 Agent。检查变更是否符合监管要求。",
            messages=[{"role": "user", "content": json.dumps(event["payload"])}]
        )
        publish_event({
            "type": "review.compliance",
            "release_id": event["payload"]["release_id"],
            "decision": parse_decision(response.content[0].text),
            "analysis": response.content[0].text
        })

class ReleaseCoordinator:
    """收集审查意见并做最终决定"""
    def __init__(self):
        self.reviews: dict[str, list] = {}

    def handle_event(self, event: dict):
        if not event["type"].startswith("review."):
            return
        release_id = event["release_id"]
        self.reviews.setdefault(release_id, []).append(event)
        if len(self.reviews[release_id]) >= 2:  # 所有审查者都已报告
            all_approved = all(r["decision"] == "approved" for r in self.reviews[release_id])
            publish_event({
                "type": "release.approved" if all_approved else "release.blocked",
                "release_id": release_id,
                "reviews": self.reviews.pop(release_id)
            })

When to use: Situations that truly require distinct domains to collaborate, where each agent may need different models or toolsets and independent scaling.

Trade‑offs: Operational complexity rises sharply; failure modes multiply. Reported failure rate for multi‑agent systems in production is 41‑86 % (mostly coordination crashes). Start with a single agent and split only when a clear need emerges.

Common Mistake

Most teams pick the most complex hosting mode from the first week because they anticipate future needs. This leads to systems that never run, and the promised future never arrives. The recommended evolution is to start with a simple Cron job, move to Reactive only when the periodic approach is insufficient, and only then consider Workflow orchestration or Mesh architectures. Each step should be driven by a concrete pain point, not imagined complexity.

AI agentscronevent-drivenworkflow orchestrationadaptive schedulinghosting patternsmulti-agent mesh
AI Tech Publishing
Written by

AI Tech Publishing

In the fast-evolving AI era, we thoroughly explain stable technical foundations.

0 followers
Reader feedback

How this landed with the community

Sign in to like

Rate this article

Was this worth your time?

Sign in to rate
Discussion

0 Comments

Thoughtful readers leave field notes, pushback, and hard-won operational detail here.