How to Build a Real-Time Security Log Collection and Alert System with ELK, Kafka, and Flume

This guide walks through setting up a comprehensive security log collection pipeline—covering WAF, firewall, and Nginx logs—using ELK, Logstash, Kafka, and Flume, and then configuring real‑time alerts with Sentinl or ElastAlert integrated with DingTalk and email notifications.

MaGe Linux Operations
MaGe Linux Operations
MaGe Linux Operations
How to Build a Real-Time Security Log Collection and Alert System with ELK, Kafka, and Flume

1. Log Collection and Alert Project Background

Due to a shortage of security testing projects, the author decided to aggregate logs from security devices (Imperva WAF, GreenWAF, Palo Alto firewall) and Nginx, then generate alerts. The environment uses CentOS 7, JDK 1.8, Python 2.7, and ELK version 5.5.2.

Log sources: security device logs (Imperva WAF, GreenWAF, Palo Alto firewall), Nginx logs. Log analysis software: ELK; alert plugins: Sentinl or ElastAlert; alert channels: DingTalk and email. Pipeline: security device logs → Logstash → Elasticsearch; Nginx logs (already collected by another team) → Kafka → Logstash → Elasticsearch. Proper Grok patterns must be tested with Grokdebug before deployment. System specs: CentOS 7, JDK 1.8, Python 2.7. ELK unified version: 5.5.2.

2. Security Device Log Collection

2.1 Imperva WAF Configuration

Configure a custom log alert rule in the WAF console and define required fields such as StartTime, AlertID, EventID, AlertLevel, SrcIP, DstIP, etc.

StartTime=$!{Alert.createTime}AlarmID=$!{Alert.dn} EventID=$!{Event.dn}   AggregationInfo=$!{Alert.aggregationInfo.occurrences}Alert_level=$!{Alert.severity}   RuleName=$!{Alert.alertMetadata.alertName} Category=$!{Alert.alertType}   Alert_description=$!{Alert.description}EventType=$!{Event.eventType}   PolicyName=$!{Rule.parent.displayName}SrcIP=$!{Event.sourceInfo.sourceIp}   SrcPort=$!{Event.sourceInfo.sourcePort}Proto=$!{Event.sourceInfo.ipProtocol}   DstIP=$!{Event.destInfo.serverIp}DstPort=$!{Event.destInfo.serverPort}   WebMethod=$!{Event.struct.httpRequest.url.method}Domain=$!{Alert.serverGroupName}   URL=$!{Event.struct.httpRequest.url.path}ResponseCode=$!{Event.struct.httpResponse.responseCode}Alert_key=$!{Event.struct.httpRequest.url.queryString}Action=$!{Alert.immediateAction}   ResponseTime=$!{Event.struct.responseTime}ResponseSize=$!{Event.struct.responseSize}   Headers_value=$!{Event.struct.httpRequest.headers.value}Parameters_value=$!{Event.struct.httpRequest.parameters.value}

Logstash configuration for Imperva WAF:

input{ syslog{ type => "syslog" port => 514 } }
filter{
  grok{
    match =>["message","%{GREEDYDATA:StartTime} AlarmID=%{NUMBER:AlarmID}   EventID=%{NUMBER:EventID}AggregationInfo=%{NUMBER:AggregationInfo}   Alert_level=%{DATA:Alert_level}RuleName=%{GREEDYDATA:RuleName}   Category=%{GREEDYDATA:Category}Alert_description=%{GREEDYDATA:Alert_description}   EventType=%{DATA:EventType}PolicyName=%{GREEDYDATA:PolicyName} SrcIP=%{IPV4:SrcIP}   SrcPort=%{NUMBER:SrcPort}Proto=%{DATA:Proto} DstIP=%{IPV4:DstIP}   DstPort=%{NUMBER:DstPort}WebMethod=%{GREEDYDATA:WebMethod}   Domain=%{DATA:Domain}URL=%{GREEDYDATA:URL}   ResponseCode=%{GREEDYDATA:ResponseCode}Alert_key=%{GREEDYDATA:Alert_key}   Action=%{DATA:Action}ResponseTime=%{GREEDYDATA:ResponseTime}   ResponseSize=%{GREEDYDATA:ResponseSize}Headers_value=%{GREEDYDATA:Headers_value}Parameters_value=%{GREEDYDATA:Parameters_value}"]
    remove_field => ["message"]
  }
  geoip{ source => "SrcIP" }
}
output{ elasticsearch{ hosts => "es_address:9200" index => "impervasyslog" } }

