Operations 24 min read

RAGFlow Link Tracing: GPS‑Style Observability for LLM‑Powered Applications

The article explains why RAGFlow needs end‑to‑end link tracing, introduces OpenTelemetry’s core concepts, shows how custom tracing utilities are implemented in Python, describes the layered architecture, provides concrete Docker and YAML configurations, and offers best‑practice guidelines for performance monitoring and fault diagnosis.

Tech Freedom Circle
Tech Freedom Circle
Tech Freedom Circle
RAGFlow Link Tracing: GPS‑Style Observability for LLM‑Powered Applications

Why Link Tracing Is Needed

Imagine a parcel’s journey from warehouse to delivery; each step is recorded and can be queried by a tracking number. RAGFlow’s tracing system works the same way, recording every request from entry to response so operators can see the full "journey" of a user query.

Challenges in Distributed RAGFlow

A single user request may span multiple services—API, conversation, retrieval, vector, and LLM services—each with its own potential failures such as parameter parsing errors, context loss, index unavailability, or slow model inference. Without tracing, diagnosing a problem is like blindfolded guessing.

OpenTelemetry Basics

What Is OpenTelemetry?

OpenTelemetry (OTel) is the de‑facto standard for observability, unifying tracing, metrics, and logs much like HTTP unified web communication.

Design Principles

Vendor neutrality : comparable to a USB interface that works with any device.

Language agnosticism : supports Python, Java, Go, etc.

Gradual adoption : you can instrument one component at a time.

Core Concepts

Trace : a complete request journey, composed of multiple spans.

Span : a single operation (e.g., document retrieval, LLM inference) with start/end timestamps and attributes.

Context propagation : a unique identifier passed via HTTP headers or message queues so all services can link their spans together.

RAGFlow’s Custom Tracing Tool

# rag/utils/trace_utils.py - RAGFlow core tracing utility
import time, functools
from opentelemetry import trace
from opentelemetry.exporter.jaeger.thrift import JaegerExporter
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor
from opentelemetry.instrumentation.flask import FlaskInstrumentor
from opentelemetry.instrumentation.requests import RequestsInstrumentor

class RAGFlowTracer:
    def __init__(self, service_name="ragflow"):
        trace.set_tracer_provider(TracerProvider())
        tracer = trace.get_tracer_provider().get_tracer(service_name)
        jaeger_exporter = JaegerExporter(agent_host_name="localhost", agent_port=6831)
        span_processor = BatchSpanProcessor(jaeger_exporter)
        trace.get_tracer_provider().add_span_processor(span_processor)
        self.tracer = tracer
        self.service_name = service_name

    def trace_operation(self, operation_name=None):
        def decorator(func):
            @functools.wraps(func)
            def wrapper(*args, **kwargs):
                span_name = operation_name or f"{func.__module__}.{func.__name__}"
                with self.tracer.start_as_current_span(span_name) as span:
                    span.set_attribute("function.name", func.__name__)
                    span.set_attribute("function.module", func.__module__)
                    if args and hasattr(args[0], '__class__'):
                        span.set_attribute("class.name", args[0].__class__.__name__)
                    try:
                        start_time = time.time()
                        result = func(*args, **kwargs)
                        span.set_attribute("duration_ms", (time.time() - start_time) * 1000)
                        span.set_attribute("status", "success")
                        return result
                    except Exception as e:
                        span.set_attribute("status", "error")
                        span.set_attribute("error.message", str(e))
                        span.set_attribute("error.type", type(e).__name__)
                        span.record_exception(e)
                        raise
                return wrapper
            return decorator
        return decorator

    def trace_llm_call(self, model_name=None):
        def decorator(func):
            @functools.wraps(func)
            def wrapper(*args, **kwargs):
                with self.tracer.start_as_current_span("llm.call") as span:
                    if model_name:
                        span.set_attribute("llm.model", model_name)
                    if 'messages' in kwargs:
                        span.set_attribute("llm.input_length", len(str(kwargs['messages'])))
                    try:
                        start_time = time.time()
                        result = func(*args, **kwargs)
                        span.set_attribute("llm.duration_ms", (time.time() - start_time) * 1000)
                        if hasattr(result, 'usage'):
                            span.set_attribute("llm.prompt_tokens", result.usage.prompt_tokens)
                            span.set_attribute("llm.completion_tokens", result.usage.completion_tokens)
                            span.set_attribute("llm.total_tokens", result.usage.total_tokens)
                        return result
                    except Exception as e:
                        span.set_attribute("llm.error", str(e))
                        span.record_exception(e)
                        raise
                return wrapper
            return decorator
        return decorator

