Operations 10 min read

Building a Real-Time Log Monitoring System with ELK, Kafka, and Python

This article details how to construct a log‑monitoring platform using the ELK stack, Kafka buffering, and a Python scheduler to collect, process, and alert on error logs, offering practical configuration tips and performance optimizations for production environments.

Efficient Ops
Efficient Ops
Efficient Ops
Building a Real-Time Log Monitoring System with ELK, Kafka, and Python

Introduction

After completing a payment system, the author built two monitoring solutions: a code‑level real‑time statistics system (CAT) and a log‑monitoring platform based on the ELK stack (Elasticsearch, Logstash, Kibana).

System Architecture

The architecture uses Logstash to collect logs and push them to a Kafka cluster for buffering; another Logstash instance reads from Kafka and stores the data in Elasticsearch. A Python script periodically queries Elasticsearch for error logs and sends email or WeChat alerts.

Logstash

Logstash is configured with tags and type to facilitate searching in Elasticsearch. sincedb_path records the read position, and sincedb_write_interval controls the write interval (default 15 seconds). The codec => multiline option merges stack‑trace lines into a single event.

# Collect application logs and push to Kafka
input {
  file {
    tags => ["pay","site"]
    path => ["/data/www/logs/pay/biz/pay-failure-0?.log","/data/www/logs/pay/biz/pay-success-0?.log"]
    type => "biz"
    sincedb_path => "/tmp/logstash_p_sincedb_biz"
  }
  file {
    tags => ["pay","site"]
    path => ["/data/www/logs/pay/debug/debug-0?.log"]
    type => "debug"
    sincedb_path => "/tmp/logstash_p_sincedb_debug"
    codec => multiline {
        pattern => "^[\\d|\\\\|]"
        negate => true
        what => "previous"
        max_lines => 500
        multiline_tag => ["exception_stack"]
    }
  }
}
output {
  kafka {
    bootstrap_servers => "kafka1.host:9092,kafka2.host:9092,kafka3.host:9092"
    topic_id => "pay_log_topic"
  }
}

Logstash is started with:

nohup /usr/local/logstash/bin/logstash agent -l /usr/local/logstash-2.3.4/logstash_pay.log -r --reload-interval 30 -f /usr/local/logstash-2.3.4/config/pay_to_kafka.conf >/dev/null 2>&1 &

Kafka

Kafka provides buffering to handle network fluctuations and component restarts, especially when Elasticsearch and business servers are in different data centers. The cluster runs three brokers, each topic has three partitions and three replicas.

Elasticsearch

Only one data node is used due to limited resources; daily log volume is about 20 GB (≈30 million entries). Performance tips include enabling bootstrap.mlockall: true to lock memory, raising the OS file‑handle limit (e.g., ulimit -n 65536) and allocating sufficient heap (e.g., -Xms12g -Xmx24g) with CMS GC.

Kibana

Kibana is configured via config/kibana.yml to point to the Elasticsearch URL and started with nohup bin/kibana &. It provides a UI for querying and visualizing logs.

Python Monitoring Program

The monitoring script uses apscheduler for periodic execution. It queries Elasticsearch for error logs, determines the appropriate index based on the time of day, and sends alerts via email (including the first 50 lines of each stack trace) or WeChat summary. The script also includes a distributed lock (based on redlock) to avoid single‑point failure.

from apscheduler.schedulers.blocking import BlockingScheduler

def job_monitor_es_error_pay():
    try:
        monitor_es_error.monitor()
    except Exception as e:
        handle_error('monitor_es_error_pay', e)

if __name__ == '__main__':
    master_lock_util.get_lock_block()
    sched = BlockingScheduler()
    es_interval = props['es_err']['interval']  # default 180
    sched.add_job(job_monitor_es_error_pay, 'interval', seconds=es_interval)
    sched.start()

The monitoring logic:

Determine the time range for the query (previous run end to now).

Build the Elasticsearch query using tags, type, and a timestamp range; optionally exclude keywords.

Send alerts with relevant log snippets; include Kibana URLs and request IDs for deeper investigation.

Conclusion

The system has been in production for over six months, turning a previously opaque error‑handling process into a near‑real‑time, transparent workflow. It has helped identify and fix numerous issues such as long GC pauses, HTTP connection‑pool problems, RPC spikes, and ActiveMQ blockages, all built on limited hardware and maintained in the author's spare time.

Original Source

Signed-in readers can open the original source through BestHub's protected redirect.

Sign in to view source
Republication Notice

This article has been distilled and summarized from source material, then republished for learning and reference. If you believe it infringes your rights, please contactadmin@besthub.devand we will review it promptly.

monitoringElasticsearchKafkaELKLogstashKibana
Efficient Ops
Written by

Efficient Ops

This public account is maintained by Xiaotianguo and friends, regularly publishing widely-read original technical articles. We focus on operations transformation and accompany you throughout your operations career, growing together happily.

0 followers
Reader feedback

How this landed with the community

Sign in to like

Rate this article

Was this worth your time?

Sign in to rate
Discussion

0 Comments

Thoughtful readers leave field notes, pushback, and hard-won operational detail here.