How to Build an AI‑Powered Log Analysis & Alert System with ELK and Feishu
This guide shows how to combine ELK, DeepSeek’s LLM API, and a Feishu chatbot in Python to automatically collect, analyze, classify, and alert on system logs, using a config file, custom utilities, and a simple historical fault database for intelligent DevOps monitoring.
Preparation
Requirements: ELK stack, Feishu chatbot webhook, DeepSeek API key, and a Python environment.
Test Log Samples
Sample logs that simulate Redis reconnections, MQ request errors, and network connection failures.
2025-03-11 10:35:05,100 [lettuce-epollEventLoop-4-3] INFO io.lettuce.core.protocol.ReconnectionHandler:177 - Reconnected to 192.168.6.15:6379
2025-03-11 10:36:03,197 [lettuce-eventExecutorLoop-1-2] INFO io.lettuce.core.protocol.ConnectionWatchdog:173 - Reconnecting, last destination was 192.168.6.13/192.168.6.13:6379
2025-03-11 10:36:03,200 [lettuce-epollEventLoop-4-4] INFO io.lettuce.core.protocol.ReconnectionHandler:177 - Reconnected to 192.168.6.11:6379
2025-03-11 10:36:43,096 [lettuce-eventExecutorLoop-1-3] INFO io.lettuce.core.protocol.ConnectionWatchdog:173 - Reconnecting, last destination was 192.168.6.13/192.168.6.13:6379
2025-03-11 10:36:43,099 [lettuce-epollEventLoop-4-1] INFO io.lettuce.core.protocol.ReconnectionHandler:177 - Reconnected to 192.168.6.13:6379
2025-03-11 10:37:39,272 [SimpleAsyncTaskExecutor-5] ERROR cn.com.easypay.aspect.WebLogAspect:39 - mq请求异常开始
2025-03-11 10:37:39,272 [SimpleAsyncTaskExecutor-5] ERROR cn.com.easypay.aspect.WebLogAspect:39 - mq请求异常结束
2025-03-11 10:37:48,139 网络连接异常开始
2025-03-11 10:37:48,139 网络连接异常结束Configuration (config.ini)
[elasticsearch]
host = your_es_ip
port = 9200
username = your_elastic_user
password = your_elastic_password
[deepseek]
api_key = sk-fc5c4a54XXXXXXXXXX61a25eb
deepseek_api_url = https://api.deepseek.com/chat/completions
[feishu]
webhook_url = https://open.feishu.cn/open-apis/bot/v2/hook/bee9699d-9490-4226-97bd-xxxxxxx
[elasticsearch_index]
index_name = your_es_index_nameCode Structure
elasticsearch_utils.py
Thin wrapper around Elasticsearch to fetch recent logs (last hour, up to 10 entries).
class ElasticsearchClient:
def __init__(self, host, port, username, password):
# initialize Elasticsearch client
pass
def get_logs_from_es(self, index_name):
# query the last hour and return up to 10 log entries
passfeishu_utils.py
Encapsulates sending interactive card messages to Feishu.
import logging
import requests
class FeishuClient:
def __init__(self, webhook_url):
self.webhook_url = webhook_url
def send_message(self, title, content):
data = {
"msg_type": "interactive",
"card": {
"header": {"title": {"tag": "plain_text", "content": "DeepSeek日志分析告警推送"}},
"elements": [
{"tag": "div", "text": {"tag": "lark_md", "content": f"**来自Deepseek的消息**:
{title}"}},
{"tag": "div", "text": {"tag": "lark_md", "content": f"**deepseek分析结果**:
{content}"}}
]
}
}
try:
response = requests.post(self.webhook_url, json=data, headers={"Content-Type": "application/json"})
response.raise_for_status()
logging.info("Message sent to Feishu.")
except requests.RequestException as e:
logging.error(f"Request error: {e}")
except requests.HTTPError:
logging.error(f"Failed to send message, status {response.status_code}")main.py
Orchestrates configuration loading, log retrieval, DeepSeek classification, historical fault matching, and Feishu notification.
import logging, configparser, requests, json, time
from elasticsearch_utils import ElasticsearchClient
from feishu_utils import FeishuClient
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
config = configparser.ConfigParser()
config.read('config.ini')
es_client = ElasticsearchClient(
config.get('elasticsearch', 'host'),
config.getint('elasticsearch', 'port'),
config.get('elasticsearch', 'username'),
config.get('elasticsearch', 'password')
)
feishu_client = FeishuClient(config.get('feishu', 'webhook_url'))
historical_fault_db = {
"Redis连接池耗尽": ["redis connection pool exhausted", "redis pool full"],
"数据库连接超时": ["database connection timeout", "db connect timed out"],
"网络连接异常历史验证": ["网络连接异常", "故障根因:专线丢包,交换机版本bug"]
}
def analyze_logs_with_deepseek(logs):
headers = {
"Authorization": f"Bearer {config.get('deepseek', 'api_key')}",
"Content-Type": "application/json"
}
combined = "
".join(logs)
data = {
"model": "deepseek-chat",
"messages": [{
"role": "user",
"content": f"作为拥有30年经验的资深运维专家,把日志按照故障类型'数据库异常'、'程序异常'、'网络异常'、'redis异常'、'mq异常'等进行分类,并评估影响范围(P0-P3)给出建议:
{combined}"
}]
}
for _ in range(3):
try:
resp = requests.post(config.get('deepseek', 'deepseek_api_url'), headers=headers, json=data)
resp.raise_for_status()
return resp.json()
except requests.RequestException as e:
logging.error(f"Request error: {e}")
time.sleep(2)
return None
def match_with_historical_db(logs):
matches = []
for log in logs:
for fault, keywords in historical_fault_db.items():
for kw in keywords:
if kw.lower() in log.lower():
matches.append((log, fault))
return matches
def main():
index_name = config.get('elasticsearch_index', 'index_name')
logs = es_client.get_logs_from_es(index_name)
if not logs:
logging.warning("No logs retrieved from Elasticsearch.")
return
result = analyze_logs_with_deepseek(logs)
if not result:
logging.error("Log analysis failed.")
return
message = result["choices"][0]["message"]["content"]
historical_matches = match_with_historical_db(logs)
if historical_matches:
hist_msg = "历史故障库匹配结果:
"
for log, fault in historical_matches:
hist_msg += f"日志内容: {log}
匹配故障: {fault}
"
else:
hist_msg = "未找到与历史故障库匹配的日志。"
full_message = f"整合日志分类:
{message}
{hist_msg}"
feishu_client.send_message("整合日志分析结果", full_message)
if __name__ == "__main__":
main()Result
The system fetches recent logs from Elasticsearch, sends them to DeepSeek for classification (network, Redis, MQ exceptions), evaluates impact levels (P0‑P3), matches logs against a static historical fault dictionary, and pushes a formatted interactive card to Feishu.
Conclusion
Integrating an LLM into an ELK monitoring pipeline enables automated fault classification and actionable recommendations. The approach can be extended with richer fault databases, refined prompts, or on‑premise LLM deployments.
Signed-in readers can open the original source through BestHub's protected redirect.
This article has been distilled and summarized from source material, then republished for learning and reference. If you believe it infringes your rights, please contactand we will review it promptly.
dbaplus Community
Enterprise-level professional community for Database, BigData, and AIOps. Daily original articles, weekly online tech talks, monthly offline salons, and quarterly XCOPS&DAMS conferences—delivered by industry experts.
How this landed with the community
Was this worth your time?
0 Comments
Thoughtful readers leave field notes, pushback, and hard-won operational detail here.