ragflow_tracer = RAGFlowTracer()

Architecture Overview

The tracing system is organized into four layers, analogous to a modern hospital:

Business Application Layer : API, conversation, retrieval, LLM services (the “departments”).

Tracing Component Layer : decorators and span management (the “vital‑sign monitors”).

Collection & Storage Layer : OpenTelemetry Collector and Jaeger (the “records office”).

Observability Analysis Layer : Jaeger UI and performance dashboards (the “health reports”).

Real‑World Usage in RAGFlow

Conversation Service

# rag/app/conversation.py – example usage
from rag.utils.trace_utils import ragflow_tracer

class ConversationService:
    @ragflow_tracer.trace_operation("conversation.chat")
    def chat(self, dialog_id, messages, stream=True):
        dialog = self._get_dialog(dialog_id)
        processed = self._process_messages(messages)
        docs = self._retrieve_documents(processed)
        response = self._generate_response(processed, docs)
        return response

    @ragflow_tracer.trace_operation("conversation.retrieve_documents")
    def _retrieve_documents(self, messages):
        from rag.nlp.search import Searcher
        searcher = Searcher()
        query = messages[-1]["content"]
        return searcher.search(query, top_k=10)

    @ragflow_tracer.trace_llm_call("qwen-plus")
    def _generate_response(self, messages, context_docs):
        prompt = self._build_prompt(messages, context_docs)
        from rag.llm import LLMBundle
        llm = LLMBundle()
        return llm.chat(prompt)

Search Service

# rag/nlp/search.py – example usage
from rag.utils.trace_utils import ragflow_tracer

class Searcher:
    @ragflow_tracer.trace_operation("search.hybrid_search")
    def search(self, query, embd_id, top_k=1024):
        vector_results = self._vector_search(query, embd_id, top_k)
        keyword_results = self._keyword_search(query, top_k)
        merged = self._merge_results(vector_results, keyword_results)
        return self.rerank(query, merged)

    @ragflow_tracer.trace_operation("search.vector_search")
    def _vector_search(self, query, embd_id, top_k):
        query_vector = self._get_query_embedding(query)
        return self.vector_db.search(query_vector, top_k)

    @ragflow_tracer.trace_operation("search.keyword_search")
    def _keyword_search(self, query, top_k):
        return self.es_client.search(index=self.index_name, body={
            "query": {"multi_match": {"query": query, "fields": ["content", "title"]}},
            "size": top_k
        })

Configuration & Deployment

Docker Compose

# docker-compose.yml – full observability stack
version: "3.8"
services:
  jaeger:
    image: jaegertracing/all-in-one:latest
    ports:
      - "16686:16686"   # Jaeger UI
      - "6831:6831/udp" # UDP collector
    environment:
      - COLLECTOR_OTLP_ENABLED=true
    networks: [ragflow-network]

  ragflow-server:
    build: .
    environment:
      - JAEGER_AGENT_HOST=jaeger
      - JAEGER_AGENT_PORT=6831
      - OTEL_SERVICE_NAME=ragflow-server
      - OTEL_TRACES_EXPORTER=jaeger
    depends_on: [jaeger]
    networks: [ragflow-network]

networks:
  ragflow-network:
    driver: bridge

Tracing Settings (YAML)

# conf/service_conf.yaml – tracing parameters
tracing:
  enabled: true
  service_name: "ragflow"
  jaeger:
    agent_host: "localhost"
    agent_port: 6831
  sampling:
    type: "probabilistic"
    param: 0.1   # 10% of requests in prod (1‑5% recommended)
  performance:
    max_queue_size: 2048
    batch_timeout: 5000
    max_export_batch_size: 512

