Mastering RAG: Chunking, Embeddings, BM25 & Multi‑Index Retrieval in Python

This tutorial explains Retrieval‑Augmented Generation (RAG) from fundamentals to a full pipeline, covering text chunking strategies, VoyageAI embeddings, vector‑store implementation, BM25 lexical search, and a multi‑index retriever that fuses semantic and lexical results with Reciprocal Rank Fusion.

Su San Talks Tech
Su San Talks Tech
Su San Talks Tech
Mastering RAG: Chunking, Embeddings, BM25 & Multi‑Index Retrieval in Python

Preface

Feeding an 800‑page financial report directly to a large language model is expensive, slow, and yields poor results. Retrieval‑Augmented Generation (RAG) solves this by first splitting the document into small chunks and then feeding only the most relevant chunks to the model.

RAG pipeline overview
RAG pipeline overview

1. What is RAG

Retrieval‑Augmented Generation (RAG) is a technique that helps process documents too large to fit into a single prompt. Instead of placing the whole document in the prompt, RAG splits the text into multiple chunks and, when a question is asked, only the most relevant chunks are included.

For example, an 800‑page financial report could be placed in the prompt as:

根据技术文档回答用户的问题。

<用户问题>
{user_question}
</用户问题>

<财务文档>
{financial_document}
</财务文档>

This approach has several limitations:

Prompt length has a hard limit.

Model performance degrades with very long prompts.

Longer prompts increase cost and latency.

RAG first preprocesses the document into small chunks, then at query time retrieves only the chunks most related to the question and inserts them into the prompt.

RAG workflow illustration
RAG workflow illustration

Advantages of RAG include:

Claude (or any LLM) focuses only on the most relevant content.

The approach scales to very large documents.

Multiple documents can be handled.

Prompt cost is lower and inference is faster.

Challenges include the need for a preprocessing step, a search mechanism, possible loss of context, and choosing the best chunking method.

2. Text Chunking Strategies

Chunking is a critical step because the way a document is split directly affects the quality of the RAG system. A poor strategy can insert irrelevant context and cause the model to answer incorrectly.

Example: a document containing both medical research and software engineering sections. If a user asks "How many bugs did the engineers fix this year?" a naïve chunker might return a medical paragraph that contains the word "bug" but is unrelated.

The three most common strategies are:

Size‑based chunking

Structure‑based chunking

Semantic‑based chunking

Size‑based chunking

The simplest method splits the text into equal‑length strings. For a 325‑character document, three chunks of roughly 108 characters each are produced.

Size‑based chunking illustration
Size‑based chunking illustration

Drawbacks:

Words may be cut in the middle of a sentence.

Important surrounding context can be lost.

Section titles may become detached from their content.

Adding overlap between consecutive chunks mitigates these issues. The following implementation demonstrates a basic size‑based chunker with overlap:

def chunk_by_char(text, chunk_size=150, chunk_overlap=20):
    chunks = []
    start_idx = 0
    while start_idx < len(text):
        end_idx = min(start_idx + chunk_size, len(text))
        chunk_text = text[start_idx:end_idx]
        chunks.append(chunk_text)
        start_idx = (end_idx - chunk_overlap if end_idx < len(text) else len(text))
    return chunks
chunk_size

controls the length of each chunk, and chunk_overlap defines how many characters are shared with the previous chunk.

Structure‑based chunking

This method splits the document according to its natural structure—headings, paragraphs, and sections. It works well for well‑formatted Markdown files.

Structure‑based chunking illustration
Structure‑based chunking illustration

Basic implementation for Markdown:

def chunk_by_section(document_text):
    pattern = r"
## "
    return re.split(pattern, document_text)

This produces clean, meaningful chunks because each chunk represents a complete section. It requires the source document to have clear structural markers.

Semantic‑based chunking

The most complex method first splits the text into sentences, then uses an NLP model to assess similarity between consecutive sentences and groups related sentences into chunks. It yields the most relevant chunks but is computationally expensive.

