Operations 10 min read

Building a Real-Time Log Monitoring System with ELK, Kafka, and Python

This article details how to construct a log‑monitoring platform using the ELK stack, Kafka buffering, and a Python scheduler to collect, process, and alert on error logs, offering practical configuration tips and performance optimizations for production environments.

Efficient Ops
Efficient Ops
Efficient Ops
Building a Real-Time Log Monitoring System with ELK, Kafka, and Python

Introduction

After completing a payment system, the author built two monitoring solutions: a code‑level real‑time statistics system (CAT) and a log‑monitoring platform based on the ELK stack (Elasticsearch, Logstash, Kibana).

System Architecture

The architecture uses Logstash to collect logs and push them to a Kafka cluster for buffering; another Logstash instance reads from Kafka and stores the data in Elasticsearch. A Python script periodically queries Elasticsearch for error logs and sends email or WeChat alerts.

Logstash

Logstash is configured with

tags

and

type

to facilitate searching in Elasticsearch.

sincedb_path

records the read position, and

sincedb_write_interval

controls the write interval (default 15 seconds). The

codec => multiline

option merges stack‑trace lines into a single event.

<code># Collect application logs and push to Kafka
input {
  file {
    tags => ["pay","site"]
    path => ["/data/www/logs/pay/biz/pay-failure-0?.log","/data/www/logs/pay/biz/pay-success-0?.log"]
    type => "biz"
    sincedb_path => "/tmp/logstash_p_sincedb_biz"
  }
  file {
    tags => ["pay","site"]
    path => ["/data/www/logs/pay/debug/debug-0?.log"]
    type => "debug"
    sincedb_path => "/tmp/logstash_p_sincedb_debug"
    codec => multiline {
        pattern => "^[\\d|\\\\|]"
        negate => true
        what => "previous"
        max_lines => 500
        multiline_tag => ["exception_stack"]
    }
  }
}
output {
  kafka {
    bootstrap_servers => "kafka1.host:9092,kafka2.host:9092,kafka3.host:9092"
    topic_id => "pay_log_topic"
  }
}
</code>

Logstash is started with:

<code>nohup /usr/local/logstash/bin/logstash agent -l /usr/local/logstash-2.3.4/logstash_pay.log -r --reload-interval 30 -f /usr/local/logstash-2.3.4/config/pay_to_kafka.conf >/dev/null 2>&1 &</code>

Kafka

Kafka provides buffering to handle network fluctuations and component restarts, especially when Elasticsearch and business servers are in different data centers. The cluster runs three brokers, each topic has three partitions and three replicas.

Elasticsearch

Only one data node is used due to limited resources; daily log volume is about 20 GB (≈30 million entries). Performance tips include enabling

bootstrap.mlockall: true

to lock memory, raising the OS file‑handle limit (e.g.,

ulimit -n 65536

) and allocating sufficient heap (e.g.,

-Xms12g -Xmx24g

) with CMS GC.

Kibana

Kibana is configured via

config/kibana.yml

to point to the Elasticsearch URL and started with

nohup bin/kibana &amp;

. It provides a UI for querying and visualizing logs.

Python Monitoring Program

The monitoring script uses

apscheduler

for periodic execution. It queries Elasticsearch for error logs, determines the appropriate index based on the time of day, and sends alerts via email (including the first 50 lines of each stack trace) or WeChat summary. The script also includes a distributed lock (based on redlock) to avoid single‑point failure.

<code>from apscheduler.schedulers.blocking import BlockingScheduler

def job_monitor_es_error_pay():
    try:
        monitor_es_error.monitor()
    except Exception as e:
        handle_error('monitor_es_error_pay', e)

if __name__ == '__main__':
    master_lock_util.get_lock_block()
    sched = BlockingScheduler()
    es_interval = props['es_err']['interval']  # default 180
    sched.add_job(job_monitor_es_error_pay, 'interval', seconds=es_interval)
    sched.start()
</code>

The monitoring logic:

Determine the time range for the query (previous run end to now).

Build the Elasticsearch query using tags, type, and a timestamp range; optionally exclude keywords.

Send alerts with relevant log snippets; include Kibana URLs and request IDs for deeper investigation.

Conclusion

The system has been in production for over six months, turning a previously opaque error‑handling process into a near‑real‑time, transparent workflow. It has helped identify and fix numerous issues such as long GC pauses, HTTP connection‑pool problems, RPC spikes, and ActiveMQ blockages, all built on limited hardware and maintained in the author's spare time.

monitoringPythonElasticsearchKafkaELKLogstashKibana
Efficient Ops
Written by

Efficient Ops

This public account is maintained by Xiaotianguo and friends, regularly publishing widely-read original technical articles. We focus on operations transformation and accompany you throughout your operations career, growing together happily.

0 followers
Reader feedback

How this landed with the community

login Sign in to like

Rate this article

Was this worth your time?

Sign in to rate
Discussion

0 Comments

Thoughtful readers leave field notes, pushback, and hard-won operational detail here.