Boost Elasticsearch 8.x Search with Vector Embeddings

This article explains how vector embeddings enhance Elasticsearch 8.x search, walks through the concepts of dense vectors, shows step‑by‑step Python and Logstash pipelines for generating and storing embeddings, compares their pros and cons, and offers guidance on selecting the right approach for large‑scale log data.

Mingyi World Elasticsearch
Mingyi World Elasticsearch
Mingyi World Elasticsearch
Boost Elasticsearch 8.x Search with Vector Embeddings

Vector Embedding Overview

A vector embedding converts text, images or other data into a multi‑dimensional numeric array that captures semantic similarity. For example, a query for “新能源 小米” can return “小米 SU7” because the two phrases have nearby vectors in the embedding space.

Embedding Workflow in Elasticsearch

Generate embeddings

Use an AI model (e.g., OpenAI embedding model, a Transformer, or a locally hosted model such as Ollama’s all‑minilm) to transform raw text into a numeric vector.

Store vectors

Save the generated vector in an Elasticsearch document using the dense_vector field type.

Vector query (KNN)

Convert a search query into a vector and perform a nearest‑neighbor (k‑Nearest Neighbor) search based on vector distance instead of keyword matching.

Typical pipeline steps

Extract key fields (e.g., title, description).

Generate embeddings with a Python library such as HuggingFace or sentence‑transformers.

Index the vectors in Elasticsearch via the dense_vector field.

Execute KNN queries to retrieve semantically similar documents.

Python Implementation

The Python script uses the elasticsearch and requests libraries to:

Initialize an Elasticsearch client from a configuration file.

Scroll through documents that lack an embedding field.

