RAGFlow Link Tracing: GPS‑Style Observability for LLM‑Powered Applications
The article explains why RAGFlow needs end‑to‑end link tracing, introduces OpenTelemetry’s core concepts, shows how custom tracing utilities are implemented in Python, describes the layered architecture, provides concrete Docker and YAML configurations, and offers best‑practice guidelines for performance monitoring and fault diagnosis.
Why Link Tracing Is Needed
Imagine a parcel’s journey from warehouse to delivery; each step is recorded and can be queried by a tracking number. RAGFlow’s tracing system works the same way, recording every request from entry to response so operators can see the full "journey" of a user query.
Challenges in Distributed RAGFlow
A single user request may span multiple services—API, conversation, retrieval, vector, and LLM services—each with its own potential failures such as parameter parsing errors, context loss, index unavailability, or slow model inference. Without tracing, diagnosing a problem is like blindfolded guessing.
OpenTelemetry Basics
What Is OpenTelemetry?
OpenTelemetry (OTel) is the de‑facto standard for observability, unifying tracing, metrics, and logs much like HTTP unified web communication.
Design Principles
Vendor neutrality : comparable to a USB interface that works with any device.
Language agnosticism : supports Python, Java, Go, etc.
Gradual adoption : you can instrument one component at a time.
Core Concepts
Trace : a complete request journey, composed of multiple spans.
Span : a single operation (e.g., document retrieval, LLM inference) with start/end timestamps and attributes.
Context propagation : a unique identifier passed via HTTP headers or message queues so all services can link their spans together.
RAGFlow’s Custom Tracing Tool
# rag/utils/trace_utils.py - RAGFlow core tracing utility
import time, functools
from opentelemetry import trace
from opentelemetry.exporter.jaeger.thrift import JaegerExporter
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor
from opentelemetry.instrumentation.flask import FlaskInstrumentor
from opentelemetry.instrumentation.requests import RequestsInstrumentor
class RAGFlowTracer:
def __init__(self, service_name="ragflow"):
trace.set_tracer_provider(TracerProvider())
tracer = trace.get_tracer_provider().get_tracer(service_name)
jaeger_exporter = JaegerExporter(agent_host_name="localhost", agent_port=6831)
span_processor = BatchSpanProcessor(jaeger_exporter)
trace.get_tracer_provider().add_span_processor(span_processor)
self.tracer = tracer
self.service_name = service_name
def trace_operation(self, operation_name=None):
def decorator(func):
@functools.wraps(func)
def wrapper(*args, **kwargs):
span_name = operation_name or f"{func.__module__}.{func.__name__}"
with self.tracer.start_as_current_span(span_name) as span:
span.set_attribute("function.name", func.__name__)
span.set_attribute("function.module", func.__module__)
if args and hasattr(args[0], '__class__'):
span.set_attribute("class.name", args[0].__class__.__name__)
try:
start_time = time.time()
result = func(*args, **kwargs)
span.set_attribute("duration_ms", (time.time() - start_time) * 1000)
span.set_attribute("status", "success")
return result
except Exception as e:
span.set_attribute("status", "error")
span.set_attribute("error.message", str(e))
span.set_attribute("error.type", type(e).__name__)
span.record_exception(e)
raise
return wrapper
return decorator
return decorator
def trace_llm_call(self, model_name=None):
def decorator(func):
@functools.wraps(func)
def wrapper(*args, **kwargs):
with self.tracer.start_as_current_span("llm.call") as span:
if model_name:
span.set_attribute("llm.model", model_name)
if 'messages' in kwargs:
span.set_attribute("llm.input_length", len(str(kwargs['messages'])))
try:
start_time = time.time()
result = func(*args, **kwargs)
span.set_attribute("llm.duration_ms", (time.time() - start_time) * 1000)
if hasattr(result, 'usage'):
span.set_attribute("llm.prompt_tokens", result.usage.prompt_tokens)
span.set_attribute("llm.completion_tokens", result.usage.completion_tokens)
span.set_attribute("llm.total_tokens", result.usage.total_tokens)
return result
except Exception as e:
span.set_attribute("llm.error", str(e))
span.record_exception(e)
raise
return wrapper
return decorator
return decorator
ragflow_tracer = RAGFlowTracer()Architecture Overview
The tracing system is organized into four layers, analogous to a modern hospital:
Business Application Layer : API, conversation, retrieval, LLM services (the “departments”).
Tracing Component Layer : decorators and span management (the “vital‑sign monitors”).
Collection & Storage Layer : OpenTelemetry Collector and Jaeger (the “records office”).
Observability Analysis Layer : Jaeger UI and performance dashboards (the “health reports”).
Real‑World Usage in RAGFlow
Conversation Service
# rag/app/conversation.py – example usage
from rag.utils.trace_utils import ragflow_tracer
class ConversationService:
@ragflow_tracer.trace_operation("conversation.chat")
def chat(self, dialog_id, messages, stream=True):
dialog = self._get_dialog(dialog_id)
processed = self._process_messages(messages)
docs = self._retrieve_documents(processed)
response = self._generate_response(processed, docs)
return response
@ragflow_tracer.trace_operation("conversation.retrieve_documents")
def _retrieve_documents(self, messages):
from rag.nlp.search import Searcher
searcher = Searcher()
query = messages[-1]["content"]
return searcher.search(query, top_k=10)
@ragflow_tracer.trace_llm_call("qwen-plus")
def _generate_response(self, messages, context_docs):
prompt = self._build_prompt(messages, context_docs)
from rag.llm import LLMBundle
llm = LLMBundle()
return llm.chat(prompt)Search Service
# rag/nlp/search.py – example usage
from rag.utils.trace_utils import ragflow_tracer
class Searcher:
@ragflow_tracer.trace_operation("search.hybrid_search")
def search(self, query, embd_id, top_k=1024):
vector_results = self._vector_search(query, embd_id, top_k)
keyword_results = self._keyword_search(query, top_k)
merged = self._merge_results(vector_results, keyword_results)
return self.rerank(query, merged)
@ragflow_tracer.trace_operation("search.vector_search")
def _vector_search(self, query, embd_id, top_k):
query_vector = self._get_query_embedding(query)
return self.vector_db.search(query_vector, top_k)
@ragflow_tracer.trace_operation("search.keyword_search")
def _keyword_search(self, query, top_k):
return self.es_client.search(index=self.index_name, body={
"query": {"multi_match": {"query": query, "fields": ["content", "title"]}},
"size": top_k
})Configuration & Deployment
Docker Compose
# docker-compose.yml – full observability stack
version: "3.8"
services:
jaeger:
image: jaegertracing/all-in-one:latest
ports:
- "16686:16686" # Jaeger UI
- "6831:6831/udp" # UDP collector
environment:
- COLLECTOR_OTLP_ENABLED=true
networks: [ragflow-network]
ragflow-server:
build: .
environment:
- JAEGER_AGENT_HOST=jaeger
- JAEGER_AGENT_PORT=6831
- OTEL_SERVICE_NAME=ragflow-server
- OTEL_TRACES_EXPORTER=jaeger
depends_on: [jaeger]
networks: [ragflow-network]
networks:
ragflow-network:
driver: bridgeTracing Settings (YAML)
# conf/service_conf.yaml – tracing parameters
tracing:
enabled: true
service_name: "ragflow"
jaeger:
agent_host: "localhost"
agent_port: 6831
sampling:
type: "probabilistic"
param: 0.1 # 10% of requests in prod (1‑5% recommended)
performance:
max_queue_size: 2048
batch_timeout: 5000
max_export_batch_size: 512Data Analysis & Fault Diagnosis
Typical trace data includes hierarchical spans that record latency, error status, and custom attributes. By visualizing the trace in Jaeger, engineers can pinpoint bottlenecks—for example, a 6.2 s latency in conversation.retrieve_documents caused by a 5.8 s vector search.
Performance‑Optimization Scenario
# Problem: overall response 8.5 s (slow)
# Trace shows:
# - conversation.chat: 8.5 s
# └─ conversation.retrieve_documents: 6.2 s (bottleneck)
# ├─ search.vector_search: 5.8 s
# └─ search.keyword_search: 0.3 s
# Solution: add caching to vector search, reduce latency.Error‑Investigation Scenario
# Problem: intermittent request failures
# Trace shows:
# - conversation.chat: ERROR
# └─ search.vector_search: ConnectionTimeout
# └─ vector_db.query: "Connection to vector service timeout"
# Solution: increase vector‑service instances, raise timeout limits.Best Practices
Sampling Strategies
Development: 100 % tracing for full visibility.
Testing: 50 % tracing.
Pre‑release: 10 % tracing.
Production: 1‑5 % tracing to balance overhead and insight.
Sensitive‑Data Protection
@ragflow_tracer.trace_operation("user.login")
def user_login(username, password):
with ragflow_tracer.tracer.start_as_current_span("user.authenticate") as span:
span.set_attribute("user.name", username)
span.set_attribute("login.ip", get_client_ip())
# NEVER record password!
try:
result = authenticate_user(username, password)
span.set_attribute("login.success", True)
return result
except AuthenticationError:
span.set_attribute("login.success", False)
span.set_attribute("login.error", "invalid_credentials")
raisePerformance Alerts
# conf/monitoring.yaml – alert rules
alerts:
- name: "slow_response"
condition: "avg_response_time > 5000ms"
duration: "2m"
action: "send_alert"
message: "RAGFlow response time too high: {{value}}ms"
- name: "high_error_rate"
condition: "error_rate > 5%"
duration: "1m"
action: "send_alert"
message: "RAGFlow error rate exceeds 5%: {{value}}%"
- name: "service_dependency_failure"
condition: "dependency_success_rate < 95%"
duration: "30s"
action: "send_alert"
message: "Dependency {{service_name}} availability below 95%"Integration with Other Observability Pillars
Tracing complements metric collection (Prometheus + Grafana) and log aggregation (ELK). Together they provide a three‑pillar observability stack: metrics for health overview, logs for detailed events, and traces for request‑level flow analysis.
Conclusion
RAGFlow’s tracing system transforms a black‑box LLM‑augmented service into an X‑ray‑visible platform. By leveraging OpenTelemetry, custom decorators, and a layered architecture, developers gain precise insight into latency, errors, and resource usage, enabling rapid performance tuning and reliable fault isolation.
Signed-in readers can open the original source through BestHub's protected redirect.
This article has been distilled and summarized from source material, then republished for learning and reference. If you believe it infringes your rights, please contactand we will review it promptly.
Tech Freedom Circle
Crazy Maker Circle (Tech Freedom Architecture Circle): a community of tech enthusiasts, experts, and high‑performance fans. Many top‑level masters, architects, and hobbyists have achieved tech freedom; another wave of go‑getters are hustling hard toward tech freedom.
How this landed with the community
Was this worth your time?
0 Comments
Thoughtful readers leave field notes, pushback, and hard-won operational detail here.