Data Analysis & Fault Diagnosis

Typical trace data includes hierarchical spans that record latency, error status, and custom attributes. By visualizing the trace in Jaeger, engineers can pinpoint bottlenecks—for example, a 6.2 s latency in conversation.retrieve_documents caused by a 5.8 s vector search.

Performance‑Optimization Scenario

# Problem: overall response 8.5 s (slow)
# Trace shows:
# - conversation.chat: 8.5 s
#   └─ conversation.retrieve_documents: 6.2 s (bottleneck)
#       ├─ search.vector_search: 5.8 s
#       └─ search.keyword_search: 0.3 s
# Solution: add caching to vector search, reduce latency.

Error‑Investigation Scenario

# Problem: intermittent request failures
# Trace shows:
# - conversation.chat: ERROR
#   └─ search.vector_search: ConnectionTimeout
#       └─ vector_db.query: "Connection to vector service timeout"
# Solution: increase vector‑service instances, raise timeout limits.

Best Practices

Sampling Strategies

Development: 100 % tracing for full visibility.

Testing: 50 % tracing.

Pre‑release: 10 % tracing.

Production: 1‑5 % tracing to balance overhead and insight.

Sensitive‑Data Protection

@ragflow_tracer.trace_operation("user.login")
def user_login(username, password):
    with ragflow_tracer.tracer.start_as_current_span("user.authenticate") as span:
        span.set_attribute("user.name", username)
        span.set_attribute("login.ip", get_client_ip())
        # NEVER record password!
        try:
            result = authenticate_user(username, password)
            span.set_attribute("login.success", True)
            return result
        except AuthenticationError:
            span.set_attribute("login.success", False)
            span.set_attribute("login.error", "invalid_credentials")
            raise

Performance Alerts

# conf/monitoring.yaml – alert rules
alerts:
  - name: "slow_response"
    condition: "avg_response_time > 5000ms"
    duration: "2m"
    action: "send_alert"
    message: "RAGFlow response time too high: {{value}}ms"
  - name: "high_error_rate"
    condition: "error_rate > 5%"
    duration: "1m"
    action: "send_alert"
    message: "RAGFlow error rate exceeds 5%: {{value}}%"
  - name: "service_dependency_failure"
    condition: "dependency_success_rate < 95%"
    duration: "30s"
    action: "send_alert"
    message: "Dependency {{service_name}} availability below 95%"

Integration with Other Observability Pillars

Tracing complements metric collection (Prometheus + Grafana) and log aggregation (ELK). Together they provide a three‑pillar observability stack: metrics for health overview, logs for detailed events, and traces for request‑level flow analysis.

Conclusion

RAGFlow’s tracing system transforms a black‑box LLM‑augmented service into an X‑ray‑visible platform. By leveraging OpenTelemetry, custom decorators, and a layered architecture, developers gain precise insight into latency, errors, and resource usage, enabling rapid performance tuning and reliable fault isolation.

Original Source

Signed-in readers can open the original source through BestHub's protected redirect.

Sign in to view source
Republication Notice

This article has been distilled and summarized from source material, then republished for learning and reference. If you believe it infringes your rights, please contactadmin@besthub.devand we will review it promptly.

distributed systemsPythonLLMobservabilityOpenTelemetryPerformance MonitoringTracingRAGFlow
Tech Freedom Circle
Written by

Tech Freedom Circle

Crazy Maker Circle (Tech Freedom Architecture Circle): a community of tech enthusiasts, experts, and high‑performance fans. Many top‑level masters, architects, and hobbyists have achieved tech freedom; another wave of go‑getters are hustling hard toward tech freedom.

0 followers
Reader feedback

How this landed with the community

Sign in to like

Rate this article

Was this worth your time?

Sign in to rate
Discussion

0 Comments

Thoughtful readers leave field notes, pushback, and hard-won operational detail here.