Call the local Ollama service ( http://localhost:11434/api/embeddings) with {"model": "all‑minilm", "prompt": <em>text</em>} to obtain an embedding vector.

Update each document with the embedding, a processing timestamp, and a status flag using an inline painless script that avoids overwriting existing embeddings.

from elasticsearch import Elasticsearch, helpers
import requests, configparser, warnings, time, logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
warnings.filterwarnings("ignore")

def init_es_client(config_path='./conf/config.ini'):
    cfg = configparser.ConfigParser()
    cfg.read(config_path)
    return Elasticsearch(
        hosts=[cfg.get('elasticsearch', 'ES_HOST')],
        basic_auth=(cfg.get('elasticsearch', 'ES_USER'), cfg.get('elasticsearch', 'ES_PASSWORD')),
        verify_certs=False,
        ca_certs='conf/http_ca.crt'
    )

EMBEDDING_SERVICE_URL = "http://localhost:11434/api/embeddings"

def fetch_documents_from_elasticsearch(es_client, index="logs", batch_size=25, query=None):
    query = query or {
        "query": {"bool": {"must_not": {"exists": {"field": "embedding"}}}},
        "size": batch_size,
        "sort": [{"@timestamp": "asc"}]
    }
    resp = es_client.search(index=index, body=query, scroll="1m")
    scroll_id = resp["_scroll_id"]
    docs = resp["hits"]["hits"]
    while docs:
        for doc in docs:
            yield doc
        resp = es_client.scroll(scroll_id=scroll_id, scroll="1m")
        scroll_id = resp["_scroll_id"]
        docs = resp["hits"]["hits"]

def fetch_embeddings(text):
    try:
        r = requests.post(EMBEDDING_SERVICE_URL, json={"model": "all‑minilm", "prompt": text}, timeout=10)
        r.raise_for_status()
        return r.json().get("embedding")
    except requests.exceptions.RequestException as e:
        logger.error("Error fetching embedding: %s", str(e))
        return None

def update_document_in_elasticsearch(es_client, doc_id, index="logs", embedding=None):
    body = {
        "script": {
            "source": '''
                if (ctx._source.containsKey("embedding_processed_at") && ctx._source.embedding_processed_at != null) { ctx.op = "noop"; }
                else {
                    ctx._source.embedding = params.embedding;
                    ctx._source.embedding_processed_at = params.timestamp;
                    ctx._source.processing_status = params.status;
                    if (params.error_message != null) { ctx._source.error_message = params.error_message; }
                }
            ''',
            "params": {
                "embedding": embedding if embedding else null,
                "timestamp": time.strftime('%Y-%m-%dT%H:%M:%SZ'),
                "status": "failed" if embedding is None else "success",
                "error_message": null if embedding else "嵌入生成失败"
            }
        }
    }
    es_client.update(index=index, id=doc_id, body=body)

def process_documents(es_client, batch_size=25):
    for doc in fetch_documents_from_elasticsearch(es_client, batch_size=batch_size):
        doc_id = doc["_id"]
        text = doc["_source"].get("content", "")
        emb = fetch_embeddings(text)
        update_document_in_elasticsearch(es_client, doc_id, embedding=emb)

if __name__ == "__main__":
    es = init_es_client()
    process_documents(es)

The embedding model all‑minilm belongs to the Sentence‑Transformers family (MiniLM‑L6‑v2) and is optimized for short‑text embeddings with low latency and resource consumption.

Logstash Implementation

Input – Elasticsearch

input {
  elasticsearch {
    hosts => ["https://127.0.0.1:9200"]
    user => "elastic"
    password => "changeme"
    ssl_enabled => true
    ca_file => "E:\logstash-8.15.3\config\http_ca.crt"
    index => "logs_20250409"
    query => '{"query":{"bool":{"must_not":{"exists":{"field":"embedding"}}}}'
    schedule => "*/1 * * * *"
    docinfo => true
    docinfo_target => "[@metadata]"
    size => 25
  }
}

Filter – Call Embedding Service

filter {
  http {
    url => "http://localhost:11434/api/embeddings"
    verb => "POST"
    body_format => "json"
    body => {"model" => "all‑minilm", "prompt" => "%{[content]}"}
    target_body => "embedding_response"
  }
}

Output – Update Elasticsearch

output {
  elasticsearch {
    hosts => ["https://127.0.0.1:9200"]
    user => "elastic"
    password => "changme"
    ssl_enabled => true
    cacert => "E:\logstash-8.15.3\config\http_ca.crt"
    index => "logs_20250409"
    document_id => "%{[@metadata][_id]}"
    action => "update"
    doc_as_upsert => true
    retry_on_conflict => 5
  }
}

Advantages

Strong scalability – pipeline workers can be increased easily.

Built‑in fault tolerance – automatic retries and error handling.

Declarative configuration – minimal code required.

Optimized for high‑throughput log streams.

Disadvantages

Debugging is harder – error investigation is less flexible.

Limited customisation – cannot embed complex native ML logic.

Tight coupling to Elasticsearch – replacement costs are high.

Method Selection

When to use Python

Suitable when the workflow requires complex custom logic, fine‑grained control over each processing step, or integration with systems beyond Elasticsearch.

When to use Logstash

Ideal for high‑volume log ingestion where scalability, low development effort, and an out‑of‑the‑box ETL solution tuned for Elasticsearch are priorities.

Conclusion

For large‑scale, high‑throughput log data, Logstash generally provides a better fit. Python offers greater flexibility for advanced customisation or direct machine‑learning integration.

Original Source

Signed-in readers can open the original source through BestHub's protected redirect.

Sign in to view source
Republication Notice

This article has been distilled and summarized from source material, then republished for learning and reference. If you believe it infringes your rights, please contactadmin@besthub.devand we will review it promptly.

PythonElasticsearchkNNdense_vectorSearchLogstashVector Embedding
Mingyi World Elasticsearch
Written by

Mingyi World Elasticsearch

The leading WeChat public account for Elasticsearch fundamentals, advanced topics, and hands‑on practice. Join us to dive deep into the ELK Stack (Elasticsearch, Logstash, Kibana, Beats).

0 followers
Reader feedback

How this landed with the community

Sign in to like

Rate this article

Was this worth your time?

Sign in to rate
Discussion

0 Comments

Thoughtful readers leave field notes, pushback, and hard-won operational detail here.