Boost Elasticsearch with Vector Embeddings: Python & Logstash Step‑by‑Step Guide
This article explains how vector embeddings enhance Elasticsearch for semantic search and recommendation, walks through the complete workflow of generating, storing, and querying embeddings, and provides detailed Python and Logstash implementations with code samples, pros and cons, and guidance on choosing the right approach.
What Are Vector Embeddings?
Vector embedding converts text, images, or other data into multi‑dimensional numeric arrays that capture semantic similarity, enabling machines to understand relationships beyond simple keyword matching.
Using Vector Embeddings in Elasticsearch
Elasticsearch can store these vectors in a dense_vector field and perform nearest‑neighbor (KNN) searches to retrieve semantically related results, such as returning "Xiaomi SU7" for a query about "new energy Xiaomi" even when the exact keywords are absent.
Typical Workflow
Generate embeddings – Use an AI model (e.g., OpenAI embedding model or a Transformer like all‑minilm) to turn raw text into vectors.
Store vectors in Elasticsearch – Index the vectors as a dedicated field.
Query with vectors – Convert the query into a vector and perform a KNN search based on vector distance.
Overall process – Extract key data, generate embeddings via a service, store them, and query using Elasticsearch’s KNN feature.
Python Implementation
The following Python script demonstrates a complete end‑to‑end solution using the elasticsearch client and a local Ollama embedding service.
from elasticsearch import Elasticsearch, helpers
import requests, configparser, warnings, time, logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
warnings.filterwarnings("ignore")
def init_es_client(config_path='./conf/config.ini'):
config = configparser.ConfigParser()
config.read(config_path)
es_host = config.get('elasticsearch', 'ES_HOST')
es_user = config.get('elasticsearch', 'ES_USER')
es_password = config.get('elasticsearch', 'ES_PASSWORD')
es = Elasticsearch(hosts=[es_host], basic_auth=(es_user, es_password), verify_certs=False, ca_certs='conf/http_ca.crt')
return es
EMBEDDING_SERVICE_URL = "http://localhost:11434/api/embeddings"
def fetch_documents_from_elasticsearch(es_client, index="logs", batch_size=25, query=None):
query = query or {"query": {"bool": {"must_not": {"exists": {"field": "embedding"}}}}, "size": batch_size, "sort": [{"@timestamp": "asc"}]}
response = es_client.search(index=index, body=query, scroll="1m")
scroll_id = response["_scroll_id"]
documents = response["hits"]["hits"]
while documents:
for doc in documents:
yield doc
response = es_client.scroll(scroll_id=scroll_id, scroll="1m")
scroll_id = response["_scroll_id"]
documents = response["hits"]["hits"]
def fetch_embeddings(text):
try:
resp = requests.post(EMBEDDING_SERVICE_URL, json={"model": "all-minilm", "prompt": text}, timeout=10)
resp.raise_for_status()
result = resp.json()
logger.info("result.embedding: %s", result["embedding"])
return result.get("embedding")
except requests.exceptions.RequestException as e:
logger.error("Error fetching embedding: %s", str(e))
return None
def update_document_in_elasticsearch(es_client, doc_id, index="logs", embedding=None):
body = {"script": {"source": '''
if (ctx._source.containsKey("embedding_processed_at") && ctx._source.embedding_processed_at != null) {
ctx.op = "noop";
} else {
ctx._source.embedding = params.embedding;
ctx._source.embedding_processed_at = params.timestamp;
ctx._source.processing_status = params.status;
if (params.error_message != null) { ctx._source.error_message = params.error_message; }
}
''', "params": {"embedding": embedding if embedding else None,
"timestamp": time.strftime('%Y-%m-%dT%H:%M:%SZ'),
"status": "failed" if embedding is None else "success",
"error_message": None if embedding else "Embedding generation failed"}}}
es_client.update(index=index, id=doc_id, body=body)
def process_documents(es_client, batch_size=25):
for doc in fetch_documents_from_elasticsearch(es_client, batch_size=batch_size):
doc_id = doc["_id"]
text_content = doc["_source"].get("content", "")
embedding = fetch_embeddings(text_content)
update_document_in_elasticsearch(es_client, doc_id, embedding=embedding)
if __name__ == "__main__":
es = init_es_client(config_path='./conf/config.ini')
process_documents(es, batch_size=25)Key advantages of the Python approach include flexibility, fine‑grained control, and easy integration with AI models; drawbacks are limited scalability due to the GIL, higher development effort, and greater resource consumption.
Logstash Implementation
Logstash provides a declarative, ETL‑style pipeline for large‑scale log processing and can call the same Ollama embedding service.
input {
elasticsearch {
hosts => ["https://127.0.0.1:9200"]
user => "elastic"
password => "changeme"
ssl_enabled => true
ca_file => "E:\logstash-8.15.3-windows-x86_64\logstash-8.15.3\config\http_ca.crt"
index => "logs_20250409"
query => '{"query":{"bool":{"must_not":{"exists":{"field":"embedding"}}}}}'
schedule => "*/1 * * * *"
docinfo => true
docinfo_target => "[@metadata]"
size => 25
}
}
filter {
http {
url => "http://localhost:11434/api/embeddings"
verb => "POST"
body_format => "json"
body => {"model":"all-minilm","prompt":"%{[content]}"}
target_body => "embedding_response"
}
}
output {
elasticsearch {
hosts => ["https://127.0.0.1:9200"]
user => "elastic"
password => "changme"
ssl_enabled => true
cacert => "E:\logstash-8.15.3-windows-x86_64\logstash-8.15.3\config\http_ca.crt"
index => "logs_20250409"
document_id => "%{[@metadata][_id]}"
action => "update"
doc_as_upsert => true
retry_on_conflict => 5
}
}Logstash’s strengths are high scalability, built‑in fault tolerance, and minimal code writing; its weaknesses are limited custom logic, harder debugging, and tight coupling to Elasticsearch.
Choosing the Right Approach
If you need fine‑grained control, complex custom logic, or direct integration with machine‑learning models, the Python solution is preferable. For high‑throughput log ingestion with low development effort and strong resilience, Logstash is the better fit.
Conclusion
Both methods enable semantic search in Elasticsearch, but the optimal choice depends on your workload size, required customisation, and operational constraints.
Signed-in readers can open the original source through BestHub's protected redirect.
This article has been distilled and summarized from source material, then republished for learning and reference. If you believe it infringes your rights, please contactand we will review it promptly.
dbaplus Community
Enterprise-level professional community for Database, BigData, and AIOps. Daily original articles, weekly online tech talks, monthly offline salons, and quarterly XCOPS&DAMS conferences—delivered by industry experts.
How this landed with the community
Was this worth your time?
0 Comments
Thoughtful readers leave field notes, pushback, and hard-won operational detail here.
