Big Data 36 min read

End-to-End Real-Time Web Log Processing with Flume, Kafka, Spark Streaming, HBase, and Spring Boot

This tutorial demonstrates how to generate simulated web access logs in Python, schedule them with Crontab, collect them in real time using Flume, forward them to Kafka, process the streams with Spark Streaming, store results in HBase, and visualize the data via a Spring Boot application with ECharts.

Big Data Technology & Architecture
Big Data Technology & Architecture
Big Data Technology & Architecture
End-to-End Real-Time Web Log Processing with Flume, Kafka, Spark Streaming, HBase, and Spring Boot

1. Requirement Description

The goal is to count total page views and search‑engine‑referral views for a website, and to store the results for front‑end visualization.

1.1 Requirements

Track overall page visits, visits from search engines, and provide a diagram of the overall architecture.

1.2 User Behavior Log Content

2. Simulated Log Data Generation

Use Python to generate synthetic logs containing URL paths, referrer URLs, search keywords, status codes, and IP addresses.

#coding=UTF-8</code><code>import random</code><code>import time</code><code></code><code>url_paths = [</code><code>    "class/112.html",</code><code>    "class/128.html",</code><code>    "class/145.html",</code><code>    "class/146.html",</code><code>    "class/131.html",</code><code>    "class/130.html",</code><code>    "class/145.html",</code><code>    "learn/821.html",</code><code>    "learn/825.html",</code><code>    "course/list"</code><code>]</code><code></code><code>http_refers=[</code><code>    "http://www.baidu.com/s?wd={query}",</code><code>    "https://www.sogou.com/web?query={query}",</code><code>    "http://cn.bing.com/search?q={query}",</code><code>    "http://search.yahoo.com/search?p={query}",</code><code>]</code><code></code><code>search_keyword = [</code><code>    "Spark+Sql",</code><code>    "Hadoop",</code><code>    "Storm",</code><code>    "Spark+Streaming",</code><code>    "大数据",</code><code>    "面试"</code><code>]</code><code></code><code>status_codes = ["200","404","500"]</code><code></code><code>ip_slices = [132,156,132,10,29,145,44,30,21,43,1,7,9,23,55,56,241,134,155,163,172,144,158]</code><code></code><code>def sample_url():</code><code>    return random.sample(url_paths,1)[0]</code><code></code><code>def sample_ip():</code><code>    slice = random.sample(ip_slices,4)</code><code>    return ".".join([str(item) for item in slice])</code><code></code><code>def sample_refer():</code><code>    if random.uniform(0,1) > 0.2:</code><code>        return "-"</code><code>    refer_str = random.sample(http_refers,1)</code><code>    query_str = random.sample(search_keyword,1)</code><code>    return refer_str[0].format(query=query_str[0])</code><code></code><code>def sample_status():</code><code>    return random.sample(status_codes,1)[0]</code><code></code><code>def generate_log(count = 10):</code><code>    time_str = time.strftime("%Y-%m-%d %H:%M:%S",time.localtime())</code><code></code><code>    f = open("/home/hadoop/tpdata/project/logs/access.log","w+")</code><code>    while count >= 1:</code><code>        query_log = "{ip}\t{local_time}\t\"GET /{url} HTTP/1.1\"\t{status}\t{refer}".format(</code><code>            local_time=time_str,</code><code>            url=sample_url(),</code><code>            ip=sample_ip(),</code><code>            refer=sample_refer(),</code><code>            status=sample_status())</code><code>        print(query_log)</code><code>        f.write(query_log + "
")</code><code>        count = count - 1</code><code></code><code>if __name__ == '__main__':</code><code>    generate_log(100)

Schedule the script to run every minute using Linux Crontab: */1 * * * Create a shell wrapper:

vi log_generator.sh</code><code>python /home/hadoop/tpdata/log.py</code><code>chmod u+x log_generator.sh

Add the job to Crontab:

crontab -e</code><code>*/1 * * * * /home/hadoop/tpdata/project/log_generator.sh

3. Flume Real‑Time Log Collection

Configure Flume to tail the generated log file:

exec-memory-logger.sources = exec-source</code><code>exec-memory-logger.sinks = logger-sink</code><code>exec-memory-logger.channels = memory-channel</code><code></code><code>exec-memory-logger.sources.exec-source.type = exec</code><code>exec-memory-logger.sources.exec-source.command = tail -F /home/hadoop/tpdata/project/logs/access.log</code><code>exec-memory-logger.sources.exec-source.shell = /bin/sh -c</code><code></code><code>exec-memory-logger.channels.memory-channel.type = memory</code><code></code><code>exec-memory-logger.sinks.logger-sink.type = logger</code><code></code><code>exec-memory-logger.sources.exec-source.channels = memory-channel</code><code>exec-memory-logger.sinks.logger-sink.channel = memory-channel

