Building Production‑Ready RAG with Vector Databases: Deep Dive into Chroma, Pinecone, Milvus and Optimizations
This article explains why Retrieval‑Augmented Generation is needed, compares popular vector databases, provides step‑by‑step Docker and Python examples for Chroma, Pinecone, and Milvus, and shows how to optimize a full RAG agent with hybrid search, reranking, and caching.
1. Why RAG?
Large language models have several limitations:
Knowledge cutoff (e.g., GPT‑4 stops at 2023)
Cannot access private data such as internal documents
Hallucination – may fabricate facts
Limited context window (finite token count per request)
High cost to retrain or fine‑tune
Retrieval‑Augmented Generation (RAG) addresses these issues by fetching relevant external information and feeding it to the LLM.
┌─────────────────────────────────────────────────────────────────┐
│ RAG Workflow │
├─────────────────────────────────────────────────────────────────┤
│ User Question: "How many vacation days does the company give?" │
│ ↓ Embedding (vectorize the question) │
│ ↓ Similarity Search in a vector DB │
│ ↓ Augmented Prompt (include retrieved snippets) │
│ ↓ LLM generates answer │
└─────────────────────────────────────────────────────────────────┘2. Vector‑Database Comparison
Chroma – Embedded, lightweight, no deployment needed; ideal for development, testing, or small projects.
Pinecone – Managed cloud service, high availability, auto‑scaling; suited for production and enterprise use.
Milvus – Distributed, high‑performance, feature‑rich; best for large‑scale scenarios.
Qdrant – Rust‑based, cloud‑native, high performance; also for large‑scale workloads.
PgVector – PostgreSQL extension, reuses existing PG infrastructure; fits stacks already using PostgreSQL.
Redis – In‑memory, ultra‑fast, good for caching and real‑time retrieval.
3. Chroma Practical (Development / Testing)
3.1 Docker Deployment
# docker-compose.yml
version: '3.8'
services:
chroma:
image: chromadb/chroma:latest
container_name: chroma
ports:
- "8000:8000"
volumes:
- chroma-data:/chroma/chroma
environment:
- IS_PERSISTENT=TRUE
- ANONYMIZED_TELEMETRY=FALSE
command: uvicorn chromadb.app:app --reload --workers 1 --host 0.0.0.0 --port 8000
volumes:
chroma-data:3.2 Python Client
# chroma_client.py
import chromadb
from chromadb.utils import embedding_functions
client = chromadb.HttpClient(host="localhost", port=8000)
collection = client.create_collection(
name="knowledge_base",
embedding_function=embedding_functions.OpenAIEmbeddingFunction(
api_key=os.getenv("OPENAI_API_KEY"),
model_name="text-embedding-ada-002"
)
)
collection.add(
documents=["入职满1年有5天年假", "入职满3年有10天年假", "入职满5年有15天年假"],
metadatas=[{"source": "policy", "category": "vacation"}] * 3,
ids=["doc1", "doc2", "doc3"]
)
results = collection.query(query_texts=["年假有多少天?"], n_results=2)4. Pinecone Practical (Production)
4.1 Register and Create Index
# 1. Register Pinecone (free tier available)
# 2. Create index
pinecone create index --name knowledge-index --dimension 1536 --metric cosine4.2 Python Client
# pinecone_client.py
import pinecone
from sentence_transformers import SentenceTransformer
pinecone.init(api_key="your-api-key", environment="us-west1-gcp")
index = pinecone.Index("knowledge-index")
model = SentenceTransformer('all-MiniLM-L6-v2')
documents = [{"id": "doc1", "text": "入职满1年有5天年假", "metadata": {"source": "policy"}},
{"id": "doc2", "text": "入职满3年有10天年假", "metadata": {"source": "policy"}}]
vectors = []
for doc in documents:
vector = model.encode(doc["text"]).tolist()
vectors.append((doc["id"], vector, doc["metadata"]))
index.upsert(vectors=vectors)
query_text = "年假有多少天?"
query_vector = model.encode(query_text).tolist()
results = index.query(vector=query_vector, top_k=3, include_metadata=True)
for match in results['matches']:
print(f"Score: {match['score']}, Text: {match['id']}")5. Milvus Practical (Large‑Scale)
5.1 Docker Deployment
# docker-compose-milvus.yml
version: '3.5'
services:
etcd:
image: quay.io/coreos/etcd:v3.5.5
container_name: milvus-etcd
environment:
- ETCD_AUTO_COMPACTION_MODE=revision
- ETCD_AUTO_COMPACTION_RETENTION=1000
- ETCD_QUOTA_BACKEND_BYTES=4294967296
volumes:
- etcd-data:/etcd
minio:
image: minio/minio:RELEASE.2023-03-20T20-16-18Z
container_name: milvus-minio
environment:
MINIO_ACCESS_KEY: minioadmin
MINIO_SECRET_KEY: minioadmin
command: minio server /data
volumes:
- minio-data:/data
standalone:
image: milvusdb/milvus:v2.3.3
container_name: milvus-standalone
command: ["milvus", "run", "standalone"]
environment:
ETCD_ENDPOINTS: etcd:2379
MINIO_ADDRESS: minio:9000
ports:
- "19530:19530"
depends_on:
- etcd
- minio
volumes:
etcd-data:
minio-data:5.2 Python Client
# milvus_client.py
from pymilvus import connections, Collection, FieldSchema, CollectionSchema, DataType
connections.connect(host='localhost', port='19530')
fields = [
FieldSchema(name="id", dtype=DataType.INT64, is_primary=True),
FieldSchema(name="embedding", dtype=DataType.FLOAT_VECTOR, dim=768),
FieldSchema(name="text", dtype=DataType.VARCHAR, max_length=500)
]
schema = CollectionSchema(fields)
collection = Collection("knowledge_base", schema)
index_params = {"metric_type": "COSINE", "index_type": "IVF_FLAT", "params": {"nlist": 128}}
collection.create_index("embedding", index_params)
# Insert example data
import numpy as np
from sentence_transformers import SentenceTransformer
model = SentenceTransformer('all-MiniLM-L6-v2')
texts = ["入职满1年有5天年假", "入职满3年有10天年假"]
embeddings = model.encode(texts)
collection.insert([
[1, 2], # ids
embeddings.tolist(), # embeddings
texts # texts
])
collection.load()
query_text = "年假天数"
query_vector = model.encode(query_text).tolist()
results = collection.search(data=[query_vector], anns_field="embedding",
param={"metric_type": "COSINE", "params": {"nprobe": 10}}, limit=3, output_fields=["text"])6. RAG System Optimizations
6.1 Document Processing Pipeline
# document_pipeline.py
from langchain_community.document_loaders import TextLoader, PyPDFLoader, CSVLoader, UnstructuredMarkdownLoader
from langchain.text_splitter import RecursiveCharacterTextSplitter
class DocumentProcessor:
"""Document processing pipeline"""
def __init__(self):
self.text_splitter = RecursiveCharacterTextSplitter(chunk_size=500, chunk_overlap=50,
separators=["
", "
", "。", ",", " ", ""])
def load_document(self, file_path: str):
if file_path.endswith('.txt'):
loader = TextLoader(file_path, encoding='utf-8')
elif file_path.endswith('.pdf'):
loader = PyPDFLoader(file_path)
elif file_path.endswith('.csv'):
loader = CSVLoader(file_path)
elif file_path.endswith('.md'):
loader = UnstructuredMarkdownLoader(file_path)
else:
raise ValueError(f"Unsupported file type: {file_path}")
return loader.load()
def split_documents(self, documents):
return self.text_splitter.split_documents(documents)
def process(self, file_path: str):
docs = self.load_document(file_path)
chunks = self.split_documents(docs)
print(f"✅ Loaded {len(docs)} documents")
print(f"✅ Split into {len(chunks)} chunks")
return chunks6.2 Hybrid Search (Vector + Keyword)
# hybrid_search.py
from sentence_transformers import SentenceTransformer
from rank_bm25 import BM25Okapi
import numpy as np
class HybridSearch:
"""Hybrid retrieval: vector similarity + BM25 keyword search"""
def __init__(self, embedding_model_name='all-MiniLM-L6-v2'):
self.embedding_model = SentenceTransformer(embedding_model_name)
self.documents = []
self.embeddings = None
self.bm25 = None
def index(self, documents):
self.documents = documents
texts = [doc.page_content for doc in documents]
self.embeddings = self.embedding_model.encode(texts)
tokenized_docs = [text.split() for text in texts]
self.bm25 = BM25Okapi(tokenized_docs)
print(f"✅ Indexed {len(documents)} documents")
def search(self, query: str, top_k: int = 5, alpha: float = 0.5):
query_emb = self.embedding_model.encode([query])[0]
vector_scores = np.dot(self.embeddings, query_emb)
bm25_scores = self.bm25.get_scores(query.split())
# Normalize scores
vector_scores = (vector_scores - vector_scores.min()) / (vector_scores.max() - vector_scores.min() + 1e-8)
bm25_scores = (bm25_scores - bm25_scores.min()) / (bm25_scores.max() - bm25_scores.min() + 1e-8)
hybrid_scores = alpha * vector_scores + (1 - alpha) * bm25_scores
indices = np.argsort(hybrid_scores)[::-1][:top_k]
results = []
for i in indices:
results.append({
"content": self.documents[i].page_content,
"score": float(hybrid_scores[i]),
"vector_score": float(vector_scores[i]),
"bm25_score": float(bm25_scores[i])
})
return results6.3 Rerank (Result Re‑ordering)
# rerank.py
from sentence_transformers import CrossEncoder
class Reranker:
"""Result reranking using a cross‑encoder"""
def __init__(self, model_name='cross-encoder/ms-marco-MiniLM-L-6-v2'):
self.model = CrossEncoder(model_name)
def rerank(self, query: str, candidates: list, top_k: int = 3):
pairs = [(query, cand['content']) for cand in candidates]
scores = self.model.predict(pairs)
for i, cand in enumerate(candidates):
cand['rerank_score'] = float(scores[i])
candidates.sort(key=lambda x: x['rerank_score'], reverse=True)
return candidates[:top_k]6.4 Cache Strategy
# cache.py
import hashlib, json, redis
class QueryCache:
"""Simple Redis‑backed query cache"""
def __init__(self, redis_host='localhost', redis_port=6379):
self.redis_client = redis.Redis(host=redis_host, port=redis_port, decode_responses=True)
self.ttl = 3600 # 1 hour
def _get_key(self, query: str) -> str:
return f"rag_cache:{hashlib.md5(query.encode()).hexdigest()}"
def get(self, query: str):
key = self._get_key(query)
cached = self.redis_client.get(key)
if cached:
return json.loads(cached)
return None
def set(self, query: str, result):
key = self._get_key(query)
self.redis_client.setex(key, self.ttl, json.dumps(result, ensure_ascii=False))
def clear(self, pattern: str = None):
if pattern:
keys = self.redis_client.keys(f"rag_cache:{pattern}*")
else:
keys = self.redis_client.keys("rag_cache:*")
for key in keys:
self.redis_client.delete(key)7. Complete RAG Agent Implementation
# complete_rag_agent.py
import os
from dotenv import load_dotenv
from langchain_openai import ChatOpenAI, OpenAIEmbeddings
from langchain.chains import RetrievalQA
from langchain_community.vectorstores import Chroma
from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain_community.document_loaders import TextLoader
load_dotenv()
class RAGAgent:
"""Basic production‑grade RAG agent"""
def __init__(self, knowledge_path: str):
self.knowledge_path = knowledge_path
self.vectorstore = None
self.qa_chain = None
self._init_vectorstore()
self._init_chain()
def _init_vectorstore(self):
print("📚 Loading knowledge base…")
loader = TextLoader(self.knowledge_path, encoding='utf-8')
documents = loader.load()
splitter = RecursiveCharacterTextSplitter(chunk_size=500, chunk_overlap=50)
docs = splitter.split_documents(documents)
embeddings = OpenAIEmbeddings()
self.vectorstore = Chroma.from_documents(documents=docs, embedding=embeddings, persist_directory="./chroma_db")
print(f"✅ Vector store ready with {len(docs)} chunks")
def _init_chain(self):
llm = ChatOpenAI(model="gpt-4", temperature=0)
self.qa_chain = RetrievalQA.from_chain_type(
llm=llm,
retriever=self.vectorstore.as_retriever(search_type="similarity", search_kwargs={"k": 4}),
return_source_documents=True,
verbose=True
)
def ask(self, question: str) -> dict:
result = self.qa_chain.invoke(question)
return {
"question": question,
"answer": result['result'],
"sources": [doc.metadata.get('source', 'unknown') for doc in result['source_documents']]
}
# Advanced version with cache, MMR retrieval and custom prompt
class AdvancedRAGAgent:
"""RAG agent with caching, MMR retrieval and prompt engineering"""
def __init__(self, knowledge_path: str):
self.knowledge_path = knowledge_path
self.vectorstore = None
self.llm = ChatOpenAI(model="gpt-4", temperature=0)
self.cache = {}
self._init_vectorstore()
def _init_vectorstore(self):
from langchain_community.document_loaders import DirectoryLoader, TextLoader
loader = DirectoryLoader(self.knowledge_path, glob="**/*.txt", loader_cls=TextLoader, loader_kwargs={'encoding': 'utf-8'})
documents = loader.load()
splitter = RecursiveCharacterTextSplitter(chunk_size=500, chunk_overlap=50)
docs = splitter.split_documents(documents)
embeddings = OpenAIEmbeddings()
self.vectorstore = Chroma.from_documents(docs, embeddings)
def ask_with_retrieval(self, question: str, top_k: int = 5) -> dict:
if question in self.cache:
print("✅ Cache hit")
return self.cache[question]
retriever = self.vectorstore.as_retriever(search_type="mmr", search_kwargs={"k": top_k, "fetch_k": 10})
docs = retriever.invoke(question)
context = "
".join([doc.page_content for doc in docs])
prompt = f"""Reference the following information to answer the question:
{context}
Question: {question}
Requirements:
1. Use only the above information.
2. State explicitly if the information is insufficient.
3. Keep the answer concise and accurate.
"""
response = self.llm.invoke(prompt)
result = {"question": question, "answer": response.content,
"sources": [doc.metadata.get('source', 'unknown') for doc in docs]}
self.cache[question] = result
return result
if __name__ == "__main__":
os.makedirs("knowledge", exist_ok=True)
with open("knowledge/policy.txt", "w", encoding='utf-8') as f:
f.write("""公司年假政策:
- 入职满1年:5天年假
- 入职满3年:10天年假
- 入职满5年:15天年假
- 年假可跨年使用,最多累积到20天
""")
agent = RAGAgent("knowledge")
questions = ["公司年假有多少天?", "入职满3年有多少年假?", "年假可以累积吗?"]
for q in questions:
print(f"
👤 User: {q}")
res = agent.ask(q)
print(f"🤖 Agent: {res['answer']}")
print(f"📖 Sources: {res['sources']}")8. Best‑Practice Checklist
Document Processing : clean noise, unify format, perform semantic chunking.
Metadata : add source, timestamps, categories.
Indexing : choose appropriate embedding dimension and algorithm (IVF, HNSW, etc.).
Retrieval : use similarity or MMR, consider hybrid vector + keyword search.
Rerank : apply a cross‑encoder for final ordering.
Cache : store frequent query results to cut latency.
Generation : craft structured prompts, cite sources, and provide fallback messages when information is missing.
Performance Comparison (summary)
Pure vector search – Accuracy ~70 %, latency ~50 ms, low cost, good for rapid prototyping.
Vector + BM25 – Accuracy ~80 %, latency ~100 ms, low cost, suitable for general use.
+ Rerank – Accuracy ~90 %, latency ~200 ms, medium cost, needed for high‑precision tasks.
+ Cache – Accuracy ~90 %, latency ~10 ms, low cost, ideal for high‑frequency queries.
9. Next Episode Preview
AI Agent from Intro to Practice (6): Deep dive into Function Calling – covering principles, multi‑tool coordination, complex parameter passing, and tool‑chain orchestration.
Signed-in readers can open the original source through BestHub's protected redirect.
This article has been distilled and summarized from source material, then republished for learning and reference. If you believe it infringes your rights, please contactand we will review it promptly.
Coder Trainee
Experienced in Java and Python, we share and learn together. For submissions or collaborations, DM us.
How this landed with the community
Was this worth your time?
0 Comments
Thoughtful readers leave field notes, pushback, and hard-won operational detail here.
