Big Data 17 min read

Real‑Time Student Performance Analytics with Flink and Spark

This article demonstrates how to build a real‑time education analytics system by streaming answer data through Kafka into Flink or Spark, performing per‑question, per‑grade, and per‑subject aggregations, and optionally accelerating development with UFlink SQL.

UCloud Tech
UCloud Tech
UCloud Tech
Real‑Time Student Performance Analytics with Flink and Spark

1. Business Scenario

Modern OLTP systems increasingly need to deliver computed and analyzed business data instantly, which requires real‑time stream processing frameworks such as Flink. In a K12 education system, students answer exercises via mobile apps, generating detailed answer records that can be used to quantify learning status and pinpoint weak knowledge points.

2. Data Schema

Each answer record contains fields such as student_id, textbook_id, grade_id, subject_id, chapter_id, question_id, score, answer_time, and ts. Example JSON:

{
  "student_id": "学生ID_16",
  "textbook_id": "教材ID_1",
  "grade_id": "年级ID_1",
  "subject_id": "科目ID_2_语文",
  "chapter_id": "章节ID_chapter_2",
  "question_id": "题目ID_100",
  "score": 2,
  "answer_time": "2019-09-11 12:44:01",
  "ts": "Sep 11, 2019 12:44:01 PM"
}

The required analysis tasks are:

Real‑time count of answer frequency per question.

Real‑time count of answer frequency per grade.

Real‑time count of answer frequency per subject and question.

3. Technical Solution Selection

The pipeline sends data to Kafka, reads it with a stream processing framework, performs the analysis, and writes the aggregated results back to another Kafka topic for downstream consumption (e.g., MySQL, HBase, ES). The two candidate frameworks are Flink and Spark.

Flink 1.6.0 introduced state expiration, RocksDB support, enhanced SQL capabilities (UDXF, DML, multi‑time‑type event handling, Kafka Table Sink) and subsequent 1.6.x releases added many optimizations, making it a strong choice.

Spark originally used Spark Streaming (RDD‑based) which required manual RDD transformations. Spark 2.0 introduced Structured Streaming, offering faster stream processing comparable to Flink.

Architecture diagram:

4. Real‑Time Computation with Flink

4.1 Send data to Kafka

val props = new Properties()
props.setProperty("bootstrap.servers", "linux01:9092,linux02:9092,linux03:9092")
props.setProperty("group.id", "group_consumer_learning_test01")
val flinkKafkaSource = new FlinkKafkaConsumer011[String]("test_topic_learning_1", new SimpleStringSchema(), props)
val eventStream = env.addSource[String](flinkKafkaSource)

4.2 Build Flink environment

val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(3)

4.3 Parse JSON

val answerDS = eventStream.map(s => {
  val gson = new Gson()
  val answer = gson.fromJson(s, classOf[Answer])
  answer
})

4.4 Register temporary view

val tableEnv = StreamTableEnvironment.create(env)
val table = tableEnv.fromDataStream(answerDS)
tableEnv.registerTable("t_answer", table)

4.5 Analysis queries

// Question frequency
val result1 = tableEnv.sqlQuery("""
SELECT question_id, COUNT(1) AS frequency
FROM t_answer
GROUP BY question_id
""".stripMargin)
// Grade frequency
val result2 = tableEnv.sqlQuery("""
SELECT grade_id, COUNT(1) AS frequency
FROM t_answer
GROUP BY grade_id
""".stripMargin)
// Subject‑question frequency
val result3 = tableEnv.sqlQuery("""
SELECT subject_id, question_id, COUNT(1) AS frequency
FROM t_answer
GROUP BY subject_id, question_id
""".stripMargin)

4.6 Output results to Kafka

tableEnv.toRetractStream[Result1](result1)
  .filter(_._1)
  .map(_._2)
  .map(new Gson().toJson(_))
  .addSink(new FlinkKafkaProducer011[String]("linux01:9092,linux02:9092,linux03:9092", "test_topic_learning_2", new SimpleStringSchema()))
// Similar sinks for result2 and result3

4.7 Execute the job env.execute("Flink StreamingAnalysis") Result screenshots:

5. Real‑Time Computation with Spark Structured Streaming

5.1 Build SparkSession

val sparkConf = new SparkConf()
  .setAppName("StreamingAnalysis")
  .set("spark.local.dir", "F:\temp")
  .set("spark.default.parallelism", "3")
  .set("spark.sql.shuffle.partitions", "3")
  .set("spark.executor.instances", "3")