Sentence‑based chunking (practical compromise)

A practical middle ground is to split by sentence using a regular expression and optionally add overlap:

import re

def chunk_by_sentence(text, max_sentences_per_chunk=2, overlap_sentences=1):
    sentences = re.split(r"(?<=[.!?。!?])\s*", text)
    chunks = []
    start_idx = 0
    while start_idx < len(sentences):
        end_idx = min(start_idx + max_sentences_per_chunk, len(sentences))
        current_chunk = sentences[start_idx:end_idx]
        chunks.append(" ".join(current_chunk))
        start_idx += max_sentences_per_chunk - overlap_sentences
        if start_idx < 0:
            start_idx = 0
    return chunks

Note: the regular expression handles both Chinese and English punctuation.

3. Text Embedding

After chunking, the next step is to convert each chunk into a numeric vector (embedding) so that similarity can be measured. Embeddings map words and sentences into a mathematical space that the model can understand.

Input text is fed to an embedding model.

The model outputs a list of numbers in the range [-1, +1].

Each dimension captures a different latent feature of the input.

Embedding illustration
Embedding illustration

We do not know the exact meaning of each dimension; the model learns these during training.

Using VoyageAI for embeddings

Because Anthropic does not provide an embedding service, the tutorial uses the free VoyageAI model. Steps:

Register on the VoyageAI website.

Obtain a free API key.

Add the key to the .env file. VOYAGE_API_KEY="your_key_here" Install the library and create a helper function:

%pip install voyageai
from dotenv import load_dotenv
import voyageai

load_dotenv()
client = voyageai.Client()

def generate_embedding(text, model="voyage-3-large", input_type="query"):
    result = client.embed([text], model=model, input_type=input_type)
    return result.embeddings[0]

Test the function on a chunk:

with open("./report_zh.md", "r") as f:
    text = f.read()
chunks = chunk_by_section(text)
generate_embedding(chunks[0])

The resulting vector can be stored and later compared with the embedding of a user query.

4. Full RAG Pipeline

The tutorial now ties together chunking, embedding, vector storage, and similarity search.

Step 1 – Split the source text

Two example parts are used:

Medical research – "This year we made major progress on XDR‑47, a bug we have never seen before."

Software engineering – "The department invested heavily in studying various infection vectors in distributed systems."

Step 2 – Generate embeddings for all chunks

Embeddings are generated in batch to avoid rate‑limiting:

def generate_embedding(chunks, model="voyage-3-large", input_type="query"):
    """Generate embeddings for a list of strings or a single string."""
    is_list = isinstance(chunks, list)
    input = chunks if is_list else [chunks]
    result = client.embed(input, model=model, input_type=input_type)
    return result.embeddings if is_list else result.embeddings[0]

Step 3 – Store embeddings in a vector store

A lightweight in‑memory vector store is implemented. Only the essential methods are shown:

import math
from typing import Optional, Any, List, Dict, Tuple

