Step‑by‑Step Guide to Building a Multi‑Agent Trading System for End‑to‑End Intelligent Decisions
This article walks through constructing a multi‑agent trading platform—analysts, researchers, traders, risk managers, and a portfolio manager—using LangChain, LangGraph, and LLMs (gpt‑4o, gpt‑4o‑mini), with real‑time data tools, shared and long‑term memory, ReAct loops, structured debates, and a final executable trade proposal.
Overview
We build a collaborative, multi‑agent trading system where specialized agents (analyst, researcher, trader, risk manager, portfolio manager) cooperate to collect 360° market intelligence, debate investment theses, and produce a machine‑readable trade signal.
Environment Setup and Tracing
First we install required libraries and configure API keys. LangSmith tracing is enabled to observe every step of the agent workflow, which is essential for debugging and production reliability.
# !pip install -U langchain langgraph langchain_openai tavily-python yfinance finnhub-python stockstats beautifulsoup4 chromadb rich
import os
from getpass import getpass
def _set_env(var: str):
if not os.environ.get(var):
os.environ[var] = getpass(f"Enter your {var}: ")
_set_env("OPENAI_API_KEY")
_set_env("FINNHUB_API_KEY")
_set_env("TAVILY_API_KEY")
_set_env("LANGSMITH_API_KEY")
os.environ["LANGSMITH_TRACING"] = "true"
os.environ["LANGSMITH_PROJECT"] = "Deep-Trading-System"Shared Memory Design
AgentState is the central nervous system of the graph. TypedDict subclasses define sub‑states for investment debate and risk debate, storing full conversation histories, counters, and final decisions.
from typing import TypedDict
class InvestDebateState(TypedDict):
bull_history: str
bear_history: str
history: str
current_response: str
judge_decision: str
count: int
class RiskDebateState(TypedDict):
risky_history: str
safe_history: str
neutral_history: str
history: str
latest_speaker: str
current_risky_response: str
current_safe_response: str
current_neutral_response: str
judge_decision: str
count: int
class AgentState(MessagesState):
company_of_interest: str
trade_date: str
sender: str
market_report: str
sentiment_report: str
news_report: str
fundamentals_report: str
investment_debate_state: InvestDebateState
investment_plan: str
trader_investment_plan: str
risk_debate_state: RiskDebateState
final_trade_decision: strToolset for Real‑Time Data
Each agent can call a curated set of tools. The tools fetch price history, compute technical indicators, retrieve news, and perform web searches. All tools are wrapped with LangChain’s @tool decorator so the LLM can invoke them.
# Example: fetch price data from Yahoo Finance
@tool
def get_yfinance_data(symbol: Annotated[str, "公司股票代码"], start_date: Annotated[str, "开始日期,格式为 yyyy-mm-dd"], end_date: Annotated[str, "结束日期,格式为 yyyy-mm-dd"]) -> str:
ticker = yf.Ticker(symbol.upper())
data = ticker.history(start=start_date, end=end_date)
if data.empty:
return f"在 {start_date} 至 {end_date} 期间未找到符号 '{symbol}' 的数据"
return data.to_csv()
# Example: compute MACD, RSI, Bollinger Bands
@tool
def get_technical_indicators(symbol: Annotated[str, "公司股票代码"], start_date: Annotated[str, "开始日期,格式为 yyyy-mm-dd"], end_date: Annotated[str, "结束日期,格式为 yyyy-mm-dd"]) -> str:
df = yf.download(symbol, start=start_date, end=end_date, progress=False)
if df.empty:
return "没有数据可计算指标。"
stock_df = stockstats_wrap(df)
indicators = stock_df[["macd", "rsi_14", "boll", "boll_ub", "boll_lb", "close_50_sma", "close_200_sma"]]
return indicators.tail().to_csv()All tools are aggregated in a Toolkit class for convenient access.
class Toolkit:
def __init__(self, config):
self.config = config
self.get_yfinance_data = get_yfinance_data
self.get_technical_indicators = get_technical_indicators
self.get_finnhub_news = get_finnhub_news
self.get_social_media_sentiment = get_social_media_sentiment
self.get_fundamental_analysis = get_fundamental_analysis
self.get_macroeconomic_news = get_macroeconomic_news
toolkit = Toolkit(config)
print("Toolkit 类已定义并使用实时数据工具实例化。")Analyst Team Factory
A higher‑order function create_analyst_node builds LangGraph nodes for each analyst role, injecting a system prompt, the list of allowed tools, and the output field where the report will be stored.
def create_analyst_node(llm, toolkit, system_message, tools, output_field):
prompt = ChatPromptTemplate.from_messages([
("system", "你是一个乐于助人的 AI 助手,与其他助手协作。使用提供的工具逐步回答问题。"),
("system", "你可以使用以下工具: {tool_names}.
{system_message}"),
("system", "供你参考,当前日期是 {current_date}。我们关注的股票是 {ticker}"),
MessagesPlaceholder(variable_name="messages"),
])
prompt = prompt.partial(system_message=system_message)
prompt = prompt.partial(tool_names=", ".join([tool.name for tool in tools]))
chain = prompt | llm.bind_tools(tools)
def analyst_node(state):
prompt_with_data = prompt.partial(current_date=state["trade_date"], ticker=state["company_of_interest"])
result = prompt_with_data.invoke(state["messages"])
report = ""
if not result.tool_calls:
report = result.content
return {"messages": [result], output_field: report}
return analyst_nodeFour analyst nodes are instantiated:
Market analyst (technical indicators)
Social‑media analyst (sentiment)
News analyst (company & macro news)
Fundamentals analyst (financial health)
ReAct Loop Execution
The run_analyst function runs a single analyst’s ReAct loop for up to five reasoning steps, automatically invoking tools when the LLM requests them.
def run_analyst(analyst_node, initial_state):
state = initial_state
all_tools = [getattr(toolkit, name) for name in dir(toolkit) if callable(getattr(toolkit, name)) and not name.startswith("__")]
tool_node = ToolNode(all_tools)
for _ in range(5):
result = analyst_node(state)
if tools_condition(result) == "tools":
state = tool_node.invoke(result)
else:
state = result
break
return stateRunning Analyst Nodes (NVDA Example)
Using the ticker NVDA and a trade date two days ago, each analyst is run sequentially. Sample outputs (truncated for brevity) illustrate the generated reports.
# Market analyst report
基于对 NVDA 的技术分析,该股票表现出强劲的看涨趋势。价格持续运行在 50 日和 200 日均线之上,MACD 位于信号线上方,RSI 处于高位但未超买,布林带扩张,表明波动性增加。
| 指标 | 信号 | 洞察 |
|---|---|---|
| 移动平均线 | 看涨 | 确认上升趋势 |
| MACD | 看涨 | 正向动量 |
| RSI | 强劲 | 买盘压力仍在 |
| 布林带 | 扩张 | 可能突破 |
# Social‑media analyst report
针对 NVDA 的社交媒体情绪总体非常乐观。X(原 Twitter)和 Reddit 上的讨论围绕 AI 芯片领先地位和财报预期,几乎没有负面声音。
| 平台 | 情绪 | 关键主题 |
|---|---|---|
| X | 非常乐观 | AI 主导、上调评级 |
| Reddit | 非常乐观 | 长期持有 |
# News analyst report
NVDA 的新闻环境积极:新产品发布、汽车和企业 AI 合作伙伴关系。宏观层面通胀符合预期,科技股受益。
| 新闻类别 | 影响 | 摘要 |
|---|---|---|
| 公司特定 | 积极 | 新产品与合作 |
| 宏观经济 | 中性偏多 | 稳定通胀 |
# Fundamentals analyst report
NVDA 基本面强劲,收入增长异常,毛利率领先,现金储备充足。估值高(市盈率),但远期增长率支撑该估值。
| 指标 | 状态 | 洞察 |
|---|---|---|
| 营收增长 | 异常强劲 | AI 需求驱动 |
| 利润率 | 优秀 | 定价能力 |
| 估值 | 高 | 市场已消化增长预期 |
| 资产负债表 | 强劲 | 现金充裕 |Investment Debate (Bull vs Bear)
Two researcher agents—Bull Analyst and Bear Analyst—debate the aggregated reports. The factory create_researcher_node builds nodes that retrieve past memories, construct a rich prompt, and append each argument to investment_debate_state.
def create_researcher_node(llm, memory, role_prompt, agent_name):
def researcher_node(state):
situation_summary = f"""市场报告: {state['market_report']}
情绪报告: {state['sentiment_report']}
新闻报告: {state['news_report']}
基本面报告: {state['fundamentals_report']}"""
past_memories = memory.get_memories(situation_summary)
past_memory_str = "
".join([m['recommendation'] for m in past_memories])
prompt = f"""{role_prompt}
以下是当前的分析状态:
{situation_summary}
对话历史: {state['investment_debate_state']['history']}
对方的最新论点: {state['investment_debate_state']['current_response']}
类似过去情景的反思: {past_memory_str or '未找到过往记忆。'}
请以对话方式阐述你的论点。"""
response = llm.invoke(prompt)
argument = f"{agent_name}: {response.content}"
debate_state = state['investment_debate_state'].copy()
debate_state['history'] += "
" + argument
if agent_name == 'Bull Analyst':
debate_state['bull_history'] += "
" + argument
else:
debate_state['bear_history'] += "
" + argument
debate_state['current_response'] = argument
debate_state['count'] += 1
return {"investment_debate_state": debate_state}
return researcher_nodeRunning two rounds yields a concise record of opposing viewpoints and highlights the core conflict (strong momentum vs. high valuation risk).
--- 投资辩论第 1 轮 ---
**多头的论点:**
NVDA 的论据坚不可摧。技术面、基本面、情绪和新闻全都指向强劲上行,建议全仓买入。
**空头的反驳:**
高市盈率是重大隐患,AI 需求可能回调。建议等待显著回调后再建仓。
--- 投资辩论第 2 轮 ---
**多头的论点:**
AI 革命是结构性转变,估值高是因为增长预期。不要等待回调。
**空头的反驳:**
即使长期向好,短期仍可能出现 30‑40% 回撤。建议持有或逢低加仓。Research Manager Decision
The research manager (a strong LLM) synthesizes the debate and emits a final investment plan.
def create_research_manager(llm, memory):
def research_manager_node(state):
prompt = f"""作为研究主管,你的职责是批判性评估多头和空头的辩论,并给出最终建议(买入、卖出或持有)。
辩论历史:
{state['investment_debate_state']['history']}"""
response = llm.invoke(prompt)
return {"investment_plan": response.content}
return research_manager_node
research_manager_node = create_research_manager(deep_thinking_llm, invest_judge_memory)
manager_result = research_manager_node(initial_state)
initial_state['investment_plan'] = manager_result['investment_plan']Sample output:
在审阅了热烈的辩论后,多头的核心论点更具说服力。空头的估值担忧虽重要,但被强劲基本面压倒。
**建议:买入**
**理由:** 基本面卓越、技术动量强、情绪与新闻利好。
**策略行动:** 分步建仓:先建 50% 仓位,回调至 50 日均线时加仓至 100%;在 200 日均线下方设置止损。Trader and Risk Management
The trader node converts the investment plan into a machine‑readable proposal ending with FINAL TRANSACTION PROPOSAL: **BUY/HOLD/SELL**. Risk analysts (aggressive, conservative, neutral) then debate the proposal using create_risk_debator.
def create_trader(llm, memory):
def trader_node(state, name):
prompt = f"""你是一名交易智能体。根据提供的投资计划,创建一个简洁的交易提案。
你的响应必须以 'FINAL TRANSACTION PROPOSAL: **BUY/HOLD/SELL**' 结尾。
拟议投资计划: {state['investment_plan']}"""
result = llm.invoke(prompt)
return {"trader_investment_plan": result.content, "sender": name}
return trader_node
trader_node = create_trader(quick_thinking_llm, trader_memory)
trader_result = trader_node(initial_state, "Trader")
initial_state['trader_investment_plan'] = trader_result['trader_investment_plan']Trader output example:
研究主管分步建仓的计划是审慎的,并得到了全面分析的有力支持。首次建 50% 仓位,回调至 50 日均线时加仓至剩余 50%。在 200 日均线下方设置硬止损。
最终交易提案:**买入**Risk debaters are created with role‑specific prompts and update risk_debate_state.
def create_risk_debator(llm, role_prompt, agent_name):
def risk_debator_node(state):
risk_state = state['risk_debate_state']
opponents = []
if agent_name != 'Risky Analyst' and risk_state['current_risky_response']:
opponents.append(f"激进型: {risk_state['current_risky_response']}")
if agent_name != 'Safe Analyst' and risk_state['current_safe_response']:
opponents.append(f"保守型: {risk_state['current_safe_response']}")
if agent_name != 'Neutral Analyst' and risk_state['current_neutral_response']:
opponents.append(f"中性型: {risk_state['current_neutral_response']}")
prompt = f"""{role_prompt}
以下是交易员的计划: {state['trader_investment_plan']}
辩论历史: {risk_state['history']}
对手的最新论点:
{'
'.join(opponents)}
请从你的角度批判或支持该计划。"""
response = llm.invoke(prompt).content
new_state = risk_state.copy()
new_state['history'] += f"
{agent_name}: {response}"
new_state['latest_speaker'] = agent_name
if agent_name == 'Risky Analyst':
new_state['current_risky_response'] = response
elif agent_name == 'Safe Analyst':
new_state['current_safe_response'] = response
else:
new_state['current_neutral_response'] = response
new_state['count'] += 1
return {"risk_debate_state": new_state}
return risk_debator_nodeRunning one round of risk debate produces three perspectives:
**激进型分析师的观点:**
全仓更能捕获强势上行,等待回调会错失利润。
**保守型分析师的观点:**
高估值和情绪狂热是回调信号,建议更紧的止损并保持 50% 仓位。
**中性型分析师的观点:**
交易员的分步建仓方案已平衡风险与收益,完全可行。Conclusion
The article demonstrates a complete, reproducible pipeline: environment setup → shared memory → data‑ingestion tools → analyst factories → ReAct execution → structured debate → manager synthesis → trader proposal → risk‑management debate. All components are implemented in Python with LangChain/LangGraph, making the system ready for production‑grade, observable, multi‑agent financial decision‑making.
Signed-in readers can open the original source through BestHub's protected redirect.
This article has been distilled and summarized from source material, then republished for learning and reference. If you believe it infringes your rights, please contactand we will review it promptly.
Data Party THU
Official platform of Tsinghua Big Data Research Center, sharing the team's latest research, teaching updates, and big data news.
How this landed with the community
Was this worth your time?
0 Comments
Thoughtful readers leave field notes, pushback, and hard-won operational detail here.