Start Flume:

flume-ng agent \</code><code>--name exec-memory-logger \</code><code>--conf $FLUME_HOME/conf \</code><code>--conf-file /home/hadoop/tpdata/project/streaming_project.conf \</code><code>-Dflume.root.logger=INFO,console

3.1 Connect Flume to Kafka

Modify the sink to use the Kafka sink:

exec-memory-kafka.sources = exec-source</code><code>exec-memory-kafka.sinks = kafka-sink</code><code>exec-memory-kafka.channels = memory-channel</code><code></code><code>exec-memory-kafka.sources.exec-source.type = exec</code><code>exec-memory-kafka.sources.exec-source.command = tail -F /home/hadoop/tpdata/project/logs/access.log</code><code>exec-memory-kafka.sources.exec-source.shell = /bin/sh -c</code><code></code><code>exec-memory-kafka.channels.memory-channel.type = memory</code><code></code><code>exec-memory-kafka.sinks.kafka-sink.type = org.apache.flume.sink.kafka.KafkaSink</code><code>exec-memory-kafka.sinks.kafka-sink.brokerList = hadoop000:9092</code><code>exec-memory-kafka.sinks.kafka-sink.topic = streamingtopic</code><code>exec-memory-kafka.sinks.kafka-sink.batchSize = 5</code><code>exec-memory-kafka.sinks.kafka-sink.requiredAcks = 1</code><code></code><code>exec-memory-kafka.sources.exec-source.channels = memory-channel</code><code>exec-memory-kafka.sinks.kafka-sink.channel = memory-channel

Start the updated Flume agent:

flume-ng agent \</code><code>--name exec-memory-kafka \</code><code>--conf $FLUME_HOME/conf \</code><code>--conf-file /home/hadoop/tpdata/project/streaming_project2.conf \</code><code>-Dflume.root.logger=INFO,console

4. Spark Streaming Integration with Kafka

pom.xml dependencies (Spark 2.2.0, Scala 2.11, Kafka 0.9.0.0, Flume integration, etc.) are defined to build the Scala application.

4.1 Spark Streaming Application (Direct Kafka)

package com.taipark.spark.project

import kafka.serializer.StringDecoder
import org.apache.spark.SparkConf
import org.apache.spark.streaming.kafka.KafkaUtils
import org.apache.spark.streaming.{Seconds, StreamingContext}

object WebStatStreamingApp {
  def main(args: Array[String]): Unit = {
    if(args.length != 2){
      System.err.println("Usage: WebStatStreamingApp <brokers> <topics>")
      System.exit(1)
    }
    val Array(brokers,topics) = args
    val sparkConf = new SparkConf().setAppName("WebStatStreamingApp").setMaster("local[2]")
    val ssc = new StreamingContext(sparkConf,Seconds(60))
    val kafkaParams = Map[String,String]("metadata.broker.list" -> brokers)
    val topicSet = topics.split(",").toSet
    val messages = KafkaUtils.createDirectStream[String,String,StringDecoder,StringDecoder](ssc,kafkaParams,topicSet)
    messages.map(_._2).count().print()
    ssc.start()
    ssc.awaitTermination()
  }
}

4.2 ETL Logic

Parse each log line, extract IP, timestamp, URL, status code, and referrer, then convert timestamps to minute granularity.

package com.taipark.spark.project.utils

import java.util.Date
import org.apache.commons.lang3.time.FastDateFormat

object DateUtils {
  val YYYYMMDDHHMMSS_FORMAT = FastDateFormat.getInstance("yyyy-MM-dd HH:mm:ss")
  val TARGET_FORMAT = FastDateFormat.getInstance("yyyyMMddHHmmss")
  def getTime(time:String) = YYYYMMDDHHMMSS_FORMAT.parse(time).getTime
  def parseToMinute(time:String) = TARGET_FORMAT.format(new Date(getTime(time)))
}

Define case classes for cleaned logs and HBase entities:

case class ClickLog(ip:String,time:String,courseId:Int,statusCode:Int,referer:String)
case class CourseClickCount(day_course:String,click_count:Long)
case class CourseSearchClickCount(day_search_course:String,click_count:Long)

4.3 Write Results to HBase

Java utility for HBase operations (singleton pattern):

package com.taipark.spark.project.utils;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.client.HBaseAdmin;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.util.Bytes;
import java.io.IOException;

