How to Build a Real‑Time Log Collection Pipeline with SpringBoot, Kafka, Logstash & Kibana
This guide walks you through setting up a complete log‑collection and visualization pipeline—preparing servers, configuring a SpringBoot application with Log4j2, routing logs to Kafka, harvesting them with Filebeat, processing with Logstash, and finally visualizing in Elasticsearch and Kibana.
The overall workflow consists of server preparation, SpringBoot project setup, log forwarding to Kafka, log harvesting with Filebeat, processing with Logstash, and visualization using Elasticsearch and Kibana.
Server Preparation
List the server nodes that will host the services so readers can map the steps to their own environment.
SpringBoot Project Preparation
Replace the default SpringBoot logging with Log4j2 and use the following demo project structure.
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
<exclusions>
<exclusion>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-logging</artifactId>
</exclusion>
</exclusions>
</dependency>
<!-- log4j2 -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-log4j2</artifactId>
</dependency>
<dependency>
<groupId>com.lmax</groupId>
<artifactId>disruptor</artifactId>
<version>3.3.4</version>
</dependency>
</dependencies>Log4j2 configuration (log4j2.xml):
<Configuration status="INFO" schema="Log4J-V2.0.xsd" monitorInterval="600">
<Properties>
<Property name="LOG_HOME">logs</Property>
<Property name="FILE_NAME">collector</Property>
<Property name="patternLayout">[%d{yyyy-MM-dd'T'HH:mm:ss.SSSZZ}] [%level{length=5}] [%thread-%tid] [%logger] [%X{hostName}] [%X{ip}] [%X{applicationName}] [%F,%L,%C,%M] [%m] ## '%ex'%n</Property>
</Properties>
<Appenders>
<Console name="CONSOLE" target="SYSTEM_OUT">
<PatternLayout pattern="${patternLayout}"/>
</Console>
<RollingRandomAccessFile name="appAppender" fileName="${LOG_HOME}/app-${FILE_NAME}.log" filePattern="${LOG_HOME}/app-${FILE_NAME}-%d{yyyy-MM-dd}-%i.log">
<PatternLayout pattern="${patternLayout}"/>
<Policies>
<TimeBasedTriggeringPolicy interval="1"/>
<SizeBasedTriggeringPolicy size="500MB"/>
</Policies>
<DefaultRolloverStrategy max="20"/>
</RollingRandomAccessFile>
<RollingRandomAccessFile name="errorAppender" fileName="${LOG_HOME}/error-${FILE_NAME}.log" filePattern="${LOG_HOME}/error-${FILE_NAME}-%d{yyyy-MM-dd}-%i.log">
<PatternLayout pattern="${patternLayout}"/>
<Filters>
<ThresholdFilter level="warn" onMatch="ACCEPT" onMismatch="DENY"/>
</Filters>
<Policies>
<TimeBasedTriggeringPolicy interval="1"/>
<SizeBasedTriggeringPolicy size="500MB"/>
</Policies>
<DefaultRolloverStrategy max="20"/>
</RollingRandomAccessFile>
</Appenders>
<Loggers>
<AsyncLogger name="com.bfxy.*" level="info" includeLocation="true">
<AppenderRef ref="appAppender"/>
</AsyncLogger>
<AsyncLogger name="com.bfxy.*" level="info" includeLocation="true">
<AppenderRef ref="errorAppender"/>
</AsyncLogger>
<Root level="info">
<AppenderRef ref="CONSOLE"/>
<AppenderRef ref="appAppender"/>
<AppenderRef ref="errorAppender"/>
</Root>
</Loggers>
</Configuration>IndexController
@Slf4j
@RestController
public class IndexController {
@RequestMapping(value = "/index")
public String index() {
InputMDC.putMDC();
log.info("我是一条info日志");
log.warn("我是一条warn日志");
log.error("我是一条error日志");
return "idx";
}
@RequestMapping(value = "/err")
public String err() {
InputMDC.putMDC();
try {
int a = 1/0;
} catch (Exception e) {
log.error("算术异常", e);
}
return "err";
}
}InputMDC
@Component
public class InputMDC implements EnvironmentAware {
private static Environment environment;
@Override
public void setEnvironment(Environment environment) {
InputMDC.environment = environment;
}
public static void putMDC() {
MDC.put("hostName", NetUtil.getLocalHostName());
MDC.put("ip", NetUtil.getLocalIp());
MDC.put("applicationName", environment.getProperty("spring.application.name"));
}
}NetUtil
public class NetUtil {
public static String normalizeAddress(String address) { /* ... */ }
public static String getLocalAddress(String address) { /* ... */ }
public static String getLocalIp(String ipPreference) { /* ... */ }
public static String getLocalIp() { return getLocalIp("* >10 >172 >192 >127"); }
public static String remoteAddress(SocketChannel channel) { /* ... */ }
public static String localAddress(SocketChannel channel) { /* ... */ }
public static String getPid() { /* ... */ }
public static String getLocalHostName() { /* ... */ }
}Start the SpringBoot application and access /index and /err. Two log files are generated: app-collector.log and error-collector.log.
Kafka Installation and Enable
Download Kafka from kafka.apache.org/downloads.html . Install Zookeeper first (three nodes), then configure server.properties:
broker.id=0
port=9092
host.name=192.168.11.51
advertised.host.name=192.168.11.51
log.dirs=/usr/local/kafka_2.12/kafka-logs
num.partitions=2
zookeeper.connect=192.168.11.111:2181,192.168.11.112:2181,192.168.11.113:2181Create log topics:
kafka-topics.sh --zookeeper 192.168.11.111:2181 --create --topic app-log-collector --partitions 1 --replication-factor 1
kafka-topics.sh --zookeeper 192.168.11.111:2181 --create --topic error-log-collector --partitions 1 --replication-factor 1Filebeat Installation and Enable
Download and extract Filebeat 6.6.0, then edit filebeat.yml (excerpt):
filebeat.prospectors:
- input_type: log
paths:
- /usr/local/logs/app-collector.log
document_type: "app-log"
multiline:
pattern: '^\['
negate: true
match: after
max_lines: 2000
timeout: 2s
fields:
logbiz: collector
logtopic: app-log-collector
evn: dev
- input_type: log
paths:
- /usr/local/logs/error-collector.log
document_type: "error-log"
multiline:
pattern: '^\['
negate: true
match: after
max_lines: 2000
timeout: 2s
fields:
logbiz: collector
logtopic: error-log-collector
evn: dev
output.kafka:
enabled: true
hosts: ["192.168.11.51:9092"]
topic: "%{[fields.logtopic]}"
compression: gzip
required_acks: 1Validate configuration and start Filebeat:
./filebeat -c filebeat.yml -configtest # Config OK
./filebeat &Logstash Installation
Create a script directory and add logstash-script.conf:
input {
kafka {
topics_pattern => "app-log-.*"
bootstrap_servers => "192.168.11.51:9092"
codec => json
consumer_threads => 1
decorate_events => true
group_id => "app-log-group"
}
kafka {
topics_pattern => "error-log-.*"
bootstrap_servers => "192.168.11.51:9092"
codec => json
consumer_threads => 1
decorate_events => true
group_id => "error-log-group"
}
}
filter {
ruby { code => "event.set('index_time',event.timestamp.time.localtime.strftime('%Y.%m.%d'))" }
if "app-log" in [fields][logtopic] {
grok { match => { "message" => "\[%{NOTSPACE:currentDateTime}\] \[%{NOTSPACE:level}\] \[%{NOTSPACE:thread-id}\] \[%{NOTSPACE:class}\] \[%{DATA:hostName}\] \[%{DATA:ip}\] \[%{DATA:applicationName}\] \[%{DATA:location}\] \[%{DATA:messageInfo}\]" } }
}
if "error-log" in [fields][logtopic] {
grok { match => { "message" => "\[%{NOTSPACE:currentDateTime}\] \[%{NOTSPACE:level}\] \[%{NOTSPACE:thread-id}\] \[%{NOTSPACE:class}\] \[%{DATA:hostName}\] \[%{DATA:ip}\] \[%{DATA:applicationName}\] \[%{DATA:location}\] \[%{DATA:messageInfo}\]" } }
}
}
output {
elasticsearch {
hosts => ["192.168.11.35:9200"]
user => "elastic"
password => "123456"
index => "app-log-%{[fields][logbiz]}-%{index_time}"
sniffing => true
template_overwrite => true
}
if "error-log" in [fields][logtopic] {
elasticsearch {
hosts => ["192.168.11.35:9200"]
user => "elastic"
password => "123456"
index => "error-log-%{[fields][logbiz]}-%{index_time}"
sniffing => true
template_overwrite => true
}
}
}Start Logstash:
/usr/local/logstash-6.6.0/bin/logstash -f /usr/local/logstash-6.6.0/script/logstash-script.conf &Elasticsearch and Kibana
After installing Elasticsearch and Kibana, create an index pattern in Kibana (e.g., app-log-*) and set currentDateTime as the time filter field. The logs from the SpringBoot application are now searchable and visualizable.
Visiting the /err endpoint again shows a full log entry in Kibana.
With these components linked, you now have a complete end‑to‑end log collection, processing, and visualization solution.
Signed-in readers can open the original source through BestHub's protected redirect.
This article has been distilled and summarized from source material, then republished for learning and reference. If you believe it infringes your rights, please contactand we will review it promptly.
Java High-Performance Architecture
Sharing Java development articles and resources, including SSM architecture and the Spring ecosystem (Spring Boot, Spring Cloud, MyBatis, Dubbo, Docker), Zookeeper, Redis, architecture design, microservices, message queues, Git, etc.
How this landed with the community
Was this worth your time?
0 Comments
Thoughtful readers leave field notes, pushback, and hard-won operational detail here.
