Production‑Ready AI Agent Architecture: High Availability, Asynchrony, Caching, Cost & Security

After mastering core AI Agent capabilities, this article shows how to transform a prototype into a production‑grade service by covering a full architecture overview, stateless design, health‑check and graceful shutdown, asynchronous task queues, multi‑level caching, token‑cost optimization, model fallback, input/output filtering, rate limiting, monitoring, and deployment recommendations for different scales.

Coder Trainee
Coder Trainee
Coder Trainee
Production‑Ready AI Agent Architecture: High Availability, Asynchrony, Caching, Cost & Security

Production‑Level Agent Architecture Overview

Client → API Gateway → Load Balancer → Agent service cluster (multiple instances) → Redis cache, MQ, DB → LLM service with multi‑model fallback.

High‑Availability Design

Stateless Design

class StatelessAgent:
    """Stateless Agent – state stored in Redis"""
    def __init__(self, redis_client):
        self.redis = redis_client
        self.llm = ChatOpenAI()
    async def chat(self, session_id: str, message: str) -> str:
        # Load history from Redis
        history = self.redis.get(f"session:{session_id}")
        # Call LLM
        response = await self.llm.acall(message, history)
        # Save updated history
        self.redis.setex(f"session:{session_id}", 3600, response)
        return response

Health Checks & Graceful Shutdown

from fastapi import FastAPI
from contextlib import asynccontextmanager
app = FastAPI()

@app.get("/health/liveness")
async def liveness():
    return {"status": "alive"}

@app.get("/health/readiness")
async def readiness():
    checks = {"llm": await check_llm_health(), "redis": await check_redis_health()}
    if all(checks.values()):
        return {"status": "ready", "checks": checks}
    return {"status": "not_ready", "checks": checks}, 503

@asynccontextmanager
async def lifespan(app: FastAPI):
    await init_agent()  # start‑up init
    yield
    await shutdown_agent()  # graceful shutdown

Asynchronous Processing & Queues

Synchronous vs Asynchronous

Synchronous : simple, real‑time; drawbacks – blocking, prone to timeout; suitable for simple Q&A.

Asynchronous : high concurrency, non‑blocking; drawbacks – more complex, requires polling; suitable for long tasks or batch processing.

Asynchronous Task Queue (Celery + Redis)

from celery import Celery
app = Celery('agent_tasks', broker='redis://localhost:6379/0')

@app.task(bind=True, max_retries=3, soft_time_limit=60)
def process_agent_task(self, task_id: str, user_input: str):
    try:
        agent = Agent()
        result = agent.invoke(user_input)
        redis_client.setex(f"task_result:{task_id}", 3600, result)
        return {"status": "completed", "result": result}
    except Exception as e:
        self.retry(exc=e, countdown=60)
        return {"status": "failed", "error": str(e)}

# FastAPI endpoint
@app.post("/agent/async")
async def agent_async(request: Request, background_tasks: BackgroundTasks):
    task_id = str(uuid.uuid4())
    background_tasks.add_task(process_agent_task, task_id, request.message)
    return {"task_id": task_id, "status": "pending"}

@app.get("/agent/result/{task_id}")
async def get_result(task_id: str):
    result = redis_client.get(f"task_result:{task_id}")
    if result:
        return {"status": "completed", "result": result}
    return {"status": "pending"}

Caching Strategies

Multi‑Level Cache

class MultiLevelCache:
    """L1: in‑process dict, L2: Redis, fallback to LLM"""
    def __init__(self):
        self.redis_client = redis.Redis(decode_responses=True)
        self.local_cache = {}
    def _get_key(self, query: str) -> str:
        return f"agent:cache:{hashlib.md5(query.encode()).hexdigest()}"
    def get(self, query: str) -> Optional[str]:
        if query in self.local_cache:
            return self.local_cache[query]
        key = self._get_key(query)
        cached = self.redis_client.get(key)
        if cached:
            self.local_cache[query] = cached
            return cached
        return None
    def set(self, query: str, response: str, ttl: int = 3600):
        key = self._get_key(query)
        self.local_cache[query] = response
        self.redis_client.setex(key, ttl, response)

