Big Data 25 min read

Big Data Practice Exercises: Spark, Kafka, and MySQL Integration with Scala and Java

This article presents a series of hands‑on big‑data exercises, including Spark Scala data analysis, Kafka topic creation and custom partitioning, and MySQL table design with Scala‑based streaming calculations, providing complete source code and step‑by‑step solutions for each task.

Big Data Technology & Architecture
Big Data Technology & Architecture
Big Data Technology & Architecture
Big Data Practice Exercises: Spark, Kafka, and MySQL Integration with Scala and Java

The article introduces a collection of practical big‑data problems aimed at students and junior developers, covering data preparation, Spark analysis, Kafka integration, and MySQL persistence.

Data set : a text file test.txt containing records in the format class name age gender subject score (e.g., 12 宋江 25 男 chinese 50).

Exercise 1 – Spark Scala solution reads the file and answers 23 analytical questions (counts, averages, max/min, etc.). The full source code is:

object test {
  def main(args: Array[String]): Unit = {
    val config = new SparkConf().setMaster("local[*]").setAppName("test")
    val sc = new SparkContext(config)
    // 1.读取文件的数据test.txt
    val data: RDD[String]  = sc.textFile("E:\\2020大数据新学年\\BigData\\05-Spark\\0403\\test.txt")
    // 2. 一共有多少个小于20岁的人参加考试?2
    val count1: Long = data.map(x=>x.split(" ")).filter(x=>x(2).toInt<20).groupBy(_(1)).count()
    // 3. 一共有多少个等于20岁的人参加考试?2
    val count2: Long = data.map(x=>x.split(" ")).filter(x=>x(2).toInt==20).groupBy(_(1)).count()
    // 4. 一共有多少个大于20岁的人参加考试?2
    val count3: Long = data.map(x=>x.split(" ")).filter(x=>x(2).toInt>20).groupBy(_(1)).count()
    // 5. 一共有多个男生参加考试?4
    val count4: Long = data.map(x=>x.split(" ")).filter(x=>x(3).equals("男")).groupBy(_(1)).count()
    // 6.  一共有多少个女生参加考试?2
    val count5: Long = data.map(x=>x.split(" ")).filter(x=>x(3).equals("女")).groupBy(_(1)).count()
    // ... (remaining calculations omitted for brevity) ...
  }
}

Exercise 2 – Kafka topic creation creates the topic rng_comment with two partitions and two replicas:

/export/servers/kafka_2.11-1.0.0/bin/kafka-topics.sh --create --zookeeper node01:2181 --replication-factor 2 --partitions 2 --topic rng_comment

Exercise 3 – Data filtering and writing to a new file uses Spark to read, filter out empty lines or rows with missing fields, and write the cleaned data:

object test01_filter {
  def main(args: Array[String]): Unit = {
    val spark: SparkSession = SparkSession.builder().master("local[*]").appName("demo01").getOrCreate()
    val sc: SparkContext = spark.sparkContext
    val rddInfo: RDD[String] = sc.textFile("E:\\rng_comment.txt")
    val RNG_INFO: RDD[String] = rddInfo.filter(data => data.split("\t").length == 11 && !data.trim.isEmpty)
    import spark.implicits._
    val df: DataFrame = RNG_INFO.toDF()
    df.repartition(1).write.text("E:\\outputtest")
    sc.stop()
    spark.stop()
  }
}

Exercise 4 – Custom Kafka partitioner (Java) determines the target partition based on the record id (odd/even):

public class ProducerPartition implements Partitioner {
    @Override
    public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
        String[] str = value.toString().split("\t");
        if (Integer.parseInt(str[0]) % 2 == 0){
            return 0;
        } else {
            return 1;
        }
    }
    @Override public void close() {}
    @Override public void configure(Map<String, ?> configs) {}
}

Exercise 5 – Kafka producer (Java) reads the cleaned file and sends each line to the rng_comment topic using the custom partitioner:

public class test02_send {
    public static void main(String[] args) throws IOException {
        Properties props = new Properties();
        props.put("bootstrap.servers", "node01:9092,node02:9092,node03:9092");
        props.put("acks", "all");
        props.put("retries", 0);
        props.put("batch.size", 16384);
        props.put("linger.ms", 1);
        props.put("buffer.memory", 33554432);
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("partitioner.class", "com.czxy.scala.demo12_0415.han.ProducerPartition");
        KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(props);
        File file = new File("E:\\outputtest\\part-00000-fe536dc7-523d-4fdd-b0b5-1a045b8cb1ab-c000.txt");
        BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(new FileInputStream(file), "UTF-8"));
        String tempString;
        while ((tempString = bufferedReader.readLine()) != null) {
            kafkaProducer.send(new ProducerRecord<>("rng_comment", tempString));
            System.out.println("已发送:" + tempString);
        }
        System.out.println("数据发送完毕!");
        kafkaProducer.close();
    }
}

Exercise 6 – MySQL table creation defines three tables to store processed data:

create table vip_rank(
  `index` varchar(100) null comment '数据id',
  child_comment varchar(100) null comment '回复数量',
  comment_time DATE null comment '评论时间',
  content TEXT null comment '评论内容',
  da_v varchar(100) null comment '微博个人认证',
  like_status varchar(100) null comment '赞',
  pic varchar(100) null comment '图片评论url',
  user_id varchar(100) null comment '微博用户id',
  user_name varchar(100) null comment '微博用户名',
  vip_rank int null comment '微博会员等级',
  stamp varchar(100) null comment '时间戳'
);
create table like_status(
  `index` varchar(100) null comment '数据id',
  child_comment varchar(100) null comment '回复数量',
  comment_time DATE null comment '评论时间',
  content varchar(10000) null comment '评论内容',
  da_v varchar(100) null comment '微博个人认证',
  like_status varchar(100) null comment '赞',
  pic varchar(100) null comment '图片评论url',
  user_id varchar(100) null comment '微博用户id',
  user_name varchar(100) null comment '微博用户名',
  vip_rank int null comment '微博会员等级',
  stamp varchar(100) null comment '时间戳'
);
create table count_comment(
  time DATE null comment '时间',
  count int null comment '出现的次数',
  constraint rng_comment_pk primary key (time)
);

