Big Data 6 min read

Simulating Log4j Logging with Flume, Kafka, and Spark Structured Streaming

This guide demonstrates how to generate Log4j logs in Java, configure Maven dependencies, set up log4j properties, launch Kafka and Flume agents, and process the streamed data in real time using Spark Structured Streaming for a complete big‑data logging pipeline.

Big Data Technology & Architecture
Big Data Technology & Architecture
Big Data Technology & Architecture
Simulating Log4j Logging with Flume, Kafka, and Spark Structured Streaming

This article provides a step‑by‑step tutorial for creating Log4j logs in Java and forwarding them through a Flume‑Kafka pipeline to be consumed by Spark Structured Streaming.

<dependency>
    <groupId>log4j</groupId>
    <artifactId>log4j</artifactId>
</dependency>
<dependency>
    <groupId>org.slf4j</groupId>
    <artifactId>slf4j-log4j12</artifactId>
</dependency>
<dependency>
    <groupId>org.apache.flume.flume-ng-clients</groupId>
    <artifactId>flume-ng-log4jappender</artifactId>
    <version>1.8.0</version>
</dependency>
public class LoggerGenerator {
    private static Logger logger = Logger.getLogger(LoggerGenerator.class.getName());
    public static void main(String[] args) throws Exception {
        int index = 0;
        while (true) {
            Thread.sleep(1000);
            logger.info("value : " + index++);
        }
        // $ kafka-topics.sh --list --zookeeper 127.0.0.1:2181
    }
}
log4j.rootLogger=INFO,stdout,flume

log4j.appender.stdout = org.apache.log4j.ConsoleAppender
log4j.appender.stdout.target = System.out
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss,SSS} [%t] [%c] [%p] - %m%n

log4j.appender.flume = org.apache.flume.clients.log4jappender.Log4jAppender
log4j.appender.flume.Hostname = 127.0.0.1
log4j.appender.flume.Port = 44444
log4j.appender.flume.UnsafeMode = true

Start the Kafka broker and create a topic (optional):

$ kafka-server-start.sh $KAFKA_HOME/config/server.properties
$ kafka-topics.sh --create --zookeeper 127.0.0.1:2181 --replication-factor 1 --partitions 1 --topic default_flume_topic

Configure Flume with an avro‑memory‑kafka agent:

# avro-memory-kafka.conf
avro-memory-kafka.sources = avro-source
avro-memory-kafka.sinks = kafka-sink
avro-memory-kafka.channels = momory-channel

avro-memory-kafka.sources.avro-source.type = avro
avro-memory-kafka.sources.avro-source.bind = 127.0.0.1
avro-memory-kafka.sources.avro-source.port = 44444

avro-memory-kafka.sinks.kafka-sink.type = org.apache.flume.sink.kafka.KafkaSink
avro-memory-kafka.sinks.kafka-sink.kafka.bootstrap.servers = 127.0.0.1:9092
avro-memory-kafka.sinks.kafka-sink.kafka.topic = default_flume_topic

avro-memory-kafka.channels.momory-channel.type = memory
avro-memory-kafka.channels.momory-channel.capacity = 1000
avro-memory-kafka.channels.momory-channel.transactionCapacity = 100

avro-memory-kafka.sources.avro-source.channels = momory-channel
avro-memory-kafka.sinks.kafka-sink.channel = momory-channel

Launch Flume and verify the data with a Kafka console consumer:

$ nohup flume-ng agent --conf conf --conf-file conf/avro-memory-kafka.conf --name avro-memory-kafka > avro-memory-kafka.out 2>&1 &
$ kafka-console-consumer.sh --bootstrap-server 127.0.0.1:9092 --topic default_flume_topic --from-beginning --new-consumer

Finally, process the streamed logs in real time using Spark Structured Streaming:

topic = 'kafka_streaming_topic'
brokers = "127.0.0.1:9092"

spark = SparkSession.builder.appName("log4j-flume-kafka-structured-streaming").getOrCreate()

lines = spark.readStream.format("kafka").option("kafka.bootstrap.servers", brokers).option("subscribe", topic).option("startingOffsets", """{"%s":{"0": 7}}""" % topic).load().selectExpr("CAST(value AS STRING)")

words = lines.select(explode(split(lines.value, ' ')).alias('word'))
word_counts = words.groupBy('word').count()

query = word_counts.writeStream.outputMode("complete").format("console").start()
query.awaitTermination()

By following these steps, you can generate Log4j output, transport it through Flume to Kafka, and perform real‑time analytics with Spark, illustrating a complete end‑to‑end big‑data logging solution.

Original Source

Signed-in readers can open the original source through BestHub's protected redirect.

Sign in to view source
Republication Notice

This article has been distilled and summarized from source material, then republished for learning and reference. If you believe it infringes your rights, please contactadmin@besthub.devand we will review it promptly.

log4jStructured Streaming
Big Data Technology & Architecture
Written by

Big Data Technology & Architecture

Wang Zhiwu, a big data expert, dedicated to sharing big data technology.

0 followers
Reader feedback

How this landed with the community

Sign in to like

Rate this article

Was this worth your time?

Sign in to rate
Discussion

0 Comments

Thoughtful readers leave field notes, pushback, and hard-won operational detail here.