public class HBaseUtils {
    private HBaseAdmin admin = null;
    private Configuration configuration = null;
    private HBaseUtils(){
        configuration = new Configuration();
        configuration.set("hbase.zookeeper.quorum","hadoop000:2181");
        configuration.set("hbase.rootdir","hdfs://hadoop000:8020/hbase");
        try { admin = new HBaseAdmin(configuration); } catch (IOException e) { e.printStackTrace(); }
    }
    private static HBaseUtils instance = null;
    public static synchronized HBaseUtils getInstance(){
        if(instance == null){ instance = new HBaseUtils(); }
        return instance;
    }
    public HTable getTable(String tableName){
        try { return new HTable(configuration,tableName); } catch (IOException e) { e.printStackTrace(); return null; }
    }
    public void put(String tableName,String rowkey,String cf,String column,String value){
        HTable table = getTable(tableName);
        Put put = new Put(Bytes.toBytes(rowkey));
        put.add(Bytes.toBytes(cf),Bytes.toBytes(column),Bytes.toBytes(value));
        try { table.put(put); } catch (IOException e) { e.printStackTrace(); }
    }
}

Scala DAO for batch increment:

package com.taipark.spark.project.dao

import com.taipark.spark.project.domian.CourseClickCount
import com.taipark.spark.project.utils.HBaseUtils
import org.apache.hadoop.hbase.util.Bytes
import scala.collection.mutable.ListBuffer

object CourseClickCountDAO {
  val tableName = "web_course_clickcount"
  val cf = "info"
  val qualifier = "click_count"
  def save(list: ListBuffer[CourseClickCount]): Unit = {
    val table = HBaseUtils.getInstance().getTable(tableName)
    for(ele <- list){
      table.incrementColumnValue(Bytes.toBytes(ele.day_course), Bytes.toBytes(cf), Bytes.toBytes(qualifier), ele.click_count)
    }
  }
  def count(day_course:String): Long = {
    val table = HBaseUtils.getInstance().getTable(tableName)
    val get = new Get(Bytes.toBytes(day_course))
    val value = table.get(get).getValue(cf.getBytes, qualifier.getBytes)
    if (value == null) 0L else Bytes.toLong(value)
  }
}

4.4 Complete Spark Streaming Job

package com.taipark.spark.project.spark

import com.taipark.spark.project.dao.{CourseClickCountDAO, CourseSearchClickCountDAO}
import com.taipark.spark.project.domian.{ClickLog, CourseClickCount, CourseSearchClickCount}
import com.taipark.spark.project.utils.DateUtils
import kafka.serializer.StringDecoder
import org.apache.spark.SparkConf
import org.apache.spark.streaming.kafka.KafkaUtils
import org.apache.spark.streaming.{Seconds, StreamingContext}
import scala.collection.mutable.ListBuffer

object WebStatStreamingApp {
  def main(args: Array[String]): Unit = {
    if(args.length != 2){ System.err.println("Usage: WebStatStreamingApp <brokers> <topics>"); System.exit(1) }
    val Array(brokers,topics) = args
    val sparkConf = new SparkConf().setAppName("WebStatStreamingApp").setMaster("local[2]")
    val ssc = new StreamingContext(sparkConf,Seconds(60))
    val kafkaParams = Map[String,String]("metadata.broker.list" -> brokers)
    val topicSet = topics.split(",").toSet
    val messages = KafkaUtils.createDirectStream[String,String,StringDecoder,StringDecoder](ssc,kafkaParams,topicSet)
    val logs = messages.map(_._2)
    val cleanData = logs.map { line =>
      val infos = line.split("\t")
      val url = infos(2).split(" ")(1)
      var courseId = 0
      if(url.startsWith("/class")){
        val courseIdHTML = url.split("/")(2)
        courseId = courseIdHTML.substring(0,courseIdHTML.lastIndexOf(".")).toInt
      }
      ClickLog(infos(0), DateUtils.parseToMinute(infos(1)), courseId, infos(3).toInt, infos(4))
    }.filter(_.courseId != 0)
    // Requirement 1: total page views per day per course
    cleanData.map(x => ((x.time.substring(0,8) + "_" + x.courseId),1))
      .reduceByKey(_+_)
      .foreachRDD { rdd =>
        rdd.foreachPartition { partitionRecords =>
          val list = new ListBuffer[CourseClickCount]
          partitionRecords.foreach { case (key, cnt) => list.append(CourseClickCount(key,cnt)) }
          CourseClickCountDAO.save(list)
        }
      }
    // Requirement 2: search‑engine referral counts
    cleanData.map { x =>
      val referer = x.referer.replaceAll("//","/")
      val splits = referer.split("/")
      var host = ""
      if(splits.length > 2) host = splits(1)
      (host, x.courseId, x.time)
    }.filter(_._1 != "")
      .map { case (host, cid, time) => (time.substring(0,8) + "_" + host + "_" + cid, 1) }
      .reduceByKey(_+_)
      .foreachRDD { rdd =>
        rdd.foreachPartition { partitionRecords =>
          val list = new ListBuffer[CourseSearchClickCount]
          partitionRecords.foreach { case (key, cnt) => list.append(CourseSearchClickCount(key,cnt)) }
          CourseSearchClickCountDAO.save(list)
        }
      }
    ssc.start()
    ssc.awaitTermination()
  }
}