Exercise 7 – Spark Streaming calculations reads from Kafka, filters records with VIP rank 5 or likes > 10, and writes them to the corresponding MySQL tables:

object test03_calculate {
  def ConnectToMysql() = {
    DriverManager.getConnection("jdbc:mysql://localhost:3306/rng_comment?characterEncoding=UTF-8", "root", "root")
  }
  def saveDataToMysql(tableName: String, data: List[String]): Unit = {
    val connection = ConnectToMysql()
    val sql = s"insert into ${tableName} (`index`, child_comment, comment_time, content, da_v,like_status,pic,user_id,user_name,vip_rank,stamp) values (?,?,?,?,?,?,?,?,?,?,?)"
    val ps = connection.prepareStatement(sql)
    data.zipWithIndex.foreach { case (v, i) => ps.setString(i+1, v) }
    ps.executeUpdate()
    connection.close()
  }
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setMaster("local[*]").setAppName("SparkStremingDemo1")
    val sc = new SparkContext(conf)
    sc.setLogLevel("WARN")
    val ssc = new StreamingContext(sc, Seconds(3))
    val kafkaParams = Map[String, Object](
      "bootstrap.servers" -> "node01:9092,node02:9092,node03:9092",
      "key.deserializer" -> classOf[StringDeserializer],
      "value.deserializer" -> classOf[StringDeserializer],
      "group.id" -> "SparkKafkaDemo",
      "auto.offset.reset" -> "earliest",
      "enable.auto.commit" -> (false: java.lang.Boolean)
    )
    val kafkaDatas = KafkaUtils.createDirectStream[String, String](
      ssc,
      LocationStrategies.PreferConsistent,
      ConsumerStrategies.Subscribe[String, String](Array("rng_comment"), kafkaParams)
    )
    kafkaDatas.foreachRDD(rdd => rdd.foreachPartition(line => {
      for (row <- line) {
        val str = row.value().split("\t")
        val list = str.toList
        if (list(9) == "5") saveDataToMysql("vip_rank", list)
        if (Integer.parseInt(list(5)) > 10) saveDataToMysql("like_status", list)
      }
    }))
    ssc.start()
    ssc.awaitTermination()
  }
}

Exercise 8 – Daily comment count aggregates comment counts per day (2018/10/20‑23) and stores them in count_comment:

object test04_count {
  def ConnectToMysql() = {
    DriverManager.getConnection("jdbc:mysql://localhost:3306/rng_test?characterEncoding=UTF-8", "root", "root")
  }
  def saveDataToMysql(time: String, count: Int): Unit = {
    if (time.matches("2018/10/(20|21|22|23)")) {
      val connection = ConnectToMysql()
      val sql = "INSERT INTO count_comment (time,count) VALUES (?,?) ON DUPLICATE KEY UPDATE count = ?"
      val ps = connection.prepareStatement(sql)
      ps.setString(1, time)
      ps.setInt(2, count)
      ps.setInt(3, count)
      ps.executeUpdate()
      connection.close()
    }
  }
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setMaster("local[*]").setAppName("SparkStremingDemo1")
    val sc = new SparkContext(conf)
    sc.setLogLevel("WARN")
    val ssc = new StreamingContext(sc, Seconds(5))
    ssc.checkpoint("./TmpCount")
    val kafkaParams = Map[String, Object](
      "bootstrap.servers" -> "node01:9092,node02:9092,node03:9092",
      "key.deserializer" -> classOf[StringDeserializer],
      "value.deserializer" -> classOf[StringDeserializer],
      "group.id" -> "SparkKafkaDemo",
      "auto.offset.reset" -> "earliest",
      "enable.auto.commit" -> (false: java.lang.Boolean)
    )
    val kafkaDatas = KafkaUtils.createDirectStream[String, String](
      ssc,
      LocationStrategies.PreferConsistent,
      ConsumerStrategies.Subscribe[String, String](Array("rng_comment"), kafkaParams)
    )
    val kafkaWordOne = kafkaDatas.map(z => z.value().split("\t")(2).split(" ")(0)).map((_,1))
    val wordCounts = kafkaWordOne.updateStateByKey(updateFunc)
    wordCounts.foreachRDD(rdd => rdd.foreachPartition(line => {
      for (row <- line) saveDataToMysql(row._1, row._2)
    }))
    ssc.start()
    ssc.awaitTermination()
  }
  def updateFunc(currentValues: Seq[Int], historyValue: Option[Int]): Option[Int] = {
    Some(currentValues.sum + historyValue.getOrElse(0))
  }
}

The article concludes with screenshots of the resulting MySQL tables and a friendly reminder to like, collect, and share the post.

Original Source

Signed-in readers can open the original source through BestHub's protected redirect.

Sign in to view source
Republication Notice

This article has been distilled and summarized from source material, then republished for learning and reference. If you believe it infringes your rights, please contactadmin@besthub.devand we will review it promptly.

Big DataStreamingKafkamysqlSparkScala
Big Data Technology & Architecture
Written by

Big Data Technology & Architecture

Wang Zhiwu, a big data expert, dedicated to sharing big data technology.

0 followers
Reader feedback

How this landed with the community

Sign in to like

Rate this article

Was this worth your time?

Sign in to rate
Discussion

0 Comments

Thoughtful readers leave field notes, pushback, and hard-won operational detail here.