Scaling Human‑in‑the‑Loop Agents to Distributed Environments with Robust Fault Recovery

This article explains how to extend a single‑process Human‑in‑the‑Loop (HITL) agent to a distributed, multi‑user API service using FastAPI, detailing session management, interrupt handling, client and server fault‑recovery strategies, and providing concrete code snippets and architectural diagrams.

AI Large Model Application Practice
AI Large Model Application Practice
AI Large Model Application Practice
Scaling Human‑in‑the‑Loop Agents to Distributed Environments with Robust Fault Recovery

Distributed Environment Challenges and Responses

In a single‑process setup the HITL agent runs in one process, handling one user and cannot scale. Deploying the agent as an independent service requires exposing HTTP APIs and handling two fundamental shifts:

From a single context to multiple API endpoints : each interaction must be split across several HTTP requests because the interrupt mechanism forces the workflow to pause and resume.

From a single user session to multi‑user sessions : the system must isolate sessions so that concurrent users do not interfere with each other.

Implementing a Distributed HITL Agent

Key Types and Schemas

Sessions – a dictionary storing each user’s agent instance and session ID:

sessions: Dict[str, Dict[str, Union[CompiledGraph, str]]] = {
    "user_id": {
        "agent": CompiledGraph,  # create_react_agent result
        "session_id": "uuid-string"
    }
}

AgentRequest – payload sent by the client:

class AgentRequest(BaseModel):
    user_id: str
    query: str
    system_message: Optional[str] = "你会使用工具来帮助用户。如果工具使用被拒绝,请提示用户。"

AgentResponse – server reply containing status, result or interrupt data:

class AgentResponse(BaseModel):
    session_id: str
    status: str               # interrupted | completed | error
    message: Optional[str] = None
    result: Optional[Dict[str, Any]] = None
    interrupt_data: Optional[Dict[str, Any]] = None

InterruptResponse – what the client sends back after an interrupt:

class InterruptResponse(BaseModel):
    user_id: str
    session_id: str
    response_type: str        # accept | reject | edit
    args: Optional[Dict[str, Any]] = None

Server Endpoints

invoke_agent creates a session if needed, stores the agent, and calls agent.invoke. It returns an AgentResponse according to the following rules:

If the result contains __interrupt__, status = interrupted and interrupt_data is returned.

If no interrupt, status = completed and result is returned.

Any exception yields status = error with an error message.

@app.post("/agent/invoke", response_model=AgentResponse)
def invoke_agent(request: AgentRequest):
    """Start the agent and wait for completion or interruption"""
    user_id = request.user_id
    if user_id not in sessions:
        session_id = str(uuid.uuid4())
        agent = create_tavily_search_agent(user_id)
        sessions[user_id] = {"agent": agent, "session_id": session_id}
    else:
        agent = sessions[user_id]["agent"]
        session_id = sessions[user_id]["session_id"]
    messages = ...
    config = {"configurable": {"thread_id": session_id}}
    try:
        result = agent.invoke(messages, config)
        return process_agent_result(session_id, result)
    except Exception as e:
        # build error response
        ...

resume_agent validates the session, builds a command from the client’s InterruptResponse, and invokes the agent again:

@app.post("/agent/resume", response_model=AgentResponse)
def resume_agent(response: InterruptResponse):
    """Resume a previously interrupted agent execution"""
    user_id = response.user_id
    if user_id not in sessions:
        raise HTTPException(status_code=404, detail=f"用户会话 {user_id} 不存在")
    server_session_id = sessions[user_id]["session_id"]
    if server_session_id != response.session_id:
        raise HTTPException(status_code=400, detail="会话ID不匹配,可能是过期的请求")
    agent = sessions[user_id]["agent"]
    command_data = {"type": response.response_type}
    if response.args:
        command_data["args"] = response.args
    result = agent.invoke(Command(resume=command_data), config={"configurable": {"thread_id": server_session_id}})
    return process_agent_result(server_session_id, result)

A delete_agent_session endpoint lets the client clean up a session when it is no longer needed:

@app.delete("/agent/session/{user_id}")
def delete_agent_session(user_id: str):
    del sessions[user_id]

Client‑Side Fault Recovery

The client mirrors the server flow: it calls invoke_agent, processes the AgentResponse, and if the status is interrupted it prompts the user, builds an InterruptResponse, and calls resume_agent. This loop repeats until a completed or error status is received.

def main():
    user_id = ...
    while True:
        query = ...
        response = invoke_agent(user_id, query)
        process_agent_response(response, user_id)
        # break when completed or error

Key logic in process_agent_response:

If status == "interrupted", present the interrupt message to the user and collect a decision (accept/reject/edit).

Send the decision as an InterruptResponse to resume_agent.

Recursively handle the new response because another interrupt may occur.

Server‑Side Fault‑Recovery Recommendations

The in‑memory session store is vulnerable to process crashes and does not scale horizontally. Persist session metadata to an external store such as Redis or a relational database and restore it on startup. The agent instance itself can continue to rely on LangGraph’s checkpointer for state reconstruction.

Persisted session schema (simplified):

sessions = {
    "user_id": {
        "agent": agent_instance,
        "session_id": "uuid",
        "status": "idle|running|interrupted|completed|error",
        "last_response": AgentResponse,
        "last_query": "...",
        "last_updated": timestamp
    }
}

During each state transition the server updates the persisted record, enabling:

Recovery after a server restart (no loss of user sessions).

Horizontal scaling – any instance can fetch the session from the shared store.

Accurate monitoring of long‑running tasks.

With these mechanisms, the distributed HITL agent can survive client refreshes, server restarts, and network glitches while preserving a seamless human‑in‑the‑loop experience.

Full source code: https://github.com/pingcy/agent-hitl

distributed-systemsSession ManagementLangGraphfault-recoveryHuman-in-the-loop
AI Large Model Application Practice
Written by

AI Large Model Application Practice

Focused on deep research and development of large-model applications. Authors of "RAG Application Development and Optimization Based on Large Models" and "MCP Principles Unveiled and Development Guide". Primarily B2B, with B2C as a supplement.

0 followers
Reader feedback

How this landed with the community

Sign in to like

Rate this article

Was this worth your time?

Sign in to rate
Discussion

0 Comments

Thoughtful readers leave field notes, pushback, and hard-won operational detail here.