End-to-End Real-Time Web Log Processing with Flume, Kafka, Spark Streaming, HBase, and Spring Boot
This tutorial demonstrates how to generate simulated web access logs in Python, schedule them with Crontab, collect them in real time using Flume, forward them to Kafka, process the streams with Spark Streaming, store results in HBase, and visualize the data via a Spring Boot application with ECharts.
1. Requirement Description
The goal is to count total page views and search‑engine‑referral views for a website, and to store the results for front‑end visualization.
1.1 Requirements
Track overall page visits, visits from search engines, and provide a diagram of the overall architecture.
1.2 User Behavior Log Content
2. Simulated Log Data Generation
Use Python to generate synthetic logs containing URL paths, referrer URLs, search keywords, status codes, and IP addresses.
#coding=UTF-8</code><code>import random</code><code>import time</code><code></code><code>url_paths = [</code><code> "class/112.html",</code><code> "class/128.html",</code><code> "class/145.html",</code><code> "class/146.html",</code><code> "class/131.html",</code><code> "class/130.html",</code><code> "class/145.html",</code><code> "learn/821.html",</code><code> "learn/825.html",</code><code> "course/list"</code><code>]</code><code></code><code>http_refers=[</code><code> "http://www.baidu.com/s?wd={query}",</code><code> "https://www.sogou.com/web?query={query}",</code><code> "http://cn.bing.com/search?q={query}",</code><code> "http://search.yahoo.com/search?p={query}",</code><code>]</code><code></code><code>search_keyword = [</code><code> "Spark+Sql",</code><code> "Hadoop",</code><code> "Storm",</code><code> "Spark+Streaming",</code><code> "大数据",</code><code> "面试"</code><code>]</code><code></code><code>status_codes = ["200","404","500"]</code><code></code><code>ip_slices = [132,156,132,10,29,145,44,30,21,43,1,7,9,23,55,56,241,134,155,163,172,144,158]</code><code></code><code>def sample_url():</code><code> return random.sample(url_paths,1)[0]</code><code></code><code>def sample_ip():</code><code> slice = random.sample(ip_slices,4)</code><code> return ".".join([str(item) for item in slice])</code><code></code><code>def sample_refer():</code><code> if random.uniform(0,1) > 0.2:</code><code> return "-"</code><code> refer_str = random.sample(http_refers,1)</code><code> query_str = random.sample(search_keyword,1)</code><code> return refer_str[0].format(query=query_str[0])</code><code></code><code>def sample_status():</code><code> return random.sample(status_codes,1)[0]</code><code></code><code>def generate_log(count = 10):</code><code> time_str = time.strftime("%Y-%m-%d %H:%M:%S",time.localtime())</code><code></code><code> f = open("/home/hadoop/tpdata/project/logs/access.log","w+")</code><code> while count >= 1:</code><code> query_log = "{ip}\t{local_time}\t\"GET /{url} HTTP/1.1\"\t{status}\t{refer}".format(</code><code> local_time=time_str,</code><code> url=sample_url(),</code><code> ip=sample_ip(),</code><code> refer=sample_refer(),</code><code> status=sample_status())</code><code> print(query_log)</code><code> f.write(query_log + "
")</code><code> count = count - 1</code><code></code><code>if __name__ == '__main__':</code><code> generate_log(100)Schedule the script to run every minute using Linux Crontab: */1 * * * Create a shell wrapper:
vi log_generator.sh</code><code>python /home/hadoop/tpdata/log.py</code><code>chmod u+x log_generator.shAdd the job to Crontab:
crontab -e</code><code>*/1 * * * * /home/hadoop/tpdata/project/log_generator.sh3. Flume Real‑Time Log Collection
Configure Flume to tail the generated log file:
exec-memory-logger.sources = exec-source</code><code>exec-memory-logger.sinks = logger-sink</code><code>exec-memory-logger.channels = memory-channel</code><code></code><code>exec-memory-logger.sources.exec-source.type = exec</code><code>exec-memory-logger.sources.exec-source.command = tail -F /home/hadoop/tpdata/project/logs/access.log</code><code>exec-memory-logger.sources.exec-source.shell = /bin/sh -c</code><code></code><code>exec-memory-logger.channels.memory-channel.type = memory</code><code></code><code>exec-memory-logger.sinks.logger-sink.type = logger</code><code></code><code>exec-memory-logger.sources.exec-source.channels = memory-channel</code><code>exec-memory-logger.sinks.logger-sink.channel = memory-channelStart Flume:
flume-ng agent \</code><code>--name exec-memory-logger \</code><code>--conf $FLUME_HOME/conf \</code><code>--conf-file /home/hadoop/tpdata/project/streaming_project.conf \</code><code>-Dflume.root.logger=INFO,console3.1 Connect Flume to Kafka
Modify the sink to use the Kafka sink:
exec-memory-kafka.sources = exec-source</code><code>exec-memory-kafka.sinks = kafka-sink</code><code>exec-memory-kafka.channels = memory-channel</code><code></code><code>exec-memory-kafka.sources.exec-source.type = exec</code><code>exec-memory-kafka.sources.exec-source.command = tail -F /home/hadoop/tpdata/project/logs/access.log</code><code>exec-memory-kafka.sources.exec-source.shell = /bin/sh -c</code><code></code><code>exec-memory-kafka.channels.memory-channel.type = memory</code><code></code><code>exec-memory-kafka.sinks.kafka-sink.type = org.apache.flume.sink.kafka.KafkaSink</code><code>exec-memory-kafka.sinks.kafka-sink.brokerList = hadoop000:9092</code><code>exec-memory-kafka.sinks.kafka-sink.topic = streamingtopic</code><code>exec-memory-kafka.sinks.kafka-sink.batchSize = 5</code><code>exec-memory-kafka.sinks.kafka-sink.requiredAcks = 1</code><code></code><code>exec-memory-kafka.sources.exec-source.channels = memory-channel</code><code>exec-memory-kafka.sinks.kafka-sink.channel = memory-channelStart the updated Flume agent:
flume-ng agent \</code><code>--name exec-memory-kafka \</code><code>--conf $FLUME_HOME/conf \</code><code>--conf-file /home/hadoop/tpdata/project/streaming_project2.conf \</code><code>-Dflume.root.logger=INFO,console4. Spark Streaming Integration with Kafka
pom.xml dependencies (Spark 2.2.0, Scala 2.11, Kafka 0.9.0.0, Flume integration, etc.) are defined to build the Scala application.
4.1 Spark Streaming Application (Direct Kafka)
package com.taipark.spark.project
import kafka.serializer.StringDecoder
import org.apache.spark.SparkConf
import org.apache.spark.streaming.kafka.KafkaUtils
import org.apache.spark.streaming.{Seconds, StreamingContext}
object WebStatStreamingApp {
def main(args: Array[String]): Unit = {
if(args.length != 2){
System.err.println("Usage: WebStatStreamingApp <brokers> <topics>")
System.exit(1)
}
val Array(brokers,topics) = args
val sparkConf = new SparkConf().setAppName("WebStatStreamingApp").setMaster("local[2]")
val ssc = new StreamingContext(sparkConf,Seconds(60))
val kafkaParams = Map[String,String]("metadata.broker.list" -> brokers)
val topicSet = topics.split(",").toSet
val messages = KafkaUtils.createDirectStream[String,String,StringDecoder,StringDecoder](ssc,kafkaParams,topicSet)
messages.map(_._2).count().print()
ssc.start()
ssc.awaitTermination()
}
}4.2 ETL Logic
Parse each log line, extract IP, timestamp, URL, status code, and referrer, then convert timestamps to minute granularity.
package com.taipark.spark.project.utils
import java.util.Date
import org.apache.commons.lang3.time.FastDateFormat
object DateUtils {
val YYYYMMDDHHMMSS_FORMAT = FastDateFormat.getInstance("yyyy-MM-dd HH:mm:ss")
val TARGET_FORMAT = FastDateFormat.getInstance("yyyyMMddHHmmss")
def getTime(time:String) = YYYYMMDDHHMMSS_FORMAT.parse(time).getTime
def parseToMinute(time:String) = TARGET_FORMAT.format(new Date(getTime(time)))
}Define case classes for cleaned logs and HBase entities:
case class ClickLog(ip:String,time:String,courseId:Int,statusCode:Int,referer:String)
case class CourseClickCount(day_course:String,click_count:Long)
case class CourseSearchClickCount(day_search_course:String,click_count:Long)4.3 Write Results to HBase
Java utility for HBase operations (singleton pattern):
package com.taipark.spark.project.utils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.client.HBaseAdmin;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.util.Bytes;
import java.io.IOException;
public class HBaseUtils {
private HBaseAdmin admin = null;
private Configuration configuration = null;
private HBaseUtils(){
configuration = new Configuration();
configuration.set("hbase.zookeeper.quorum","hadoop000:2181");
configuration.set("hbase.rootdir","hdfs://hadoop000:8020/hbase");
try { admin = new HBaseAdmin(configuration); } catch (IOException e) { e.printStackTrace(); }
}
private static HBaseUtils instance = null;
public static synchronized HBaseUtils getInstance(){
if(instance == null){ instance = new HBaseUtils(); }
return instance;
}
public HTable getTable(String tableName){
try { return new HTable(configuration,tableName); } catch (IOException e) { e.printStackTrace(); return null; }
}
public void put(String tableName,String rowkey,String cf,String column,String value){
HTable table = getTable(tableName);
Put put = new Put(Bytes.toBytes(rowkey));
put.add(Bytes.toBytes(cf),Bytes.toBytes(column),Bytes.toBytes(value));
try { table.put(put); } catch (IOException e) { e.printStackTrace(); }
}
}Scala DAO for batch increment:
package com.taipark.spark.project.dao
import com.taipark.spark.project.domian.CourseClickCount
import com.taipark.spark.project.utils.HBaseUtils
import org.apache.hadoop.hbase.util.Bytes
import scala.collection.mutable.ListBuffer
object CourseClickCountDAO {
val tableName = "web_course_clickcount"
val cf = "info"
val qualifier = "click_count"
def save(list: ListBuffer[CourseClickCount]): Unit = {
val table = HBaseUtils.getInstance().getTable(tableName)
for(ele <- list){
table.incrementColumnValue(Bytes.toBytes(ele.day_course), Bytes.toBytes(cf), Bytes.toBytes(qualifier), ele.click_count)
}
}
def count(day_course:String): Long = {
val table = HBaseUtils.getInstance().getTable(tableName)
val get = new Get(Bytes.toBytes(day_course))
val value = table.get(get).getValue(cf.getBytes, qualifier.getBytes)
if (value == null) 0L else Bytes.toLong(value)
}
}4.4 Complete Spark Streaming Job
package com.taipark.spark.project.spark
import com.taipark.spark.project.dao.{CourseClickCountDAO, CourseSearchClickCountDAO}
import com.taipark.spark.project.domian.{ClickLog, CourseClickCount, CourseSearchClickCount}
import com.taipark.spark.project.utils.DateUtils
import kafka.serializer.StringDecoder
import org.apache.spark.SparkConf
import org.apache.spark.streaming.kafka.KafkaUtils
import org.apache.spark.streaming.{Seconds, StreamingContext}
import scala.collection.mutable.ListBuffer
object WebStatStreamingApp {
def main(args: Array[String]): Unit = {
if(args.length != 2){ System.err.println("Usage: WebStatStreamingApp <brokers> <topics>"); System.exit(1) }
val Array(brokers,topics) = args
val sparkConf = new SparkConf().setAppName("WebStatStreamingApp").setMaster("local[2]")
val ssc = new StreamingContext(sparkConf,Seconds(60))
val kafkaParams = Map[String,String]("metadata.broker.list" -> brokers)
val topicSet = topics.split(",").toSet
val messages = KafkaUtils.createDirectStream[String,String,StringDecoder,StringDecoder](ssc,kafkaParams,topicSet)
val logs = messages.map(_._2)
val cleanData = logs.map { line =>
val infos = line.split("\t")
val url = infos(2).split(" ")(1)
var courseId = 0
if(url.startsWith("/class")){
val courseIdHTML = url.split("/")(2)
courseId = courseIdHTML.substring(0,courseIdHTML.lastIndexOf(".")).toInt
}
ClickLog(infos(0), DateUtils.parseToMinute(infos(1)), courseId, infos(3).toInt, infos(4))
}.filter(_.courseId != 0)
// Requirement 1: total page views per day per course
cleanData.map(x => ((x.time.substring(0,8) + "_" + x.courseId),1))
.reduceByKey(_+_)
.foreachRDD { rdd =>
rdd.foreachPartition { partitionRecords =>
val list = new ListBuffer[CourseClickCount]
partitionRecords.foreach { case (key, cnt) => list.append(CourseClickCount(key,cnt)) }
CourseClickCountDAO.save(list)
}
}
// Requirement 2: search‑engine referral counts
cleanData.map { x =>
val referer = x.referer.replaceAll("//","/")
val splits = referer.split("/")
var host = ""
if(splits.length > 2) host = splits(1)
(host, x.courseId, x.time)
}.filter(_._1 != "")
.map { case (host, cid, time) => (time.substring(0,8) + "_" + host + "_" + cid, 1) }
.reduceByKey(_+_)
.foreachRDD { rdd =>
rdd.foreachPartition { partitionRecords =>
val list = new ListBuffer[CourseSearchClickCount]
partitionRecords.foreach { case (key, cnt) => list.append(CourseSearchClickCount(key,cnt)) }
CourseSearchClickCountDAO.save(list)
}
}
ssc.start()
ssc.awaitTermination()
}
}5. Production Deployment
Remove hard‑coded .setAppName and .setMaster for cluster submission, comment out Scala source directories in pom.xml, and package the jar.
Submit with spark‑submit, adding required packages and HBase jars:
./spark-submit \
--master local[5] \
--name WebStatStreamingApp \
--class com.taipark.spark.project.spark.WebStatStreamingApp \
--packages org.apache.spark:spark-streaming-kafka-0-8_2.11:2.2.0 \
--jars $(echo /home/hadoop/app/hbase-1.2.0-cdh5.7.0/lib/*.jar | tr ' ' ',') \
/home/hadoop/tplib/sparktrain-1.0.jar \
hadoop000:9092 streamingtopic6. Spring Boot Front‑End with ECharts
Build a Spring Boot project, add spring-boot-starter-thymeleaf and HBase client dependencies, and create a controller that returns JSON data for the chart.
package com.taipark.spark.web.spark;
import com.taipark.spark.web.dao.CourseClickDAO;
import com.taipark.spark.web.domain.CourseClickCount;
import net.sf.json.JSONArray;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.*;
import java.util.*;
@RestController
public class WebStatApp {
private static Map<String,String> courses = new HashMap<>();
static {
courses.put("112","某些外国人对中国有多不了解?");
courses.put("128","你认为有哪些失败的建筑?");
// ... other mappings ...
}
@Autowired
CourseClickDAO courseClickDAO;
@RequestMapping(value = "/course_clickcount_dynamic", method = RequestMethod.POST)
@ResponseBody
public List<CourseClickCount> courseClickCount() throws Exception {
List<CourseClickCount> list = courseClickDAO.query("20200311");
for(CourseClickCount model : list){
model.setName(courses.get(model.getName().substring(9)));
}
return list;
}
@RequestMapping(value = "/echarts", method = RequestMethod.GET)
public ModelAndView echarts(){
return new ModelAndView("echarts");
}
}ECharts HTML page fetches the JSON via AJAX and renders a pie chart of page views per course.
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8">
<title>web_stat</title>
<script src="js/echarts.min.js"></script>
<script src="js/jquery-3.4.1.min.js"></script>
</head>
<body>
<div id="main" style="width:960px;height:540px;position:absolute;top:50%;left:50%;margin-top:-200px;margin-left:-300px;"></div>
<script type="text/javascript">
var myChart = echarts.init(document.getElementById('main'));
var option = {
title:{text:'某站点实时流处理访问量统计',subtext:'网页访问次数',left:'center'},
tooltip:{trigger:'item',formatter:'{a} <br/>{b} : {c} ({d}%)'},
legend:{orient:'vertical',left:'left'},
series:[{name:'访问次数',type:'pie',radius:'55%',center:['50%','60%'],data:(function(){
var datas=[];
$.ajax({type:"POST",url:"/taipark/course_clickcount_dynamic",dataType:"json",async:false,success:function(result){
for(var i=0;i<result.length;i++){
datas.push({value:result[i].value,name:result[i].name});
}
}});
return datas;
})(),emphasis:{itemStyle:{shadowBlur:10,shadowOffsetX:0,shadowColor:'rgba(0,0,0,0.5)'}}}]
};
myChart.setOption(option);
</script>
</body>
</html>Package the Spring Boot application as a jar and run it on the server: java -jar web-0.0.1.jar Access the UI to see the live pie chart of page views.
Conclusion
The end‑to‑end pipeline demonstrates log generation, scheduling, real‑time ingestion with Flume, message queuing with Kafka, stream processing and aggregation with Spark Streaming, persistent storage in HBase, and interactive visualization using a Spring Boot front‑end powered by ECharts.
Signed-in readers can open the original source through BestHub's protected redirect.
This article has been distilled and summarized from source material, then republished for learning and reference. If you believe it infringes your rights, please contactand we will review it promptly.
Big Data Technology & Architecture
Wang Zhiwu, a big data expert, dedicated to sharing big data technology.
How this landed with the community
Was this worth your time?
0 Comments
Thoughtful readers leave field notes, pushback, and hard-won operational detail here.
