Build a RAG App from Scratch: Master Text Chunking, Vector Retrieval, and Coreference Resolution
This tutorial walks through building a Retrieval‑Augmented Generation (RAG) system from the ground up, covering document parsing, text chunking strategies, vector store creation with ChromaDB, semantic search, prompt engineering for LLMs, conversation memory, coreference handling, and practical optimization tips, all illustrated with complete Python code.
What is RAG?
RAG (Retrieval‑Augmented Generation) adds an external knowledge base to an LLM so that the model can consult proprietary documents such as internal reports, technical manuals, or business files instead of relying solely on its training data.
Why Implement Your Own RAG Pipeline?
Frameworks like LangChain or LlamaIndex let you spin up a prototype quickly, but a custom implementation gives you full visibility into how documents are split, how embeddings are generated, and how the retrieval logic runs. When retrieval is inaccurate, answers are low‑quality, or costs rise, you can pinpoint the problematic stage—splitting strategy, embedding model, or retrieval parameters—rather than tweaking opaque framework settings.
Document Parsing: Converting Files to Plain Text
import os
import PyPDF2
import docx
def load_plain_text(file_path: str) -> str:
"""Load and return the full contents of a .txt file."""
with open(file_path, 'r', encoding='utf-8') as fp:
return fp.read()
def extract_text_from_pdf(file_path: str) -> str:
"""Read every page of a PDF and stitch the text together."""
texts = []
with open(file_path, 'rb') as fp:
reader = PyPDF2.PdfReader(fp)
for pg in reader.pages:
page_txt = pg.extract_text() or ""
texts.append(page_txt)
return "
".join(texts)
def extract_text_from_docx(file_path: str) -> str:
"""Grab all paragraphs from a .docx document."""
doc = docx.Document(file_path)
paras = [p.text for p in doc.paragraphs]
return "
".join(paras)A router selects the appropriate parser based on file extension:
def load_document(file_path: str):
"""Load a document's text based on its file extension."""
_, extension = os.path.splitext(file_path)
extension = extension.lower()
if extension == '.txt':
return read_text_file(file_path)
elif extension == '.pdf':
return read_pdf_file(file_path)
elif extension == '.docx':
return read_docx_file(file_path)
else:
raise ValueError(f"Unsupported file type: {extension}")Text Chunking: Splitting Long Documents
Because LLMs have context length limits, documents must be broken into manageable pieces. The example function splits on sentence boundaries and respects a maximum character length:
def chunk_sentences(text: str, max_length: int = 500) -> list[str]:
"""Split text into size‑limited chunks, breaking only at sentence boundaries."""
segments = text.replace('
', ' ').split('. ')
blocks = []
buffer = []
buffer_len = 0
for segment in segments:
seg = segment.strip()
if not seg:
continue
if not seg.endswith('.'): # ensure period
seg += '.'
seg_len = len(seg)
if buffer and buffer_len + seg_len > max_length:
blocks.append(' '.join(buffer))
buffer = [seg]
buffer_len = seg_len
else:
buffer.append(seg)
buffer_len += seg_len
if buffer:
blocks.append(' '.join(buffer))
return blocksChunk size is a trade‑off: 200‑500 characters for precise matching, 500‑1000 for richer context, and >1000 for narrative content.
Vector Store with ChromaDB
After chunking, each piece is embedded and stored for semantic search. ChromaDB provides a lightweight persistent store:
import chromadb
from chromadb.utils import embedding_functions
client = chromadb.PersistentClient(path="chroma_db")
sentence_transformer_ef = embedding_functions.SentenceTransformerEmbeddingFunction(
model_name="all-MiniLM-L6-v2"
)
collection = client.get_or_create_collection(
name="documents_collection",
embedding_function=sentence_transformer_ef
)Key components: PersistentClient – keeps data across restarts. SentenceTransformerEmbeddingFunction – converts text to vectors. all-MiniLM-L6-v2 – a compact yet effective embedding model.
Document Indexing and Batch Insertion
def build_knowledge_units(path: str):
"""Ingest a file, break it into chunks, and tag each piece with metadata."""
try:
raw = load_document(path)
segments = partition_text(raw)
name = os.path.basename(path)
metadata_records = [{"source_file": name, "segment_index": idx} for idx in range(len(segments))]
unique_keys = [f"{name}_seg_{idx}" for idx in range(len(segments))]
return unique_keys, segments, metadata_records
except Exception as err:
print(f"Failed to process '{path}': {err}")
return [], [], []
def batch_insert_into_store(store, record_ids, contents, metadata_list):
"""Insert items into the vector store in optimized batches."""
batch_size = 100
for start_idx in range(0, len(contents), batch_size):
stop_idx = min(start_idx + batch_size, len(contents))
store.add(
documents=contents[start_idx:stop_idx],
metadatas=metadata_list[start_idx:stop_idx],
ids=record_ids[start_idx:stop_idx]
)Running the ingest on a folder produces output such as:
► Processing customer_faqs.pdf …
✔ Loaded 51 chunks from customer_faqs.pdf
► Processing onboarding_guide.docx …
✔ Loaded 20 chunks from onboarding_guide.docxSemantic Retrieval
def run_semantic_query(collection, query: str, top_k: int = 2):
"""Run a semantic search to find the most relevant chunks."""
return collection.query(
query_texts=[query],
n_results=top_k
)
def build_context_and_citations(results):
"""Combine matched chunks and reference their original sources."""
combined_text = "
".join(results['documents'][0])
references = [f"{meta['source']} (chunk {meta['chunk']})" for meta in results['metadatas'][0]]
return combined_text, referencesSearch results include similarity scores and source identifiers, helping you assess retrieval quality.
LLM Prompt Engineering
import os
from openai import OpenAI
client = OpenAI()
os.environ["OPENAI_API_KEY"] = "your_api_key"
def build_prompt(context: str, question: str) -> str:
"""Construct a focused prompt using context and a user question."""
return f"""You are a helpful assistant. Use only the context provided below to answer.
If the answer cannot be found in the context, reply with \"I don't have that information.\"
Context:
{context}
Question: {question}
Answer:"""
def ask_openai(question: str, context: str) -> str:
"""Send the prompt to OpenAI and return the generated response."""
prompt = build_prompt(context, question)
try:
reply = client.chat.completions.create(
model="gpt-4-turbo",
messages=[
{"role": "system", "content": "You answer based strictly on the context provided."},
{"role": "user", "content": prompt}
],
temperature=0.3,
max_tokens=300
)
return reply.choices[0].message.content
except Exception as err:
return f"Error: {str(err)}"The temperature parameter controls randomness; for RAG, values between 0.0 and 0.3 keep answers grounded in the retrieved documents.
Conversation Memory
import uuid
from datetime import datetime
chat_sessions = {}
def start_new_session() -> str:
"""Initialize a fresh conversation session with a unique ID."""
session_id = str(uuid.uuid4())
chat_sessions[session_id] = []
return session_id
def log_message(session_id: str, sender: str, message: str):
"""Add a message to the session history."""
if session_id not in chat_sessions:
chat_sessions[session_id] = []
chat_sessions[session_id].append({"role": sender, "content": message, "timestamp": datetime.now().isoformat()})
def fetch_recent_messages(session_id: str, limit: int = 5):
"""Return the last few messages from a session."""
msgs = chat_sessions.get(session_id, [])
return msgs[-limit:]
def prepare_history_for_model(messages: list) -> str:
"""Convert messages into a single formatted string."""
return "
".join(f"{msg['role'].capitalize()}: {msg['content']}" for msg in messages)Coreference Resolution
def rewrite_query_with_context(query: str, chat_log: str, client: OpenAI) -> str:
"""Rewrites a follow‑up query as a full standalone question using prior conversation."""
prompt = f"""Rephrase follow‑up questions to be fully self‑contained.
Refer to the chat history as needed. Return only the rewritten question.
Chat History:
{chat_log}
Follow‑up: {query}
Standalone Question:"""
try:
response = client.chat.completions.create(
model="gpt-3.5-turbo",
messages=[{"role": "user", "content": prompt}],
temperature=0
)
return response.choices[0].message.content
except Exception as err:
print(f"Failed to contextualize query: {err}")
return queryFull Conversational RAG Flow
def handle_conversational_query(collection, query: str, session_id: str, n_chunks: int = 3):
"""Orchestrates the full RAG‑based QA flow in a chat session."""
# 1. Get recent chat history
chat_log = get_conversation_history(session_id)
prior_messages = format_history(chat_log)
# 2. Resolve pronouns
refined_query = contextualize_query(query, prior_messages, client)
print(f"[Refined Query] {refined_query}")
# 3. Retrieve relevant chunks
search_results = run_semantic_query(collection, refined_query, n_chunks)
retrieved_text, citations = build_context_and_citations(search_results)
# 4. Generate answer grounded in retrieved text
answer = generate_response(refined_query, retrieved_text)
# 5. Save interaction
add_message(session_id, "user", query)
add_message(session_id, "assistant", answer)
return answer, citationsExample usage:
session = start_conversation()
q1 = "What does LaunchPad do?"
reply, refs = smart_retrieval(collection, q1, session)
print(f"Answer: {reply}
Sources: {refs}")
q2 = "When did it start?"
reply, refs = smart_retrieval(collection, q2, session)
print(f"Answer: {reply}
Sources: {refs}")Practical Optimizations
Hybrid search: combine semantic similarity with metadata filters (e.g., department == "HR").
Automatic citation injection: append source list to the generated answer.
Dynamic chunk size based on document type (smaller chunks for financial docs, larger for narrative text).
History summarization: when conversation becomes long, summarize past turns with an LLM before feeding them back.
Conclusion
Building a RAG system from scratch requires more initial effort than using a ready‑made framework, but it grants complete control over each component, transparent cost accounting, and deep understanding of the retrieval‑augmented generation pipeline—benefits that become critical in complex, domain‑specific scenarios.
Signed-in readers can open the original source through BestHub's protected redirect.
This article has been distilled and summarized from source material, then republished for learning and reference. If you believe it infringes your rights, please contactand we will review it promptly.
Data STUDIO
Click to receive the "Python Study Handbook"; reply "benefit" in the chat to get it. Data STUDIO focuses on original data science articles, centered on Python, covering machine learning, data analysis, visualization, MySQL and other practical knowledge and project case studies.
How this landed with the community
Was this worth your time?
0 Comments
Thoughtful readers leave field notes, pushback, and hard-won operational detail here.
