Integrating Log4j, Flume, Kafka, and Spark Streaming for Real‑Time Data Processing
This tutorial demonstrates how to configure Log4j for simulated logging, collect the logs with Flume, forward them to Kafka via a Flume KafkaSink, and finally consume the stream using Spark Streaming, providing a complete end‑to‑end big‑data pipeline example.
This article walks through building a real‑time data pipeline using Log4j, Flume, Kafka, and Spark Streaming.
1. Log4j configuration – Set up a simple console appender in log4j.properties:
log4j.rootLogger = INFO,stdout
log4j.appender.stdout = org.apache.log4j.ConsoleAppender
log4j.appender.stdout.target = System.out
log4j.appender.stdout.layout = org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern = %d{yyyy-MM-dd HH:mm:ss,SSS} [%t] [%c] [%p] - %m%nWrite a Java class LoggerGenerator that emits an incremental value every second:
import org.apache.log4j.Logger;
/** Simulated log generator */
public class LoggerGenerator {
private static Logger logger = Logger.getLogger(LoggerGenerator.class.getName());
public static void main(String[] args) throws Exception {
int index = 0;
while (true) {
Thread.sleep(1000);
logger.info("value:" + index++);
}
}
}Running the program produces output such as:
2020-03-07 18:21:37,637 [main] [LoggerGenerator] [INFO] - current value is:0
2020-03-07 18:21:38,639 [main] [LoggerGenerator] [INFO] - current value is:1
...2. Flume collection – Create streaming.conf to define an Avro source, a memory channel, and a logger sink:
agent1.sources=avro-source
agent1.channels=logger-channel
agent1.sinks=log-sink
# define source
agent1.sources.avro-source.type=avro
agent1.sources.avro-source.bind=0.0.0.0
agent1.sources.avro-source.port=41414
# define channel
agent1.channels.logger-channel.type=memory
# define sink
agent1.sinks.log-sink.type=logger
agent1.sources.avro-source.channels=logger-channel
agent1.sinks.log-sink.channel=logger-channelStart Flume with:
flume-ng agent \
--conf $FLUME_HOME/conf \
--conf-file $FLUME_HOME/conf/streaming.conf \
--name agent1 \
-Dflume.root.logger=INFO,consoleAdd the Flume Log4j appender dependency to pom.xml:
<dependency>
<groupId>org.apache.flume.flume-ng-clients</groupId>
<artifactId>flume-ng-log4jappender</artifactId>
<version>1.6.0</version>
</dependency>Update log4j.properties so Log4j also sends events to Flume:
log4j.rootLogger = INFO,stdout,flume
log4j.appender.stdout = org.apache.log4j.ConsoleAppender
log4j.appender.stdout.target = System.out
log4j.appender.stdout.layout = org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern = %d{yyyy-MM-dd HH:mm:ss,SSS} [%t] [%c] [%p] - %m%n
log4j.appender.flume = org.apache.flume.clients.log4jappender.Log4jAppender
log4j.appender.flume.Hostname = hadoop000
log4j.appender.flume.Port = 41414
log4j.appender.flume.UnsafeMode = trueAfter restarting Log4j, Flume reports successful collection.
3. Kafka integration – Start Zookeeper and Kafka:
./zkServer.sh start ./kafka-server-start.sh -daemon /home/hadoop/app/kafka_2.11-0.9.0.0/config/server.propertiesList existing topics and create a new one:
kafka-topics.sh --list --zookeeper hadoop000:2181 kafka-topics.sh --create \
--zookeeper hadoop000:2181 \
--replication-factor 1 \
--partitions 1 \
--topic tp_streamingtopicConfigure a Flume KafkaSink in streaming2.conf:
agent1.sources=avro-source
agent1.channels=logger-channel
agent1.sinks=kafka-sink
# source
agent1.sources.avro-source.type=avro
agent1.sources.avro-source.bind=0.0.0.0
agent1.sources.avro-source.port=41414
# channel
agent1.channels.logger-channel.type=memory
# sink
agent1.sinks.kafka-sink.type=org.apache.flume.sink.kafka.KafkaSink
agent1.sinks.kafka-sink.topic = tp_streamingtopic
agent1.sinks.kafka-sink.brokerList = hadoop000:9092
agent1.sinks.kafka-sink.requiredAcks = 1
agent1.sinks.kafka-sink.batchSize = 20
agent1.sources.avro-source.channels=logger-channel
agent1.sinks.kafka-sink.channel=logger-channelStart Flume with the new configuration:
flume-ng agent \
--conf $FLUME_HOME/conf \
--conf-file $FLUME_HOME/conf/streaming2.conf \
--name agent1 \
-Dflume.root.logger=INFO,consoleConsume the data from Kafka to verify the pipeline:
./kafka-console-consumer.sh --zookeeper hadoop000:2181 --topic tp_streamingtopic4. Spark Streaming consumption – A minimal Spark application that reads from Kafka and performs word count:
package com.taipark.spark
import kafka.serializer.StringDecoder
import org.apache.spark.SparkConf
import org.apache.spark.streaming.kafka.KafkaUtils
import org.apache.spark.streaming.{Seconds, StreamingContext}
object KafkaStreamingApp {
def main(args: Array[String]): Unit = {
if (args.length != 2) {
System.err.println("Usage: KafkaStreamingApp <brokers> <topics>")
System.exit(1)
}
val Array(brokers, topics) = args
val sparkConf = new SparkConf().setAppName("KafkaReceiverWordCount").setMaster("local[2]")
val ssc = new StreamingContext(sparkConf, Seconds(5))
val kafkaParams = Map[String, String]("metadata.broker.list" -> brokers)
val topicSet = topics.split(",").toSet
val messages = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topicSet)
messages.map(_._2).flatMap(_.split(" ")).map((_, 1)).reduceByKey(_ + _).print()
ssc.start()
ssc.awaitTermination()
}
}Run the Spark job with the broker address and the topic name, and you will see the streamed word‑count results.
The article concludes with a reminder to like, bookmark, and share the tutorial.
Signed-in readers can open the original source through BestHub's protected redirect.
This article has been distilled and summarized from source material, then republished for learning and reference. If you believe it infringes your rights, please contactand we will review it promptly.
Big Data Technology & Architecture
Wang Zhiwu, a big data expert, dedicated to sharing big data technology.
How this landed with the community
Was this worth your time?
0 Comments
Thoughtful readers leave field notes, pushback, and hard-won operational detail here.
