Real-time Kafka Message Consumption and MySQL Sink with Apache Flink
This tutorial explains how to consume Kafka messages in real time using Apache Flink and persist them into a MySQL database by adding the JDBC dependency, implementing a custom RichSinkFunction, and configuring a Flink job with a Kafka source and MySQL sink.
This article demonstrates how to consume real-time messages from Kafka and write them into MySQL using Apache Flink.
First, add the MySQL JDBC driver dependency to your Maven project.
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>5.1.39</version>
</dependency>Next, implement a custom RichSinkFunction (MysqlSink) that establishes a JDBC connection, prepares a REPLACE INTO statement, and writes each Tuple3<Integer, String, Integer> (id, num, price) to the MySQL table.
public class MysqlSink extends RichSinkFunction<Tuple3<Integer, String, Integer>> {
private Connection connection;
private PreparedStatement preparedStatement;
String username = "";
String password = "";
String drivername = ""; // configure your driver
String dburl = ""; // configure your DB URL
@Override
public void invoke(Tuple3<Integer, String, Integer> value) throws Exception {
Class.forName(drivername);
connection = DriverManager.getConnection(dburl, username, password);
String sql = "replace into table(id,num,price) values(?,?,?)"; // assume table has columns id,num,price
preparedStatement = connection.prepareStatement(sql);
preparedStatement.setInt(1, value.f0);
preparedStatement.setString(2, value.f1);
preparedStatement.setInt(3, value.f2);
preparedStatement.executeUpdate();
if (preparedStatement != null) {
preparedStatement.close();
}
if (connection != null) {
connection.close();
}
}
}Finally, create the Flink job main class (MysqlSinkTest) that sets up the StreamExecutionEnvironment, configures the Kafka consumer, parses incoming CSV strings into Tuple3 objects, and adds the MysqlSink to the data stream.
public class MysqlSinkTest {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
Properties properties = new Properties();
properties.setProperty("bootstrap.servers", "localhost:9092");
// Example data: 1,abc,100 (could also be complex JSON)
FlinkKafkaConsumer<String> consumer = new FlinkKafkaConsumer<>("test", new SimpleStringSchema(), properties);
env.getConfig().disableSysoutLogging(); // suppress log output
env.getConfig().setRestartStrategy(
RestartStrategies.fixedDelayRestart(5, 5000));
env.enableCheckpointing(2000);
DataStream<String> stream = env.addSource(consumer);
DataStream<Tuple3<Integer, String, Integer>> sourceStream = stream
.filter((FilterFunction<String>) value -> StringUtils.isNotBlank(value))
.map((MapFunction<String, Tuple3<Integer, String, Integer>>) value -> {
String[] args1 = value.split(",");
return new Tuple3<>(Integer.valueOf(args1[0]), args1[1], Integer.valueOf(args1[2]));
});
sourceStream.addSink(new MysqlSink());
env.execute("data to mysql start");
}
}Replace the placeholder configuration values (driver name, DB URL, username, password) with your own settings, compile and run the job, and you will see Kafka data being persisted into MySQL in real time.
Signed-in readers can open the original source through BestHub's protected redirect.
This article has been distilled and summarized from source material, then republished for learning and reference. If you believe it infringes your rights, please contactand we will review it promptly.
Big Data Technology & Architecture
Wang Zhiwu, a big data expert, dedicated to sharing big data technology.
How this landed with the community
Was this worth your time?
0 Comments
Thoughtful readers leave field notes, pushback, and hard-won operational detail here.
