5 Essential Design Principles for Building High‑Quality RAG Systems
This article outlines five critical design principles for constructing high‑quality Retrieval‑Augmented Generation (RAG) systems, covering document chunking strategies, embedding model selection, hybrid retrieval architectures, metadata filtering with multi‑level indexes, and reranking mechanisms, and provides concrete code snippets and evaluation metrics.
1. Document Chunking Strategy
1.1 Limitations of Fixed‑size Chunking
Fixed‑size chunking (e.g., 512 tokens) splits arbitrarily, breaking sentences, code structures, or table rows.
def naive_chunking(text: str, chunk_size: int = 512) -> list[str]:
tokens = text.split() # naive tokenization
return [' '.join(tokens[i:i+chunk_size]) for i in range(0, len(tokens), chunk_size)]May cut sentences or paragraphs arbitrarily
May split functions or classes in the middle of code
May divide a table row into two parts
1.2 Semantic‑aware Intelligent Chunking
from langchain.text_splitter import RecursiveCharacterTextSplitter
text_splitter = RecursiveCharacterTextSplitter(
chunk_size=800,
chunk_overlap=100, # overlap keeps context continuity
length_function=len,
separators=["
", "
", "。", "!", "?", " ", ""]
)1.3 Special Content Chunking
Code files : split by function, class, or logical unit rather than by line count.
import ast
def split_code_by_function(code: str) -> list[dict]:
"""Split code into functions/classes while preserving structure"""
try:
tree = ast.parse(code)
chunks = []
for node in ast.walk(tree):
if isinstance(node, (ast.FunctionDef, ast.AsyncFunctionDef, ast.ClassDef)):
start = node.lineno - 1
end = node.end_lineno
chunk_content = code.split('
')[start:end]
chunks.append({
"content": '
'.join(chunk_content),
"type": type(node).__name__,
"name": node.name,
})
return chunks
except Exception:
return [{"content": code, "type": "unknown", "name": "unknown"}]Table data : treat the whole table as a single retrieval unit.
def process_table_as_unit(table_element) -> dict:
"""Convert a table into an independent retrieval unit"""
return {
"content": table_element.to_markdown(),
"metadata": {
"type": "table",
"row_count": len(table_element.rows),
"header": table_element.headers,
},
}2. Vector Embedding Model Selection
2.1 Comparison of General‑Purpose Embedding Models
text-embedding-3-large – 3072‑dim, high MTEB score, suited for general use.
text-embedding-3-small – 1536‑dim, medium MTEB score, cost‑sensitive scenarios.
cohere-embed-v4 – 1024‑dim, high MTEB score, multilingual applications.
BGE‑M3 – 1024‑dim, high MTEB score, Chinese‑English bilingual use.
2.2 Domain‑Adapted Embeddings
For vertical domains (medical, legal, finance) a domain‑adapted model often outperforms generic embeddings.
embedding_config = {
"model": "thenlper/gte-large-zh", # Chinese‑optimized
"dimension": 1024,
"normalize": True, # required for cosine similarity
"batch_size": 32,
}2.3 Embedding Quality Validation
def evaluate_embedding_quality(embedder, test_cases: list[dict]) -> dict:
"""Assess embedding model performance on a test set"""
correct = 0
for case in test_cases:
query_emb = embedder.encode(case["query"])
doc_emb = embedder.encode(case["positive_doc"])
neg_emb = embedder.encode(case["negative_doc"])
pos_sim = cosine_similarity(query_emb, doc_emb)
neg_sim = cosine_similarity(query_emb, neg_emb)
if pos_sim > neg_sim:
correct += 1
return {"accuracy": correct / len(test_cases)}3. Hybrid Retrieval Architecture
3.1 Complementarity of Sparse and Dense Retrieval
Dense (vector) retrieval – excels at semantic similarity, captures synonyms and polysemy.
Sparse (BM25/TF‑IDF) retrieval – excels at exact keyword matching, captures proper nouns and technical terms.
from rank_bm25 import BM25Okapi
class HybridRetriever:
def __init__(self, vector_store, documents: list[str]):
self.vector_store = vector_store
tokenized_docs = [doc.lower().split() for doc in documents]
self.bm25 = BM25Okapi(tokenized_docs)
def retrieve(self, query: str, k: int = 10, alpha: float = 0.5) -> list[dict]:
"""Hybrid retrieval where alpha=0 → pure BM25, alpha=1 → pure vector"""
# Vector search (expanded to 2k candidates)
vector_results = self.vector_store.similarity_search(query, k=k*2)
vector_scores = {r.page_content: r.metadata.get("score", 1.0) for r in vector_results}
# BM25 search
tokenized_query = query.lower().split()
bm25_scores = self.bm25.get_scores(tokenized_query)
top_bm25_idx = np.argsort(bm25_scores)[::-1][:k*2]
bm25_results = {documents[i]: bm25_scores[i] / max(bm25_scores) for i in top_bm25_idx}
# Fuse scores
all_docs = set(vector_scores) | set(bm25_results)
fused = []
for doc in all_docs:
vs = vector_scores.get(doc, 0)
bs = bm25_results.get(doc, 0)
fused.append((doc, alpha * vs + (1 - alpha) * bs))
fused.sort(key=lambda x: x[1], reverse=True)
return [{"content": doc, "score": score} for doc, score in fused[:k]]3.2 Keyword Cache to Accelerate Sparse Retrieval
import redis, json, hashlib
class CachedHybridRetriever(HybridRetriever):
def __init__(self, *args, cache: redis.Redis, **kwargs):
super().__init__(*args, **kwargs)
self.cache = cache
def _get_cache_key(self, query: str) -> str:
return f"bm25:{hashlib.md5(query.encode()).hexdigest()}"
def _bm25_retrieve(self, query: str, k: int) -> list[tuple[str, float]]:
cache_key = self._get_cache_key(query)
cached = self.cache.get(cache_key)
if cached:
return json.loads(cached)
result = super()._bm25_retrieve(query, k)
self.cache.setex(cache_key, 3600, json.dumps(result))
return result4. Metadata Filtering and Index Design
4.1 Metadata Structure Design
document_metadata = {
"id": "doc_001",
"source": "api_docs",
"source_url": "https://api.example.com/v1/users",
"created_at": "2024-03-15",
"updated_at": "2024-11-20",
"version": "2.1.0",
"category": "user_management",
"tags": ["users", "authentication", "crud"],
"language": "zh",
"author": "backend_team",
"chunk_index": 3,
}4.2 Multi‑Level Index Architecture
Two‑stage retrieval: first filter candidate IDs in a metadata index (Elasticsearch or Solr), then perform vector similarity on the reduced set.
from elasticsearch import Elasticsearch
class MultiIndexRetriever:
def __init__(self, es_client: Elasticsearch):
self.es = es_client
def retrieve_with_filter(self, query: str, filters: dict, k: int = 10) -> list[dict]:
must = [{"multi_match": {"query": query, "fields": ["content^2", "title"]}}]
filter_clauses = []
if filters.get("category"):
filter_clauses.append({"term": {"category": filters["category"]}})
if filters.get("date_range"):
filter_clauses.append({"range": {"updated_at": {"gte": filters["date_range"]["start"], "lte": filters["date_range"]["end"]}}})
if filters.get("tags"):
filter_clauses.append({"terms": {"tags": filters["tags"]}})
body = {"query": {"bool": {"must": must, "filter": filter_clauses}}, "size": k}
return self.es.search(index="documents", body=body)4.3 Hierarchical Index Design
Level 1 stores structured metadata for fast filtering; Level 2 stores high‑dimensional vectors (e.g., Pinecone, Milvus). Query flow:
Apply user filters in the metadata index to obtain candidate document IDs.
Restrict vector search to those IDs.
Re‑rank the final top‑k results.
5. Reranking Mechanisms
5.1 Cross‑Encoder Reranking
from sentence_transformers import CrossEncoder
class Reranker:
def __init__(self, model_name: str = "BAAI/bge-reranker-large"):
self.model = CrossEncoder(model_name, max_length=512)
def rerank(self, query: str, documents: list[str], top_k: int = 5) -> list[dict]:
pairs = [[query, doc] for doc in documents]
scores = self.model.predict(pairs)
ranked = sorted(zip(documents, scores), key=lambda x: x[1], reverse=True)
return [{"content": doc, "score": float(score)} for doc, score in ranked[:top_k]]5.2 Cascade Reranking Strategy
class CascadeReranker:
def __init__(self, retrievers: list, rerankers: list):
self.retrievers = retrievers # multiple retrievers
self.rerankers = rerankers # multiple rerank models
def retrieve_and_rerank(self, query: str, filters: dict = None,
initial_k: int = 50, final_k: int = 5) -> list[dict]:
# Stage 1: collect candidates from all retrievers
candidates = {}
for retriever in self.retrievers:
results = retriever.retrieve(query, k=initial_k, filters=filters)
for r in results:
doc_id = r["content"]
if doc_id not in candidates:
candidates[doc_id] = {"content": doc_id, "scores": []}
candidates[doc_id]["scores"].append(r["score"])
candidate_docs = [c["content"] for c in candidates.values()]
# Stage 2: coarse rerank with first model
if self.rerankers:
coarse = self.rerankers[0].rerank(query, candidate_docs, top_k=20)
candidate_docs = [r["content"] for r in coarse]
# Stage 3: fine rerank with second model (if present)
if len(self.rerankers) >= 2:
final = self.rerankers[1].rerank(query, candidate_docs, top_k=final_k)
return final
return coarse[:final_k]6. Quality Assurance and Continuous Optimization
6.1 Offline Evaluation Metrics
def evaluate_rag_system(rag_pipeline, test_dataset: list[dict]) -> dict:
"""Evaluate RAG performance on a held‑out test set"""
results = {"retrieval_precision": [], "retrieval_recall": [], "generation_fluency": [], "answer_relevance": []}
for case in test_dataset:
retrieved = rag_pipeline.retrieve(case["query"])
relevant = set(case["relevant_docs"])
retrieved_set = set(d["content"] for d in retrieved)
recall = len(retrieved_set & relevant) / len(relevant)
results["retrieval_recall"].append(recall)
answer = rag_pipeline.generate(case["query"], retrieved)
results["answer_relevance"].append(compute_answer_relevance(answer, case["question"]))
return {k: sum(v) / len(v) for k, v in results.items()}6.2 Online Monitoring Metrics
Retrieval recall inferred from user clicks/feedback.
Answer satisfaction score.
P99 retrieval latency.
Vector index storage growth rate.
Key Takeaways
Chunking must respect semantic boundaries; use overlap and special handling for code and tables.
Select embedding models based on dimension, MTEB benchmark, and domain requirements; validate quality before production.
Hybrid retrieval combines dense semantic matching with sparse exact matching; tune the α weight (e.g., 0.5) and cache frequent BM25 results.
Metadata‑driven multi‑level indexing reduces candidate set size and improves latency.
Cross‑encoder reranking improves relevance; cascade reranking balances efficiency and accuracy.
Continuously measure offline recall/precision and monitor online latency and user satisfaction to guide iterative improvements.
MaGe Linux Operations
Founded in 2009, MaGe Education is a top Chinese high‑end IT training brand. Its graduates earn 12K+ RMB salaries, and the school has trained tens of thousands of students. It offers high‑pay courses in Linux cloud operations, Python full‑stack, automation, data analysis, AI, and Go high‑concurrency architecture. Thanks to quality courses and a solid reputation, it has talent partnerships with numerous internet firms.
How this landed with the community
Was this worth your time?
0 Comments
Thoughtful readers leave field notes, pushback, and hard-won operational detail here.
