Build a Real‑Time Log Collection Pipeline with Spring Boot, Kafka, Filebeat, Logstash & Kibana
This step‑by‑step guide shows how to configure a Spring Boot application with Log4j2, ship logs via Filebeat to Kafka, process them with Logstash, and visualize the data in Elasticsearch and Kibana, covering server setup, Maven dependencies, configuration files, and deployment commands.
The article walks through constructing a complete log‑collection and visualization pipeline for a Spring Boot service.
Server preparation
List the server nodes that will host the services so readers can map the later steps to specific machines.
Spring Boot project preparation
Add Log4j2 to replace Spring Boot's default logging. The pom.xml should include the Spring Boot starter web dependency with the logging starter excluded, then add spring-boot-starter-log4j2 and the Disruptor library.
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
<!-- exclude spring-boot-starter-logging -->
<exclusions>
<exclusion>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-logging</artifactId>
</exclusion>
</exclusions>
</dependency>
<!-- log4j2 -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-log4j2</artifactId>
</dependency>
<dependency>
<groupId>com.lmax</groupId>
<artifactId>disruptor</artifactId>
<version>3.3.4</version>
</dependency>
</dependencies>The log4j2.xml defines properties, console and rolling file appenders, async loggers for the application package, and a root logger that writes to both console and the rolling files.
<Configuration status="INFO" schema="Log4J-V2.0.xsd" monitorInterval="600">
<Properties>
<Property name="LOG_HOME">logs</Property>
<Property name="FILE_NAME">collector</Property>
<Property name="patternLayout">[%d{yyyy-MM-dd'T'HH:mm:ss.SSSZZ}] [%level{length=5}] [%thread-%tid] [%logger] [%X{hostName}] [%X{ip}] [%X{applicationName}] [%F,%L,%C,%M] [%m] ## '%ex'%n</Property>
</Properties>
<Appenders>
<Console name="CONSOLE" target="SYSTEM_OUT">
<PatternLayout pattern="${patternLayout}"/>
</Console>
<RollingRandomAccessFile name="appAppender" fileName="${LOG_HOME}/app-${FILE_NAME}.log" filePattern="${LOG_HOME}/app-${FILE_NAME}-%d{yyyy-MM-dd}-%i.log">
<PatternLayout pattern="${patternLayout}"/>
<Policies>
<TimeBasedTriggeringPolicy interval="1"/>
<SizeBasedTriggeringPolicy size="500MB"/>
</Policies>
<DefaultRolloverStrategy max="20"/>
</RollingRandomAccessFile>
<RollingRandomAccessFile name="errorAppender" fileName="${LOG_HOME}/error-${FILE_NAME}.log" filePattern="${LOG_HOME}/error-${FILE_NAME}-%d{yyyy-MM-dd}-%i.log">
<PatternLayout pattern="${patternLayout}"/>
<Filters>
<ThresholdFilter level="warn" onMatch="ACCEPT" onMismatch="DENY"/>
</Filters>
<Policies>
<TimeBasedTriggeringPolicy interval="1"/>
<SizeBasedTriggeringPolicy size="500MB"/>
</Policies>
<DefaultRolloverStrategy max="20"/>
</RollingRandomAccessFile>
</Appenders>
<Loggers>
<AsyncLogger name="com.bfxy.*" level="info" includeLocation="true">
<AppenderRef ref="appAppender"/>
</AsyncLogger>
<AsyncLogger name="com.bfxy.*" level="info" includeLocation="true">
<AppenderRef ref="errorAppender"/>
</AsyncLogger>
<Root level="info">
<AppenderRef ref="CONSOLE"/>
<AppenderRef ref="appAppender"/>
<AppenderRef ref="errorAppender"/>
</Root>
</Loggers>
</Configuration>Two simple controllers ( IndexController) are provided to generate info, warn, and error logs and to trigger an arithmetic exception for error logging.
@Slf4j
@RestController
public class IndexController {
@RequestMapping(value = "/index")
public String index() {
InputMDC.putMDC();
log.info("I am an info log");
log.warn("I am a warn log");
log.error("I am an error log");
return "idx";
}
@RequestMapping(value = "/err")
public String err() {
InputMDC.putMDC();
try {
int a = 1/0;
} catch (Exception e) {
log.error("Arithmetic exception", e);
}
return "err";
}
}The InputMDC component populates MDC fields ( hostName, ip, applicationName) using NetUtil utilities.
Kafka installation and enablement
Download Kafka from the official site, extract it, rename the directory, and edit server.properties to set broker ID, ports, host names, log directory, partitions, and Zookeeper connection (three‑node Zookeeper ensemble).
tar -zxvf kafka_2.12-2.1.0.tgz -C /usr/local
mv kafka_2.12-2.1.0/ kafka_2.12
vim /usr/local/kafka_2.12/config/server.properties
# modify:
broker.id=0
port=9092
host.name=192.168.11.51
advertised.host.name=192.168.11.51
log.dirs=/usr/local/kafka_2.12/kafka-logs
num.partitions=2
zookeeper.connect=192.168.11.111:2181,192.168.11.112:2181,192.168.11.113:2181
mkdir /usr/local/kafka_2.12/kafka-logs
/usr/local/kafka_2.12/bin/kafka-server-start.sh /usr/local/kafka_2.12/config/server.properties &Create two topics for the application and error logs.
kafka-topics.sh --zookeeper 192.168.11.111:2181 --create --topic app-log-collector --partitions 1 --replication-factor 1
kafka-topics.sh --zookeeper 192.168.11.111:2181 --create --topic error-log-collector --partitions 1 --replication-factor 1Filebeat installation and enablement
cd /usr/local/software
tar -zxvf filebeat-6.6.0-linux-x86_64.tar.gz -C /usr/local/
cd /usr/local
mv filebeat-6.6.0-linux-x86_64/ filebeat-6.6.0
vim /usr/local/filebeat-6.6.0/filebeat.yml # configure as shown belowThe Filebeat configuration defines two prospector inputs that read app-collector.log and error-collector.log, apply multiline handling, add custom fields ( logbiz, logtopic, evn), and forward events to Kafka.
filebeat.prospectors:
- input_type: log
paths:
- /usr/local/logs/app-collector.log
document_type: "app-log"
multiline:
pattern: '^\['
negate: true
match: after
max_lines: 2000
timeout: 2s
fields:
logbiz: collector
logtopic: app-log-collector
evn: dev
- input_type: log
paths:
- /usr/local/logs/error-collector.log
document_type: "error-log"
multiline:
pattern: '^\['
negate: true
match: after
max_lines: 2000
timeout: 2s
fields:
logbiz: collector
logtopic: error-log-collector
evn: dev
output.kafka:
enabled: true
hosts: ["192.168.11.51:9092"]
topic: "%{[fields.logtopic]}"
compression: gzip
max_message_bytes: 1000000
required_acks: 1
logging.to_files: trueValidate the configuration with ./filebeat -c filebeat.yml -configtest and start Filebeat in the background.
Logstash installation
mkdir script
cd script
vim logstash-script.confThe Logstash pipeline reads from the two Kafka topics, extracts fields using Grok patterns that match the Log4j2 layout, adds a local time field, and outputs to Elasticsearch indices named app-log-* and error-log-*. A stdout output is also defined for debugging.
input {
kafka {
topics_pattern => "app-log-.*"
bootstrap_servers => "192.168.11.51:9092"
codec => json
consumer_threads => 1
decorate_events => true
group_id => "app-log-group"
}
kafka {
topics_pattern => "error-log-.*"
bootstrap_servers => "192.168.11.51:9092"
codec => json
consumer_threads => 1
decorate_events => true
group_id => "error-log-group"
}
}
filter {
ruby { code => "event.set('index_time',event.timestamp.time.localtime.strftime('%Y.%m.%d'))" }
if "app-log" in [fields][logtopic] {
grok {
match => ["message", "\[%{NOTSPACE:currentDateTime}\] \[%{NOTSPACE:level}\] \[%{NOTSPACE:thread-id}\] \[%{NOTSPACE:class}\] \[%{DATA:hostName}\] \[%{DATA:ip}\] \[%{DATA:applicationName}\] \[%{DATA:location}\] \[%{DATA:messageInfo}\] ## ('|'%{QUOTEDSTRING:throwable})"]
}
}
if "error-log" in [fields][logtopic] {
grok {
match => ["message", "\[%{NOTSPACE:currentDateTime}\] \[%{NOTSPACE:level}\] \[%{NOTSPACE:thread-id}\] \[%{NOTSPACE:class}\] \[%{DATA:hostName}\] \[%{DATA:ip}\] \[%{DATA:applicationName}\] \[%{DATA:location}\] \[%{DATA:messageInfo}\] ## ('|'%{QUOTEDSTRING:throwable})"]
}
}
}
output {
stdout { codec => rubydebug }
if "app-log" in [fields][logtopic] {
elasticsearch {
hosts => ["192.168.11.35:9200"]
user => "elastic"
password => "123456"
index => "app-log-%{[fields][logbiz]}-%{index_time}"
sniffing => true
template_overwrite => true
}
}
if "error-log" in [fields][logtopic] {
elasticsearch {
hosts => ["192.168.11.35:9200"]
user => "elastic"
password => "123456"
index => "error-log-%{[fields][logbiz]}-%{index_time}"
sniffing => true
template_overwrite => true
}
}
}Start Logstash with the pipeline configuration:
/usr/local/logstash-6.6.0/bin/logstash -f /usr/local/logstash-6.6.0/script/logstash-script.conf &Elasticsearch and Kibana
After installing Elasticsearch and Kibana, open Kibana at http://192.168.11.35:5601, go to Management → Index Patterns, and create a pattern app-log-* (or error-log-*) using currentDateTime as the time filter field. Once the pattern is created, visiting the Spring Boot endpoints ( /index and /err) generates logs that appear in Kibana’s Discover view.
At this point the end‑to‑end log collection, enrichment, and visualization pipeline is fully operational.
Signed-in readers can open the original source through BestHub's protected redirect.
This article has been distilled and summarized from source material, then republished for learning and reference. If you believe it infringes your rights, please contactand we will review it promptly.
Code Ape Tech Column
Former Ant Group P8 engineer, pure technologist, sharing full‑stack Java, job interview and career advice through a column. Site: java-family.cn
How this landed with the community
Was this worth your time?
0 Comments
Thoughtful readers leave field notes, pushback, and hard-won operational detail here.
