Deploying a Stateful AI Agent on a Stateless Web Architecture: Challenges, Solutions, and Code Walkthrough
This article analyzes the fundamental conflict between stateful AI agents and the inherently stateless, distributed nature of modern web services, explores time, state, and execution model mismatches, and presents a practical Agent‑as‑API solution using FastAPI, Redis, SSE, and Kubernetes to achieve scalable, fault‑tolerant deployments.
Problem Origin
A customer‑service AI Agent built with the OpenAI Agents SDK can query order and logistics services and generate a professional reply. Running locally the script returns the expected response:
from agents import Agent, Runner, function_tool
@function_tool
def query_order(order_id: str) -> str:
"""查询订单详情"""
return f"订单 {order_id}:iPhone 16,已签收,支持 7 天无理由退款"
@function_tool
def query_logistics(order_id: str) -> str:
"""查询物流信息"""
return f"订单 {order_id}:顺丰快递,3 天前已签收"
support_agent = Agent(
name="客服助手",
instructions="你是一个专业的电商客服。根据用户问题,调用工具查询订单和物流信息,给出准确的回复。",
model="gpt-4o",
tools=[query_order, query_logistics],
)
result = Runner.run_sync(support_agent, "我要退订单 #12345")
print(result.final_output)When the manager asked to expose this as a web service, the simple "add a FastAPI endpoint, build a Docker image, deploy" plan ran into fundamental issues.
Web Is Naturally Distributed
Typical web deployment involves packaging code into a Docker image, running multiple pods (e.g., uvicorn --workers 4), and placing an Nginx or cloud load balancer in front. The key premise is that HTTP is stateless: each request is independent and can be handled by any instance.
An AI Agent, however, is stateful – it must retain conversation history, intermediate reasoning steps, and tool call results. This creates three concrete conflicts:
Time Model – Fast‑forward‑fast‑out vs. Slow Thinking
Web servers expect responses in milliseconds to a few seconds (Nginx default proxy_read_timeout 60s, AWS ALB 60 s, API‑Gateway 30–60 s). The refund Agent needs about 45 s for a simple request (three LLM calls + two tool calls) and can exceed 60 s, causing a 504 Gateway Timeout.
State Model – No Memory vs. Memory
Load balancers route each request randomly. If a user’s first request lands on Worker A and the second on Worker B, Worker B has no memory of the prior round, forcing the user to repeat information. Sticky sessions would preserve affinity but sacrifice load‑balancing benefits and are hard to maintain with auto‑scaling.
Execution Model – Deterministic vs. Non‑Deterministic
Web capacity planning assumes roughly predictable resource consumption per request. Agents decide dynamically how many tool calls and reasoning loops to perform. Example: two users ask the same question. User A’s query finishes in 3 s (single tool call). User B’s query involves ten orders, four loops, and takes 40 s – a 13× difference that breaks traditional rate‑limiting assumptions.
Solution Landscape
Four categories of solutions were surveyed:
Framework‑provided hosting platforms (e.g., LangGraph Platform / LangSmith Deployment) – use a Checkpointer + Thread model.
General workflow engines – Temporal (durable execution) and Dapr Agents (virtual Actor model).
Agent‑as‑API (self‑hosted) – expose HTTP endpoints with FastAPI and manage state manually.
Third‑party deployment platforms – Crewship, Railway, etc.
The chosen approach was Agent‑as‑API because:
I already use FastAPI and Redis, so no new technology is required.
It allows incremental complexity – start simple, then add Temporal or Dapr if needed.
It is framework‑agnostic – works with OpenAI Agents SDK, LangGraph, CrewAI, etc.
Deployment Architecture
Core principle: remove state from the process and store it in a shared external store (Redis) .
Nginx – load balancing + SSL termination (no sticky sessions).
FastAPI workers (multiple replicas) – each request loads conversation history from Redis, runs the Agent, then writes the updated history back to Redis. Workers remain stateless.
Redis – stores session state keyed by session_id with a 24 h TTL (86400 s).
OpenAI API – provides the LLM inference.
Business systems – order and logistics services accessed via the Agent’s tool functions.
This architecture lets any worker handle any request because the state is externalized.
Implementation Walk‑through
5.1 Minimal Viable Service
Expose the Agent via a FastAPI POST endpoint:
from fastapi import FastAPI
from pydantic import BaseModel
from agents import Agent, Runner, function_tool
app = FastAPI()
@function_tool
def query_order(order_id: str) -> str:
return f"订单 {order_id}:iPhone 16,已签收,支持 7 天无理由退款"
@function_tool
def query_logistics(order_id: str) -> str:
return f"订单 {order_id}:顺丰快递,3 天前已签收"
support_agent = Agent(
name="客服助手",
instructions="你是一个专业的电商客服。根据用户问题,调用工具查询订单和物流信息,给出准确的回复。",
model="gpt-4o",
tools=[query_order, query_logistics],
)
class ChatRequest(BaseModel):
message: str
@app.post("/chat")
async def chat(req: ChatRequest):
result = await Runner.run(support_agent, req.message)
return {"reply": result.final_output}Run with uvicorn main:app --host 0.0.0.0 --port 8000. This works but reveals three problems: long user‑perceived latency, no memory across requests, and possible timeout.
5.2 Streaming Output (SSE)
Use Server‑Sent Events to push incremental LLM tokens to the browser, keeping the connection alive and improving perceived latency. The client can consume the stream with EventSource or fetch + ReadableStream.
from fastapi import FastAPI
from fastapi.responses import StreamingResponse
from pydantic import BaseModel
from agents import Agent, Runner
from openai.types.responses import ResponseTextDeltaEvent
import json
app = FastAPI()
class ChatRequest(BaseModel):
message: str
@app.post("/chat/stream")
async def chat_stream(req: ChatRequest):
async def event_stream(message: str):
result = Runner.run_streamed(support_agent, message)
async for event in result.stream_events():
if event.type == "raw_response_event" and isinstance(event.data, ResponseTextDeltaEvent):
yield f"data:{json.dumps({'type':'delta','content':event.data.delta})}
"
yield f"data:{json.dumps({'type':'done','content':result.final_output})}
"
return StreamingResponse(event_stream(req.message), media_type="text/event-stream")Because data is continuously sent, most reverse proxies treat the connection as active and do not enforce the total request timeout.
5.3 Externalizing Conversation State
Use Redis to store the full OpenAI‑compatible message list. The SDK provides result.to_input_list() which serializes the entire round (user messages, tool calls, assistant replies). The workflow per request:
Generate or reuse session_id (client‑provided or server‑generated UUID).
Load history from Redis: GET session:{session_id}. If missing, start with an empty list.
Append the new user message to the list.
Run the Agent with the combined list.
Store the updated list back to Redis with SET session:{session_id} … EX 86400.
Return the assistant reply and the session_id to the client.
import redis, json, uuid
from fastapi import FastAPI
from pydantic import BaseModel
from agents import Agent, Runner
app = FastAPI()
store = redis.Redis(host="localhost", port=6379, decode_responses=True)
class ChatRequest(BaseModel):
message: str
session_id: str | None = None
def load_history(session_id: str) -> list:
data = store.get(f"session:{session_id}")
return json.loads(data) if data else []
def save_history(session_id: str, history: list):
store.set(f"session:{session_id}", json.dumps(history), ex=86400)
@app.post("/chat")
async def chat(req: ChatRequest):
session_id = req.session_id or str(uuid.uuid4())
history = load_history(session_id)
input_list = history + [{"role": "user", "content": req.message}]
result = await Runner.run(support_agent, input_list)
new_history = result.to_input_list()
save_history(session_id, new_history)
return {"reply": result.final_output, "session_id": session_id}Now any worker can serve any request while preserving full conversation context.
5.4 Production Hardening
Async task fallback : For very long‑running Agent tasks, expose a "submit‑and‑poll" API. The request returns a task_id; a background worker runs the Agent and stores the result in a separate Redis DB. The client polls /chat/result/{task_id} for status and final reply.
from fastapi import BackgroundTasks
import uuid
task_store = redis.Redis(host="localhost", port=6379, db=1, decode_responses=True)
@app.post("/chat/async")
async def chat_async(req: ChatRequest, bg: BackgroundTasks):
task_id = str(uuid.uuid4())
task_store.set(f"task:{task_id}", json.dumps({"status": "running"}), ex=3600)
bg.add_task(run_agent_task, task_id, req)
return {"task_id": task_id}
@app.get("/chat/result/{task_id}")
async def get_result(task_id: str):
data = task_store.get(f"task:{task_id}")
return json.loads(data) if data else {"status": "not_found"}
async def run_agent_task(task_id: str, req: ChatRequest):
try:
# reuse the session logic from the normal endpoint
session_id = req.session_id or str(uuid.uuid4())
history = load_history(session_id)
input_list = history + [{"role": "user", "content": req.message}]
result = await Runner.run(support_agent, input_list)
save_history(session_id, result.to_input_list())
task_store.set(f"task:{task_id}", json.dumps({"status": "done", "reply": result.final_output, "session_id": session_id}), ex=3600)
except Exception as e:
task_store.set(f"task:{task_id}", json.dumps({"status": "error", "error": str(e)}), ex=3600)Prevent infinite loops : Limit the maximum number of reasoning turns with the SDK’s max_turns parameter and catch MaxTurnsExceeded to return a friendly error.
try:
result = await Runner.run(support_agent, input_list, max_turns=10)
except MaxTurnsExceeded:
return {"error": "处理超过最大轮次限制,请简化问题后重试", "session_id": session_id}Authentication & rate limiting : In production, protect the API with JWT verification (or delegate to an API gateway) and apply per‑user rate limits.
from fastapi import Depends, HTTPException, Header
async def verify_token(authorization: str = Header(...)):
if not authorization.startswith("Bearer "):
raise HTTPException(status_code=401, detail="未授权")
# validate JWT here
@app.post("/chat", dependencies=[Depends(verify_token)])
async def chat(...):
...Multi‑turn Conversation Flow Example
Three‑turn refund dialogue illustrates how the externalized state works across different workers:
First turn – client sends {"message": "我要退订单 #12345", "session_id": null}. Server generates session_id = "s_abc123", loads empty history, runs the Agent (calls query_order), stores a 4‑message history in Redis, and returns the reply plus session_id.
Second turn – client sends
{"message": "好的,帮我退款", "session_id": "s_abc123"}. Load balancer routes to Worker B. Worker B fetches the 4‑message history from Redis, appends the new user message, runs the Agent (now aware of order #12345), stores the updated 6‑message history, and returns the confirmation reply.
Third turn – client asks "退款<|channel|>多久到账?" with the same session_id. Worker C loads the 6‑message history, sees the context, and replies without additional tool calls.
All workers see a consistent conversation because the state lives in Redis, not in any worker’s memory.
Other Advanced Options
Temporal – provides durable execution with automatic checkpointing, suitable when Agent tasks must not be lost (e.g., real refund transactions).
Dapr Agents – virtual Actor model with scale‑to‑zero, ideal for many low‑traffic per‑user Agents.
A2A + Agent Gateway – standardizes Agent‑to‑Agent communication for multi‑Agent orchestration (e.g., hand‑off from customer‑service Agent to technical‑support Agent).
SPIFFE/SPIRE – gives each Agent a cryptographic identity, enabling secure OAuth‑delegated access in enterprise environments.
Conclusion – Decision Path
For most teams the pragmatic path is:
Expose the Agent as a FastAPI HTTP service.
Externalize conversation state to Redis with a 24 h TTL.
Use Server‑Sent Events for streaming output to avoid client‑side timeouts.
Add async task fallback, max‑turn limits, and JWT authentication for production robustness.
This "Agent‑as‑API + Redis" stack works with any underlying Agent framework (OpenAI Agents SDK, LangGraph, CrewAI, etc.) and resolves the core conflict of placing a stateful Agent into a stateless distributed web infrastructure.
AI Tech Publishing
In the fast-evolving AI era, we thoroughly explain stable technical foundations.
How this landed with the community
Was this worth your time?
0 Comments
Thoughtful readers leave field notes, pushback, and hard-won operational detail here.
