Building a Multimodal RAG System with LangChain 1.0: Core Architecture and Smart Q&A Development
This article walks through the design and implementation of a multimodal Retrieval‑Augmented Generation (RAG) system using LangChain 1.0, detailing a front‑end/back‑end separated architecture, FastAPI service setup, multimodal data handling, conversation history management, streaming responses, and Postman testing to verify the intelligent Q&A module.
Core Architecture and Technology Stack
The project implements a multimodal RAG conversational system that accepts text, image, audio, and PDF inputs and provides four core capabilities: intelligent Q&A, image analysis, audio transcription, and PDF parsing. A modern front‑end/back‑end separated architecture is adopted to illustrate enterprise‑grade design.
The backend consists of four independent agent modules, each handling a specific multimodal input type.
Smart Q&A Agent Construction
1. Environment Setup and Dependency Import
import json
import uvicorn
from typing import List, Dict, Any, AsyncGenerator
from datetime import datetime
from pydantic import BaseModel, Field
from fastapi import HTTPException, FastAPI
from fastapi.responses import StreamingResponse
from fastapi.middleware.cors import CORSMiddleware
from langchain.chat_models import init_chat_model
from langchain.messages import SystemMessage, HumanMessage, AIMessage
from langchain_core.messages import BaseMessage2. Multimodal Model Initialization
The full‑modal model Qwen3‑Omni‑30B‑A3B‑Instruct released by Alibaba in September 2025 is accessed via the SiliconFlow API using init_chat_model:
def get_chat_model():
try:
model = init_chat_model(
model="Qwen/Qwen3-Omni-30B-A3B-Instruct",
model_provider="openai",
base_url="https://api.siliconflow.cn/v1/",
api_key="YOUR_SILICONFLOW_API_KEY",
)
return model
except Exception as e:
raise HTTPException(status_code=500, detail=f"模型初始化失败: {str(e)}")3. Data Structure Definition
class ContentBlock(BaseModel):
type: str = Field(description="内容类型: text, image, audio")
content: str = Field(description="内容数据")
class MessageRequest(BaseModel):
content_blocks: List[ContentBlock] = Field(default=[], description="内容块")
history: List[Dict[str, Any]] = Field(default=[], description="对话历史")
class MessageResponse(BaseModel):
content: str
timestamp: str
role: str4. Multimodal Message Construction
def create_multimodal_message(request: MessageRequest) -> HumanMessage:
"""创建多模态消息"""
message_content = []
# 处理内容块
for i, block in enumerate(request.content_blocks):
if block.type == "text":
message_content.append({"type": "text", "text": block.content})
return HumanMessage(content=message_content[0]["text"])5. Conversation History Management
def convert_history_to_messages(history: List[Dict[str, Any]]) -> List[BaseMessage]:
"""将历史记录转换为 LangChain 消息格式,支持多模态内容"""
messages = []
# 添加系统消息
system_prompt = """
你是一个专业的多模态 RAG 助手,具备与用户对话的能力, 请以专业、准确、友好的方式回答用户所提问题。
"""
messages.append(SystemMessage(content=system_prompt))
# 转换历史消息
for i, msg in enumerate(history):
content = msg.get("content", "")
content_blocks = msg.get("content_blocks", [])
message_content = []
if msg["role"] == "user":
for block in content_blocks:
if block.get("type") == "text":
message_content.append({"type": "text", "text": block.get("content", "")})
messages.append(HumanMessage(content=message_content))
elif msg["role"] == "assistant":
messages.append(AIMessage(content=content))
return messages6. Streaming Response Generation
async def generate_streaming_response(messages: List[BaseMessage]) -> AsyncGenerator[str, None]:
"""生成流式响应"""
try:
model = get_chat_model()
full_response = ""
chunk_count = 0
async for chunk in model.astream(messages):
chunk_count += 1
if hasattr(chunk, 'content') and chunk.content:
content = chunk.content
full_response += content
data = {"type": "content_delta", "content": content, "timestamp": datetime.now().isoformat()}
yield f"data: {json.dumps(data, ensure_ascii=False)}
"
final_data = {"type": "message_complete", "full_content": full_response, "timestamp": datetime.now().isoformat()}
yield f"data: {json.dumps(final_data, ensure_ascii=False)}
"
except Exception as e:
error_data = {"type": "error", "error": str(e), "timestamp": datetime.now().isoformat()}
yield f"data: {json.dumps(error_data, ensure_ascii=False)}
"7. API Endpoints
Two FastAPI routes are provided: a streaming endpoint /api/chat/stream and a synchronous endpoint /api/chat.
async def chat_stream(request: MessageRequest):
"""流式聊天接口(支持多模态)"""
try:
messages = convert_history_to_messages(request.history)
current_message = create_multimodal_message(request)
messages.append(current_message)
return StreamingResponse(
generate_streaming_response(messages),
media_type="text/plain",
headers={"Cache-Control": "no-cache", "Connection": "keep-alive", "Content-Type": "text/event-stream"},
)
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
async def chat_sync(request: MessageRequest):
"""同步聊天接口(支持多模态)"""
try:
messages = convert_history_to_messages(request.history)
current_message = create_multimodal_message(request)
messages.append(current_message)
model = get_chat_model()
response = await model.ainvoke(messages)
return MessageResponse(content=response.content, role="assistant", timestamp=datetime.now().isoformat())
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))FastAPI Service Setup
app = FastAPI(
title="多模态 RAG 工作台 API",
description="基于 LangChain 1.0 的智能对话 API",
version="1.0.0",
)
# CORS configuration
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
@app.post("/api/chat/stream")
async def chat_stream(request: MessageRequest):
...
@app.post("/api/chat")
async def chat_sync(request: MessageRequest):
...
if __name__ == "__main__":
uvicorn.run(app, host="localhost", port=8000)Postman Testing
To verify the service, the author runs the server with python main.py and uses Postman to send POST requests to http://localhost:8000/api/chat/stream with Content-Type: application/json. A single‑round test payload:
{
"content_blocks": [{"type": "text", "content": "什么是人工智能?"}],
"history": []
}The streaming response returns multiple content_delta chunks followed by a message_complete payload, confirming correct operation.
A multi‑round test includes a prior conversation history to demonstrate memory retention. The response correctly references the user’s name from the earlier exchange, showing that the system preserves context.
Conclusion
The article provides a complete, reproducible guide for building the core smart‑question‑answering component of a multimodal RAG system with LangChain 1.0, covering architecture design, model integration, data modeling, streaming mechanics, FastAPI service creation, and end‑to‑end testing. Subsequent installments will extend the system to image analysis and audio transcription.
Fun with Large Models
Master's graduate from Beijing Institute of Technology, published four top‑journal papers, previously worked as a developer at ByteDance and Alibaba. Currently researching large models at a major state‑owned enterprise. Committed to sharing concise, practical AI large‑model development experience, believing that AI large models will become as essential as PCs in the future. Let's start experimenting now!
How this landed with the community
Was this worth your time?
0 Comments
Thoughtful readers leave field notes, pushback, and hard-won operational detail here.