class VectorIndex:
    def __init__(self, distance_metric: str = "cosine", embedding_fn=None):
        self.vectors: List[List[float]] = []
        self.documents: List[Dict[str, Any]] = []
        self._vector_dim: Optional[int] = None
        if distance_metric not in ["cosine", "euclidean"]:
            raise ValueError("distance_metric must be 'cosine' or 'euclidean'")
        self._distance_metric = distance_metric
        self._embedding_fn = embedding_fn

    def add_document(self, document: Dict[str, Any]):
        if not self._embedding_fn:
            raise ValueError("Embedding function not provided during initialization.")
        if not isinstance(document, dict):
            raise TypeError("Document must be a dictionary.")
        if "content" not in document:
            raise ValueError("Document dictionary must contain a 'content' key.")
        content = document["content"]
        if not isinstance(content, str):
            raise TypeError("Document 'content' must be a string.")
        vector = self._embedding_fn(content)
        self.add_vector(vector=vector, document=document)

    def add_documents(self, documents: List[Dict[str, Any]]):
        if not self._embedding_fn:
            raise ValueError("Embedding function not provided during initialization.")
        if not isinstance(documents, list):
            raise TypeError("Documents must be a list of dictionaries.")
        if not documents:
            return
        contents = []
        for i, doc in enumerate(documents):
            if not isinstance(doc, dict):
                raise TypeError(f"Document at index {i} must be a dictionary.")
            if "content" not in doc:
                raise ValueError(f"Document at index {i} must contain a 'content' key.")
            if not isinstance(doc["content"], str):
                raise TypeError(f"Document 'content' at index {i} must be a string.")
            contents.append(doc["content"])
        vectors = self._embedding_fn(contents)
        for vector, document in zip(vectors, documents):
            self.add_vector(vector=vector, document=document)

    def search(self, query: Any, k: int = 1) -> List[Tuple[Dict[str, Any], float]]:
        if not self.vectors:
            return []
        if isinstance(query, str):
            if not self._embedding_fn:
                raise ValueError("Embedding function not provided for string query.")
            query_vector = self._embedding_fn(query)
        elif isinstance(query, list) and all(isinstance(x, (int, float)) for x in query):
            query_vector = query
        else:
            raise TypeError("Query must be either a string or a list of numbers.")
        if self._vector_dim is None:
            return []
        if len(query_vector) != self._vector_dim:
            raise ValueError(f"Query vector dimension mismatch. Expected {self._vector_dim}, got {len(query_vector)}")
        if k <= 0:
            raise ValueError("k must be a positive integer")
        if self._distance_metric == "cosine":
            dist_func = self._cosine_distance
        else:
            dist_func = self._euclidean_distance
        distances = []
        for i, stored_vector in enumerate(self.vectors):
            distance = dist_func(query_vector, stored_vector)
            distances.append((distance, self.documents[i]))
        distances.sort(key=lambda item: item[0])
        return [(doc, dist) for dist, doc in distances[:k]]

    def add_vector(self, vector, document: Dict[str, Any]):
        if not isinstance(vector, list) or not all(isinstance(x, (int, float)) for x in vector):
            raise TypeError("Vector must be a list of numbers.")
        if not isinstance(document, dict):
            raise TypeError("Document must be a dictionary.")
        if "content" not in document:
            raise ValueError("Document dictionary must contain a 'content' key.")
        if not self.vectors:
            self._vector_dim = len(vector)
        elif len(vector) != self._vector_dim:
            raise ValueError(f"Inconsistent vector dimension. Expected {self._vector_dim}, got {len(vector)}")
        self.vectors.append(list(vector))
        self.documents.append(document)

    def _euclidean_distance(self, vec1: List[float], vec2: List[float]) -> float:
        if len(vec1) != len(vec2):
            raise ValueError("Vectors must have the same dimension")
        return math.sqrt(sum((p - q) ** 2 for p, q in zip(vec1, vec2)))

    def _dot_product(self, vec1: List[float], vec2: List[float]) -> float:
        if len(vec1) != len(vec2):
            raise ValueError("Vectors must have the same dimension")
        return sum(p * q for p, q in zip(vec1, vec2))

    def _magnitude(self, vec: List[float]) -> float:
        return math.sqrt(sum(x * x for x in vec))

    def _cosine_distance(self, vec1: List[float], vec2: List[float]) -> float:
        if len(vec1) != len(vec2):
            raise ValueError("Vectors must have the same dimension")
        mag1 = self._magnitude(vec1)
        mag2 = self._magnitude(vec2)
        if mag1 == 0 and mag2 == 0:
            return 0.0
        if mag1 == 0 or mag2 == 0:
            return 1.0
        dot_prod = self._dot_product(vec1, vec2)
        cosine_similarity = dot_prod / (mag1 * mag2)
        cosine_similarity = max(-1.0, min(1.0, cosine_similarity))
        return 1.0 - cosine_similarity

    def __len__(self) -> int:
        return len(self.vectors)

    def __repr__(self) -> str:
        has_embed_fn = "Yes" if self._embedding_fn else "No"
        return f"VectorIndex(count={len(self)}, dim={self._vector_dim}, metric='{self._distance_metric}', has_embedding_fn='{has_embed_fn}')"

