Build a Real-Time Log Pipeline with SpringBoot, Kafka, Filebeat, Logstash and Kibana

This guide walks through setting up a complete log‑collection and visualization pipeline—preparing servers, configuring a SpringBoot project with Log4j2, deploying Kafka, installing Filebeat, creating Logstash pipelines, and visualizing logs in Elasticsearch and Kibana—so you can monitor application logs in real time.

macrozheng
macrozheng
macrozheng
Build a Real-Time Log Pipeline with SpringBoot, Kafka, Filebeat, Logstash and Kibana

Server Preparation

List the server nodes that will host the services and ensure network connectivity.

SpringBoot Project Setup

Replace the default SpringBoot logging with Log4j2. The pom.xml should include the following dependencies:

<dependencies>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-web</artifactId>
        <exclusions>
            <exclusion>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-logging</artifactId>
            </exclusion>
        </exclusions>
    </dependency>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-log4j2</artifactId>
    </dependency>
    <dependency>
        <groupId>com.lmax</groupId>
        <artifactId>disruptor</artifactId>
        <version>3.3.4</version>
    </dependency>
</dependencies>

The log4j2.xml configuration defines console and rolling file appenders, async loggers, and a root logger:

<Configuration status="INFO" schema="Log4J-V2.0.xsd" monitorInterval="600">
    <Properties>
        <Property name="LOG_HOME">logs</Property>
        <Property name="FILE_NAME">collector</Property>
        <Property name="patternLayout">[%d{yyyy-MM-dd'T'HH:mm:ss.SSSZZ}] [%level{length=5}] [%thread-%tid] [%logger] [%X{hostName}] [%X{ip}] [%X{applicationName}] [%F,%L,%C,%M] [%m] ## '%ex'%n</Property>
    </Properties>
    <Appenders>
        <Console name="CONSOLE" target="SYSTEM_OUT">
            <PatternLayout pattern="${patternLayout}"/>
        </Console>
        <RollingRandomAccessFile name="appAppender" fileName="${LOG_HOME}/app-${FILE_NAME}.log" filePattern="${LOG_HOME}/app-${FILE_NAME}-%d{yyyy-MM-dd}-%i.log">
            <PatternLayout pattern="${patternLayout}"/>
            <Policies>
                <TimeBasedTriggeringPolicy interval="1"/>
                <SizeBasedTriggeringPolicy size="500MB"/>
            </Policies>
            <DefaultRolloverStrategy max="20"/>
        </RollingRandomAccessFile>
        <RollingRandomAccessFile name="errorAppender" fileName="${LOG_HOME}/error-${FILE_NAME}.log" filePattern="${LOG_HOME}/error-${FILE_NAME}-%d{yyyy-MM-dd}-%i.log">
            <PatternLayout pattern="${patternLayout}"/>
            <Filters>
                <ThresholdFilter level="warn" onMatch="ACCEPT" onMismatch="DENY"/>
            </Filters>
            <Policies>
                <TimeBasedTriggeringPolicy interval="1"/>
                <SizeBasedTriggeringPolicy size="500MB"/>
            </Policies>
            <DefaultRolloverStrategy max="20"/>
        </RollingRandomAccessFile>
    </Appenders>
    <Loggers>
        <AsyncLogger name="com.bfxy.*" level="info" includeLocation="true">
            <AppenderRef ref="appAppender"/>
        </AsyncLogger>
        <AsyncLogger name="com.bfxy.*" level="info" includeLocation="true">
            <AppenderRef ref="errorAppender"/>
        </AsyncLogger>
        <Root level="info">
            <AppenderRef ref="CONSOLE"/>
            <AppenderRef ref="appAppender"/>
            <AppenderRef ref="errorAppender"/>
        </Root>
    </Loggers>
</Configuration>

Demo Controllers

IndexController provides two endpoints for testing logs:

@Slf4j
@RestController
public class IndexController {
    @RequestMapping(value = "/index")
    public String index() {
        InputMDC.putMDC();
        log.info("我是一条info日志");
        log.warn("我是一条warn日志");
        log.error("我是一条error日志");
        return "idx";
    }

    @RequestMapping(value = "/err")
    public String err() {
        InputMDC.putMDC();
        try {
            int a = 1/0;
        } catch (Exception e) {
            log.error("算术异常", e);
        }
        return "err";
    }
}

InputMDC populates MDC fields used in the log pattern:

@Component
public class InputMDC implements EnvironmentAware {
    private static Environment environment;
    @Override
    public void setEnvironment(Environment env) { environment = env; }
    public static void putMDC() {
        MDC.put("hostName", NetUtil.getLocalHostName());
        MDC.put("ip", NetUtil.getLocalIp());
        MDC.put("applicationName", environment.getProperty("spring.application.name"));
    }
}

NetUtil contains helper methods for IP/host handling (code omitted for brevity).

Kafka Installation and Configuration

Download Kafka from kafka.apache.org/downloads.html . After extracting, edit config/server.properties:

# broker configuration
broker.id=0
port=9092
host.name=192.168.11.51
advertised.host.name=192.168.11.51
log.dirs=/usr/local/kafka_2.12/kafka-logs
num.partitions=2
zookeeper.connect=192.168.11.111:2181,192.168.11.112:2181,192.168.11.113:2181