5. Production Deployment

Remove hard‑coded .setAppName and .setMaster for cluster submission, comment out Scala source directories in pom.xml, and package the jar.

Submit with spark‑submit, adding required packages and HBase jars:

./spark-submit \
--master local[5] \
--name WebStatStreamingApp \
--class com.taipark.spark.project.spark.WebStatStreamingApp \
--packages org.apache.spark:spark-streaming-kafka-0-8_2.11:2.2.0 \
--jars $(echo /home/hadoop/app/hbase-1.2.0-cdh5.7.0/lib/*.jar | tr ' ' ',') \
/home/hadoop/tplib/sparktrain-1.0.jar \
hadoop000:9092 streamingtopic

6. Spring Boot Front‑End with ECharts

Build a Spring Boot project, add spring-boot-starter-thymeleaf and HBase client dependencies, and create a controller that returns JSON data for the chart.

package com.taipark.spark.web.spark;

import com.taipark.spark.web.dao.CourseClickDAO;
import com.taipark.spark.web.domain.CourseClickCount;
import net.sf.json.JSONArray;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.*;
import java.util.*;

@RestController
public class WebStatApp {
    private static Map<String,String> courses = new HashMap<>();
    static {
        courses.put("112","某些外国人对中国有多不了解?");
        courses.put("128","你认为有哪些失败的建筑?");
        // ... other mappings ...
    }
    @Autowired
    CourseClickDAO courseClickDAO;
    @RequestMapping(value = "/course_clickcount_dynamic", method = RequestMethod.POST)
    @ResponseBody
    public List<CourseClickCount> courseClickCount() throws Exception {
        List<CourseClickCount> list = courseClickDAO.query("20200311");
        for(CourseClickCount model : list){
            model.setName(courses.get(model.getName().substring(9)));
        }
        return list;
    }
    @RequestMapping(value = "/echarts", method = RequestMethod.GET)
    public ModelAndView echarts(){
        return new ModelAndView("echarts");
    }
}

ECharts HTML page fetches the JSON via AJAX and renders a pie chart of page views per course.

<!DOCTYPE html>
<html lang="en">
<head>
    <meta charset="UTF-8">
    <title>web_stat</title>
    <script src="js/echarts.min.js"></script>
    <script src="js/jquery-3.4.1.min.js"></script>
</head>
<body>
    <div id="main" style="width:960px;height:540px;position:absolute;top:50%;left:50%;margin-top:-200px;margin-left:-300px;"></div>
    <script type="text/javascript">
        var myChart = echarts.init(document.getElementById('main'));
        var option = {
            title:{text:'某站点实时流处理访问量统计',subtext:'网页访问次数',left:'center'},
            tooltip:{trigger:'item',formatter:'{a} <br/>{b} : {c} ({d}%)'},
            legend:{orient:'vertical',left:'left'},
            series:[{name:'访问次数',type:'pie',radius:'55%',center:['50%','60%'],data:(function(){
                var datas=[];
                $.ajax({type:"POST",url:"/taipark/course_clickcount_dynamic",dataType:"json",async:false,success:function(result){
                    for(var i=0;i<result.length;i++){
                        datas.push({value:result[i].value,name:result[i].name});
                    }
                }});
                return datas;
            })(),emphasis:{itemStyle:{shadowBlur:10,shadowOffsetX:0,shadowColor:'rgba(0,0,0,0.5)'}}}]
        };
        myChart.setOption(option);
    </script>
</body>
</html>

Package the Spring Boot application as a jar and run it on the server: java -jar web-0.0.1.jar Access the UI to see the live pie chart of page views.

Conclusion

The end‑to‑end pipeline demonstrates log generation, scheduling, real‑time ingestion with Flume, message queuing with Kafka, stream processing and aggregation with Spark Streaming, persistent storage in HBase, and interactive visualization using a Spring Boot front‑end powered by ECharts.

Original Source

Signed-in readers can open the original source through BestHub's protected redirect.

Sign in to view source
Republication Notice

This article has been distilled and summarized from source material, then republished for learning and reference. If you believe it infringes your rights, please contactadmin@besthub.devand we will review it promptly.

Big DataKafkaSpring BootHBaseSpark StreamingEChartsFlume
Big Data Technology & Architecture
Written by

Big Data Technology & Architecture

Wang Zhiwu, a big data expert, dedicated to sharing big data technology.

0 followers
Reader feedback

How this landed with the community

Sign in to like

Rate this article

Was this worth your time?

Sign in to rate
Discussion

0 Comments

Thoughtful readers leave field notes, pushback, and hard-won operational detail here.