class SemanticCache:
    """Vector‑based semantic cache"""
    def __init__(self, threshold: float = 0.95):
        self.embeddings = OpenAIEmbeddings()
        self.vectorstore = Chroma()
        self.threshold = threshold
    def get(self, query: str) -> Optional[str]:
        query_emb = self.embeddings.embed_query(query)
        results = self.vectorstore.similarity_search_by_vector(query_emb, k=1)
        if results and results[0].score > self.threshold:
            return results[0].metadata["response"]
        return None
    def set(self, query: str, response: str):
        emb = self.embeddings.embed_query(query)
        self.vectorstore.add_texts([query], metadatas=[{"response": response, "embedding": emb}])

Cache Policy Selection

Exact match – FAQ / fixed questions – TTL 24 h.

Semantic match – similar questions – TTL 12 h.

Result cache – identical parameters – TTL 1 h.

Warm‑up cache – hot questions – TTL permanent.

Cost Optimization

Token Optimization

class CostOptimizer:
    """Control token consumption"""
    def __init__(self, max_tokens_per_request=2000):
        self.max_tokens_per_request = max_tokens_per_request
    def truncate_context(self, context: str, max_chars: int = 2000) -> str:
        return context if len(context) <= max_chars else context[:max_chars] + "...(已截断)"
    def smart_prompt(self, user_input: str, history: list) -> str:
        recent = history[-3:] if history else []
        compressed = user_input[:500]
        return self._build_prompt(compressed, recent)
    def estimate_cost(self, prompt_tokens: int, completion_tokens: int, model: str = "gpt-4") -> float:
        rates = {"gpt-4": {"prompt": 0.03, "completion": 0.06},
                 "gpt-3.5-turbo": {"prompt": 0.001, "completion": 0.002}}
        rate = rates.get(model, rates["gpt-4"])
        return (prompt_tokens/1000)*rate["prompt"] + (completion_tokens/1000)*rate["completion"]

Model Fallback Strategy

class ModelRouter:
    """Multi‑model routing with fallback"""
    def __init__(self):
        self.models = {"primary": "gpt-4", "secondary": "gpt-3.5-turbo", "fallback": "claude-instant"}
        self.costs = {"gpt-4": 1.0, "gpt-3.5-turbo": 0.1, "claude-instant": 0.05}
    async def route(self, user_input: str, context: dict) -> str:
        if len(user_input) < 50 and not self._needs_reasoning(user_input):
            model = self.models["secondary"]
        elif self._needs_reasoning(user_input):
            model = self.models["primary"]
        else:
            model = self.models["secondary"]
        return await self._call_model(model, user_input, context)
    async def fallback(self, user_input: str, context: dict) -> str:
        for model in [self.models["secondary"], self.models["fallback"]]:
            try:
                return await self._call_model(model, user_input, context)
            except Exception:
                continue
        return "服务繁忙,请稍后重试"

Security Protection

Input/Output Filtering

class ContentFilter:
    SENSITIVE_WORDS = ["暴力", "色情", "政治敏感词"]
    INJECTION_PATTERNS = [r"ignore previous instructions", r"system\s*:", r"你是一个.*助手"]
    def filter_input(self, user_input: str) -> str:
        for p in self.INJECTION_PATTERNS:
            user_input = re.sub(p, "", user_input, flags=re.IGNORECASE)
        for w in self.SENSITIVE_WORDS:
            if w in user_input:
                raise ValueError(f"输入包含敏感词: {w}")
        return user_input.strip()[:2000]
    def filter_output(self, output: str) -> str:
        output = self._redact_pii(output)
        output = self._check_sensitive(output)
        return output
    def _redact_pii(self, text: str) -> str:
        text = re.sub(r'1[3-9]\d{9}', '138****0000', text)
        text = re.sub(r'\b\w+@\w+\.\w+\b', '[email protected]', text)
        return text

Rate Limiting & Quotas

class TokenBucket:
    def __init__(self, capacity: int, refill_rate: float):
        self.capacity = capacity
        self.refill_rate = refill_rate
        self.tokens = capacity
        self.last_refill = time.time()
    def consume(self, tokens: int = 1) -> bool:
        now = time.time()
        elapsed = now - self.last_refill
        self.tokens = min(self.capacity, self.tokens + elapsed * self.refill_rate)
        self.last_refill = now
        if self.tokens >= tokens:
            self.tokens -= tokens
            return True
        return False