The GeoIP plugin enriches IP addresses with country and city information for visualizing attack sources.

2.2 GreenWAF Configuration

Set up Syslog in the GreenWAF console and use the following Logstash Grok pattern:

grok{ match => ["message",%{DATA:syslog_flag}  site_id:%{NUMBER:site_id}   protect_id:%{NUMBER:protect_id}  dst_ip:%{IPV4:dst_ip}  dst_port:%{NUMBER:dst_port}   src_ip:%{IPV4:src_ip}src_port:%{NUMBER:src_port} method:%{DATA:method}   domain:%{DATA:domain}  uri:%{DATA:uri}  alertlevel:%{DATA:alert_level}   event_type:%{DATA:Attack_types} stat_time:%{GREEDYDATA:stat_time}   policy_id:%{NUMBER:policy_id}rule_id:%{NUMBER:rule_id}  action:%{DATA:action}   block:%{DATA:block} block_info:%{DATA:block_info}  http:%{GREEDYDATA:URL}"] }

2.3 Palo Alto Firewall Configuration

Configure a syslog server on the firewall (version 6.1) and use the following Logstash Grok pattern:

grok{ match =>["message","%{DATA:PA_Name},%{GREEDYDATA:Time},%{NUMBER:Eventid},%{DATA:Category},%{DATA:Subcategory},%{NUMBER:NULL},%{GREEDYDATA:Generate_Time},%{IPV4:SourceIP},%{IPV4:DestinationIP},%{IPV4:NAT_SourceIP},%{IPV4:NAT_DestinationIP},%{DATA:RuleName},%{DATA:SourceUser},%{DATA:DestinationUser},%{DATA:Application},%{DATA:VMsys},%{DATA:SourceZone},%{DATA:DestinationZone},%{DATA:IN_interface},%{DATA:OUT_interface},%{DATA:Syslog},%{DATA:GREEDYDATA:TimeTwo},%{NUMBER:SessionID},%{NUMBER:Repeat},%{NUMBER:SourcePort},%{NUMBER:DestinationPort},%{NUMBER:NAT-SourcePort},%{NUMBER:NAT-DestinationPort},%{DATA:Flag},%{DATA:Proto},%{DATA:Action},%{DATA:NUll2},%{DATA:ThreatName},%{DATA:Category2},%{DATA:Priority},%{DATA:Direction},%{NUMBER:Serialnum},%{GREEDYDATA:NULL3}"] }

3. Nginx Log Collection

Since another team already collects Nginx logs into Kafka, this setup pulls the logs from Kafka to avoid duplication.

a1.sources=s1 a1.channels=c1 a1.sinks=k1

Source configuration (exec tail):

a1.sources.s1.type=exec
a1.sources.s1.command=tail -F /data/log/nginx/crf_crm.access.log
a1.sources.s1.channels=c1

Channel configuration (memory):

a1.channels.c1.type=memory
a1.channels.c1.capacity=1000
a1.channels.c1.transactionCapacity=100

Sink configuration (Kafka):

a1.sinks.k1.type=org.apache.flume.sink.kafka.KafkaSink
a1.sinks.k1.channel=c1
a1.sinks.k1.topic=crm_nginx_log_topic
a1.sinks.k1.brokerList=x.x.x.x:9092
a1.sinks.k1.requiredAcks=1
a1.sinks.k1.batchSize=20

Kafka installation and startup commands:

wget http://mirror.bit.edu.cn/apache/kafka/0.8.2.2/kafka_2.9.1-0.8.2.2.tgz
# adjust JVM memory as needed
nohup bin/kafka-server-start.sh config/server.properties &

Create the topic for Nginx logs:

kafka-topics.sh --create --zookeeper x.x.x.x:2181 --replication-factor 2 --partitions 5 --topic crm_nginx_log_topic

4. Security Alerting

4.1 Sentinl

Install Sentinl as a Kibana plugin:

kibana-plugin install https://github.com/sirensolutions/sentinl/releases/download/tag-5.4.1/sentinl-v5.4.1.zip

Example rule for IP request frequency (run every 2 minutes) with DingTalk webhook integration:

{
  "script": {
    "script":"payload.json='超过阀值40';var match=false;var threshold=40;varfirst=payload.aggregations.code.buckets;for(var i=0;i<first.length;i++){varip_count = parseInt(first[i].doc_count);if (ip_count >= threshold){match=true;payload.json += '【ip】:' + first[i].key + '  【count】:' +first[i].doc_count + '
';}};match;"
  }
}

Configure the action to send a DingTalk message using the webhook URL.

4.2 ElastAlert

Prerequisites: JDK 1.8, Python 2.7, setuptools, gcc, python‑devel, libffi‑devel.

Installation steps:

git clone https://github.com/Yelp/elastalert.git
cd elastalert
python setup.py install
pip install -r requirements.txt
cp config.yaml.example config.yaml

Key config.yaml settings (run every minute, buffer 15 min, ES host/port, writeback index, alert time limit):

rules_folder: example_rules
run_every:
  minutes: 1
buffer_time:
  minutes: 15
es_host: x.x.x.x
es_port: 9200
writeback_index: elastalert_status
alert_time_limit:
  days: 2

Example DingTalk alert rule (cardinality on src_ip, trigger if >30 requests in 5 min):

es_host: x.x.x.x
es_port: 9200
name: xxx安全告警
type: cardinality
index: nsfocuswaf_syslog
cardinality_field: src_ip
max_cardinality: 30
timeframe:
  minutes: 5
aggregation_key: src_ip
alert:
  - "elastalert_modules.dingtalk_alert.DingTalkAlerter"
dingtalk_webhook: "your webhook"
dingtalk_msgtype: text

For email alerts, use a similar rule with type: cardinality and configure SMTP settings.

5. Summary

Commands to start the components in the background:

nohup su - elasticsearch -c '/opt/elasticsearch-5.5.2/bin/elasticsearch -d' &
nohup ./bin/logstash -f x.x.x.conf &
nohup ./kibana &
flume-ng agent --conf conf --conf-file conf/log_analysis_test.conf --name a1 -Dflume.root.logger=INFO,console &
nohup bin/kafka-server-start.sh config/server.properties &
nohup python -m elastalert.elastalert --config ./config.yaml --verbose --rule ./example_rules/DD_rule.yaml &

These steps provide a complete pipeline for collecting security logs, storing them in Elasticsearch, visualizing with Kibana, and generating real‑time alerts via DingTalk or email.

Original Source

Signed-in readers can open the original source through BestHub's protected redirect.

Sign in to view source
Republication Notice

This article has been distilled and summarized from source material, then republished for learning and reference. If you believe it infringes your rights, please contactadmin@besthub.devand we will review it promptly.

KafkaAlertingELKFlumesecurity monitoringLogstash
MaGe Linux Operations
Written by

MaGe Linux Operations

Founded in 2009, MaGe Education is a top Chinese high‑end IT training brand. Its graduates earn 12K+ RMB salaries, and the school has trained tens of thousands of students. It offers high‑pay courses in Linux cloud operations, Python full‑stack, automation, data analysis, AI, and Go high‑concurrency architecture. Thanks to quality courses and a solid reputation, it has talent partnerships with numerous internet firms.

0 followers
Reader feedback

How this landed with the community

Sign in to like

Rate this article

Was this worth your time?

Sign in to rate
Discussion

0 Comments

Thoughtful readers leave field notes, pushback, and hard-won operational detail here.