Build a Streamable HTTP MCP Server from Scratch: Theory, Protocol Deep‑Dive and Full Python Implementation
This article explains the limitations of the original Stdio and HTTP SSE communication modes for MCP, introduces the Streamable HTTP protocol that resolves those issues, and provides a step‑by‑step Python implementation of both a Streamable HTTP MCP server and a matching client, complete with environment setup, FastAPI code, JSON‑RPC handling, and tool‑calling examples.
Introduction
The Model Context Protocol (MCP) is a key standard for enabling large‑model AI agents to call external tools. In 2025 MCP added a new communication mode called Streamable HTTP , which replaces the older Stdio and HTTP SSE approaches.
1. Streamable HTTP Protocol Theory
1.1 Drawbacks of Stdio and HTTP SSE
Stdio runs the MCP server as a subprocess of the client and communicates via pipes. This limits the server to local execution, ties performance to the client’s hardware, and cannot be used for distributed enterprise deployments.
HTTP SSE pushes events over a single‑direction HTTP stream. Although it removes the local‑process restriction, it suffers from four critical problems:
No support for reconnection – a broken connection loses all session state.
The server must keep a long‑lived SSE connection for each client, causing resource explosion under high concurrency.
All responses, even simple request‑response interactions, must be sent through the SSE channel, adding unnecessary complexity.
Many infrastructure components (CDN, load balancers, firewalls) do not reliably support long‑lived SSE connections.
These issues prevent MCP from being adopted in enterprise settings.
1.2 Design and Principles of Streamable HTTP
On 2025‑05‑09 the MCP GitHub repository proposed a new standard called Streamable HTTP . It defines a request/response flow that can switch between a normal HTTP response and an SSE stream when needed. The protocol uses JSON‑RPC for message framing, includes a req_id to match requests and responses, and records each request/response pair to enable reconnection.
The protocol works as follows:
Client initiates a three‑step handshake (POST /mcp → 200 initialize, POST /mcp → 204 notifications/initialized, then waits for user input).
When a tool call is required, the client sends a tools/call request. The server streams JSON‑RPC lines; intermediate progress is sent with a stream field, and the final result is sent with a result field.
Each line contains the original req_id, allowing the client to resume after a disconnection.
1.3 Streamable HTTP vs. HTTP SSE
The article answers four concrete questions, showing how Streamable HTTP solves the problems of SSE:
Reconnection is possible because request IDs and stored responses enable state recovery.
Long‑lived connections are only kept while streaming; after the response finishes the server closes the stream.
Simple requests can use a normal HTTP response, while complex ones automatically upgrade to SSE.
All major web infrastructure now supports the protocol, removing compatibility limits.
2. Hand‑Crafting a Streamable MCP Server
2.1 Environment Setup
conda create -n mcp python=3.12
conda activate mcp
pip install uv
uv init streamable-mcp-server
cd streamable-mcp-server
uv venv
# Windows: .\.venv\Scripts\activate
# Linux: source .venv/bin/activate2.2 Server Implementation
The server uses FastAPI to expose the /mcp endpoint. Required dependencies are installed with: uv add openai fastapi requests Key constants:
SERVER_NAME = "WeatherServer"
SERVER_VERSION = "1.0.0"
PROTOCOL_VERSION = "2025-05-16"Two core functions are provided:
async def fetch_weather(city: str):
try:
url = "https://api.seniverse.com/v3/weather/now.json"
params = {"key": "YOUR_API_KEY", "location": city, "language": "zh-Hans", "unit": "c"}
response = requests.get(url, params=params)
temperature = response.json()['results'][0]['now']
except Exception:
return "error"
return json.dumps(temperature)
async def stream_weather(city: str, req_id: int | str):
yield json.dumps({"jsonrpc": "2.0", "id": req_id, "stream": f"查询 {city} 天气中…"}).encode() + b"
"
await asyncio.sleep(0.3)
data = await fetch_weather(city)
if data == "error":
yield json.dumps({"jsonrpc": "2.0", "id": req_id, "error": {"code": -32000, "message": data}}).encode() + b"
"
return
yield json.dumps({"jsonrpc": "2.0", "id": req_id, "result": {"content": [{"type": "text", "text": data}], "isError": false}}).encode() + b"
"The tool registry defines a single tool get_weather with a JSON Schema that the LLM can discover via the tools/list method.
TOOLS_REGISTRY = {
"tools": [{
"name": "get_weather",
"description": "查询城市天气",
"inputSchema": {
"type": "object",
"properties": {"city": {"type": "string", "description": "City name, e.g. 'Hangzhou'"}},
"required": ["city"]
}
}],
"nextCursor": None
}The FastAPI routes handle initialization, tool listing, and tool calls. The tools/call route returns a StreamingResponse that yields the lines produced by stream_weather .
2.3 Running and Testing the Server
Start the server with: uv run server.py Four Postman requests emulate the full MCP client flow: initialize – negotiates protocol version. notifications/initialized – confirms the client is ready (204 No Content). tools/list – retrieves the get_weather definition. tools/call with get_weather – streams the weather result line by line.
3. Hand‑Crafting an MCP Client
3.1 Model Configuration
The client uses DeepSeek‑V3‑0324 as the LLM. API keys are stored in a simple JSON config file and loaded via a Configuration class.
3.2 HTTP MCP Server Wrapper
The HTTPMCPServer class encapsulates the four MCP operations (initialize, list tools, call tool, close). It uses httpx.AsyncClient for async HTTP calls and parses streamed JSON‑RPC lines, discarding intermediate stream messages and concatenating final result text.
class HTTPMCPServer:
"""Communicates with a single Streamable HTTP MCP server"""
def __init__(self, name: str, endpoint: str):
self.name = name
self.endpoint = endpoint.rstrip('/')
self.session: Optional[httpx.AsyncClient] = None
self.protocol_version = "2025-05-16"
async def _post_json(self, payload: Dict[str, Any]) -> Dict[str, Any]:
assert self.session is not None
r = await self.session.post(self.endpoint, json=payload, headers={"Accept": "application/json"})
if r.status_code == 204 or not r.content:
return {}
r.raise_for_status()
return r.json()
async def initialize(self) -> None:
self.session = httpx.AsyncClient(timeout=httpx.Timeout(30.0))
init_req = {"jsonrpc": "2.0", "id": 0, "method": "initialize", "params": {"protocolVersion": self.protocol_version, "capabilities": {}, "clientInfo": {"name": "Streamable HTTP Client Demo", "version": "0.1"}}}
r = await self._post_json(init_req)
if "error" in r:
raise RuntimeError(f"Initialize error: {r['error']}")
await self._post_json({"jsonrpc": "2.0", "method": "notifications/initialized"})
async def list_tools(self) -> List[Dict[str, Any]]:
req = {"jsonrpc": "2.0", "id": 1, "method": "tools/list", "params": {}}
res = await self._post_json(req)
return res["result"]["tools"]
async def call_tool_stream(self, tool_name: str, arguments: Dict[str, Any]) -> str:
req = {"jsonrpc": "2.0", "id": 3, "method": "tools/call", "params": {"name": tool_name, "arguments": arguments}}
async with self.session.stream("POST", self.endpoint, json=req, headers={"Accept": "application/json"}) as resp:
if resp.status_code != 200:
raise RuntimeError(f"HTTP {resp.status_code}")
collected: List[str] = []
async for line in resp.aiter_lines():
if not line:
continue
chunk = json.loads(line)
if "stream" in chunk:
continue
if "error" in chunk:
raise RuntimeError(chunk["error"]["message"])
if "result" in chunk:
for item in chunk["result"]["content"]:
if item["type"] == "text":
collected.append(item["text"])
return "
".join(collected)
async def close(self) -> None:
if self.session:
await self.session.aclose()
self.session = None3.3 Multi‑Server Client and Conversation Loop
The MultiHTTPMCPClient aggregates tools from all configured servers, prefixes tool names with the server identifier, and runs an interactive chat loop. When the LLM returns a tool_calls finish reason, the client invokes the appropriate remote tool via the corresponding HTTPMCPServer , injects the tool result back into the conversation, and continues the dialogue.
class MultiHTTPMCPClient:
def __init__(self, servers_conf: Dict[str, Any], api_key: str, base_url: Optional[str], model: str):
self.servers = {name: HTTPMCPServer(name, cfg["endpoint"]) for name, cfg in servers_conf.items()}
self.llm = LLMClient(api_key, base_url, model)
self.all_tools: List[Dict[str, Any]] = []
async def start(self):
for srv in self.servers.values():
await srv.initialize()
tools = await srv.list_tools()
for t in tools:
full_name = f"{srv.name}_{t['name']}"
self.all_tools.append({"type": "function", "function": {"name": full_name, "description": t["description"], "parameters": t["inputSchema"]}})
logging.info("Connected servers and aggregated tools: %s", [t["function"]["name"] for t in self.all_tools])
async def call_local_tool(self, full_name: str, args: Dict[str, Any]) -> str:
srv_name, tool_name = full_name.split("_", 1)
srv = self.servers[srv_name]
city = args.get("city")
if not city:
raise ValueError("Missing city")
return await srv.call_tool_stream(tool_name, {"city": city})
async def chat_loop(self):
print("🤖 HTTP MCP + Function Calling client started, type 'quit' to exit")
messages: List[Dict[str, Any]] = []
while True:
user = input("You: ").strip()
if user.lower() == "quit":
break
messages.append({"role": "user", "content": user})
resp = self.llm.chat(messages, self.all_tools)
choice = resp.choices[0]
if choice.finish_reason == "tool_calls":
tc = choice.message.tool_calls[0]
tool_name = tc.function.name
tool_args = json.loads(tc.function.arguments)
print(f"[Calling tool] {tool_name} -> {tool_args}")
tool_resp = await self.call_local_tool(tool_name, tool_args)
messages.append(choice.message.model_dump())
messages.append({"role": "tool", "content": tool_resp, "tool_call_id": tc.id})
resp2 = self.llm.chat(messages, self.all_tools)
print("AI:", resp2.choices[0].message.content)
messages.append(resp2.choices[0].message.model_dump())
else:
print("AI:", choice.message.content)
messages.append(choice.message.model_dump())
async def close(self):
for s in self.servers.values():
await s.close()3.4 Configuration and Execution
A servers_config.json file lists the server endpoints, e.g.:
{
"mcpServers": {
"weather": {"endpoint": "http://127.0.0.1:8000/mcp"}
}
}Run the server and client with:
uv run server.py # starts the Streamable HTTP MCP server
uv run client.py # starts the interactive client4. Conclusion
The Streamable HTTP protocol eliminates the reconnection, resource, and compatibility problems of HTTP SSE, making MCP suitable for enterprise‑grade AI agents. The article provides a complete, from‑zero Python implementation of both server and client, demonstrates the full handshake, tool registration, and streaming tool execution, and offers a reusable multi‑server client template for future MCP projects.
Fun with Large Models
Master's graduate from Beijing Institute of Technology, published four top‑journal papers, previously worked as a developer at ByteDance and Alibaba. Currently researching large models at a major state‑owned enterprise. Committed to sharing concise, practical AI large‑model development experience, believing that AI large models will become as essential as PCs in the future. Let's start experimenting now!
How this landed with the community
Was this worth your time?
0 Comments
Thoughtful readers leave field notes, pushback, and hard-won operational detail here.