val spark = SparkSession.builder.config(sparkConf).getOrCreate()

5.2 Read from Kafka

val inputDataFrame1 = spark.readStream.format("kafka")
  .option("kafka.bootstrap.servers", "linux01:9092,linux02:9092,linux03:9092")
  .option("subscribe", "test_topic_learning_1")
  .load()

5.3 Parse JSON

val keyValueDataset1 = inputDataFrame1.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)").as[(String, String)]
val answerDS = keyValueDataset1.map(t => {
  val gson = new Gson()
  val answer = gson.fromJson(t._2, classOf[Answer])
  answer
})
case class Answer(student_id: String, textbook_id: String, grade_id: String, subject_id: String, chapter_id: String, question_id: String, score: Int, answer_time: String, ts: Timestamp)

5.4 Register temporary view answerDS.createTempView("t_answer") 5.5 Example query (question frequency)

val result1 = spark.sql("""
SELECT question_id, COUNT(1) AS frequency
FROM t_answer
GROUP BY question_id
""".stripMargin).toJSON

5.6 Write results to Kafka

result1.writeStream
  .outputMode("update")
  .trigger(Trigger.ProcessingTime(0))
  .format("kafka")
  .option("kafka.bootstrap.servers", "linux01:9092,linux02:9092,linux03:9092")
  .option("topic", "test_topic_learning_2")
  .option("checkpointLocation", "./checkpoint_chapter11_1")
  .start()

6. Accelerating Development with UFlink SQL

UFlink SQL provides a SQL‑only development kit on top of Flink, allowing developers to write only SQL statements to accomplish the three analysis tasks.

6.1 Create UKafka cluster (configuration steps omitted).

6.2 Create UFlink cluster (configuration steps omitted).

6.3 Define source and result tables

CREATE TABLE t_answer(
  student_id VARCHAR,
  textbook_id VARCHAR,
  grade_id VARCHAR,
  subject_id VARCHAR,
  chapter_id VARCHAR,
  question_id VARCHAR,
  score INT,
  answer_time VARCHAR,
  ts TIMESTAMP
) WITH (
  type = 'kafka11',
  bootstrapServers = 'ukafka-mqacnjxk-kafka001:9092,ukafka-mqacnjxk-kafka002:9092,ukafka-mqacnjxk-kafka003:9092',
  zookeeperQuorum = 'ukafka-mqacnjxk-kafka001:2181/ukafka',
  topic = 'test_topic_learning_1',
  groupId = 'group_consumer_learning_test01',
  parallelism = '3'
);

CREATE TABLE t_result1(
  question_id VARCHAR,
  frequency INT
) WITH (
  type = 'kafka11',
  bootstrapServers = 'ukafka-mqacnjxk-kafka001:9092,ukafka-mqacnjxk-kafka002:9092,ukafka-mqacnjxk-kafka003:9092',
  zookeeperQuorum = 'ukafka-mqacnjxk-kafka001:2181/ukafka',
  topic = 'test_topic_learning_2',
  parallelism = '3'
);
-- Similar definitions for t_result2 and t_result3

6.4 Insert query results

INSERT INTO t_result1
SELECT question_id, COUNT(1) AS frequency FROM t_answer GROUP BY question_id;
INSERT INTO t_result2
SELECT grade_id, COUNT(1) AS frequency FROM t_answer GROUP BY grade_id;
INSERT INTO t_result3
SELECT subject_id, question_id, COUNT(1) AS frequency FROM t_answer GROUP BY subject_id, question_id;

Submitting these SQL statements in the UFlink UI executes the real‑time analytics instantly.

7. Conclusion

UFlink, built on Apache Flink, offers full compatibility with open‑source Flink while adding modules like UFlink SQL to improve development efficiency and lower the entry barrier. Upcoming support for Flink 1.9.0 will further enhance batch‑stream convergence and SQL capabilities.

FlinkKafkaReal-time Stream ProcessingEducation AnalyticsUFlink SQL
UCloud Tech
Written by

UCloud Tech

UCloud is a leading neutral cloud provider in China, developing its own IaaS, PaaS, AI service platform, and big data exchange platform, and delivering comprehensive industry solutions for public, private, hybrid, and dedicated clouds.

0 followers
Reader feedback

How this landed with the community

Sign in to like

Rate this article

Was this worth your time?

Sign in to rate
Discussion

0 Comments

Thoughtful readers leave field notes, pushback, and hard-won operational detail here.