Operations 17 min read

Build a Real‑Time Log Collection Pipeline with SpringBoot, Kafka, Filebeat, Logstash & Kibana

This guide walks through setting up a complete log‑collection and visualization pipeline—including SpringBoot log4j2 configuration, Kafka broker creation, Filebeat forwarding, Logstash processing, and Kibana dashboard setup—so you can capture, ship, and analyze application logs in real time.

Java High-Performance Architecture
Java High-Performance Architecture
Java High-Performance Architecture
Build a Real‑Time Log Collection Pipeline with SpringBoot, Kafka, Filebeat, Logstash & Kibana

Overall Process

The following steps outline a full‑stack log collection and visualization solution.

Server Preparation

Identify the server nodes that will host the services (SpringBoot, Kafka, Elasticsearch, Kibana, etc.).

SpringBoot Project Setup

Replace the default SpringBoot logging with Log4j2. The demo project includes the necessary pom.xml dependencies and a log4j2.xml configuration that defines console and rolling file appenders, pattern layout, and async loggers.

<dependencies>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-web</artifactId>
        <exclusions>
            <exclusion>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-logging</artifactId>
            </exclusion>
        </exclusions>
    </dependency>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-log4j2</artifactId>
    </dependency>
    <dependency>
        <groupId>com.lmax</groupId>
        <artifactId>disruptor</artifactId>
        <version>3.3.4</version>
    </dependency>
</dependencies>
<?xml version="1.0" encoding="UTF-8"?>
<Configuration status="INFO" schema="Log4J-V2.0.xsd" monitorInterval="600">
    <Properties>
        <Property name="LOG_HOME">logs</Property>
        <Property name="FILE_NAME">collector</Property>
        <Property name="patternLayout">[%d{yyyy-MM-dd'T'HH:mm:ss.SSSZZ}] [%level{length=5}] [%thread-%tid] [%logger] [%X{hostName}] [%X{ip}] [%X{applicationName}] [%F,%L,%C,%M] [%m] ## '%ex'%n</Property>
    </Properties>
    <Appenders>
        <Console name="CONSOLE" target="SYSTEM_OUT">
            <PatternLayout pattern="${patternLayout}"/>
        </Console>
        <RollingRandomAccessFile name="appAppender" fileName="${LOG_HOME}/app-${FILE_NAME}.log" filePattern="${LOG_HOME}/app-${FILE_NAME}-%d{yyyy-MM-dd}-%i.log">
            <PatternLayout pattern="${patternLayout}"/>
            <Policies>
                <TimeBasedTriggeringPolicy interval="1"/>
                <SizeBasedTriggeringPolicy size="500MB"/>
            </Policies>
            <DefaultRolloverStrategy max="20"/>
        </RollingRandomAccessFile>
        <RollingRandomAccessFile name="errorAppender" fileName="${LOG_HOME}/error-${FILE_NAME}.log" filePattern="${LOG_HOME}/error-${FILE_NAME}-%d{yyyy-MM-dd}-%i.log">
            <PatternLayout pattern="${patternLayout}"/>
            <Filters>
                <ThresholdFilter level="warn" onMatch="ACCEPT" onMismatch="DENY"/>
            </Filters>
            <Policies>
                <TimeBasedTriggeringPolicy interval="1"/>
                <SizeBasedTriggeringPolicy size="500MB"/>
            </Policies>
            <DefaultRolloverStrategy max="20"/>
        </RollingRandomAccessFile>
    </Appenders>
    <Loggers>
        <AsyncLogger name="com.bfxy.*" level="info" includeLocation="true">
            <AppenderRef ref="appAppender"/>
        </AsyncLogger>
        <AsyncLogger name="com.bfxy.*" level="info" includeLocation="true">
            <AppenderRef ref="errorAppender"/>
        </AsyncLogger>
        <Root level="info">
            <AppenderRef ref="CONSOLE"/>
            <AppenderRef ref="appAppender"/>
            <AppenderRef ref="errorAppender"/>
        </Root>
    </Loggers>
</Configuration>

Two sample controllers ( IndexController) demonstrate logging at different levels and an exception scenario. InputMDC populates MDC fields ( hostName, ip, applicationName) for each request, while NetUtil provides utilities for IP/host handling.

Running the Application

Start the SpringBoot service and access /index and /err. Two log files are generated: app-collector.log and error-collector.log.

Log files generated
Log files generated

Kafka Installation and Enablement

Download Kafka from the official site, extract it, rename the directory, and edit server.properties to set broker ID, port, host name, log directory, partitions, and Zookeeper connection.

tar -zxvf kafka_2.12-2.1.0.tgz -C /usr/local
mv kafka_2.12-2.1.0/ kafka_2.12
vim /usr/local/kafka_2.12/config/server.properties
# Example settings
broker.id=0
port=9092
host.name=192.168.11.51
advertised.host.name=192.168.11.51
log.dirs=/usr/local/kafka_2.12/kafka-logs
num.partitions=2
zookeeper.connect=192.168.11.111:2181,192.168.11.112:2181,192.168.11.113:2181
mkdir -p /usr/local/kafka_2.12/kafka-logs
/usr/local/kafka_2.12/bin/kafka-server-start.sh /usr/local/kafka_2.12/config/server.properties &

Create two topics for application and error logs:

kafka-topics.sh --zookeeper 192.168.11.111:2181 --create --topic app-log-collector --partitions 1 --replication-factor 1
kafka-topics.sh --zookeeper 192.168.11.111:2181 --create --topic error-log-collector --partitions 1 --replication-factor 1
Kafka topics
Kafka topics

