Building a Log Collection and Visualization Pipeline with SpringBoot, Log4j2, Kafka, Filebeat, Logstash, Elasticsearch, and Kibana

This tutorial walks through the end‑to‑end setup of a logging pipeline that starts with a SpringBoot application using Log4j2, forwards logs to Kafka, collects them with Filebeat, processes them via Logstash, and finally visualizes them in Elasticsearch and Kibana, covering server preparation, configuration files, and essential code snippets.

Top Architect
Top Architect
Top Architect
Building a Log Collection and Visualization Pipeline with SpringBoot, Log4j2, Kafka, Filebeat, Logstash, Elasticsearch, and Kibana

Overall Process

The article describes a complete workflow: prepare servers, configure a SpringBoot project with Log4j2, generate logs, send them to Kafka, collect with Filebeat, process with Logstash, and visualize in Elasticsearch/Kibana.

Server Preparation

List of server nodes is shown (images omitted) to help readers map each component to a physical machine.

SpringBoot Project Setup

Replace the default logging with Log4j2. The pom.xml dependencies include Spring Boot starter web, Log4j2 starter, and Disruptor:

<dependencies>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-web</artifactId>
        <exclusions>
            <exclusion>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-logging</artifactId>
            </exclusion>
        </exclusions>
    </dependency>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-log4j2</artifactId>
    </dependency>
    <dependency>
        <groupId>com.lmax</groupId>
        <artifactId>disruptor</artifactId>
        <version>3.3.4</version>
    </dependency>
</dependencies>

The log4j2.xml defines console and rolling file appenders, MDC properties (hostName, ip, applicationName), and async loggers for the package com.bfxy.*:

<Configuration status="INFO" schema="Log4J-V2.0.xsd" monitorInterval="600">
    <Properties>
        <Property name="LOG_HOME">logs</Property>
        <Property name="FILE_NAME">collector</Property>
        <Property name="patternLayout">[%d{yyyy-MM-dd'T'HH:mm:ss.SSSZZ}] [%level{length=5}] [%thread-%tid] [%logger] [%X{hostName}] [%X{ip}] [%X{applicationName}] [%F,%L,%C,%M] [%m] ## '%ex'%n</Property>
    </Properties>
    <Appenders>
        <Console name="CONSOLE" target="SYSTEM_OUT">
            <PatternLayout pattern="${patternLayout}"/>
        </Console>
        <RollingRandomAccessFile name="appAppender" fileName="${LOG_HOME}/app-${FILE_NAME}.log" filePattern="${LOG_HOME}/app-${FILE_NAME}-%d{yyyy-MM-dd}-%i.log">
            <PatternLayout pattern="${patternLayout}"/>
            <Policies>
                <TimeBasedTriggeringPolicy interval="1"/>
                <SizeBasedTriggeringPolicy size="500MB"/>
            </Policies>
            <DefaultRolloverStrategy max="20"/>
        </RollingRandomAccessFile>
        ... (errorAppender omitted for brevity) ...
    </Appenders>
    <Loggers>
        <AsyncLogger name="com.bfxy.*" level="info" includeLocation="true">
            <AppenderRef ref="appAppender"/>
        </AsyncLogger>
        <AsyncLogger name="com.bfxy.*" level="info" includeLocation="true">
            <AppenderRef ref="errorAppender"/>
        </AsyncLogger>
        <Root level="info">
            <AppenderRef ref="CONSOLE"/>
            <AppenderRef ref="appAppender"/>
            <AppenderRef ref="errorAppender"/>
        </Root>
    </Loggers>
</Configuration>

Sample Controllers IndexController provides two endpoints ( /index and /err) that log info, warn, and error messages and demonstrate MDC usage:

@Slf4j
@RestController
public class IndexController {
    @RequestMapping(value = "/index")
    public String index() {
        InputMDC.putMDC();
        log.info("我是一条info日志");
        log.warn("我是一条warn日志");
        log.error("我是一条error日志");
        return "idx";
    }

    @RequestMapping(value = "/err")
    public String err() {
        InputMDC.putMDC();
        try {
            int a = 1/0;
        } catch (Exception e) {
            log.error("算术异常", e);
        }
        return "err";
    }
}
InputMDC

populates MDC fields used in the Log4j2 pattern:

@Component
public class InputMDC implements EnvironmentAware {
    private static Environment environment;
    @Override
    public void setEnvironment(Environment env) { environment = env; }
    public static void putMDC() {
        MDC.put("hostName", NetUtil.getLocalHostName());
        MDC.put("ip", NetUtil.getLocalIp());
        MDC.put("applicationName", environment.getProperty("spring.application.name"));
    }
}
NetUtil

contains helper methods for IP/host handling (code omitted for brevity).

Running the Project

After starting the SpringBoot app, accessing /index and /err creates two log files: app-collector.log and error-collector.log.

Kafka Installation and Usage

Download Kafka, extract, rename, and edit server.properties with broker ID, ports, host name, log directories, partitions, and Zookeeper connection (three nodes). Create two topics:

kafka-topics.sh --zookeeper 192.168.11.111:2181 --create --topic app-log-collector --partitions 1 --replication-factor 1
kafka-topics.sh --zookeeper 192.168.11.111:2181 --create --topic error-log-collector --partitions 1 --replication-factor 1

Verify topics with kafka-topics.sh --describe.

Filebeat Installation and Configuration

Extract Filebeat 6.6.0, edit filebeat.yml to monitor the two log files, define document types, multiline settings, and add a Kafka output pointing to the broker at 192.168.11.51:9092. Example snippet:

filebeat.prospectors:
- input_type: log
  paths:
    - /usr/local/logs/app-collector.log
  document_type: "app-log"
  multiline:
    pattern: '^\['
    negate: true
    match: after
    max_lines: 2000
    timeout: 2s
  fields:
    logbiz: collector
    logtopic: app-log-collector
    evn: dev

- input_type: log
  paths:
    - /usr/local/logs/error-collector.log
  document_type: "error-log"
  multiline:
    pattern: '^\['
    negate: true
    match: after
    max_lines: 2000
    timeout: 2s
  fields:
    logbiz: collector
    logtopic: error-log-collector
    evn: dev

output.kafka:
  enabled: true
  hosts: ["192.168.11.51:9092"]
  topic: "%{[fields.logtopic]}"
  compression: gzip
  required_acks: 1

Test the configuration with ./filebeat -c filebeat.yml -configtest and start it in background.

Logstash Configuration

Create logstash-script.conf that reads from the two Kafka topics, parses the Log4j2 JSON messages with Grok, adds a timezone‑adjusted index_time field, and outputs to both console and Elasticsearch indices:

input {
  kafka {
    topics_pattern => "app-log-.*"
    bootstrap_servers => "192.168.11.51:9092"
    codec => json
    consumer_threads => 1
    decorate_events => true
    group_id => "app-log-group"
  }
  kafka {
    topics_pattern => "error-log-.*"
    bootstrap_servers => "192.168.11.51:9092"
    codec => json
    consumer_threads => 1
    decorate_events => true
    group_id => "error-log-group"
  }
}

filter {
  ruby { code => "event.set('index_time',event.timestamp.time.localtime.strftime('%Y.%m.%d'))" }
  if "app-log" in [fields][logtopic] {
    grok { match => { "message" => "\[%{NOTSPACE:currentDateTime}\] \[%{NOTSPACE:level}\] \[%{NOTSPACE:thread-id}\] \[%{NOTSPACE:class}\] \[%{DATA:hostName}\] \[%{DATA:ip}\] \[%{DATA:applicationName}\] \[%{DATA:location}\] \[%{DATA:messageInfo}\] ## (''|%{QUOTEDSTRING:throwable})" } }
  }
  if "error-log" in [fields][logtopic] {
    grok { match => { "message" => "\[%{NOTSPACE:currentDateTime}\] \[%{NOTSPACE:level}\] \[%{NOTSPACE:thread-id}\] \[%{NOTSPACE:class}\] \[%{DATA:hostName}\] \[%{DATA:ip}\] \[%{DATA:applicationName}\] \[%{DATA:location}\] \[%{DATA:messageInfo}\] ## (''|%{QUOTEDSTRING:throwable})" } }
  }
}

output {
  stdout { codec => rubydebug }
  if "app-log" in [fields][logtopic] {
    elasticsearch {
      hosts => ["192.168.11.35:9200"]
      user => "elastic"
      password => "123456"
      index => "app-log-%{[fields][logbiz]}-%{index_time}"
      sniffing => true
      template_overwrite => true
    }
  }
  if "error-log" in [fields][logtopic] {
    elasticsearch {
      hosts => ["192.168.11.35:9200"]
      user => "elastic"
      password => "123456"
      index => "error-log-%{[fields][logbiz]}-%{index_time}"
      sniffing => true
      template_overwrite => true
    }
  }
}

Start Logstash with

/usr/local/logstash-6.6.0/bin/logstash -f /usr/local/logstash-6.6.0/script/logstash-script.conf &

and verify logs appear in the console.

Elasticsearch & Kibana

After installing ES and Kibana, create index patterns such as app-log-* with currentDateTime as the time filter field. Access Kibana at 192.168.11.35:5601 to view the collected logs, which now include full MDC information.

By following these steps, a complete log collection, processing, and visualization pipeline is built.

Original Source

Signed-in readers can open the original source through BestHub's protected redirect.

Sign in to view source
Republication Notice

This article has been distilled and summarized from source material, then republished for learning and reference. If you believe it infringes your rights, please contactadmin@besthub.devand we will review it promptly.

ElasticsearchKafkaSpringBootlog4j2LogstashKibanaFilebeat
Top Architect
Written by

Top Architect

Top Architect focuses on sharing practical architecture knowledge, covering enterprise, system, website, large‑scale distributed, and high‑availability architectures, plus architecture adjustments using internet technologies. We welcome idea‑driven, sharing‑oriented architects to exchange and learn together.

0 followers
Reader feedback

How this landed with the community

Sign in to like

Rate this article

Was this worth your time?

Sign in to rate
Discussion

0 Comments

Thoughtful readers leave field notes, pushback, and hard-won operational detail here.