Build a RAG App from Scratch: Master Text Chunking, Vector Retrieval, and Coreference Resolution

This tutorial walks through building a Retrieval‑Augmented Generation (RAG) system from the ground up, covering document parsing, text chunking strategies, vector store creation with ChromaDB, semantic search, prompt engineering for LLMs, conversation memory, coreference handling, and practical optimization tips, all illustrated with complete Python code.

Data STUDIO
Data STUDIO
Data STUDIO
Build a RAG App from Scratch: Master Text Chunking, Vector Retrieval, and Coreference Resolution

What is RAG?

RAG (Retrieval‑Augmented Generation) adds an external knowledge base to an LLM so that the model can consult proprietary documents such as internal reports, technical manuals, or business files instead of relying solely on its training data.

Why Implement Your Own RAG Pipeline?

Frameworks like LangChain or LlamaIndex let you spin up a prototype quickly, but a custom implementation gives you full visibility into how documents are split, how embeddings are generated, and how the retrieval logic runs. When retrieval is inaccurate, answers are low‑quality, or costs rise, you can pinpoint the problematic stage—splitting strategy, embedding model, or retrieval parameters—rather than tweaking opaque framework settings.

Document Parsing: Converting Files to Plain Text

import os
import PyPDF2
import docx

def load_plain_text(file_path: str) -> str:
    """Load and return the full contents of a .txt file."""
    with open(file_path, 'r', encoding='utf-8') as fp:
        return fp.read()

def extract_text_from_pdf(file_path: str) -> str:
    """Read every page of a PDF and stitch the text together."""
    texts = []
    with open(file_path, 'rb') as fp:
        reader = PyPDF2.PdfReader(fp)
        for pg in reader.pages:
            page_txt = pg.extract_text() or ""
            texts.append(page_txt)
    return "
".join(texts)

def extract_text_from_docx(file_path: str) -> str:
    """Grab all paragraphs from a .docx document."""
    doc = docx.Document(file_path)
    paras = [p.text for p in doc.paragraphs]
    return "
".join(paras)

A router selects the appropriate parser based on file extension:

def load_document(file_path: str):
    """Load a document's text based on its file extension."""
    _, extension = os.path.splitext(file_path)
    extension = extension.lower()
    if extension == '.txt':
        return read_text_file(file_path)
    elif extension == '.pdf':
        return read_pdf_file(file_path)
    elif extension == '.docx':
        return read_docx_file(file_path)
    else:
        raise ValueError(f"Unsupported file type: {extension}")

Text Chunking: Splitting Long Documents

Because LLMs have context length limits, documents must be broken into manageable pieces. The example function splits on sentence boundaries and respects a maximum character length:

def chunk_sentences(text: str, max_length: int = 500) -> list[str]:
    """Split text into size‑limited chunks, breaking only at sentence boundaries."""
    segments = text.replace('
', ' ').split('. ')
    blocks = []
    buffer = []
    buffer_len = 0
    for segment in segments:
        seg = segment.strip()
        if not seg:
            continue
        if not seg.endswith('.'):  # ensure period
            seg += '.'
        seg_len = len(seg)
        if buffer and buffer_len + seg_len > max_length:
            blocks.append(' '.join(buffer))
            buffer = [seg]
            buffer_len = seg_len
        else:
            buffer.append(seg)
            buffer_len += seg_len
    if buffer:
        blocks.append(' '.join(buffer))
    return blocks

Chunk size is a trade‑off: 200‑500 characters for precise matching, 500‑1000 for richer context, and >1000 for narrative content.

Vector Store with ChromaDB

After chunking, each piece is embedded and stored for semantic search. ChromaDB provides a lightweight persistent store:

import chromadb
from chromadb.utils import embedding_functions

client = chromadb.PersistentClient(path="chroma_db")
sentence_transformer_ef = embedding_functions.SentenceTransformerEmbeddingFunction(
    model_name="all-MiniLM-L6-v2"
)
collection = client.get_or_create_collection(
    name="documents_collection",
    embedding_function=sentence_transformer_ef
)

Key components: PersistentClient – keeps data across restarts. SentenceTransformerEmbeddingFunction – converts text to vectors. all-MiniLM-L6-v2 – a compact yet effective embedding model.

Document Indexing and Batch Insertion

def build_knowledge_units(path: str):
    """Ingest a file, break it into chunks, and tag each piece with metadata."""
    try:
        raw = load_document(path)
        segments = partition_text(raw)
        name = os.path.basename(path)
        metadata_records = [{"source_file": name, "segment_index": idx} for idx in range(len(segments))]
        unique_keys = [f"{name}_seg_{idx}" for idx in range(len(segments))]
        return unique_keys, segments, metadata_records
    except Exception as err:
        print(f"Failed to process '{path}': {err}")
        return [], [], []

def batch_insert_into_store(store, record_ids, contents, metadata_list):
    """Insert items into the vector store in optimized batches."""
    batch_size = 100
    for start_idx in range(0, len(contents), batch_size):
        stop_idx = min(start_idx + batch_size, len(contents))
        store.add(
            documents=contents[start_idx:stop_idx],
            metadatas=metadata_list[start_idx:stop_idx],
            ids=record_ids[start_idx:stop_idx]
        )

Running the ingest on a folder produces output such as:

► Processing customer_faqs.pdf …
✔ Loaded 51 chunks from customer_faqs.pdf

► Processing onboarding_guide.docx …
✔ Loaded 20 chunks from onboarding_guide.docx

Semantic Retrieval

def run_semantic_query(collection, query: str, top_k: int = 2):
    """Run a semantic search to find the most relevant chunks."""
    return collection.query(
        query_texts=[query],
        n_results=top_k
    )

def build_context_and_citations(results):
    """Combine matched chunks and reference their original sources."""
    combined_text = "

".join(results['documents'][0])
    references = [f"{meta['source']} (chunk {meta['chunk']})" for meta in results['metadatas'][0]]
    return combined_text, references

Search results include similarity scores and source identifiers, helping you assess retrieval quality.

LLM Prompt Engineering

import os
from openai import OpenAI

client = OpenAI()
os.environ["OPENAI_API_KEY"] = "your_api_key"

def build_prompt(context: str, question: str) -> str:
    """Construct a focused prompt using context and a user question."""
    return f"""You are a helpful assistant. Use only the context provided below to answer.
If the answer cannot be found in the context, reply with \"I don't have that information.\"

Context:
{context}

Question: {question}

Answer:"""

def ask_openai(question: str, context: str) -> str:
    """Send the prompt to OpenAI and return the generated response."""
    prompt = build_prompt(context, question)
    try:
        reply = client.chat.completions.create(
            model="gpt-4-turbo",
            messages=[
                {"role": "system", "content": "You answer based strictly on the context provided."},
                {"role": "user", "content": prompt}
            ],
            temperature=0.3,
            max_tokens=300
        )
        return reply.choices[0].message.content
    except Exception as err:
        return f"Error: {str(err)}"

The temperature parameter controls randomness; for RAG, values between 0.0 and 0.3 keep answers grounded in the retrieved documents.

Conversation Memory

import uuid
from datetime import datetime

chat_sessions = {}

def start_new_session() -> str:
    """Initialize a fresh conversation session with a unique ID."""
    session_id = str(uuid.uuid4())
    chat_sessions[session_id] = []
    return session_id

def log_message(session_id: str, sender: str, message: str):
    """Add a message to the session history."""
    if session_id not in chat_sessions:
        chat_sessions[session_id] = []
    chat_sessions[session_id].append({"role": sender, "content": message, "timestamp": datetime.now().isoformat()})

def fetch_recent_messages(session_id: str, limit: int = 5):
    """Return the last few messages from a session."""
    msgs = chat_sessions.get(session_id, [])
    return msgs[-limit:]

def prepare_history_for_model(messages: list) -> str:
    """Convert messages into a single formatted string."""
    return "
".join(f"{msg['role'].capitalize()}: {msg['content']}" for msg in messages)

Coreference Resolution

def rewrite_query_with_context(query: str, chat_log: str, client: OpenAI) -> str:
    """Rewrites a follow‑up query as a full standalone question using prior conversation."""
    prompt = f"""Rephrase follow‑up questions to be fully self‑contained.
Refer to the chat history as needed. Return only the rewritten question.

Chat History:
{chat_log}

Follow‑up: {query}
Standalone Question:"""
    try:
        response = client.chat.completions.create(
            model="gpt-3.5-turbo",
            messages=[{"role": "user", "content": prompt}],
            temperature=0
        )
        return response.choices[0].message.content
    except Exception as err:
        print(f"Failed to contextualize query: {err}")
        return query

Full Conversational RAG Flow

def handle_conversational_query(collection, query: str, session_id: str, n_chunks: int = 3):
    """Orchestrates the full RAG‑based QA flow in a chat session."""
    # 1. Get recent chat history
    chat_log = get_conversation_history(session_id)
    prior_messages = format_history(chat_log)
    # 2. Resolve pronouns
    refined_query = contextualize_query(query, prior_messages, client)
    print(f"[Refined Query] {refined_query}")
    # 3. Retrieve relevant chunks
    search_results = run_semantic_query(collection, refined_query, n_chunks)
    retrieved_text, citations = build_context_and_citations(search_results)
    # 4. Generate answer grounded in retrieved text
    answer = generate_response(refined_query, retrieved_text)
    # 5. Save interaction
    add_message(session_id, "user", query)
    add_message(session_id, "assistant", answer)
    return answer, citations

Example usage:

session = start_conversation()
q1 = "What does LaunchPad do?"
reply, refs = smart_retrieval(collection, q1, session)
print(f"Answer: {reply}
Sources: {refs}")

q2 = "When did it start?"
reply, refs = smart_retrieval(collection, q2, session)
print(f"Answer: {reply}
Sources: {refs}")

Practical Optimizations

Hybrid search: combine semantic similarity with metadata filters (e.g., department == "HR").

Automatic citation injection: append source list to the generated answer.

Dynamic chunk size based on document type (smaller chunks for financial docs, larger for narrative text).

History summarization: when conversation becomes long, summarize past turns with an LLM before feeding them back.

Conclusion

Building a RAG system from scratch requires more initial effort than using a ready‑made framework, but it grants complete control over each component, transparent cost accounting, and deep understanding of the retrieval‑augmented generation pipeline—benefits that become critical in complex, domain‑specific scenarios.

Original Source

Signed-in readers can open the original source through BestHub's protected redirect.

Sign in to view source
Republication Notice

This article has been distilled and summarized from source material, then republished for learning and reference. If you believe it infringes your rights, please contactadmin@besthub.devand we will review it promptly.

PythonRAGvector databaseChromaDBtext chunkingcoreference resolution
Data STUDIO
Written by

Data STUDIO

Click to receive the "Python Study Handbook"; reply "benefit" in the chat to get it. Data STUDIO focuses on original data science articles, centered on Python, covering machine learning, data analysis, visualization, MySQL and other practical knowledge and project case studies.

0 followers
Reader feedback

How this landed with the community

Sign in to like

Rate this article

Was this worth your time?

Sign in to rate
Discussion

0 Comments

Thoughtful readers leave field notes, pushback, and hard-won operational detail here.