How to Connect Python to Elasticsearch for Powerful Search and Data Ingestion

This guide walks through installing the Python Elasticsearch client, building a reusable class with CRUD methods, importing data from MongoDB, writing a simple Baidu Baike crawler, and scaling the workflow with Celery and Flask for a complete search‑engine pipeline.

Python Crawling & Data Mining
Python Crawling & Data Mining
Python Crawling & Data Mining
How to Connect Python to Elasticsearch for Powerful Search and Data Ingestion

Elasticsearch Overview

Elasticsearch is an open‑source search engine built on top of Apache Lucene™.

Install the Python client:

pip install elasticsearch

Python Wrapper Class

from elasticsearch import Elasticsearch

class elasticSearch():
    def __init__(self, index_type: str, index_name: str, ip="127.0.0.1"):
        # self.es = Elasticsearch([ip], http_auth=('username', 'password'), port=9200)
        self.es = Elasticsearch("localhost:9200")
        self.index_type = index_type
        self.index_name = index_name

    def create_index(self):
        if self.es.indices.exists(index=self.index_name):
            self.es.indices.delete(index=self.index_name)
        self.es.indices.create(index=self.index_name, ignore=400)

    def delete_index(self):
        try:
            self.es.indices.delete(index=self.index_name)
        except:
            pass

    def get_doc(self, uid):
        return self.es.get(index=self.index_name, id=uid)

    def insert_one(self, doc: dict):
        self.es.index(index=self.index_name, doc_type=self.index_type, body=doc)

    def insert_array(self, docs: list):
        for doc in docs:
            self.es.index(index=self.index_name, doc_type=self.index_type, body=doc)

    def search(self, query, count: int = 30):
        dsl = {
            "query": {"multi_match": {"query": query, "fields": ["title", "content", "link"]}},
            "highlight": {"fields": {"title": {}}}
        }
        match_data = self.es.search(index=self.index_name, body=dsl, size=count)
        return match_data

Importing Data from MongoDB

import json
from datetime import datetime
import pymongo
from app.elasticsearchClass import elasticSearch

client = pymongo.MongoClient('127.0.0.1', 27017)
db = client['spider']
sheet = db.get_collection('Spider').find({}, {'_id': 0})

es = elasticSearch(index_type="spider_data", index_name="spider")
es.create_index()

for i in sheet:
    data = {
        'title': i['title'],
        'content': i['data'],
        'link': i['link'],
        'create_time': datetime.now()
    }
    es.insert_one(doc=data)

Simple Baidu Baike Crawler

import requests
import re
import time

exist_urls = []
headers = {'User-Agent': 'Mozilla/5.0 (Windows NT 6.1) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/62.0.3202.62 Safari/537.36'}

def get_link(url):
    try:
        response = requests.get(url=url, headers=headers)
        response.encoding = 'UTF-8'
        html = response.text
        link_lists = re.findall('.*?<a target=_blank href="/item/([^:#=&<>]*?)".*?</a>', html)
        return link_lists
    except Exception:
        pass
    finally:
        exist_urls.append(url)

def main(start_url, depth=1):
    link_lists = get_link(start_url)
    if link_lists:
        unique_lists = list(set(link_lists) - set(exist_urls))
        for unique_url in unique_lists:
            unique_url = 'https://baike.baidu.com/item/' + unique_url
            with open('url.txt', 'a+') as f:
                f.write(unique_url + '
')
            if depth < 10:
                main(unique_url, depth + 1)

if __name__ == '__main__':
    start_url = 'https://baike.baidu.com/item/%E7%99%BE%E5%BA%A6%E7%99%BE%E7%A7%91'
    main(start_url)

Celery Task for URL Processing

from celery import Celery
import requests
from lxml import etree
import pymongo

app = Celery('tasks', broker='redis://localhost:6379/2')
client = pymongo.MongoClient('localhost', 27017)
db = client['baike']

@app.task
def get_url(link):
    item = {}
    headers = {'User-Agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_2) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/34.0.1847.131 Safari/537.36'}
    res = requests.get(link, headers=headers)
    res.encoding = 'UTF-8'
    doc = etree.HTML(res.text)
    content = doc.xpath("//div[@class='lemma-summary']/div[@class='para']//text()")
    item['link'] = link
    data = ''.join(content).replace(' ', '').replace('\t', '').replace('
', '').replace('\r', '')
    item['data'] = data
    if db['Baike'].insert(dict(item)):
        print('is OK ...')
    else:
        print('Fail')

Running the Distributed Workers

celery -A parse worker -l info -P gevent -c 10

The pipeline stores raw data in MongoDB, then bulk‑loads it into Elasticsearch, and finally exposes search functionality through a Flask (or Django/FastAPI) web application.

Elasticsearch example
Elasticsearch example
Original Source

Signed-in readers can open the original source through BestHub's protected redirect.

Sign in to view source
Republication Notice

This article has been distilled and summarized from source material, then republished for learning and reference. If you believe it infringes your rights, please contactadmin@besthub.devand we will review it promptly.

PythonElasticsearchceleryFlaskWeb Crawlingdata ingestion
Python Crawling & Data Mining
Written by

Python Crawling & Data Mining

Life's short, I code in Python. This channel shares Python web crawling, data mining, analysis, processing, visualization, automated testing, DevOps, big data, AI, cloud computing, machine learning tools, resources, news, technical articles, tutorial videos and learning materials. Join us!

0 followers
Reader feedback

How this landed with the community

Sign in to like

Rate this article

Was this worth your time?

Sign in to rate
Discussion

0 Comments

Thoughtful readers leave field notes, pushback, and hard-won operational detail here.