Real‑Time Student Performance Analytics with Flink and Spark
This article demonstrates how to build a real‑time education analytics system by streaming answer data through Kafka into Flink or Spark, performing per‑question, per‑grade, and per‑subject aggregations, and optionally accelerating development with UFlink SQL.
1. Business Scenario
Modern OLTP systems increasingly need to deliver computed and analyzed business data instantly, which requires real‑time stream processing frameworks such as Flink. In a K12 education system, students answer exercises via mobile apps, generating detailed answer records that can be used to quantify learning status and pinpoint weak knowledge points.
2. Data Schema
Each answer record contains fields such as student_id, textbook_id, grade_id, subject_id, chapter_id, question_id, score, answer_time, and ts. Example JSON:
{
"student_id": "学生ID_16",
"textbook_id": "教材ID_1",
"grade_id": "年级ID_1",
"subject_id": "科目ID_2_语文",
"chapter_id": "章节ID_chapter_2",
"question_id": "题目ID_100",
"score": 2,
"answer_time": "2019-09-11 12:44:01",
"ts": "Sep 11, 2019 12:44:01 PM"
}The required analysis tasks are:
Real‑time count of answer frequency per question.
Real‑time count of answer frequency per grade.
Real‑time count of answer frequency per subject and question.
3. Technical Solution Selection
The pipeline sends data to Kafka, reads it with a stream processing framework, performs the analysis, and writes the aggregated results back to another Kafka topic for downstream consumption (e.g., MySQL, HBase, ES). The two candidate frameworks are Flink and Spark.
Flink 1.6.0 introduced state expiration, RocksDB support, enhanced SQL capabilities (UDXF, DML, multi‑time‑type event handling, Kafka Table Sink) and subsequent 1.6.x releases added many optimizations, making it a strong choice.
Spark originally used Spark Streaming (RDD‑based) which required manual RDD transformations. Spark 2.0 introduced Structured Streaming, offering faster stream processing comparable to Flink.
Architecture diagram:
4. Real‑Time Computation with Flink
4.1 Send data to Kafka
val props = new Properties()
props.setProperty("bootstrap.servers", "linux01:9092,linux02:9092,linux03:9092")
props.setProperty("group.id", "group_consumer_learning_test01")
val flinkKafkaSource = new FlinkKafkaConsumer011[String]("test_topic_learning_1", new SimpleStringSchema(), props)
val eventStream = env.addSource[String](flinkKafkaSource)4.2 Build Flink environment
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(3)4.3 Parse JSON
val answerDS = eventStream.map(s => {
val gson = new Gson()
val answer = gson.fromJson(s, classOf[Answer])
answer
})4.4 Register temporary view
val tableEnv = StreamTableEnvironment.create(env)
val table = tableEnv.fromDataStream(answerDS)
tableEnv.registerTable("t_answer", table)4.5 Analysis queries
// Question frequency
val result1 = tableEnv.sqlQuery("""
SELECT question_id, COUNT(1) AS frequency
FROM t_answer
GROUP BY question_id
""".stripMargin)
// Grade frequency
val result2 = tableEnv.sqlQuery("""
SELECT grade_id, COUNT(1) AS frequency
FROM t_answer
GROUP BY grade_id
""".stripMargin)
// Subject‑question frequency
val result3 = tableEnv.sqlQuery("""
SELECT subject_id, question_id, COUNT(1) AS frequency
FROM t_answer
GROUP BY subject_id, question_id
""".stripMargin)4.6 Output results to Kafka
tableEnv.toRetractStream[Result1](result1)
.filter(_._1)
.map(_._2)
.map(new Gson().toJson(_))
.addSink(new FlinkKafkaProducer011[String]("linux01:9092,linux02:9092,linux03:9092", "test_topic_learning_2", new SimpleStringSchema()))
// Similar sinks for result2 and result34.7 Execute the job env.execute("Flink StreamingAnalysis") Result screenshots:
5. Real‑Time Computation with Spark Structured Streaming
5.1 Build SparkSession
val sparkConf = new SparkConf()
.setAppName("StreamingAnalysis")
.set("spark.local.dir", "F:\temp")
.set("spark.default.parallelism", "3")
.set("spark.sql.shuffle.partitions", "3")
.set("spark.executor.instances", "3")
val spark = SparkSession.builder.config(sparkConf).getOrCreate()5.2 Read from Kafka
val inputDataFrame1 = spark.readStream.format("kafka")
.option("kafka.bootstrap.servers", "linux01:9092,linux02:9092,linux03:9092")
.option("subscribe", "test_topic_learning_1")
.load()5.3 Parse JSON
val keyValueDataset1 = inputDataFrame1.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)").as[(String, String)]
val answerDS = keyValueDataset1.map(t => {
val gson = new Gson()
val answer = gson.fromJson(t._2, classOf[Answer])
answer
})
case class Answer(student_id: String, textbook_id: String, grade_id: String, subject_id: String, chapter_id: String, question_id: String, score: Int, answer_time: String, ts: Timestamp)5.4 Register temporary view answerDS.createTempView("t_answer") 5.5 Example query (question frequency)
val result1 = spark.sql("""
SELECT question_id, COUNT(1) AS frequency
FROM t_answer
GROUP BY question_id
""".stripMargin).toJSON5.6 Write results to Kafka
result1.writeStream
.outputMode("update")
.trigger(Trigger.ProcessingTime(0))
.format("kafka")
.option("kafka.bootstrap.servers", "linux01:9092,linux02:9092,linux03:9092")
.option("topic", "test_topic_learning_2")
.option("checkpointLocation", "./checkpoint_chapter11_1")
.start()6. Accelerating Development with UFlink SQL
UFlink SQL provides a SQL‑only development kit on top of Flink, allowing developers to write only SQL statements to accomplish the three analysis tasks.
6.1 Create UKafka cluster (configuration steps omitted).
6.2 Create UFlink cluster (configuration steps omitted).
6.3 Define source and result tables
CREATE TABLE t_answer(
student_id VARCHAR,
textbook_id VARCHAR,
grade_id VARCHAR,
subject_id VARCHAR,
chapter_id VARCHAR,
question_id VARCHAR,
score INT,
answer_time VARCHAR,
ts TIMESTAMP
) WITH (
type = 'kafka11',
bootstrapServers = 'ukafka-mqacnjxk-kafka001:9092,ukafka-mqacnjxk-kafka002:9092,ukafka-mqacnjxk-kafka003:9092',
zookeeperQuorum = 'ukafka-mqacnjxk-kafka001:2181/ukafka',
topic = 'test_topic_learning_1',
groupId = 'group_consumer_learning_test01',
parallelism = '3'
);
CREATE TABLE t_result1(
question_id VARCHAR,
frequency INT
) WITH (
type = 'kafka11',
bootstrapServers = 'ukafka-mqacnjxk-kafka001:9092,ukafka-mqacnjxk-kafka002:9092,ukafka-mqacnjxk-kafka003:9092',
zookeeperQuorum = 'ukafka-mqacnjxk-kafka001:2181/ukafka',
topic = 'test_topic_learning_2',
parallelism = '3'
);
-- Similar definitions for t_result2 and t_result36.4 Insert query results
INSERT INTO t_result1
SELECT question_id, COUNT(1) AS frequency FROM t_answer GROUP BY question_id;
INSERT INTO t_result2
SELECT grade_id, COUNT(1) AS frequency FROM t_answer GROUP BY grade_id;
INSERT INTO t_result3
SELECT subject_id, question_id, COUNT(1) AS frequency FROM t_answer GROUP BY subject_id, question_id;Submitting these SQL statements in the UFlink UI executes the real‑time analytics instantly.
7. Conclusion
UFlink, built on Apache Flink, offers full compatibility with open‑source Flink while adding modules like UFlink SQL to improve development efficiency and lower the entry barrier. Upcoming support for Flink 1.9.0 will further enhance batch‑stream convergence and SQL capabilities.
UCloud Tech
UCloud is a leading neutral cloud provider in China, developing its own IaaS, PaaS, AI service platform, and big data exchange platform, and delivering comprehensive industry solutions for public, private, hybrid, and dedicated clouds.
How this landed with the community
Was this worth your time?
0 Comments
Thoughtful readers leave field notes, pushback, and hard-won operational detail here.