Filebeat Installation and Enablement

Extract Filebeat, rename the directory, and edit filebeat.yml to collect the two log files, define multiline handling, add custom fields, and configure Kafka output.

cd /usr/local/software
tar -zxvf filebeat-6.6.0-linux-x86_64.tar.gz -C /usr/local
mv filebeat-6.6.0-linux-x86_64/ filebeat-6.6.0
vim /usr/local/filebeat-6.6.0/filebeat.yml
# (excerpt of the configuration)
filebeat.prospectors:
- input_type: log
  paths:
    - /usr/local/logs/app-collector.log
  document_type: "app-log"
  multiline:
    pattern: '^\['
    negate: true
    match: after
    max_lines: 2000
    timeout: 2s
  fields:
    logbiz: collector
    logtopic: app-log-collector
    evn: dev
- input_type: log
  paths:
    - /usr/local/logs/error-collector.log
  document_type: "error-log"
  multiline:
    pattern: '^\['
    negate: true
    match: after
    max_lines: 2000
    timeout: 2s
  fields:
    logbiz: collector
    logtopic: error-log-collector
    evn: dev
output.kafka:
  enabled: true
  hosts: ["192.168.11.51:9092"]
  topic: "%{[fields.logtopic]}"
  compression: gzip
  required_acks: 1

Validate the configuration and start Filebeat:

cd /usr/local/filebeat-6.6.0
./filebeat -c filebeat.yml -configtest   # Config OK
./filebeat &
Filebeat running
Filebeat running

Logstash Installation

Create a Logstash pipeline script ( logstash-script.conf) that consumes the two Kafka topics, parses the JSON messages with Grok, adds a date field, and outputs to Elasticsearch (or console for testing).

input {
  kafka {
    topics_pattern => "app-log-.*"
    bootstrap_servers => "192.168.11.51:9092"
    codec => json
    consumer_threads => 1
    decorate_events => true
    group_id => "app-log-group"
  }
  kafka {
    topics_pattern => "error-log-.*"
    bootstrap_servers => "192.168.11.51:9092"
    codec => json
    consumer_threads => 1
    decorate_events => true
    group_id => "error-log-group"
  }
}
filter {
  ruby { code => "event.set('index_time',event.timestamp.time.localtime.strftime('%Y.%m.%d'))" }
  if "app-log" in [fields][logtopic] {
    grok { match => ["message", "\[%{NOTSPACE:currentDateTime}\] \[%{NOTSPACE:level}\] \[%{NOTSPACE:thread-id}\] \[%{NOTSPACE:class}\] \[%{DATA:hostName}\] \[%{DATA:ip}\] \[%{DATA:applicationName}\] \[%{DATA:location}\] \[%{DATA:messageInfo}\] ## (''|%{QUOTEDSTRING:throwable})"] }
  }
  if "error-log" in [fields][logtopic] {
    grok { match => ["message", "\[%{NOTSPACE:currentDateTime}\] \[%{NOTSPACE:level}\] \[%{NOTSPACE:thread-id}\] \[%{NOTSPACE:class}\] \[%{DATA:hostName}\] \[%{DATA:ip}\] \[%{DATA:applicationName}\] \[%{DATA:location}\] \[%{DATA:messageInfo}\] ## (''|%{QUOTEDSTRING:throwable})"] }
  }
}
output {
  stdout { codec => rubydebug }
  if "app-log" in [fields][logtopic] {
    elasticsearch {
      hosts => ["192.168.11.35:9200"]
      user => "elastic"
      password => "123456"
      index => "app-log-%{[fields][logbiz]}-%{index_time}"
      sniffing => true
      template_overwrite => true
    }
  }
  if "error-log" in [fields][logtopic] {
    elasticsearch {
      hosts => ["192.168.11.35:9200"]
      user => "elastic"
      password => "123456"
      index => "error-log-%{[fields][logbiz]}-%{index_time}"
      sniffing => true
      template_overwrite => true
    }
  }
}

Start Logstash with the script:

/usr/local/logstash-6.6.0/bin/logstash -f /usr/local/logstash-6.6.0/script/logstash-script.conf &

Elasticsearch and Kibana

After Elasticsearch and Kibana are running, create an index pattern in Kibana (e.g., app-log-*) and set currentDateTime as the time filter field. The logs collected from the SpringBoot application are now searchable and visualizable in Kibana.

Kibana index pattern
Kibana index pattern

Visiting the application endpoints again generates new log entries that appear in the Kibana dashboard, completing the end‑to‑end log collection and visualization workflow.

Original Source

Signed-in readers can open the original source through BestHub's protected redirect.

Sign in to view source
Republication Notice

This article has been distilled and summarized from source material, then republished for learning and reference. If you believe it infringes your rights, please contactadmin@besthub.devand we will review it promptly.

ElasticsearchKafkaSpringBootLogstashKibanaFilebeat
Java High-Performance Architecture
Written by

Java High-Performance Architecture

Sharing Java development articles and resources, including SSM architecture and the Spring ecosystem (Spring Boot, Spring Cloud, MyBatis, Dubbo, Docker), Zookeeper, Redis, architecture design, microservices, message queues, Git, etc.

0 followers
Reader feedback

How this landed with the community

Sign in to like

Rate this article

Was this worth your time?

Sign in to rate
Discussion

0 Comments

Thoughtful readers leave field notes, pushback, and hard-won operational detail here.