How to Build a Real-Time Security Log Collection and Alert System with ELK, Kafka, and Flume
This guide walks through setting up a comprehensive security log collection pipeline—covering WAF, firewall, and Nginx logs—using ELK, Logstash, Kafka, and Flume, and then configuring real‑time alerts with Sentinl or ElastAlert integrated with DingTalk and email notifications.
1. Log Collection and Alert Project Background
Due to a shortage of security testing projects, the author decided to aggregate logs from security devices (Imperva WAF, GreenWAF, Palo Alto firewall) and Nginx, then generate alerts. The environment uses CentOS 7, JDK 1.8, Python 2.7, and ELK version 5.5.2.
Log sources: security device logs (Imperva WAF, GreenWAF, Palo Alto firewall), Nginx logs. Log analysis software: ELK; alert plugins: Sentinl or ElastAlert; alert channels: DingTalk and email. Pipeline: security device logs → Logstash → Elasticsearch; Nginx logs (already collected by another team) → Kafka → Logstash → Elasticsearch. Proper Grok patterns must be tested with Grokdebug before deployment. System specs: CentOS 7, JDK 1.8, Python 2.7. ELK unified version: 5.5.2.
2. Security Device Log Collection
2.1 Imperva WAF Configuration
Configure a custom log alert rule in the WAF console and define required fields such as StartTime, AlertID, EventID, AlertLevel, SrcIP, DstIP, etc.
StartTime=$!{Alert.createTime}AlarmID=$!{Alert.dn} EventID=$!{Event.dn} AggregationInfo=$!{Alert.aggregationInfo.occurrences}Alert_level=$!{Alert.severity} RuleName=$!{Alert.alertMetadata.alertName} Category=$!{Alert.alertType} Alert_description=$!{Alert.description}EventType=$!{Event.eventType} PolicyName=$!{Rule.parent.displayName}SrcIP=$!{Event.sourceInfo.sourceIp} SrcPort=$!{Event.sourceInfo.sourcePort}Proto=$!{Event.sourceInfo.ipProtocol} DstIP=$!{Event.destInfo.serverIp}DstPort=$!{Event.destInfo.serverPort} WebMethod=$!{Event.struct.httpRequest.url.method}Domain=$!{Alert.serverGroupName} URL=$!{Event.struct.httpRequest.url.path}ResponseCode=$!{Event.struct.httpResponse.responseCode}Alert_key=$!{Event.struct.httpRequest.url.queryString}Action=$!{Alert.immediateAction} ResponseTime=$!{Event.struct.responseTime}ResponseSize=$!{Event.struct.responseSize} Headers_value=$!{Event.struct.httpRequest.headers.value}Parameters_value=$!{Event.struct.httpRequest.parameters.value}Logstash configuration for Imperva WAF:
input{ syslog{ type => "syslog" port => 514 } }
filter{
grok{
match =>["message","%{GREEDYDATA:StartTime} AlarmID=%{NUMBER:AlarmID} EventID=%{NUMBER:EventID}AggregationInfo=%{NUMBER:AggregationInfo} Alert_level=%{DATA:Alert_level}RuleName=%{GREEDYDATA:RuleName} Category=%{GREEDYDATA:Category}Alert_description=%{GREEDYDATA:Alert_description} EventType=%{DATA:EventType}PolicyName=%{GREEDYDATA:PolicyName} SrcIP=%{IPV4:SrcIP} SrcPort=%{NUMBER:SrcPort}Proto=%{DATA:Proto} DstIP=%{IPV4:DstIP} DstPort=%{NUMBER:DstPort}WebMethod=%{GREEDYDATA:WebMethod} Domain=%{DATA:Domain}URL=%{GREEDYDATA:URL} ResponseCode=%{GREEDYDATA:ResponseCode}Alert_key=%{GREEDYDATA:Alert_key} Action=%{DATA:Action}ResponseTime=%{GREEDYDATA:ResponseTime} ResponseSize=%{GREEDYDATA:ResponseSize}Headers_value=%{GREEDYDATA:Headers_value}Parameters_value=%{GREEDYDATA:Parameters_value}"]
remove_field => ["message"]
}
geoip{ source => "SrcIP" }
}
output{ elasticsearch{ hosts => "es_address:9200" index => "impervasyslog" } }The GeoIP plugin enriches IP addresses with country and city information for visualizing attack sources.
2.2 GreenWAF Configuration
Set up Syslog in the GreenWAF console and use the following Logstash Grok pattern:
grok{ match => ["message",%{DATA:syslog_flag} site_id:%{NUMBER:site_id} protect_id:%{NUMBER:protect_id} dst_ip:%{IPV4:dst_ip} dst_port:%{NUMBER:dst_port} src_ip:%{IPV4:src_ip}src_port:%{NUMBER:src_port} method:%{DATA:method} domain:%{DATA:domain} uri:%{DATA:uri} alertlevel:%{DATA:alert_level} event_type:%{DATA:Attack_types} stat_time:%{GREEDYDATA:stat_time} policy_id:%{NUMBER:policy_id}rule_id:%{NUMBER:rule_id} action:%{DATA:action} block:%{DATA:block} block_info:%{DATA:block_info} http:%{GREEDYDATA:URL}"] }2.3 Palo Alto Firewall Configuration
Configure a syslog server on the firewall (version 6.1) and use the following Logstash Grok pattern:
grok{ match =>["message","%{DATA:PA_Name},%{GREEDYDATA:Time},%{NUMBER:Eventid},%{DATA:Category},%{DATA:Subcategory},%{NUMBER:NULL},%{GREEDYDATA:Generate_Time},%{IPV4:SourceIP},%{IPV4:DestinationIP},%{IPV4:NAT_SourceIP},%{IPV4:NAT_DestinationIP},%{DATA:RuleName},%{DATA:SourceUser},%{DATA:DestinationUser},%{DATA:Application},%{DATA:VMsys},%{DATA:SourceZone},%{DATA:DestinationZone},%{DATA:IN_interface},%{DATA:OUT_interface},%{DATA:Syslog},%{DATA:GREEDYDATA:TimeTwo},%{NUMBER:SessionID},%{NUMBER:Repeat},%{NUMBER:SourcePort},%{NUMBER:DestinationPort},%{NUMBER:NAT-SourcePort},%{NUMBER:NAT-DestinationPort},%{DATA:Flag},%{DATA:Proto},%{DATA:Action},%{DATA:NUll2},%{DATA:ThreatName},%{DATA:Category2},%{DATA:Priority},%{DATA:Direction},%{NUMBER:Serialnum},%{GREEDYDATA:NULL3}"] }3. Nginx Log Collection
Since another team already collects Nginx logs into Kafka, this setup pulls the logs from Kafka to avoid duplication.
a1.sources=s1 a1.channels=c1 a1.sinks=k1
Source configuration (exec tail):
a1.sources.s1.type=exec
a1.sources.s1.command=tail -F /data/log/nginx/crf_crm.access.log
a1.sources.s1.channels=c1Channel configuration (memory):
a1.channels.c1.type=memory
a1.channels.c1.capacity=1000
a1.channels.c1.transactionCapacity=100Sink configuration (Kafka):
a1.sinks.k1.type=org.apache.flume.sink.kafka.KafkaSink
a1.sinks.k1.channel=c1
a1.sinks.k1.topic=crm_nginx_log_topic
a1.sinks.k1.brokerList=x.x.x.x:9092
a1.sinks.k1.requiredAcks=1
a1.sinks.k1.batchSize=20Kafka installation and startup commands:
wget http://mirror.bit.edu.cn/apache/kafka/0.8.2.2/kafka_2.9.1-0.8.2.2.tgz
# adjust JVM memory as needed
nohup bin/kafka-server-start.sh config/server.properties &Create the topic for Nginx logs:
kafka-topics.sh --create --zookeeper x.x.x.x:2181 --replication-factor 2 --partitions 5 --topic crm_nginx_log_topic4. Security Alerting
4.1 Sentinl
Install Sentinl as a Kibana plugin:
kibana-plugin install https://github.com/sirensolutions/sentinl/releases/download/tag-5.4.1/sentinl-v5.4.1.zipExample rule for IP request frequency (run every 2 minutes) with DingTalk webhook integration:
{
"script": {
"script":"payload.json='超过阀值40';var match=false;var threshold=40;varfirst=payload.aggregations.code.buckets;for(var i=0;i<first.length;i++){varip_count = parseInt(first[i].doc_count);if (ip_count >= threshold){match=true;payload.json += '【ip】:' + first[i].key + ' 【count】:' +first[i].doc_count + '
';}};match;"
}
}Configure the action to send a DingTalk message using the webhook URL.
4.2 ElastAlert
Prerequisites: JDK 1.8, Python 2.7, setuptools, gcc, python‑devel, libffi‑devel.
Installation steps:
git clone https://github.com/Yelp/elastalert.git
cd elastalert
python setup.py install
pip install -r requirements.txt
cp config.yaml.example config.yamlKey config.yaml settings (run every minute, buffer 15 min, ES host/port, writeback index, alert time limit):
rules_folder: example_rules
run_every:
minutes: 1
buffer_time:
minutes: 15
es_host: x.x.x.x
es_port: 9200
writeback_index: elastalert_status
alert_time_limit:
days: 2Example DingTalk alert rule (cardinality on src_ip, trigger if >30 requests in 5 min):
es_host: x.x.x.x
es_port: 9200
name: xxx安全告警
type: cardinality
index: nsfocuswaf_syslog
cardinality_field: src_ip
max_cardinality: 30
timeframe:
minutes: 5
aggregation_key: src_ip
alert:
- "elastalert_modules.dingtalk_alert.DingTalkAlerter"
dingtalk_webhook: "your webhook"
dingtalk_msgtype: textFor email alerts, use a similar rule with type: cardinality and configure SMTP settings.
5. Summary
Commands to start the components in the background:
nohup su - elasticsearch -c '/opt/elasticsearch-5.5.2/bin/elasticsearch -d' &
nohup ./bin/logstash -f x.x.x.conf &
nohup ./kibana &
flume-ng agent --conf conf --conf-file conf/log_analysis_test.conf --name a1 -Dflume.root.logger=INFO,console &
nohup bin/kafka-server-start.sh config/server.properties &
nohup python -m elastalert.elastalert --config ./config.yaml --verbose --rule ./example_rules/DD_rule.yaml &These steps provide a complete pipeline for collecting security logs, storing them in Elasticsearch, visualizing with Kibana, and generating real‑time alerts via DingTalk or email.
Signed-in readers can open the original source through BestHub's protected redirect.
This article has been distilled and summarized from source material, then republished for learning and reference. If you believe it infringes your rights, please contactand we will review it promptly.
MaGe Linux Operations
Founded in 2009, MaGe Education is a top Chinese high‑end IT training brand. Its graduates earn 12K+ RMB salaries, and the school has trained tens of thousands of students. It offers high‑pay courses in Linux cloud operations, Python full‑stack, automation, data analysis, AI, and Go high‑concurrency architecture. Thanks to quality courses and a solid reputation, it has talent partnerships with numerous internet firms.
How this landed with the community
Was this worth your time?
0 Comments
Thoughtful readers leave field notes, pushback, and hard-won operational detail here.
