How Flink SQL Simplifies Real-Time Data Cleaning Compared to Storm
This article introduces Flink’s background, architecture, and ecosystem, then demonstrates a step‑by‑step tutorial on using Flink SQL to clean and transform streaming data from Kafka, highlighting its advantages over Storm for real‑time ETL.
Background
Baixing.com processes massive user behavior and system data in real time to monitor business status, originally using a Storm‑based pipeline that required Java code changes for every new data‑processing rule, creating a heavy burden for the SQL‑savvy data team.
About Flink
Flink's Origin
Flink is a next‑generation stream processing framework that joined the Apache Foundation in 2014, written mainly in Java and now led by Data Artisans. It supports both stream and batch processing, with a core Stream API wrapped by Table API and SQL, offering native exactly‑once semantics without extra effort.
Flink Architecture Overview
Flink’s architecture resembles other distributed engines and consists of two main components:
JobManager: handles submitted jobs, performs scheduling, and monitors the cluster.
TaskManager: executes tasks assigned by the JobManager and exchanges data among TaskManagers.
Each TaskManager runs as a JVM instance with fine‑grained memory allocation and can spill data to off‑heap binary streams, reducing OOM risk.
The diagram shows a client submitting a program to the JobManager, which then distributes tasks to two TaskManagers for processing.
Flink Ecosystem
A healthy ecosystem is crucial for a framework’s longevity. The following stack illustrates Flink’s components:
The stack shows support for Table & SQL, machine learning, graph computing, and more, approaching Spark’s coverage though some APIs remain in beta.
Practical Example: Using Flink SQL to Process Data
We recreate a Storm‑based data‑cleaning requirement with Flink. The source JSON (type A) is:
{
"type":"A",
"timestamp":"1502323401603",
"msg":{
"userId":"123456",
"id":"sv123",
"start_flag":"true"
}
}and should be transformed into:
{
"type":"B",
"ts":1502323401603,
"uid":"123456",
"vid":"sv123",
"start_flag":1
}The transformation flattens fields and modifies values (e.g., changing "type" from A to B). We build a Maven project with the quick‑start script:
curl https://flink.apache.org/q/quickstart.sh | bashProject structure:
quickstart
├── pom.xml
└── src
└── main
├── java
│ └── org
│ └── myorg
│ └── quickstart
│ ├── BatchJob.java
│ ├── SocketTextStreamWordCount.java
│ ├── StreamingJob.java
│ └── WordCount.java
└── resources
└── log4j.propertiesAdd the Flink Table dependency in pom.xml:
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table_2.10</artifactId>
<version>1.3.2</version>
</dependency>Ensure flink.version and scala.binary.version match (Flink 1.3.2, Scala 2.11).
...<br/><properties><br/> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding><br/> <flink.version>1.3.2</flink.version><br/> <slf4j.version>1.7.7</slf4j.version><br/> <log4j.version>1.2.17</log4j.version><br/> <scala.binary.version>2.11</scala.binary.version><br/></properties><br/>...Create a class TableExample with the following code:
public class TableExample {
public static void main(String args[]) throws Exception {
// streaming execution environment
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// table environment
StreamTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);
// define Kafka JSON source schema
TypeInformation<Row> typeInfo = Types.ROW(
new String[] { "type", "ts", "msg" },
new TypeInformation<?>[] { Types.STRING(), Types.STRING(), Types.MAP(Types.STRING(), Types.STRING()) }
);
String kafkaTopic = "test";
Properties kafkaProperties = new Properties();
kafkaProperties.setProperty("bootstrap.servers", "localhost:9092");
kafkaProperties.setProperty("group.id", "flink_test11");
KafkaJsonTableSource kafkaTableSource = new Kafka010JsonTableSource(kafkaTopic, kafkaProperties, typeInfo);
tableEnv.registerTableSource("kafka_source", kafkaTableSource);
String execSql = "
SELECT
\tCASE WHEN type = 'A' THEN 'B' ELSE type END AS type,
\tCAST (ts AS DECIMAL) AS ts,
\tmsg['userId'] AS uid,
\tCASE WHEN msg['id'] LIKE 'sv%' THEN msg['id'] END AS vid,
\tCASE WHEN msg['start_flag'] = 'true' THEN 1 ELSE 0 END AS start_flag
FROM kafka_source";
Table play = tableEnv.sql(execSql);
DataStream<Row> playStreaming = tableEnv.toAppendStream(play, Row.class);
playStreaming.print();
env.execute("play table");
}
}The SQL query performs the required field mapping and value conversion.
Run the class and send the following JSON to the Kafka test topic:
{"type":"A","ts":"1502323401603","msg":{"userId":"123456","id":"sv123","start_flag":"true"}}The console output is: B,1502323401603,123456,sv123,1 The result matches expectations, and the output can be converted back to JSON for downstream use.
Compared with Storm, Flink allows each event‑level cleaning rule to be expressed in a single SQL statement, eliminating the need for multiple custom Bolts and reducing maintenance overhead.
Conclusion
This article briefly presented Flink’s features and demonstrated a concrete case of using Flink SQL for real‑time ETL. Flink’s SQL‑first approach lowers the barrier for data engineers, though some APIs are still in beta. Readers are encouraged to explore the official documentation for deeper insights.
Signed-in readers can open the original source through BestHub's protected redirect.
This article has been distilled and summarized from source material, then republished for learning and reference. If you believe it infringes your rights, please contactand we will review it promptly.
Baixing.com Technical Team
A collection of the Baixing.com tech team's insights and learnings, featuring one weekly technical article worth following.
How this landed with the community
Was this worth your time?
0 Comments
Thoughtful readers leave field notes, pushback, and hard-won operational detail here.
