Build a Real-Time Security Log Collection & Alert System with ELK, Kafka, and Sentinl
This guide walks through collecting security device and Nginx logs using ELK 5.5.2, Logstash grok patterns, Kafka and Flume pipelines on CentOS 7, and configuring Sentinl or ElastAlert for DingTalk and email alerts, complete with code snippets and deployment commands.
1. Log Collection and Alert Project Background
Due to a shortage of security testing projects, the author decided to collect logs from security devices (Imperva WAF, NSFOCUS WAF, Palo Alto firewall) and Nginx, and set up alerting.
Log sources: security device logs (Imperva WAF, NSFOCUS WAF, Palo Alto firewall), Nginx logs Open‑source analysis software: ELK; alert plugins: Sentinl or ElastAlert; alert channels: DingTalk and email Pipeline: security device logs → Logstash → Elasticsearch; Nginx logs (already collected by another team) → Kafka → Logstash → Elasticsearch (ensure correct grok patterns) System: CentOS 7, JDK 1.8, Python 2.7 ELK unified version: 5.5.2
Download links (ES, Kibana, Logstash) and Grokdebug testing URL are provided.
2. Security Device Log Collection
2.1 Imperva WAF Configuration
Custom log fields are defined according to the manual. Example field list includes StartTime, AlertID, EventID, Alert_level, SrcIP, DstIP, etc.
StartTime=$!{Alert.createTime}AlarmID=$!{Alert.dn} EventID=$!{Event.dn} AggregationInfo=$!{Alert.aggregationInfo.occurrences}Alert_level=$!{Alert.severity} RuleName=$!{Alert.alertMetadata.alertName} Category=$!{Alert.alertType} Alert_description=$!{Alert.description}EventType=$!{Event.eventType} PolicyName=$!{Rule.parent.displayName}SrcIP=$!{Event.sourceInfo.sourceIp} SrcPort=$!{Event.sourceInfo.sourcePort}Proto=$!{Event.sourceInfo.ipProtocol} DstIP=$!{Event.destInfo.serverIp}DstPort=$!{Event.destInfo.serverPort} WebMethod=$!{Event.struct.httpRequest.url.method}Domain=$!{Alert.serverGroupName} URL=$!{Event.struct.httpRequest.url.path}ResponseCode=$!{Event.struct.httpResponse.responseCode}Alert_key=$!{Event.struct.httpRequest.url.queryString}Action=$!{Alert.immediateAction} ResponseTime=$!{Event.struct.responseTime}ResponseSize=$!{Event.struct.responseSize} Headers_value=$!{Event.struct.httpRequest.headers.value}Parameters_value=$!{Event.struct.httpRequest.parameters.value}Logstash configuration for Imperva WAF:
input{ syslog{ type => "syslog" port => 514 } }filter{ grok{ match =>["message","%{GREEDYDATA:StartTime} AlarmID=%{NUMBER:AlarmID} EventID=%{NUMBER:EventID}AggregationInfo=%{NUMBER:AggregationInfo} Alert_level=%{DATA:Alert_level}RuleName=%{GREEDYDATA:RuleName} Category=%{GREEDYDATA:Category}Alert_description=%{GREEDYDATA:Alert_description} EventType=%{DATA:EventType}PolicyName=%{GREEDYDATA:PolicyName} SrcIP=%{IPV4:SrcIP} SrcPort=%{NUMBER:SrcPort}Proto=%{DATA:Proto} DstIP=%{IPV4:DstIP} DstPort=%{NUMBER:DstPort}WebMethod=%{GREEDYDATA:WebMethod} Domain=%{DATA:Domain}URL=%{GREEDYDATA:URL} ResponseCode=%{GREEDYDATA:ResponseCode}Alert_key=%{GREEDYDATA:Alert_key} Action=%{DATA:Action}ResponseTime=%{GREEDYDATA:ResponseTime} ResponseSize=%{GREEDYDATA:ResponseSize}Headers_value=%{GREEDYDATA:Headers_value}Parameters_value=%{GREEDYDATA:Parameters_value}"] remove_field => ["message"] } geoip{ source => "SrcIP" } }output{ elasticsearch{ hosts => "<es_host>:9200" index => "impervasyslog" } }The built‑in geoip plugin resolves IP location for map visualizations.
2.2 NSFOCUS WAF Configuration
Log management → Syslog configuration on the device. Logstash grok snippet for NSFOCUS logs:
grok { match => ["message", "%{DATA:syslog_flag} site_id:%{NUMBER:site_id} protect_id:%{NUMBER:protect_id} dst_ip:%{IPV4:dst_ip} dst_port:%{NUMBER:dst_port} src_ip:%{IPV4:src_ip}src_port:%{NUMBER:src_port} method:%{DATA:method} domain:%{DATA:domain} uri:%{DATA:uri} alertlevel:%{DATA:alert_level} event_type:%{DATA:Attack_types} stat_time:%{GREEDYDATA:stat_time} policy_id:%{NUMBER:policy_id}rule_id:%{NUMBER:rule_id} action:%{DATA:action} block:%{DATA:block} block_info:%{DATA:block_info} http:%{GREEDYDATA:URL}" ] }2.3 Palo Alto Firewall Configuration
Configure a syslog server on the firewall to forward logs. Logstash grok snippet for Palo Alto logs:
grok { match =>["message","%{DATA:PA_Name},%{GREEDYDATA:Time},%{NUMBER:Eventid},%{DATA:Category},%{DATA:Subcategory},%{NUMBER:NULL},%{GREEDYDATA:Generate_Time},%{IPV4:SourceIP},%{IPV4:DestinationIP},%{IPV4:NAT_SourceIP},%{IPV4:NAT_DestinationIP},%{DATA:RuleName},%{DATA:SourceUser},%{DATA:DestinationUser},%{DATA:Application},%{DATA:VMsys},%{DATA:SourceZone},%{DATA:DestinationZone},%{DATA:IN_interface},%{DATA:OUT_interface},%{DATA:Syslog},%{DATA:GREEDYDATA:TimeTwo},%{NUMBER:SessionID},%{NUMBER:Repeat},%{NUMBER:SourcePort},%{NUMBER:DestinationPort},%{NUMBER:NAT-SourcePort},%{NUMBER:NAT-DestinationPort},%{DATA:Flag},%{DATA:Proto},%{DATA:Action},%{DATA:NUll2},%{DATA:ThreatName},%{DATA:Category2},%{DATA:Priority},%{DATA:Direction},%{NUMBER:Serialnum},%{GREEDYDATA:NULL3}"] }3. Nginx Log Collection
Since Nginx logs are already collected by another team, they are pulled from Kafka. Flume configuration (source, channel, sink) is shown, followed by Kafka topic creation and producer/consumer commands.
a1.sources=s1
a1.channels=c1
a1.sinks=k1
a1.sources.s1.type=exec
a1.sources.s1.command=tail -F /data/log/nginx/crf_crm.access.log
a1.sources.s1.channels=c1
a1.channels.c1.type=memory
a1.channels.c1.capacity=1000
a1.channels.c1.transactionCapacity=100
a1.sinks.k1.type=org.apache.flume.sink.kafka.KafkaSink
a1.sinks.k1.channel=c1
a1.sinks.k1.topic=crm_nginx_log_topic
a1.sinks.k1.brokerList=x.x.x.x:9092
a1.sinks.k1.requiredAcks=1
a1.sinks.k1.batchSize=20 kafka-topics.sh --create --zookeeper x.x.x.x:2181 --replication-factor 2 --partitions 5 --topic crm_nginx_log_topic bin/kafka-console-producer.sh --broker-list x.x.x.x:9092 --topic crm_nginx_log_topic kafka-console-consumer.sh --zookeeper x.x.x.x:2181 --topic crm_nginx_log_topicLogstash configuration to consume Kafka messages:
input{ kafka{ codec => "plain" group_id => "logstash1" auto_offset_reset => "smallest" reset_beginning => true topic_id => "crm_nginx_log_topic" zk_connect => "x.x.x.x:2181" } grok{ ... } }output{ elasticsearch{ hosts => "<es_host>:9200" index => "impervasyslog" } }4. Security Alert
4.1 Sentinl
Install Sentinl plugin for Kibana:
kibana-plugin install https://github.com/sirensolutions/sentinl/releases/download/tag-5.4/sentinl-v5.4.1.zipExample rule monitors IP request frequency every 2 minutes, sends alerts to DingTalk via webhook, and can also send email alerts.
4.2 ElastAlert
Prerequisites: JDK 1.8, Python 2.7, setuptools, gcc, python‑devel, libffi‑devel.
Installation steps:
git clone https://github.com/Yelp/elastalert.git
cd elastalert
python setup.py install
pip install -r requirements.txt
cp config.yaml.example config.yamlConfigure config.yaml (run_every, buffer_time, es_host, es_port, writeback_index, alert_time_limit, etc.). Example DingTalk rule (cardinality on src_ip, threshold 30 in 5 minutes) and email rule are provided.
es_host: x.x.x.x
es_port: 9200
name: xxx安全告警
type: cardinality
index: nsfocuswaf_syslog
cardinality_field: src_ip
max_cardinality: 30
timeframe:
minutes: 5
aggregation_key: src_ip
summary_table_fields:
- src_ip
- dst_ip
- dst_port
- Attack_types
- stat_time
alert:
- "elastalert_modules.dingtalk_alert.DingTalkAlerter"
dingtalk_webhook: "your webhook"
dingtalk_msgtype: textCode modifications are suggested to send multiple alerts and to extract custom fields from the message when only message exists.
5. Summary
Commands to start essential services:
nohup su - elasticsearch -c '/opt/elasticsearch-5.5.2/bin/elasticsearch -d' & nohup ./bin/logstash -f x.x.x.conf & nohup ./kibana & flume-ng agent --conf conf --conf-file conf/log_analysis_test.conf --name a1 -Dflume.root.logger=INFO,console nohup bin/kafka-server-start.sh config/server.properties & nohup python -m elastalert.elastalert --config ./config.yaml --verbose --rule ./example_rules/DD_rule.yaml &Typical alert strategies include regex from security devices, high‑frequency IP requests, error status codes, and characteristic codes in Nginx requests.
Signed-in readers can open the original source through BestHub's protected redirect.
This article has been distilled and summarized from source material, then republished for learning and reference. If you believe it infringes your rights, please contactand we will review it promptly.
MaGe Linux Operations
Founded in 2009, MaGe Education is a top Chinese high‑end IT training brand. Its graduates earn 12K+ RMB salaries, and the school has trained tens of thousands of students. It offers high‑pay courses in Linux cloud operations, Python full‑stack, automation, data analysis, AI, and Go high‑concurrency architecture. Thanks to quality courses and a solid reputation, it has talent partnerships with numerous internet firms.
How this landed with the community
Was this worth your time?
0 Comments
Thoughtful readers leave field notes, pushback, and hard-won operational detail here.
