Production‑Ready AI Agent Architecture: High Availability, Asynchrony, Caching, Cost & Security
After mastering core AI Agent capabilities, this article shows how to transform a prototype into a production‑grade service by covering a full architecture overview, stateless design, health‑check and graceful shutdown, asynchronous task queues, multi‑level caching, token‑cost optimization, model fallback, input/output filtering, rate limiting, monitoring, and deployment recommendations for different scales.
Production‑Level Agent Architecture Overview
Client → API Gateway → Load Balancer → Agent service cluster (multiple instances) → Redis cache, MQ, DB → LLM service with multi‑model fallback.
High‑Availability Design
Stateless Design
class StatelessAgent:
"""Stateless Agent – state stored in Redis"""
def __init__(self, redis_client):
self.redis = redis_client
self.llm = ChatOpenAI()
async def chat(self, session_id: str, message: str) -> str:
# Load history from Redis
history = self.redis.get(f"session:{session_id}")
# Call LLM
response = await self.llm.acall(message, history)
# Save updated history
self.redis.setex(f"session:{session_id}", 3600, response)
return responseHealth Checks & Graceful Shutdown
from fastapi import FastAPI
from contextlib import asynccontextmanager
app = FastAPI()
@app.get("/health/liveness")
async def liveness():
return {"status": "alive"}
@app.get("/health/readiness")
async def readiness():
checks = {"llm": await check_llm_health(), "redis": await check_redis_health()}
if all(checks.values()):
return {"status": "ready", "checks": checks}
return {"status": "not_ready", "checks": checks}, 503
@asynccontextmanager
async def lifespan(app: FastAPI):
await init_agent() # start‑up init
yield
await shutdown_agent() # graceful shutdownAsynchronous Processing & Queues
Synchronous vs Asynchronous
Synchronous : simple, real‑time; drawbacks – blocking, prone to timeout; suitable for simple Q&A.
Asynchronous : high concurrency, non‑blocking; drawbacks – more complex, requires polling; suitable for long tasks or batch processing.
Asynchronous Task Queue (Celery + Redis)
from celery import Celery
app = Celery('agent_tasks', broker='redis://localhost:6379/0')
@app.task(bind=True, max_retries=3, soft_time_limit=60)
def process_agent_task(self, task_id: str, user_input: str):
try:
agent = Agent()
result = agent.invoke(user_input)
redis_client.setex(f"task_result:{task_id}", 3600, result)
return {"status": "completed", "result": result}
except Exception as e:
self.retry(exc=e, countdown=60)
return {"status": "failed", "error": str(e)}
# FastAPI endpoint
@app.post("/agent/async")
async def agent_async(request: Request, background_tasks: BackgroundTasks):
task_id = str(uuid.uuid4())
background_tasks.add_task(process_agent_task, task_id, request.message)
return {"task_id": task_id, "status": "pending"}
@app.get("/agent/result/{task_id}")
async def get_result(task_id: str):
result = redis_client.get(f"task_result:{task_id}")
if result:
return {"status": "completed", "result": result}
return {"status": "pending"}Caching Strategies
Multi‑Level Cache
class MultiLevelCache:
"""L1: in‑process dict, L2: Redis, fallback to LLM"""
def __init__(self):
self.redis_client = redis.Redis(decode_responses=True)
self.local_cache = {}
def _get_key(self, query: str) -> str:
return f"agent:cache:{hashlib.md5(query.encode()).hexdigest()}"
def get(self, query: str) -> Optional[str]:
if query in self.local_cache:
return self.local_cache[query]
key = self._get_key(query)
cached = self.redis_client.get(key)
if cached:
self.local_cache[query] = cached
return cached
return None
def set(self, query: str, response: str, ttl: int = 3600):
key = self._get_key(query)
self.local_cache[query] = response
self.redis_client.setex(key, ttl, response)
class SemanticCache:
"""Vector‑based semantic cache"""
def __init__(self, threshold: float = 0.95):
self.embeddings = OpenAIEmbeddings()
self.vectorstore = Chroma()
self.threshold = threshold
def get(self, query: str) -> Optional[str]:
query_emb = self.embeddings.embed_query(query)
results = self.vectorstore.similarity_search_by_vector(query_emb, k=1)
if results and results[0].score > self.threshold:
return results[0].metadata["response"]
return None
def set(self, query: str, response: str):
emb = self.embeddings.embed_query(query)
self.vectorstore.add_texts([query], metadatas=[{"response": response, "embedding": emb}])Cache Policy Selection
Exact match – FAQ / fixed questions – TTL 24 h.
Semantic match – similar questions – TTL 12 h.
Result cache – identical parameters – TTL 1 h.
Warm‑up cache – hot questions – TTL permanent.
Cost Optimization
Token Optimization
class CostOptimizer:
"""Control token consumption"""
def __init__(self, max_tokens_per_request=2000):
self.max_tokens_per_request = max_tokens_per_request
def truncate_context(self, context: str, max_chars: int = 2000) -> str:
return context if len(context) <= max_chars else context[:max_chars] + "...(已截断)"
def smart_prompt(self, user_input: str, history: list) -> str:
recent = history[-3:] if history else []
compressed = user_input[:500]
return self._build_prompt(compressed, recent)
def estimate_cost(self, prompt_tokens: int, completion_tokens: int, model: str = "gpt-4") -> float:
rates = {"gpt-4": {"prompt": 0.03, "completion": 0.06},
"gpt-3.5-turbo": {"prompt": 0.001, "completion": 0.002}}
rate = rates.get(model, rates["gpt-4"])
return (prompt_tokens/1000)*rate["prompt"] + (completion_tokens/1000)*rate["completion"]Model Fallback Strategy
class ModelRouter:
"""Multi‑model routing with fallback"""
def __init__(self):
self.models = {"primary": "gpt-4", "secondary": "gpt-3.5-turbo", "fallback": "claude-instant"}
self.costs = {"gpt-4": 1.0, "gpt-3.5-turbo": 0.1, "claude-instant": 0.05}
async def route(self, user_input: str, context: dict) -> str:
if len(user_input) < 50 and not self._needs_reasoning(user_input):
model = self.models["secondary"]
elif self._needs_reasoning(user_input):
model = self.models["primary"]
else:
model = self.models["secondary"]
return await self._call_model(model, user_input, context)
async def fallback(self, user_input: str, context: dict) -> str:
for model in [self.models["secondary"], self.models["fallback"]]:
try:
return await self._call_model(model, user_input, context)
except Exception:
continue
return "服务繁忙,请稍后重试"Security Protection
Input/Output Filtering
class ContentFilter:
SENSITIVE_WORDS = ["暴力", "色情", "政治敏感词"]
INJECTION_PATTERNS = [r"ignore previous instructions", r"system\s*:", r"你是一个.*助手"]
def filter_input(self, user_input: str) -> str:
for p in self.INJECTION_PATTERNS:
user_input = re.sub(p, "", user_input, flags=re.IGNORECASE)
for w in self.SENSITIVE_WORDS:
if w in user_input:
raise ValueError(f"输入包含敏感词: {w}")
return user_input.strip()[:2000]
def filter_output(self, output: str) -> str:
output = self._redact_pii(output)
output = self._check_sensitive(output)
return output
def _redact_pii(self, text: str) -> str:
text = re.sub(r'1[3-9]\d{9}', '138****0000', text)
text = re.sub(r'\b\w+@\w+\.\w+\b', '[email protected]', text)
return textRate Limiting & Quotas
class TokenBucket:
def __init__(self, capacity: int, refill_rate: float):
self.capacity = capacity
self.refill_rate = refill_rate
self.tokens = capacity
self.last_refill = time.time()
def consume(self, tokens: int = 1) -> bool:
now = time.time()
elapsed = now - self.last_refill
self.tokens = min(self.capacity, self.tokens + elapsed * self.refill_rate)
self.last_refill = now
if self.tokens >= tokens:
self.tokens -= tokens
return True
return False
class RateLimiter:
def __init__(self):
self.limiters: Dict[str, TokenBucket] = {}
def check_limit(self, user_id: str, api_key: str) -> bool:
user_key = f"user:{user_id}"
if user_key not in self.limiters:
self.limiters[user_key] = TokenBucket(100, 100/60) # 100 req/min
api_key_key = f"api:{api_key}"
if api_key_key not in self.limiters:
self.limiters[api_key_key] = TokenBucket(1000, 1000/60) # 1000 req/min
return self.limiters[user_key].consume() and self.limiters[api_key_key].consume()Monitoring & Alerting
from dataclasses import dataclass
import logging
@dataclass
class Alert:
name: str
level: str # info, warning, critical
message: str
value: float
threshold: float
class AgentMonitor:
def __init__(self):
self.logger = logging.getLogger(__name__)
self.metrics = {"requests_total": 0, "requests_success": 0,
"requests_failed": 0, "avg_latency_ms": 0,
"token_usage": 0}
def record_request(self, success: bool, latency_ms: float, tokens: int):
self.metrics["requests_total"] += 1
if success:
self.metrics["requests_success"] += 1
else:
self.metrics["requests_failed"] += 1
self.metrics["avg_latency_ms"] = (self.metrics["avg_latency_ms"]*0.9 + latency_ms*0.1)
self.metrics["token_usage"] += tokens
self._check_alerts()
def _check_alerts(self):
alerts = []
error_rate = self.metrics["requests_failed"] / max(1, self.metrics["requests_total"])
if error_rate > 0.05:
alerts.append(Alert("high_error_rate", "critical", "错误率过高", error_rate, 0.05))
if self.metrics["avg_latency_ms"] > 5000:
alerts.append(Alert("high_latency", "warning", "响应延迟过高", self.metrics["avg_latency_ms"], 5000))
for a in alerts:
self._send_alert(a)
def _send_alert(self, alert: Alert):
self.logger.warning(f"[{alert.level}] {alert.name}: {alert.message}")
# Integration points for DingTalk, WeChat Work, Email can be addedCost Estimation Formula
def estimate_monthly_cost(daily_requests: int, avg_tokens_per_request: int,
model: str = "gpt-4", cache_hit_rate: float = 0.3) -> dict:
rates = {"gpt-4": {"prompt": 0.03, "completion": 0.06},
"gpt-4-turbo": {"prompt": 0.01, "completion": 0.03},
"gpt-3.5-turbo": {"prompt": 0.001, "completion": 0.002}}
monthly_requests = daily_requests * 30
cached = monthly_requests * cache_hit_rate
llm_calls = monthly_requests - cached
# Assume prompt:completion = 2:1
prompt_tokens = avg_tokens_per_request * 2 / 3
completion_tokens = avg_tokens_per_request * 1 / 3
rate = rates.get(model, rates["gpt-3.5-turbo"])
cost_per_req = (prompt_tokens/1000)*rate["prompt"] + (completion_tokens/1000)*rate["completion"]
total = llm_calls * cost_per_req
return {"model": model,
"monthly_requests": monthly_requests,
"cache_hit_rate": cache_hit_rate,
"llm_calls": llm_calls,
"cost_per_request_usd": round(cost_per_req, 4),
"estimated_monthly_cost_usd": round(total, 2)}
# Example: estimate_monthly_cost(10000, 500, "gpt-3.5-turbo", 0.3) → ~12 USDDeployment Recommendations
Personal project (concurrency < 10): single‑node deployment with in‑process cache.
Small team (10‑100 concurrent users): multiple replicas, Redis cache, basic monitoring.
Enterprise (100‑1000): clustered agents, message queue, multi‑model routing, full‑stack monitoring.
Large scale (> 1000): Kubernetes with auto‑scaling, multi‑region deployment.
Signed-in readers can open the original source through BestHub's protected redirect.
This article has been distilled and summarized from source material, then republished for learning and reference. If you believe it infringes your rights, please contactand we will review it promptly.
Coder Trainee
Experienced in Java and Python, we share and learn together. For submissions or collaborations, DM us.
How this landed with the community
Was this worth your time?
0 Comments
Thoughtful readers leave field notes, pushback, and hard-won operational detail here.
