Big Data 9 min read

Integrating Log4j, Flume, Kafka, and Spark Streaming for Real‑Time Data Processing

This tutorial demonstrates how to configure Log4j for simulated logging, collect the logs with Flume, forward them to Kafka via a Flume KafkaSink, and finally consume the stream using Spark Streaming, providing a complete end‑to‑end big‑data pipeline example.

Big Data Technology & Architecture
Big Data Technology & Architecture
Big Data Technology & Architecture
Integrating Log4j, Flume, Kafka, and Spark Streaming for Real‑Time Data Processing

This article walks through building a real‑time data pipeline using Log4j, Flume, Kafka, and Spark Streaming.

1. Log4j configuration – Set up a simple console appender in log4j.properties:

log4j.rootLogger = INFO,stdout
log4j.appender.stdout = org.apache.log4j.ConsoleAppender
log4j.appender.stdout.target = System.out
log4j.appender.stdout.layout = org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern = %d{yyyy-MM-dd HH:mm:ss,SSS} [%t] [%c] [%p] - %m%n

Write a Java class LoggerGenerator that emits an incremental value every second:

import org.apache.log4j.Logger;

/** Simulated log generator */
public class LoggerGenerator {
    private static Logger logger = Logger.getLogger(LoggerGenerator.class.getName());
    public static void main(String[] args) throws Exception {
        int index = 0;
        while (true) {
            Thread.sleep(1000);
            logger.info("value:" + index++);
        }
    }
}

Running the program produces output such as:

2020-03-07 18:21:37,637 [main] [LoggerGenerator] [INFO] - current value is:0
2020-03-07 18:21:38,639 [main] [LoggerGenerator] [INFO] - current value is:1
...

2. Flume collection – Create streaming.conf to define an Avro source, a memory channel, and a logger sink:

agent1.sources=avro-source
agent1.channels=logger-channel
agent1.sinks=log-sink
# define source
agent1.sources.avro-source.type=avro
agent1.sources.avro-source.bind=0.0.0.0
agent1.sources.avro-source.port=41414
# define channel
agent1.channels.logger-channel.type=memory
# define sink
agent1.sinks.log-sink.type=logger
agent1.sources.avro-source.channels=logger-channel
agent1.sinks.log-sink.channel=logger-channel

Start Flume with:

flume-ng agent \
  --conf $FLUME_HOME/conf \
  --conf-file $FLUME_HOME/conf/streaming.conf \
  --name agent1 \
  -Dflume.root.logger=INFO,console

Add the Flume Log4j appender dependency to pom.xml:

<dependency>
  <groupId>org.apache.flume.flume-ng-clients</groupId>
  <artifactId>flume-ng-log4jappender</artifactId>
  <version>1.6.0</version>
</dependency>

Update log4j.properties so Log4j also sends events to Flume:

log4j.rootLogger = INFO,stdout,flume
log4j.appender.stdout = org.apache.log4j.ConsoleAppender
log4j.appender.stdout.target = System.out
log4j.appender.stdout.layout = org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern = %d{yyyy-MM-dd HH:mm:ss,SSS} [%t] [%c] [%p] - %m%n
log4j.appender.flume = org.apache.flume.clients.log4jappender.Log4jAppender
log4j.appender.flume.Hostname = hadoop000
log4j.appender.flume.Port = 41414
log4j.appender.flume.UnsafeMode = true

After restarting Log4j, Flume reports successful collection.

3. Kafka integration – Start Zookeeper and Kafka:

./zkServer.sh start
./kafka-server-start.sh -daemon /home/hadoop/app/kafka_2.11-0.9.0.0/config/server.properties

List existing topics and create a new one:

kafka-topics.sh --list --zookeeper hadoop000:2181
kafka-topics.sh --create \
  --zookeeper hadoop000:2181 \
  --replication-factor 1 \
  --partitions 1 \
  --topic tp_streamingtopic

Configure a Flume KafkaSink in streaming2.conf:

agent1.sources=avro-source
agent1.channels=logger-channel
agent1.sinks=kafka-sink
# source
agent1.sources.avro-source.type=avro
agent1.sources.avro-source.bind=0.0.0.0
agent1.sources.avro-source.port=41414
# channel
agent1.channels.logger-channel.type=memory
# sink
agent1.sinks.kafka-sink.type=org.apache.flume.sink.kafka.KafkaSink
agent1.sinks.kafka-sink.topic = tp_streamingtopic
agent1.sinks.kafka-sink.brokerList = hadoop000:9092
agent1.sinks.kafka-sink.requiredAcks = 1
agent1.sinks.kafka-sink.batchSize = 20
agent1.sources.avro-source.channels=logger-channel
agent1.sinks.kafka-sink.channel=logger-channel

Start Flume with the new configuration:

flume-ng agent \
  --conf $FLUME_HOME/conf \
  --conf-file $FLUME_HOME/conf/streaming2.conf \
  --name agent1 \
  -Dflume.root.logger=INFO,console

Consume the data from Kafka to verify the pipeline:

./kafka-console-consumer.sh --zookeeper hadoop000:2181 --topic tp_streamingtopic

4. Spark Streaming consumption – A minimal Spark application that reads from Kafka and performs word count:

package com.taipark.spark

import kafka.serializer.StringDecoder
import org.apache.spark.SparkConf
import org.apache.spark.streaming.kafka.KafkaUtils
import org.apache.spark.streaming.{Seconds, StreamingContext}

object KafkaStreamingApp {
  def main(args: Array[String]): Unit = {
    if (args.length != 2) {
      System.err.println("Usage: KafkaStreamingApp <brokers> <topics>")
      System.exit(1)
    }
    val Array(brokers, topics) = args
    val sparkConf = new SparkConf().setAppName("KafkaReceiverWordCount").setMaster("local[2]")
    val ssc = new StreamingContext(sparkConf, Seconds(5))
    val kafkaParams = Map[String, String]("metadata.broker.list" -> brokers)
    val topicSet = topics.split(",").toSet
    val messages = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topicSet)
    messages.map(_._2).flatMap(_.split(" ")).map((_, 1)).reduceByKey(_ + _).print()
    ssc.start()
    ssc.awaitTermination()
  }
}

Run the Spark job with the broker address and the topic name, and you will see the streamed word‑count results.

The article concludes with a reminder to like, bookmark, and share the tutorial.

Original Source

Signed-in readers can open the original source through BestHub's protected redirect.

Sign in to view source
Republication Notice

This article has been distilled and summarized from source material, then republished for learning and reference. If you believe it infringes your rights, please contactadmin@besthub.devand we will review it promptly.

log4jbig-dataspark-streaming
Big Data Technology & Architecture
Written by

Big Data Technology & Architecture

Wang Zhiwu, a big data expert, dedicated to sharing big data technology.

0 followers
Reader feedback

How this landed with the community

Sign in to like

Rate this article

Was this worth your time?

Sign in to rate
Discussion

0 Comments

Thoughtful readers leave field notes, pushback, and hard-won operational detail here.