Operations 17 min read

Build a Real‑Time Log Collection Pipeline with Spring Boot, Kafka, Filebeat, Logstash & Kibana

This step‑by‑step guide shows how to configure a Spring Boot application with Log4j2, ship logs via Filebeat to Kafka, process them with Logstash, and visualize the data in Elasticsearch and Kibana, covering server setup, Maven dependencies, configuration files, and deployment commands.

Code Ape Tech Column
Code Ape Tech Column
Code Ape Tech Column
Build a Real‑Time Log Collection Pipeline with Spring Boot, Kafka, Filebeat, Logstash & Kibana

The article walks through constructing a complete log‑collection and visualization pipeline for a Spring Boot service.

Server preparation

List the server nodes that will host the services so readers can map the later steps to specific machines.

Spring Boot project preparation

Add Log4j2 to replace Spring Boot's default logging. The pom.xml should include the Spring Boot starter web dependency with the logging starter excluded, then add spring-boot-starter-log4j2 and the Disruptor library.

<dependencies>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-web</artifactId>
        <!-- exclude spring-boot-starter-logging -->
        <exclusions>
            <exclusion>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-logging</artifactId>
            </exclusion>
        </exclusions>
    </dependency>
    <!-- log4j2 -->
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-log4j2</artifactId>
    </dependency>
    <dependency>
        <groupId>com.lmax</groupId>
        <artifactId>disruptor</artifactId>
        <version>3.3.4</version>
    </dependency>
</dependencies>

The log4j2.xml defines properties, console and rolling file appenders, async loggers for the application package, and a root logger that writes to both console and the rolling files.

<Configuration status="INFO" schema="Log4J-V2.0.xsd" monitorInterval="600">
    <Properties>
        <Property name="LOG_HOME">logs</Property>
        <Property name="FILE_NAME">collector</Property>
        <Property name="patternLayout">[%d{yyyy-MM-dd'T'HH:mm:ss.SSSZZ}] [%level{length=5}] [%thread-%tid] [%logger] [%X{hostName}] [%X{ip}] [%X{applicationName}] [%F,%L,%C,%M] [%m] ## '%ex'%n</Property>
    </Properties>
    <Appenders>
        <Console name="CONSOLE" target="SYSTEM_OUT">
            <PatternLayout pattern="${patternLayout}"/>
        </Console>
        <RollingRandomAccessFile name="appAppender" fileName="${LOG_HOME}/app-${FILE_NAME}.log" filePattern="${LOG_HOME}/app-${FILE_NAME}-%d{yyyy-MM-dd}-%i.log">
            <PatternLayout pattern="${patternLayout}"/>
            <Policies>
                <TimeBasedTriggeringPolicy interval="1"/>
                <SizeBasedTriggeringPolicy size="500MB"/>
            </Policies>
            <DefaultRolloverStrategy max="20"/>
        </RollingRandomAccessFile>
        <RollingRandomAccessFile name="errorAppender" fileName="${LOG_HOME}/error-${FILE_NAME}.log" filePattern="${LOG_HOME}/error-${FILE_NAME}-%d{yyyy-MM-dd}-%i.log">
            <PatternLayout pattern="${patternLayout}"/>
            <Filters>
                <ThresholdFilter level="warn" onMatch="ACCEPT" onMismatch="DENY"/>
            </Filters>
            <Policies>
                <TimeBasedTriggeringPolicy interval="1"/>
                <SizeBasedTriggeringPolicy size="500MB"/>
            </Policies>
            <DefaultRolloverStrategy max="20"/>
        </RollingRandomAccessFile>
    </Appenders>
    <Loggers>
        <AsyncLogger name="com.bfxy.*" level="info" includeLocation="true">
            <AppenderRef ref="appAppender"/>
        </AsyncLogger>
        <AsyncLogger name="com.bfxy.*" level="info" includeLocation="true">
            <AppenderRef ref="errorAppender"/>
        </AsyncLogger>
        <Root level="info">
            <AppenderRef ref="CONSOLE"/>
            <AppenderRef ref="appAppender"/>
            <AppenderRef ref="errorAppender"/>
        </Root>
    </Loggers>
</Configuration>

Two simple controllers ( IndexController) are provided to generate info, warn, and error logs and to trigger an arithmetic exception for error logging.

@Slf4j
@RestController
public class IndexController {
    @RequestMapping(value = "/index")
    public String index() {
        InputMDC.putMDC();
        log.info("I am an info log");
        log.warn("I am a warn log");
        log.error("I am an error log");
        return "idx";
    }

    @RequestMapping(value = "/err")
    public String err() {
        InputMDC.putMDC();
        try {
            int a = 1/0;
        } catch (Exception e) {
            log.error("Arithmetic exception", e);
        }
        return "err";
    }
}

The InputMDC component populates MDC fields ( hostName, ip, applicationName) using NetUtil utilities.

Kafka installation and enablement

Download Kafka from the official site, extract it, rename the directory, and edit server.properties to set broker ID, ports, host names, log directory, partitions, and Zookeeper connection (three‑node Zookeeper ensemble).

tar -zxvf kafka_2.12-2.1.0.tgz -C /usr/local
mv kafka_2.12-2.1.0/ kafka_2.12
vim /usr/local/kafka_2.12/config/server.properties
# modify:
broker.id=0
port=9092
host.name=192.168.11.51
advertised.host.name=192.168.11.51
log.dirs=/usr/local/kafka_2.12/kafka-logs
num.partitions=2
zookeeper.connect=192.168.11.111:2181,192.168.11.112:2181,192.168.11.113:2181
mkdir /usr/local/kafka_2.12/kafka-logs
/usr/local/kafka_2.12/bin/kafka-server-start.sh /usr/local/kafka_2.12/config/server.properties &