class RateLimiter:
    def __init__(self):
        self.limiters: Dict[str, TokenBucket] = {}
    def check_limit(self, user_id: str, api_key: str) -> bool:
        user_key = f"user:{user_id}"
        if user_key not in self.limiters:
            self.limiters[user_key] = TokenBucket(100, 100/60)  # 100 req/min
        api_key_key = f"api:{api_key}"
        if api_key_key not in self.limiters:
            self.limiters[api_key_key] = TokenBucket(1000, 1000/60)  # 1000 req/min
        return self.limiters[user_key].consume() and self.limiters[api_key_key].consume()

Monitoring & Alerting

from dataclasses import dataclass
import logging

@dataclass
class Alert:
    name: str
    level: str  # info, warning, critical
    message: str
    value: float
    threshold: float

class AgentMonitor:
    def __init__(self):
        self.logger = logging.getLogger(__name__)
        self.metrics = {"requests_total": 0, "requests_success": 0,
                        "requests_failed": 0, "avg_latency_ms": 0,
                        "token_usage": 0}
    def record_request(self, success: bool, latency_ms: float, tokens: int):
        self.metrics["requests_total"] += 1
        if success:
            self.metrics["requests_success"] += 1
        else:
            self.metrics["requests_failed"] += 1
        self.metrics["avg_latency_ms"] = (self.metrics["avg_latency_ms"]*0.9 + latency_ms*0.1)
        self.metrics["token_usage"] += tokens
        self._check_alerts()
    def _check_alerts(self):
        alerts = []
        error_rate = self.metrics["requests_failed"] / max(1, self.metrics["requests_total"])
        if error_rate > 0.05:
            alerts.append(Alert("high_error_rate", "critical", "错误率过高", error_rate, 0.05))
        if self.metrics["avg_latency_ms"] > 5000:
            alerts.append(Alert("high_latency", "warning", "响应延迟过高", self.metrics["avg_latency_ms"], 5000))
        for a in alerts:
            self._send_alert(a)
    def _send_alert(self, alert: Alert):
        self.logger.warning(f"[{alert.level}] {alert.name}: {alert.message}")
        # Integration points for DingTalk, WeChat Work, Email can be added

Cost Estimation Formula

def estimate_monthly_cost(daily_requests: int, avg_tokens_per_request: int,
                         model: str = "gpt-4", cache_hit_rate: float = 0.3) -> dict:
    rates = {"gpt-4": {"prompt": 0.03, "completion": 0.06},
             "gpt-4-turbo": {"prompt": 0.01, "completion": 0.03},
             "gpt-3.5-turbo": {"prompt": 0.001, "completion": 0.002}}
    monthly_requests = daily_requests * 30
    cached = monthly_requests * cache_hit_rate
    llm_calls = monthly_requests - cached
    # Assume prompt:completion = 2:1
    prompt_tokens = avg_tokens_per_request * 2 / 3
    completion_tokens = avg_tokens_per_request * 1 / 3
    rate = rates.get(model, rates["gpt-3.5-turbo"])
    cost_per_req = (prompt_tokens/1000)*rate["prompt"] + (completion_tokens/1000)*rate["completion"]
    total = llm_calls * cost_per_req
    return {"model": model,
            "monthly_requests": monthly_requests,
            "cache_hit_rate": cache_hit_rate,
            "llm_calls": llm_calls,
            "cost_per_request_usd": round(cost_per_req, 4),
            "estimated_monthly_cost_usd": round(total, 2)}
# Example: estimate_monthly_cost(10000, 500, "gpt-3.5-turbo", 0.3) → ~12 USD

Deployment Recommendations

Personal project (concurrency < 10): single‑node deployment with in‑process cache.

Small team (10‑100 concurrent users): multiple replicas, Redis cache, basic monitoring.

Enterprise (100‑1000): clustered agents, message queue, multi‑model routing, full‑stack monitoring.

Large scale (> 1000): Kubernetes with auto‑scaling, multi‑region deployment.

Original Source

Signed-in readers can open the original source through BestHub's protected redirect.

Sign in to view source
Republication Notice

This article has been distilled and summarized from source material, then republished for learning and reference. If you believe it infringes your rights, please contactadmin@besthub.devand we will review it promptly.

High AvailabilityCachingCost OptimizationSecurityAI AgentAsynchronous ProcessingProduction Architecture
Coder Trainee
Written by

Coder Trainee

Experienced in Java and Python, we share and learn together. For submissions or collaborations, DM us.

0 followers
Reader feedback

How this landed with the community

Sign in to like

Rate this article

Was this worth your time?

Sign in to rate
Discussion

0 Comments

Thoughtful readers leave field notes, pushback, and hard-won operational detail here.