Populate the store:

store = VectorIndex()
for embedding, chunk in zip(embeddings, chunks):
    store.add_vector(embedding, {"content": chunk})

When a user asks a question (e.g., "What did the software‑engineering department do last year?"), the query is embedded and the nearest chunks are retrieved using cosine distance.

user_embedding = generate_embedding("软件工程系去年做了什么?")
results = store.search(user_embedding, 2)
for doc, distance in results:
    print(distance, "
", doc["content"][:200], "
")

The search returns the most relevant chunks together with a distance score (lower = more similar). The tutorial notes that while this simple implementation works, real‑world scenarios may need more robustness.

5. BM25 Lexical Search

Semantic search alone may miss exact term matches. BM25 provides a classic lexical search that scores documents based on term frequency and inverse document frequency.

Example query: "INC‑2023‑Q4‑011". A hybrid approach runs both semantic and lexical searches and merges the results.

Hybrid semantic + BM25 search illustration
Hybrid semantic + BM25 search illustration

BM25 steps:

Tokenize the query.

Count how often each term appears in the whole corpus.

Weight rare terms higher.

Return documents that contain more high‑weight terms.

BM25 implementation

The following class implements a BM25 index with optional Chinese tokenization via jieba:

import jieba
from collections import Counter
from typing import Callable, Any, List, Dict, Tuple, Optional

class BM25Index:
    def __init__(self, k1: float = 1.5, b: float = 0.75, tokenizer: Optional[Callable[[str], List[str]]] = None):
        self.documents: List[Dict[str, Any]] = []
        self._corpus_tokens: List[List[str]] = []
        self._doc_len: List[int] = []
        self._doc_freqs: Dict[str, int] = {}
        self._avg_doc_len: float = 0.0
        self._idf: Dict[str, float] = {}
        self._index_built: bool = False
        self.k1 = k1
        self.b = b
        self._tokenizer = tokenizer if tokenizer else self._default_tokenizer_zh

    def _default_tokenizer(self, text: str) -> List[str]:
        text = text.lower()
        tokens = re.split(r"\W+", text)
        return [token for token in tokens if token]

    def _default_tokenizer_zh(self, text: str) -> List[str]:
        tokens = jieba.cut(text)
        return [t.lower() for t in tokens if re.match(r"\w+", t)]

    def _update_stats_add(self, doc_tokens: List[str]):
        self._doc_len.append(len(doc_tokens))
        seen_in_doc = set()
        for token in doc_tokens:
            if token not in seen_in_doc:
                self._doc_freqs[token] = self._doc_freqs.get(token, 0) + 1
                seen_in_doc.add(token)
        self._index_built = False

    def _calculate_idf(self):
        N = len(self.documents)
        self._idf = {}
        for term, freq in self._doc_freqs.items():
            idf_score = math.log(((N - freq + 0.5) / (freq + 0.5)) + 1)
            self._idf[term] = idf_score

    def _build_index(self):
        if not self.documents:
            self._avg_doc_len = 0.0
            self._idf = {}
            self._index_built = True
            return
        self._avg_doc_len = sum(self._doc_len) / len(self.documents)
        self._calculate_idf()
        self._index_built = True

    def add_document(self, document: Dict[str, Any]):
        if not isinstance(document, dict):
            raise TypeError("Document must be a dictionary.")
        if "content" not in document:
            raise ValueError("Document dictionary must contain a 'content' key.")
        content = document.get("content", "")
        if not isinstance(content, str):
            raise TypeError("Document 'content' must be a string.")
        doc_tokens = self._tokenizer(content)
        self.documents.append(document)
        self._corpus_tokens.append(doc_tokens)
        self._update_stats_add(doc_tokens)

    def add_documents(self, documents: List[Dict[str, Any]]):
        if not isinstance(documents, list):
            raise TypeError("Documents must be a list of dictionaries.")
        if not documents:
            return
        for i, doc in enumerate(documents):
            if not isinstance(doc, dict):
                raise TypeError(f"Document at index {i} must be a dictionary.")
            if "content" not in doc:
                raise ValueError(f"Document at index {i} must contain a 'content' key.")
            if not isinstance(doc["content"], str):
                raise TypeError(f"Document 'content' at index {i} must be a string.")
            content = doc["content"]
            doc_tokens = self._tokenizer(content)
            self.documents.append(doc)
            self._corpus_tokens.append(doc_tokens)
            self._update_stats_add(doc_tokens)
        self._index_built = False

    def _compute_bm25_score(self, query_tokens: List[str], doc_index: int) -> float:
        score = 0.0
        doc_term_counts = Counter(self._corpus_tokens[doc_index])
        doc_length = self._doc_len[doc_index]
        for token in query_tokens:
            if token not in self._idf:
                continue
            idf = self._idf[token]
            term_freq = doc_term_counts.get(token, 0)
            numerator = idf * term_freq * (self.k1 + 1)
            denominator = term_freq + self.k1 * (1 - self.b + self.b * (doc_length / self._avg_doc_len))
            score += numerator / (denominator + 1e-9)
        return score

    def search(self, query_text: str, k: int = 1, score_normalization_factor: float = 0.1) -> List[Tuple[Dict[str, Any], float]]:
        if not self.documents:
            return []
        if not isinstance(query_text, str):
            raise TypeError("Query text must be a string.")
        if k <= 0:
            raise ValueError("k must be a positive integer.")
        if not self._index_built:
            self._build_index()
        if self._avg_doc_len == 0:
            return []
        query_tokens = self._tokenizer(query_text)
        if not query_tokens:
            return []
        raw_scores = []
        for i in range(len(self.documents)):
            raw_score = self._compute_bm25_score(query_tokens, i)
            if raw_score > 1e-9:
                raw_scores.append((raw_score, self.documents[i]))
        raw_scores.sort(key=lambda item: item[0], reverse=True)
        normalized_results = []
        for raw_score, doc in raw_scores[:k]:
            normalized_score = math.exp(-score_normalization_factor * raw_score)
            normalized_results.append((doc, normalized_score))
        normalized_results.sort(key=lambda item: item[1])
        return normalized_results

    def __len__(self) -> int:
        return len(self.documents)

    def __repr__(self) -> str:
        return f"BM25VectorStore(count={len(self)}, k1={self.k1}, b={self.b}, index_built={self._index_built})"

Key points of BM25:

Rare terms receive higher weight.

Common words are down‑weighted.

It excels at matching technical identifiers, IDs, and exact phrases.

6. Multi‑Index RAG Pipeline

Both VectorIndex (semantic) and BM25Index (lexical) share a common API ( add_document, add_documents, search). A new Retriever class coordinates them and merges results using Reciprocal Rank Fusion (RRF).

Multi‑index architecture diagram
Multi‑index architecture diagram

Reciprocal Rank Fusion

RRF combines rankings from different search methods fairly. For each document, the score is: score = Σ 1 / (k + rank_i(d)) where k is a constant (commonly 60) and rank_i(d) is the rank of document d in the i ‑th index.

Documents are then sorted by descending RRF score.

Retriever implementation

from typing import Any, List, Dict, Tuple, Protocol

class SearchIndex(Protocol):
    def add_document(self, document: Dict[str, Any]) -> None: ...
    def add_documents(self, documents: List[Dict[str, Any]]) -> None: ...
    def search(self, query: Any, k: int = 1) -> List[Tuple[Dict[str, Any], float]]: ...

class Retriever:
    def __init__(self, *indexes: SearchIndex):
        if len(indexes) == 0:
            raise ValueError("At least one index must be provided")
        self._indexes = list(indexes)

    def add_document(self, document: Dict[str, Any]):
        for index in self._indexes:
            index.add_document(document)

    def add_documents(self, documents: List[Dict[str, Any]]):
        for index in self._indexes:
            index.add_documents(documents)

    def search(self, query_text: str, k: int = 1, k_rrf: int = 60) -> List[Tuple[Dict[str, Any], float]]:
        if not isinstance(query_text, str):
            raise TypeError("Query text must be a string.")
        if k <= 0:
            raise ValueError("k must be a positive integer.")
        if k_rrf < 0:
            raise ValueError("k_rrf must be non-negative.")
        # Retrieve more candidates from each index to give RRF enough data
        all_results = [index.search(query_text, k=k * 5) for index in self._indexes]
        doc_ranks: Dict[int, Dict[str, Any]] = {}
        for idx, results in enumerate(all_results):
            for rank, (doc, _) in enumerate(results):
                doc_id = id(doc)  # use object identity as a unique key
                if doc_id not in doc_ranks:
                    doc_ranks[doc_id] = {"doc_obj": doc, "ranks": [float("inf")] * len(self._indexes)}
                doc_ranks[doc_id]["ranks"][idx] = rank + 1  # ranks start at 1
        def calc_rrf_score(ranks: List[float]) -> float:
            return sum(1.0 / (k_rrf + r) for r in ranks if r != float("inf"))
        scored_docs: List[Tuple[Dict[str, Any], float]] = [
            (info["doc_obj"], calc_rrf_score(info["ranks"]))
            for info in doc_ranks.values()
        ]
        filtered_docs = [(doc, score) for doc, score in scored_docs if score > 0]
        filtered_docs.sort(key=lambda x: x[1], reverse=True)
        return filtered_docs[:k]

The retriever can be instantiated with both indexes:

vector_store = VectorIndex(embedding_fn=lambda txt: generate_embedding(txt))
bm25_store = BM25Index()
retriever = Retriever(vector_store, bm25_store)

After adding the same documents to both stores, a hybrid query such as "INC‑2023‑Q4‑011 happened what?" yields a ranked list where the RRF score reflects agreement between the semantic and lexical ranks.

Hybrid search results
Hybrid search results

Higher RRF scores indicate stronger relevance. The example shows that the combined approach returns more accurate answers than using either semantic or lexical search alone.

Extensibility

Because every index implements the SearchIndex protocol, new retrieval methods (keyword‑based, graph‑based, domain‑specific) can be added simply by providing add_document(s) and search methods. The Retriever will automatically fuse their rankings using RRF.

Extensible multi‑index diagram
Extensible multi‑index diagram

In summary, the tutorial demonstrates a complete RAG pipeline: chunking, embedding with VoyageAI, vector storage, BM25 lexical search, and a multi‑index retriever that merges results with Reciprocal Rank Fusion, providing a robust and scalable solution for large‑document question answering.

Original Source

Signed-in readers can open the original source through BestHub's protected redirect.

Sign in to view source
Republication Notice

This article has been distilled and summarized from source material, then republished for learning and reference. If you believe it infringes your rights, please contactadmin@besthub.devand we will review it promptly.

PythonRAGVector DatabaseBM25ChunkingEmbeddingsReciprocal Rank Fusion
Su San Talks Tech
Written by

Su San Talks Tech

Su San, former staff at several leading tech companies, is a top creator on Juejin and a premium creator on CSDN, and runs the free coding practice site www.susan.net.cn.

0 followers
Reader feedback

How this landed with the community

Sign in to like

Rate this article

Was this worth your time?

Sign in to rate
Discussion

0 Comments

Thoughtful readers leave field notes, pushback, and hard-won operational detail here.