RAGFlow Search Engine Deep Dive: Multi‑Path Retrieval, Fusion, and Reranking
The article provides a detailed technical analysis of RAGFlow's search engine, covering the Searcher class coordination, adaptive multi‑path retrieval (vector, keyword, and knowledge‑graph), intelligent fusion with weighted scoring, caching, performance monitoring, and both built‑in and model‑driven reranking to achieve high‑precision results.
Searcher class
The Searcher class in rag/nlp/search.py coordinates all retrieval sub‑modules. Its initializer creates a full‑text queryer and connects to the configured vector store.
class Searcher:
def __init__(self, config):
self.qryr = FulltextQueryer()
self.dataStore = self._get_storage()
def search(self, req, idx_names, kb_ids, emb_mdl=None, highlight=False, rank_feature=None):
# core search logic
...Design highlights
Intelligent weight allocation: vector weight 0.95, keyword weight 0.05 (experimentally tuned).
Adaptive strategy: switches between pure keyword mode and hybrid mode based on the presence of an embedding model.
Scalable scoring: supports custom rank_feature for business‑specific relevance.
Backend connection management
Searcher automatically selects the appropriate backend (Elasticsearch, Infinity, OpenSearch) according to config["vector_db"].
if config.get("vector_db") == "elasticsearch":
from rag.utils.es_conn import ESConnection
self.dataStore = ESConnection(config)
elif config.get("vector_db") == "infinity":
from rag.utils.infinity_conn import InfinityConnection
self.dataStore = InfinityConnection(config)
elif config.get("vector_db") == "opensearch":
from rag.utils.opensearch_conn import OpenSearchConnection
self.dataStore = OpenSearchConnection(config)
else:
self.dataStore = ESConnection(config)Elasticsearch – Enterprise‑grade, mature; excellent hybrid retrieval; medium deployment difficulty.
Infinity – High‑performance vector search; very fast, memory‑efficient; easy deployment.
OpenSearch – AWS/cloud‑native; similar to ES with better cloud integration; medium deployment difficulty.
Multi‑path retrieval
RAGFlow runs parallel retrieval strategies: vector semantic search, keyword BM25 search, and optional knowledge‑graph lookup. The search method builds a list of matchExprs and sends them to the data store.
if emb_mdl is None:
matchText, keywords = self.qryr.question(qst, min_match=0.3)
res = self.dataStore.search(src, [], filters, [matchText], orderBy, offset, limit, idx_names, kb_ids)
else:
matchText, keywords = self.qryr.question(qst, min_match=0.3)
matchDense = self.get_vector(qst, emb_mdl, topk, similarity)
fusionExpr = FusionExpr("weighted_sum", topk, {"weights": "0.05,0.95"})
matchExprs = [matchText, matchDense, fusionExpr]
res = self.dataStore.search(src, highlightFields, filters, matchExprs, orderBy, offset, limit, idx_names, kb_ids)Vector retrieval
Transforms the query into a high‑dimensional embedding and performs cosine similarity search.
def get_vector(self, txt, emb_mdl, topk=10, similarity=0.1):
qv, _ = emb_mdl.encode_queries(txt)
vector_column_name = f"q_{len(qv)}_vec"
return MatchDenseExpr(vector_column_name, qv, 'float', 'cosine', topk, {"similarity": similarity})Full‑text retrieval
Uses BM25 with boosted fields and fuzzy matching to guarantee exact term coverage.
class FulltextQueryer:
def question(self, txt, min_match=0.3):
otxt = re.sub(r"[
\t ]+", " ", txt).strip()
if not otxt:
return None, []
tks = self._tokenize_and_extract_keywords(otxt)
keywords = list(set(tks))
qs = []
for kwd in keywords:
if len(kwd) < 2:
continue
exact_match = f'"{kwd}"'
fuzzy_match = f'{kwd}~2'
if len(kwd.split()) > 1:
phrase_match = f'"{kwd}"~2'
qs.append(f'({exact_match})^3 OR ({phrase_match})^2 OR ({fuzzy_match})^1')
else:
qs.append(f'({exact_match})^2 OR ({fuzzy_match})^1')
if qs:
query = " OR ".join([f"({t})" for t in qs])
return MatchTextExpr(self.query_fields, query, 100, {"minimum_should_match": min_match}), keywords
return None, keywordsResult fusion
After parallel retrieval, FusionExpr merges candidates, normalizes scores, adds position weighting, and applies a diversity bonus for chunks appearing in multiple retrieval streams.
def _fuse_results(self, retrieval_results):
chunk_scores = {}
for retrieval_type, results in retrieval_results.items():
if not results:
continue
scores = [hit["score"] for hit in results]
max_score = max(scores)
min_score = min(scores)
score_range = max_score - min_score if max_score != min_score else 1.0
for rank, hit in enumerate(results):
chunk_id = hit["id"]
normalized_score = (hit["score"] - min_score) / score_range if score_range > 0 else 1.0
position_weight = 1.0 / (rank + 1)
final_score = normalized_score * 0.8 + position_weight * 0.2
if chunk_id not in chunk_scores:
chunk_scores[chunk_id] = {"scores": {}, "max_score": 0.0, "chunk_data": hit, "retrieval_types": []}
chunk_scores[chunk_id]["scores"][retrieval_type] = final_score
chunk_scores[chunk_id]["max_score"] = max(chunk_scores[chunk_id]["max_score"], final_score)
chunk_scores[chunk_id]["retrieval_types"].append(retrieval_type)
fused_chunks = []
for chunk_id, data in chunk_scores.items():
diversity_bonus = len(data["retrieval_types"]) * 0.1
weighted_score = sum(score * self.hybrid_weights.get(rt, 1.0) for rt, score in data["scores"].items())
total_weight = sum(self.hybrid_weights.get(rt, 1.0) for rt in data["scores"].keys())
final_score = (weighted_score / total_weight) + diversity_bonus if total_weight > 0 else data["max_score"] + diversity_bonus
chunk = Chunk(id=chunk_id, content=data["chunk_data"]["content"], score=final_score,
metadata=data["chunk_data"].get("metadata", {}),
retrieval_info={"types": data["retrieval_types"], "scores": data["scores"]})
fused_chunks.append(chunk)
fused_chunks.sort(key=lambda x: x.score, reverse=True)
self.logger.info(f"Fused {len(fused_chunks)} unique chunks from {sum(len(r) for r in retrieval_results.values())} total results")
return fused_chunksReranking
Rerank refines the fused list using either a built‑in hybrid similarity function or a dedicated rerank model. The process computes token similarity, vector similarity, and optional ranking features, then produces a final score.
def rerank(self, sres, query, tkweight=0.3, vtweight=0.7, cfield="content_ltks", rank_feature=None):
# extract vectors, build token list, compute hybrid similarity, add rank_feature scores
sim, tksim, vtsim = self.qryr.hybrid_similarity(
sres.query_vector, ins_embd, keywords, ins_tw, tkweight, vtweight)
rank_fea = self._rank_feature_scores(rank_feature, sres)
return sim + rank_fea, tksim, vtsim
def rerank_by_model(self, rerank_mdl, sres, query, tkweight=0.3, vtweight=0.7, rank_feature=None):
# prepare model input, call model, combine scores
...Performance optimisation & monitoring
Multi‑level cache: L1 query results, L2 vector embeddings, @lru_cache for repeated vectorisation.
Metrics collection records query count, average latency, cache hit rate, and error rate.
class SearchCache:
def __init__(self, config):
self.query_cache = {}
self.vector_cache = {}
self.cache_ttl = config.get("cache_ttl", 3600)
@lru_cache(maxsize=5000)
def get_cached_vector(self, text: str) -> List[float]:
# actual implementation provided elsewhere
return None
class SearchMetrics:
def __init__(self):
self.query_count = 0
self.avg_response_time = 0
self.cache_hit_rate = 0
self.error_rate = 0
def log_search_metrics(self, query, results, search_time, cache_hit):
self.query_count += 1
self.avg_response_time = (self.avg_response_time * (self.query_count - 1) + search_time) / self.query_count
self.logger.info(f"Search metrics - query: '{query[:50]}...', time: {search_time:.1f}ms, results: {len(results)}")Real‑world case study
A sample query “How to configure RAGFlow’s vector database?” demonstrates logging, parallel recall (28 vector candidates, 15 keyword candidates), reranking of the top‑10, and final precision of 85‑90 % after model‑based rerank.
[INFO] Start search: "How to configure RAGFlow's vector database?"
[DEBUG] Vector recall: 28 candidates (120ms)
[DEBUG] Full‑text recall: 15 candidates (35ms)
[DEBUG] Rerank processing: Top‑10 results (89ms)
[INFO] Search completed: total 297ms, returned 10 resultsSummary
RAGFlow combines multi‑path retrieval, intelligent fusion, and sophisticated reranking to achieve both high recall and high precision. Adaptive weighting, backend‑agnostic connectors, and built‑in performance tools make the search pipeline production‑ready and extensible for future AI‑driven applications.
Signed-in readers can open the original source through BestHub's protected redirect.
This article has been distilled and summarized from source material, then republished for learning and reference. If you believe it infringes your rights, please contactand we will review it promptly.
Tech Freedom Circle
Crazy Maker Circle (Tech Freedom Architecture Circle): a community of tech enthusiasts, experts, and high‑performance fans. Many top‑level masters, architects, and hobbyists have achieved tech freedom; another wave of go‑getters are hustling hard toward tech freedom.
How this landed with the community
Was this worth your time?
0 Comments
Thoughtful readers leave field notes, pushback, and hard-won operational detail here.
