How to Connect Python to Elasticsearch for Powerful Search and Data Ingestion
This guide walks through installing the Python Elasticsearch client, building a reusable class with CRUD methods, importing data from MongoDB, writing a simple Baidu Baike crawler, and scaling the workflow with Celery and Flask for a complete search‑engine pipeline.
Elasticsearch Overview
Elasticsearch is an open‑source search engine built on top of Apache Lucene™.
Install the Python client:
pip install elasticsearchPython Wrapper Class
from elasticsearch import Elasticsearch
class elasticSearch():
def __init__(self, index_type: str, index_name: str, ip="127.0.0.1"):
# self.es = Elasticsearch([ip], http_auth=('username', 'password'), port=9200)
self.es = Elasticsearch("localhost:9200")
self.index_type = index_type
self.index_name = index_name
def create_index(self):
if self.es.indices.exists(index=self.index_name):
self.es.indices.delete(index=self.index_name)
self.es.indices.create(index=self.index_name, ignore=400)
def delete_index(self):
try:
self.es.indices.delete(index=self.index_name)
except:
pass
def get_doc(self, uid):
return self.es.get(index=self.index_name, id=uid)
def insert_one(self, doc: dict):
self.es.index(index=self.index_name, doc_type=self.index_type, body=doc)
def insert_array(self, docs: list):
for doc in docs:
self.es.index(index=self.index_name, doc_type=self.index_type, body=doc)
def search(self, query, count: int = 30):
dsl = {
"query": {"multi_match": {"query": query, "fields": ["title", "content", "link"]}},
"highlight": {"fields": {"title": {}}}
}
match_data = self.es.search(index=self.index_name, body=dsl, size=count)
return match_dataImporting Data from MongoDB
import json
from datetime import datetime
import pymongo
from app.elasticsearchClass import elasticSearch
client = pymongo.MongoClient('127.0.0.1', 27017)
db = client['spider']
sheet = db.get_collection('Spider').find({}, {'_id': 0})
es = elasticSearch(index_type="spider_data", index_name="spider")
es.create_index()
for i in sheet:
data = {
'title': i['title'],
'content': i['data'],
'link': i['link'],
'create_time': datetime.now()
}
es.insert_one(doc=data)Simple Baidu Baike Crawler
import requests
import re
import time
exist_urls = []
headers = {'User-Agent': 'Mozilla/5.0 (Windows NT 6.1) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/62.0.3202.62 Safari/537.36'}
def get_link(url):
try:
response = requests.get(url=url, headers=headers)
response.encoding = 'UTF-8'
html = response.text
link_lists = re.findall('.*?<a target=_blank href="/item/([^:#=&<>]*?)".*?</a>', html)
return link_lists
except Exception:
pass
finally:
exist_urls.append(url)
def main(start_url, depth=1):
link_lists = get_link(start_url)
if link_lists:
unique_lists = list(set(link_lists) - set(exist_urls))
for unique_url in unique_lists:
unique_url = 'https://baike.baidu.com/item/' + unique_url
with open('url.txt', 'a+') as f:
f.write(unique_url + '
')
if depth < 10:
main(unique_url, depth + 1)
if __name__ == '__main__':
start_url = 'https://baike.baidu.com/item/%E7%99%BE%E5%BA%A6%E7%99%BE%E7%A7%91'
main(start_url)Celery Task for URL Processing
from celery import Celery
import requests
from lxml import etree
import pymongo
app = Celery('tasks', broker='redis://localhost:6379/2')
client = pymongo.MongoClient('localhost', 27017)
db = client['baike']
@app.task
def get_url(link):
item = {}
headers = {'User-Agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_2) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/34.0.1847.131 Safari/537.36'}
res = requests.get(link, headers=headers)
res.encoding = 'UTF-8'
doc = etree.HTML(res.text)
content = doc.xpath("//div[@class='lemma-summary']/div[@class='para']//text()")
item['link'] = link
data = ''.join(content).replace(' ', '').replace('\t', '').replace('
', '').replace('\r', '')
item['data'] = data
if db['Baike'].insert(dict(item)):
print('is OK ...')
else:
print('Fail')Running the Distributed Workers
celery -A parse worker -l info -P gevent -c 10The pipeline stores raw data in MongoDB, then bulk‑loads it into Elasticsearch, and finally exposes search functionality through a Flask (or Django/FastAPI) web application.
Signed-in readers can open the original source through BestHub's protected redirect.
This article has been distilled and summarized from source material, then republished for learning and reference. If you believe it infringes your rights, please contactand we will review it promptly.
Python Crawling & Data Mining
Life's short, I code in Python. This channel shares Python web crawling, data mining, analysis, processing, visualization, automated testing, DevOps, big data, AI, cloud computing, machine learning tools, resources, news, technical articles, tutorial videos and learning materials. Join us!
How this landed with the community
Was this worth your time?
0 Comments
Thoughtful readers leave field notes, pushback, and hard-won operational detail here.