Create two topics for the application and error logs.

kafka-topics.sh --zookeeper 192.168.11.111:2181 --create --topic app-log-collector --partitions 1 --replication-factor 1
kafka-topics.sh --zookeeper 192.168.11.111:2181 --create --topic error-log-collector --partitions 1 --replication-factor 1

Filebeat installation and enablement

cd /usr/local/software
tar -zxvf filebeat-6.6.0-linux-x86_64.tar.gz -C /usr/local/
cd /usr/local
mv filebeat-6.6.0-linux-x86_64/ filebeat-6.6.0
vim /usr/local/filebeat-6.6.0/filebeat.yml   # configure as shown below

The Filebeat configuration defines two prospector inputs that read app-collector.log and error-collector.log, apply multiline handling, add custom fields ( logbiz, logtopic, evn), and forward events to Kafka.

filebeat.prospectors:
- input_type: log
  paths:
    - /usr/local/logs/app-collector.log
  document_type: "app-log"
  multiline:
    pattern: '^\['
    negate: true
    match: after
    max_lines: 2000
    timeout: 2s
  fields:
    logbiz: collector
    logtopic: app-log-collector
    evn: dev

- input_type: log
  paths:
    - /usr/local/logs/error-collector.log
  document_type: "error-log"
  multiline:
    pattern: '^\['
    negate: true
    match: after
    max_lines: 2000
    timeout: 2s
  fields:
    logbiz: collector
    logtopic: error-log-collector
    evn: dev

output.kafka:
  enabled: true
  hosts: ["192.168.11.51:9092"]
  topic: "%{[fields.logtopic]}"
  compression: gzip
  max_message_bytes: 1000000
  required_acks: 1
logging.to_files: true

Validate the configuration with ./filebeat -c filebeat.yml -configtest and start Filebeat in the background.

Logstash installation

mkdir script
cd script
vim logstash-script.conf

The Logstash pipeline reads from the two Kafka topics, extracts fields using Grok patterns that match the Log4j2 layout, adds a local time field, and outputs to Elasticsearch indices named app-log-* and error-log-*. A stdout output is also defined for debugging.

input {
  kafka {
    topics_pattern => "app-log-.*"
    bootstrap_servers => "192.168.11.51:9092"
    codec => json
    consumer_threads => 1
    decorate_events => true
    group_id => "app-log-group"
  }
  kafka {
    topics_pattern => "error-log-.*"
    bootstrap_servers => "192.168.11.51:9092"
    codec => json
    consumer_threads => 1
    decorate_events => true
    group_id => "error-log-group"
  }
}

filter {
  ruby { code => "event.set('index_time',event.timestamp.time.localtime.strftime('%Y.%m.%d'))" }
  if "app-log" in [fields][logtopic] {
    grok {
      match => ["message", "\[%{NOTSPACE:currentDateTime}\] \[%{NOTSPACE:level}\] \[%{NOTSPACE:thread-id}\] \[%{NOTSPACE:class}\] \[%{DATA:hostName}\] \[%{DATA:ip}\] \[%{DATA:applicationName}\] \[%{DATA:location}\] \[%{DATA:messageInfo}\] ## ('|'%{QUOTEDSTRING:throwable})"]
    }
  }
  if "error-log" in [fields][logtopic] {
    grok {
      match => ["message", "\[%{NOTSPACE:currentDateTime}\] \[%{NOTSPACE:level}\] \[%{NOTSPACE:thread-id}\] \[%{NOTSPACE:class}\] \[%{DATA:hostName}\] \[%{DATA:ip}\] \[%{DATA:applicationName}\] \[%{DATA:location}\] \[%{DATA:messageInfo}\] ## ('|'%{QUOTEDSTRING:throwable})"]
    }
  }
}

output {
  stdout { codec => rubydebug }
  if "app-log" in [fields][logtopic] {
    elasticsearch {
      hosts => ["192.168.11.35:9200"]
      user => "elastic"
      password => "123456"
      index => "app-log-%{[fields][logbiz]}-%{index_time}"
      sniffing => true
      template_overwrite => true
    }
  }
  if "error-log" in [fields][logtopic] {
    elasticsearch {
      hosts => ["192.168.11.35:9200"]
      user => "elastic"
      password => "123456"
      index => "error-log-%{[fields][logbiz]}-%{index_time}"
      sniffing => true
      template_overwrite => true
    }
  }
}

Start Logstash with the pipeline configuration:

/usr/local/logstash-6.6.0/bin/logstash -f /usr/local/logstash-6.6.0/script/logstash-script.conf &

Elasticsearch and Kibana

After installing Elasticsearch and Kibana, open Kibana at http://192.168.11.35:5601, go to Management → Index Patterns, and create a pattern app-log-* (or error-log-*) using currentDateTime as the time filter field. Once the pattern is created, visiting the Spring Boot endpoints ( /index and /err) generates logs that appear in Kibana’s Discover view.

At this point the end‑to‑end log collection, enrichment, and visualization pipeline is fully operational.

Original Source

Signed-in readers can open the original source through BestHub's protected redirect.

Sign in to view source
Republication Notice

This article has been distilled and summarized from source material, then republished for learning and reference. If you believe it infringes your rights, please contactadmin@besthub.devand we will review it promptly.

ElasticsearchKafkaloggingSpring Bootlog4j2LogstashKibanaFilebeat
Code Ape Tech Column
Written by

Code Ape Tech Column

Former Ant Group P8 engineer, pure technologist, sharing full‑stack Java, job interview and career advice through a column. Site: java-family.cn

0 followers
Reader feedback

How this landed with the community

Sign in to like

Rate this article

Was this worth your time?

Sign in to rate
Discussion

0 Comments

Thoughtful readers leave field notes, pushback, and hard-won operational detail here.