Boost Elasticsearch with Vector Embeddings: Python & Logstash Step‑by‑Step Guide

This article explains how vector embeddings enhance Elasticsearch for semantic search and recommendation, walks through the complete workflow of generating, storing, and querying embeddings, and provides detailed Python and Logstash implementations with code samples, pros and cons, and guidance on choosing the right approach.

dbaplus Community
dbaplus Community
dbaplus Community
Boost Elasticsearch with Vector Embeddings: Python & Logstash Step‑by‑Step Guide

What Are Vector Embeddings?

Vector embedding converts text, images, or other data into multi‑dimensional numeric arrays that capture semantic similarity, enabling machines to understand relationships beyond simple keyword matching.

Using Vector Embeddings in Elasticsearch

Elasticsearch can store these vectors in a dense_vector field and perform nearest‑neighbor (KNN) searches to retrieve semantically related results, such as returning "Xiaomi SU7" for a query about "new energy Xiaomi" even when the exact keywords are absent.

Typical Workflow

Generate embeddings – Use an AI model (e.g., OpenAI embedding model or a Transformer like all‑minilm) to turn raw text into vectors.

Store vectors in Elasticsearch – Index the vectors as a dedicated field.

Query with vectors – Convert the query into a vector and perform a KNN search based on vector distance.

Overall process – Extract key data, generate embeddings via a service, store them, and query using Elasticsearch’s KNN feature.

Python Implementation

The following Python script demonstrates a complete end‑to‑end solution using the elasticsearch client and a local Ollama embedding service.

from elasticsearch import Elasticsearch, helpers
import requests, configparser, warnings, time, logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
warnings.filterwarnings("ignore")

def init_es_client(config_path='./conf/config.ini'):
    config = configparser.ConfigParser()
    config.read(config_path)
    es_host = config.get('elasticsearch', 'ES_HOST')
    es_user = config.get('elasticsearch', 'ES_USER')
    es_password = config.get('elasticsearch', 'ES_PASSWORD')
    es = Elasticsearch(hosts=[es_host], basic_auth=(es_user, es_password), verify_certs=False, ca_certs='conf/http_ca.crt')
    return es

EMBEDDING_SERVICE_URL = "http://localhost:11434/api/embeddings"

def fetch_documents_from_elasticsearch(es_client, index="logs", batch_size=25, query=None):
    query = query or {"query": {"bool": {"must_not": {"exists": {"field": "embedding"}}}}, "size": batch_size, "sort": [{"@timestamp": "asc"}]}
    response = es_client.search(index=index, body=query, scroll="1m")
    scroll_id = response["_scroll_id"]
    documents = response["hits"]["hits"]
    while documents:
        for doc in documents:
            yield doc
        response = es_client.scroll(scroll_id=scroll_id, scroll="1m")
        scroll_id = response["_scroll_id"]
        documents = response["hits"]["hits"]

def fetch_embeddings(text):
    try:
        resp = requests.post(EMBEDDING_SERVICE_URL, json={"model": "all-minilm", "prompt": text}, timeout=10)
        resp.raise_for_status()
        result = resp.json()
        logger.info("result.embedding: %s", result["embedding"])
        return result.get("embedding")
    except requests.exceptions.RequestException as e:
        logger.error("Error fetching embedding: %s", str(e))
        return None

def update_document_in_elasticsearch(es_client, doc_id, index="logs", embedding=None):
    body = {"script": {"source": '''
        if (ctx._source.containsKey("embedding_processed_at") && ctx._source.embedding_processed_at != null) {
            ctx.op = "noop";
        } else {
            ctx._source.embedding = params.embedding;
            ctx._source.embedding_processed_at = params.timestamp;
            ctx._source.processing_status = params.status;
            if (params.error_message != null) { ctx._source.error_message = params.error_message; }
        }
    ''', "params": {"embedding": embedding if embedding else None,
                     "timestamp": time.strftime('%Y-%m-%dT%H:%M:%SZ'),
                     "status": "failed" if embedding is None else "success",
                     "error_message": None if embedding else "Embedding generation failed"}}}
    es_client.update(index=index, id=doc_id, body=body)

def process_documents(es_client, batch_size=25):
    for doc in fetch_documents_from_elasticsearch(es_client, batch_size=batch_size):
        doc_id = doc["_id"]
        text_content = doc["_source"].get("content", "")
        embedding = fetch_embeddings(text_content)
        update_document_in_elasticsearch(es_client, doc_id, embedding=embedding)

if __name__ == "__main__":
    es = init_es_client(config_path='./conf/config.ini')
    process_documents(es, batch_size=25)

Key advantages of the Python approach include flexibility, fine‑grained control, and easy integration with AI models; drawbacks are limited scalability due to the GIL, higher development effort, and greater resource consumption.

Logstash Implementation

Logstash provides a declarative, ETL‑style pipeline for large‑scale log processing and can call the same Ollama embedding service.

input {
  elasticsearch {
    hosts => ["https://127.0.0.1:9200"]
    user => "elastic"
    password => "changeme"
    ssl_enabled => true
    ca_file => "E:\logstash-8.15.3-windows-x86_64\logstash-8.15.3\config\http_ca.crt"
    index => "logs_20250409"
    query => '{"query":{"bool":{"must_not":{"exists":{"field":"embedding"}}}}}'
    schedule => "*/1 * * * *"
    docinfo => true
    docinfo_target => "[@metadata]"
    size => 25
  }
}

filter {
  http {
    url => "http://localhost:11434/api/embeddings"
    verb => "POST"
    body_format => "json"
    body => {"model":"all-minilm","prompt":"%{[content]}"}
    target_body => "embedding_response"
  }
}

output {
  elasticsearch {
    hosts => ["https://127.0.0.1:9200"]
    user => "elastic"
    password => "changme"
    ssl_enabled => true
    cacert => "E:\logstash-8.15.3-windows-x86_64\logstash-8.15.3\config\http_ca.crt"
    index => "logs_20250409"
    document_id => "%{[@metadata][_id]}"
    action => "update"
    doc_as_upsert => true
    retry_on_conflict => 5
  }
}

Logstash’s strengths are high scalability, built‑in fault tolerance, and minimal code writing; its weaknesses are limited custom logic, harder debugging, and tight coupling to Elasticsearch.

Choosing the Right Approach

If you need fine‑grained control, complex custom logic, or direct integration with machine‑learning models, the Python solution is preferable. For high‑throughput log ingestion with low development effort and strong resilience, Logstash is the better fit.

Conclusion

Both methods enable semantic search in Elasticsearch, but the optimal choice depends on your workload size, required customisation, and operational constraints.

Original Source

Signed-in readers can open the original source through BestHub's protected redirect.

Sign in to view source
Republication Notice

This article has been distilled and summarized from source material, then republished for learning and reference. If you believe it infringes your rights, please contactadmin@besthub.devand we will review it promptly.

PythonElasticsearchsemantic searchkNNLogstashVector Embedding
dbaplus Community
Written by

dbaplus Community

Enterprise-level professional community for Database, BigData, and AIOps. Daily original articles, weekly online tech talks, monthly offline salons, and quarterly XCOPS&DAMS conferences—delivered by industry experts.

0 followers
Reader feedback

How this landed with the community

Sign in to like

Rate this article

Was this worth your time?

Sign in to rate
Discussion

0 Comments

Thoughtful readers leave field notes, pushback, and hard-won operational detail here.