Create log directory and start the broker:

mkdir /usr/local/kafka_2.12/kafka-logs
/usr/local/kafka_2.12/bin/kafka-server-start.sh /usr/local/kafka_2.12/config/server.properties &

Create two topics for the log collector:

kafka-topics.sh --zookeeper 192.168.11.111:2181 --create --topic app-log-collector --partitions 1 --replication-factor 1
kafka-topics.sh --zookeeper 192.168.11.111:2181 --create --topic error-log-collector --partitions 1 --replication-factor 1

Filebeat Installation and Configuration

Extract Filebeat and edit filebeat.yml to ship the two log files to Kafka:

filebeat.prospectors:
- input_type: log
  paths:
    - /usr/local/logs/app-collector.log
  document_type: "app-log"
  multiline:
    pattern: '^\['
    negate: true
    match: after
  fields:
    logbiz: collector
    logtopic: app-log-collector
    evn: dev

- input_type: log
  paths:
    - /usr/local/logs/error-collector.log
  document_type: "error-log"
  multiline:
    pattern: '^\['
    negate: true
    match: after
  fields:
    logbiz: collector
    logtopic: error-log-collector
    evn: dev

output.kafka:
  enabled: true
  hosts: ["192.168.11.51:9092"]
  topic: "%{[fields.logtopic]}"
  compression: gzip
  max_message_bytes: 1000000
  required_acks: 1

Test the configuration and start Filebeat:

cd /usr/local/filebeat-6.6.0
./filebeat -c filebeat.yml -configtest   # should output "Config OK"
./filebeat &
ps -ef | grep filebeat   # verify the process is running

Logstash Pipeline

Create logstash-script.conf to consume the two Kafka topics, parse the log lines with Grok, add a local time field, and ship the events to Elasticsearch:

input {
  kafka {
    topics_pattern => "app-log-.*"
    bootstrap_servers => "192.168.11.51:9092"
    codec => json
    consumer_threads => 1
    group_id => "app-log-group"
  }
  kafka {
    topics_pattern => "error-log-.*"
    bootstrap_servers => "192.168.11.51:9092"
    codec => json
    consumer_threads => 1
    group_id => "error-log-group"
  }
}

filter {
  ruby { code => "event.set('index_time', event.timestamp.time.localtime.strftime('%Y.%m.%d'))" }
  if "app-log" in [fields][logtopic] {
    grok { match => ["message", "\[%{NOTSPACE:currentDateTime}\] \[%{NOTSPACE:level}\] \[%{NOTSPACE:thread-id}\] \[%{NOTSPACE:class}\] \[%{DATA:hostName}\] \[%{DATA:ip}\] \[%{DATA:applicationName}\] \[%{DATA:location}\] \[%{DATA:messageInfo}\] (''|%{QUOTEDSTRING:throwable})"] }
  }
  if "error-log" in [fields][logtopic] {
    grok { match => ["message", "\[%{NOTSPACE:currentDateTime}\] \[%{NOTSPACE:level}\] \[%{NOTSPACE:thread-id}\] \[%{NOTSPACE:class}\] \[%{DATA:hostName}\] \[%{DATA:ip}\] \[%{DATA:applicationName}\] \[%{DATA:location}\] \[%{DATA:messageInfo}\] (''|%{QUOTEDSTRING:throwable})"] }
  }
}

output {
  stdout { codec => rubydebug }
  if "app-log" in [fields][logtopic] {
    elasticsearch {
      hosts => ["192.168.11.35:9200"]
      user => "elastic"
      password => "123456"
      index => "app-log-%{[fields][logbiz]}-%{index_time}"
      sniffing => true
      template_overwrite => true
    }
  }
  if "error-log" in [fields][logtopic] {
    elasticsearch {
      hosts => ["192.168.11.35:9200"]
      user => "elastic"
      password => "123456"
      index => "error-log-%{[fields][logbiz]}-%{index_time}"
      sniffing => true
      template_overwrite => true
    }
  }
}

Start Logstash:

/usr/local/logstash-6.6.0/bin/logstash -f /usr/local/logstash-6.6.0/script/logstash-script.conf &

Elasticsearch & Kibana

After Elasticsearch and Kibana are up, create an index pattern app-log-* in Kibana, using currentDateTime as the time filter field. The logs from both the app and error collectors will appear in the Discover view, providing full‑field visibility.

Visiting the SpringBoot endpoints ( /index and /err) now generates logs that flow through Filebeat → Kafka → Logstash → Elasticsearch and are visualized in Kibana.

Original Source

Signed-in readers can open the original source through BestHub's protected redirect.

Sign in to view source
Republication Notice

This article has been distilled and summarized from source material, then republished for learning and reference. If you believe it infringes your rights, please contactadmin@besthub.devand we will review it promptly.

ElasticsearchKafkaSpringBootLogstashFilebeat
macrozheng
Written by

macrozheng

Dedicated to Java tech sharing and dissecting top open-source projects. Topics include Spring Boot, Spring Cloud, Docker, Kubernetes and more. Author’s GitHub project “mall” has 50K+ stars.

0 followers
Reader feedback

How this landed with the community

Sign in to like

Rate this article

Was this worth your time?

Sign in to rate
Discussion

0 Comments

Thoughtful readers leave field notes, pushback, and hard-won operational detail here.